VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
had a different meaning and the name seems to have been blindly ported to 2.x adapter
and does not make sense anymore
Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 490a40c..b75bfac 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -42,7 +42,7 @@
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
)
-var flowMgr *OpenOltFlowMgr
+var flowMgr []*OpenOltFlowMgr
func init() {
_, _ = log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
@@ -58,8 +58,8 @@
deviceinfo := &openolt.DeviceInfo{Vendor: "openolt", Model: "openolt", HardwareVersion: "1.0", FirmwareVersion: "1.0",
DeviceId: "olt", DeviceSerialNumber: "openolt", PonPorts: 16, Technology: "Default",
- OnuIdStart: 1, OnuIdEnd: 1, AllocIdStart: 1, AllocIdEnd: 1,
- GemportIdStart: 1, GemportIdEnd: 1, FlowIdStart: 1, FlowIdEnd: 1,
+ OnuIdStart: OnuIDStart, OnuIdEnd: OnuIDEnd, AllocIdStart: AllocIDStart, AllocIdEnd: AllocIDEnd,
+ GemportIdStart: GemIDStart, GemportIdEnd: GemIDEnd, FlowIdStart: FlowIDStart, FlowIdEnd: FlowIDEnd,
Ranges: ranges,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -73,7 +73,7 @@
return rsrMgr
}
-func newMockFlowmgr() *OpenOltFlowMgr {
+func newMockFlowmgr() []*OpenOltFlowMgr {
rMgr := newMockResourceMgr()
dh := newMockDeviceHandler()
@@ -81,40 +81,32 @@
rMgr.KVStore.Client = &mocks.MockKVClient{}
dh.resourceMgr = rMgr
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- flwMgr := NewFlowManager(ctx, dh, rMgr)
- onuGemInfo1 := make([]rsrcMgr.OnuGemInfo, 2)
- onuGemInfo2 := make([]rsrcMgr.OnuGemInfo, 2)
- onuGemInfo1[0] = rsrcMgr.OnuGemInfo{OnuID: 1, SerialNumber: "1", IntfID: 1, GemPorts: []uint32{1}}
- onuGemInfo2[1] = rsrcMgr.OnuGemInfo{OnuID: 2, SerialNumber: "2", IntfID: 2, GemPorts: []uint32{2}}
- flwMgr.onuGemInfo[1] = onuGemInfo1
- flwMgr.onuGemInfo[2] = onuGemInfo2
+ // onuGemInfo := make([]rsrcMgr.OnuGemInfo, NumPonPorts)
+ var i uint32
- packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
- packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 1, OnuID: 1, LogicalPort: 1}] = 1
- packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 2, OnuID: 2, LogicalPort: 2}] = 2
+ for i = 0; i < NumPonPorts; i++ {
+ packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
+ packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
- flwMgr.packetInGemPort = packetInGemPort
- tps := make(map[uint32]tp.TechProfileIf)
- for key := range rMgr.ResourceMgrs {
- tps[key] = mocks.MockTechProfile{TpID: key}
+ dh.flowMgr[i].packetInGemPort = packetInGemPort
+ tps := make(map[uint32]tp.TechProfileIf)
+ for key := range rMgr.ResourceMgrs {
+ tps[key] = mocks.MockTechProfile{TpID: key}
+ }
+ dh.flowMgr[i].techprofile = tps
+ interface2mcastQeueuMap := make(map[uint32]*QueueInfoBrief)
+ interface2mcastQeueuMap[0] = &QueueInfoBrief{
+ gemPortID: 4000,
+ servicePriority: 3,
+ }
+ dh.flowMgr[i].grpMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
}
- flwMgr.techprofile = tps
- interface2mcastQeueuMap := make(map[uint32]*queueInfoBrief)
- interface2mcastQeueuMap[0] = &queueInfoBrief{
- gemPortID: 4000,
- servicePriority: 3,
- }
- flwMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
- return flwMgr
+ return dh.flowMgr
}
func TestOpenOltFlowMgr_CreateSchedulerQueues(t *testing.T) {
- // flowMgr := newMockFlowmgr()
-
tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -140,26 +132,26 @@
wantErr bool
}{
// TODO: Add test cases.
- {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
- {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
- {"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 2, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 2, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+ {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
+ {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+ {"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
//Negative testcases
- {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, nil}, true},
+ {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+ if err := flowMgr[tt.schedQueue.intfID].CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.CreateSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -167,8 +159,6 @@
}
func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
-
- // flowMgr := newMockFlowmgr()
tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -198,7 +188,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+ if err := flowMgr[tt.schedQueue.intfID].RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.RemoveSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -207,7 +197,6 @@
}
func TestOpenOltFlowMgr_createTcontGemports(t *testing.T) {
- // flowMgr := newMockFlowmgr()
bands := make([]*ofp.OfpMeterBandHeader, 2)
bands[0] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 1000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
bands[1] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 2000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
@@ -237,7 +226,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- _, _, tpInst := flowMgr.createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
+ _, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
switch tpInst := tpInst.(type) {
case *tp.TechProfile:
if tt.args.TpID != 64 {
@@ -256,7 +245,6 @@
func TestOpenOltFlowMgr_RemoveFlow(t *testing.T) {
ctx := context.Background()
- // flowMgr := newMockFlowmgr()
logger.Debug(ctx, "Info Warning Error: Starting RemoveFlow() test")
fa := &fu.FlowArgs{
MatchFields: []*ofp.OfpOxmOfbField{
@@ -334,7 +322,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.RemoveFlow(ctx, tt.args.flow); err != nil {
+ if err := flowMgr[0].RemoveFlow(ctx, tt.args.flow); err != nil {
logger.Warn(ctx, err)
}
})
@@ -343,7 +331,6 @@
}
func TestOpenOltFlowMgr_AddFlow(t *testing.T) {
- // flowMgr := newMockFlowmgr()
kw := make(map[string]uint64)
kw["table_id"] = 1
kw["meter_id"] = 1
@@ -581,29 +568,27 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- _ = flowMgr.AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
+ _ = flowMgr[0].AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
// TODO: actually verify test cases
})
}
}
func TestOpenOltFlowMgr_UpdateOnuInfo(t *testing.T) {
- flwMgr := newMockFlowmgr()
-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wg := sync.WaitGroup{}
- intfCount := 16
- onuCount := 32
+ intfCount := NumPonPorts
+ onuCount := OnuIDEnd - OnuIDStart + 1
for i := 0; i < intfCount; i++ {
- for j := 0; j < onuCount; j++ {
+ for j := 1; j <= onuCount; j++ {
wg.Add(1)
go func(i uint32, j uint32) {
// TODO: actually verify success
- _ = flwMgr.UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
+ _ = flowMgr[i].UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
wg.Done()
}(uint32(i), uint32(j))
}
@@ -614,34 +599,35 @@
}
func TestOpenOltFlowMgr_addGemPortToOnuInfoMap(t *testing.T) {
- flowMgr = newMockFlowmgr()
- intfNum := 16
- onuNum := 32
+ intfNum := NumPonPorts
+ onuNum := OnuIDEnd - OnuIDStart + 1
// clean the flowMgr
- flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+ for i := 0; i < intfNum; i++ {
+ flowMgr[i].onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+ }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create OnuInfo
for i := 0; i < intfNum; i++ {
- for o := 0; o < onuNum; o++ {
+ for o := 1; o <= onuNum; o++ {
// TODO: actually verify success
- _ = flowMgr.UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o))
+ _ = flowMgr[i].UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o-1))
}
}
// Add gemPorts to OnuInfo in parallel threads
wg := sync.WaitGroup{}
- for o := 0; o < onuNum; o++ {
+ for o := 1; o <= onuNum; o++ {
for i := 0; i < intfNum; i++ {
wg.Add(1)
go func(intfId uint32, onuId uint32) {
- gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId))
+ gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId-1))
- flowMgr.addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
+ flowMgr[intfId].addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
wg.Done()
}(uint32(i), uint32(o))
}
@@ -651,21 +637,21 @@
// check that each entry of onuGemInfo has the correct number of ONUs
for i := 0; i < intfNum; i++ {
- lenofOnu := len(flowMgr.onuGemInfo[uint32(i)])
+ lenofOnu := len(flowMgr[i].onuGemInfo[uint32(i)])
if onuNum != lenofOnu {
t.Errorf("OnuGemInfo length is not as expected len = %d, want %d", lenofOnu, onuNum)
}
- for o := 0; o < onuNum; o++ {
- lenOfGemPorts := len(flowMgr.onuGemInfo[uint32(i)][o].GemPorts)
+ for o := 1; o <= onuNum; o++ {
+ lenOfGemPorts := len(flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts)
// check that each onuEntry has 1 gemPort
if lenOfGemPorts != 1 {
t.Errorf("Expected 1 GemPort per ONU, found %d", lenOfGemPorts)
}
// check that the value of the gemport is correct
- gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o))
- currentValue := flowMgr.onuGemInfo[uint32(i)][o].GemPorts[0]
+ gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o-1))
+ currentValue := flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts[0]
if uint32(gemID) != currentValue {
t.Errorf("Expected GemPort value to be %d, found %d", gemID, currentValue)
}
@@ -674,7 +660,8 @@
}
func TestOpenOltFlowMgr_deleteGemPortFromLocalCache(t *testing.T) {
- flwMgr := newMockFlowmgr()
+ // Create fresh flowMgr instance
+ flowMgr = newMockFlowmgr()
type args struct {
intfID uint32
onuID uint32
@@ -704,18 +691,18 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// TODO: should check returned errors are as expected?
- _ = flwMgr.UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
+ _ = flowMgr[tt.args.intfID].UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
for _, gemPort := range tt.args.gemPortIDs {
- flwMgr.addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
+ flowMgr[tt.args.intfID].addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
}
for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
- flwMgr.deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
+ flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
}
- lenofGemPorts := len(flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts)
+ lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts)
if lenofGemPorts != tt.args.finalLength {
t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
}
- gemPorts := flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts
+ gemPorts := flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts
if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
}
@@ -725,7 +712,6 @@
}
func TestOpenOltFlowMgr_GetLogicalPortFromPacketIn(t *testing.T) {
- flwMgr := newMockFlowmgr()
type args struct {
packetIn *openoltpb2.PacketIndication
}
@@ -736,18 +722,18 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048577, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
// Negative Test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 2, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 4112, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flwMgr.GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
+ got, err := flowMgr[tt.args.packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
if (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.GetLogicalPortFromPacketIn() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -760,7 +746,8 @@
}
func TestOpenOltFlowMgr_GetPacketOutGemPortID(t *testing.T) {
- // flwMgr := newMockFlowmgr()
+ // Create fresh flowMgr instance
+ flowMgr = newMockFlowmgr()
//untagged packet in hex string
untaggedStr := "01005e000002000000000001080046c00020000040000102fa140a000001e00000029404000017000705e10000fa"
@@ -769,15 +756,15 @@
t.Error("Unable to parse hex string", err)
panic(err)
}
- //single-tagged packet in hex string. vlanID.pbit: 540.0
- singleTaggedStr := "01005e0000010025ba48172481000225080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
+ //single-tagged packet in hex string. vlanID.pbit: 1.1
+ singleTaggedStr := "01005e0000010025ba48172481002001080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
singleTagged, err := hex.DecodeString(singleTaggedStr)
if err != nil {
t.Error("Unable to parse hex string", err)
panic(err)
}
- //double-tagged packet in hex string. vlanID.pbit: 210.0-48.7
- doubleTaggedStr := "01005e000016deadbeefba11810002108100e030080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
+ //double-tagged packet in hex string. vlanID.pbit: 210.0-0.0
+ doubleTaggedStr := "01005e000016deadbeefba118100021081000000080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
doubleTagged, err := hex.DecodeString(doubleTaggedStr)
if err != nil {
t.Error("Unable to parse hex string", err)
@@ -797,11 +784,11 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: untagged}, 3, false},
- {"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 4, packet: singleTagged}, 4, false},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: doubleTagged}, 2, false},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 10, portNum: 10, packet: untagged}, 2, true},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: untagged}, 1, false},
+ {"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: singleTagged}, 2, false},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: doubleTagged}, 1, false},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 10, portNum: 10, packet: untagged}, 2, true},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -809,7 +796,7 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
+ got, err := flowMgr[tt.args.intfID].GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
if tt.wantErr {
if err == nil {
//error expected but got value
@@ -830,7 +817,6 @@
}
func TestOpenOltFlowMgr_DeleteTechProfileInstance(t *testing.T) {
- // flwMgr := newMockFlowmgr()
type args struct {
intfID uint32
onuID uint32
@@ -850,7 +836,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
+ if err := flowMgr[tt.args.intfID].DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.DeleteTechProfileInstance() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -859,7 +845,6 @@
func TestOpenOltFlowMgr_checkAndAddFlow(t *testing.T) {
ctx := context.Background()
- // flowMgr := newMockFlowmgr()
kw := make(map[string]uint64)
kw["table_id"] = 1
kw["meter_id"] = 1
@@ -1097,7 +1082,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowMgr.checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
+ flowMgr[tt.args.intfID].checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
tt.args.TpInst, tt.args.gemPorts, tt.args.TpID, tt.args.uni)
})
}
@@ -1108,7 +1093,7 @@
defer cancel()
//create group
group := newGroup(2, []uint32{1})
- err := flowMgr.AddGroup(ctx, group)
+ err := flowMgr[0].grpMgr.AddGroup(ctx, group)
if err != nil {
t.Error("group-add failed", err)
return
@@ -1128,7 +1113,7 @@
}
ofpStats, _ := fu.MkFlowStat(multicastFlowArgs)
fmt.Println(ofpStats.Id)
- err = flowMgr.AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
+ err = flowMgr[0].AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
if err != nil {
t.Error("Multicast flow-add failed", err)
return
@@ -1136,20 +1121,20 @@
//add bucket to the group
group = newGroup(2, []uint32{1, 2})
- err = flowMgr.ModifyGroup(ctx, group)
+ err = flowMgr[0].grpMgr.ModifyGroup(ctx, group)
if err != nil {
t.Error("modify-group failed", err)
return
}
//remove the multicast flow
- err = flowMgr.RemoveFlow(ctx, ofpStats)
+ err = flowMgr[0].RemoveFlow(ctx, ofpStats)
if err != nil {
t.Error("Multicast flow-remove failed", err)
return
}
//remove the group
- err = flowMgr.DeleteGroup(ctx, group)
+ err = flowMgr[0].grpMgr.DeleteGroup(ctx, group)
if err != nil {
t.Error("delete-group failed", err)
return