VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
  separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
  had a different meaning and the name seems to have been blindly ported to 2.x adapter
  and does not make sense anymore

Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 490a40c..b75bfac 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -42,7 +42,7 @@
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 )
 
-var flowMgr *OpenOltFlowMgr
+var flowMgr []*OpenOltFlowMgr
 
 func init() {
 	_, _ = log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
@@ -58,8 +58,8 @@
 
 	deviceinfo := &openolt.DeviceInfo{Vendor: "openolt", Model: "openolt", HardwareVersion: "1.0", FirmwareVersion: "1.0",
 		DeviceId: "olt", DeviceSerialNumber: "openolt", PonPorts: 16, Technology: "Default",
-		OnuIdStart: 1, OnuIdEnd: 1, AllocIdStart: 1, AllocIdEnd: 1,
-		GemportIdStart: 1, GemportIdEnd: 1, FlowIdStart: 1, FlowIdEnd: 1,
+		OnuIdStart: OnuIDStart, OnuIdEnd: OnuIDEnd, AllocIdStart: AllocIDStart, AllocIdEnd: AllocIDEnd,
+		GemportIdStart: GemIDStart, GemportIdEnd: GemIDEnd, FlowIdStart: FlowIDStart, FlowIdEnd: FlowIDEnd,
 		Ranges: ranges,
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -73,7 +73,7 @@
 	return rsrMgr
 }
 
-func newMockFlowmgr() *OpenOltFlowMgr {
+func newMockFlowmgr() []*OpenOltFlowMgr {
 	rMgr := newMockResourceMgr()
 	dh := newMockDeviceHandler()
 
@@ -81,40 +81,32 @@
 	rMgr.KVStore.Client = &mocks.MockKVClient{}
 
 	dh.resourceMgr = rMgr
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-	defer cancel()
-	flwMgr := NewFlowManager(ctx, dh, rMgr)
 
-	onuGemInfo1 := make([]rsrcMgr.OnuGemInfo, 2)
-	onuGemInfo2 := make([]rsrcMgr.OnuGemInfo, 2)
-	onuGemInfo1[0] = rsrcMgr.OnuGemInfo{OnuID: 1, SerialNumber: "1", IntfID: 1, GemPorts: []uint32{1}}
-	onuGemInfo2[1] = rsrcMgr.OnuGemInfo{OnuID: 2, SerialNumber: "2", IntfID: 2, GemPorts: []uint32{2}}
-	flwMgr.onuGemInfo[1] = onuGemInfo1
-	flwMgr.onuGemInfo[2] = onuGemInfo2
+	// onuGemInfo := make([]rsrcMgr.OnuGemInfo, NumPonPorts)
+	var i uint32
 
-	packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
-	packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 1, OnuID: 1, LogicalPort: 1}] = 1
-	packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 2, OnuID: 2, LogicalPort: 2}] = 2
+	for i = 0; i < NumPonPorts; i++ {
+		packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
+		packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
 
-	flwMgr.packetInGemPort = packetInGemPort
-	tps := make(map[uint32]tp.TechProfileIf)
-	for key := range rMgr.ResourceMgrs {
-		tps[key] = mocks.MockTechProfile{TpID: key}
+		dh.flowMgr[i].packetInGemPort = packetInGemPort
+		tps := make(map[uint32]tp.TechProfileIf)
+		for key := range rMgr.ResourceMgrs {
+			tps[key] = mocks.MockTechProfile{TpID: key}
+		}
+		dh.flowMgr[i].techprofile = tps
+		interface2mcastQeueuMap := make(map[uint32]*QueueInfoBrief)
+		interface2mcastQeueuMap[0] = &QueueInfoBrief{
+			gemPortID:       4000,
+			servicePriority: 3,
+		}
+		dh.flowMgr[i].grpMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
 	}
-	flwMgr.techprofile = tps
 
-	interface2mcastQeueuMap := make(map[uint32]*queueInfoBrief)
-	interface2mcastQeueuMap[0] = &queueInfoBrief{
-		gemPortID:       4000,
-		servicePriority: 3,
-	}
-	flwMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
-	return flwMgr
+	return dh.flowMgr
 }
 
 func TestOpenOltFlowMgr_CreateSchedulerQueues(t *testing.T) {
-	// flowMgr := newMockFlowmgr()
-
 	tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
 		ProfileType: "pt1", NumGemPorts: 1, Version: 1,
 		InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -140,26 +132,26 @@
 		wantErr    bool
 	}{
 		// TODO: Add test cases.
-		{"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
-		{"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
-		{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 2, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 2, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+		{"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
+		{"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+		{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
 		//Negative testcases
-		{"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, nil}, true},
+		{"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+			if err := flowMgr[tt.schedQueue.intfID].CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.CreateSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -167,8 +159,6 @@
 }
 
 func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
-
-	// flowMgr := newMockFlowmgr()
 	tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
 		ProfileType: "pt1", NumGemPorts: 1, Version: 1,
 		InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -198,7 +188,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+			if err := flowMgr[tt.schedQueue.intfID].RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.RemoveSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -207,7 +197,6 @@
 }
 
 func TestOpenOltFlowMgr_createTcontGemports(t *testing.T) {
-	// flowMgr := newMockFlowmgr()
 	bands := make([]*ofp.OfpMeterBandHeader, 2)
 	bands[0] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 1000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
 	bands[1] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 2000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
@@ -237,7 +226,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_, _, tpInst := flowMgr.createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
+			_, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
 			switch tpInst := tpInst.(type) {
 			case *tp.TechProfile:
 				if tt.args.TpID != 64 {
@@ -256,7 +245,6 @@
 
 func TestOpenOltFlowMgr_RemoveFlow(t *testing.T) {
 	ctx := context.Background()
-	// flowMgr := newMockFlowmgr()
 	logger.Debug(ctx, "Info Warning Error: Starting RemoveFlow() test")
 	fa := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
@@ -334,7 +322,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.RemoveFlow(ctx, tt.args.flow); err != nil {
+			if err := flowMgr[0].RemoveFlow(ctx, tt.args.flow); err != nil {
 				logger.Warn(ctx, err)
 			}
 		})
@@ -343,7 +331,6 @@
 }
 
 func TestOpenOltFlowMgr_AddFlow(t *testing.T) {
-	// flowMgr := newMockFlowmgr()
 	kw := make(map[string]uint64)
 	kw["table_id"] = 1
 	kw["meter_id"] = 1
@@ -581,29 +568,27 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_ = flowMgr.AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
+			_ = flowMgr[0].AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
 			// TODO: actually verify test cases
 		})
 	}
 }
 
 func TestOpenOltFlowMgr_UpdateOnuInfo(t *testing.T) {
-	flwMgr := newMockFlowmgr()
-
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 
 	wg := sync.WaitGroup{}
 
-	intfCount := 16
-	onuCount := 32
+	intfCount := NumPonPorts
+	onuCount := OnuIDEnd - OnuIDStart + 1
 
 	for i := 0; i < intfCount; i++ {
-		for j := 0; j < onuCount; j++ {
+		for j := 1; j <= onuCount; j++ {
 			wg.Add(1)
 			go func(i uint32, j uint32) {
 				// TODO: actually verify success
-				_ = flwMgr.UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
+				_ = flowMgr[i].UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
 				wg.Done()
 			}(uint32(i), uint32(j))
 		}
@@ -614,34 +599,35 @@
 }
 
 func TestOpenOltFlowMgr_addGemPortToOnuInfoMap(t *testing.T) {
-	flowMgr = newMockFlowmgr()
-	intfNum := 16
-	onuNum := 32
+	intfNum := NumPonPorts
+	onuNum := OnuIDEnd - OnuIDStart + 1
 
 	// clean the flowMgr
-	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+	for i := 0; i < intfNum; i++ {
+		flowMgr[i].onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 
 	// Create OnuInfo
 	for i := 0; i < intfNum; i++ {
-		for o := 0; o < onuNum; o++ {
+		for o := 1; o <= onuNum; o++ {
 			// TODO: actually verify success
-			_ = flowMgr.UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o))
+			_ = flowMgr[i].UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o-1))
 		}
 	}
 
 	// Add gemPorts to OnuInfo in parallel threads
 	wg := sync.WaitGroup{}
 
-	for o := 0; o < onuNum; o++ {
+	for o := 1; o <= onuNum; o++ {
 		for i := 0; i < intfNum; i++ {
 			wg.Add(1)
 			go func(intfId uint32, onuId uint32) {
-				gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId))
+				gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId-1))
 
-				flowMgr.addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
+				flowMgr[intfId].addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
 				wg.Done()
 			}(uint32(i), uint32(o))
 		}
@@ -651,21 +637,21 @@
 
 	// check that each entry of onuGemInfo has the correct number of ONUs
 	for i := 0; i < intfNum; i++ {
-		lenofOnu := len(flowMgr.onuGemInfo[uint32(i)])
+		lenofOnu := len(flowMgr[i].onuGemInfo[uint32(i)])
 		if onuNum != lenofOnu {
 			t.Errorf("OnuGemInfo length is not as expected len = %d, want %d", lenofOnu, onuNum)
 		}
 
-		for o := 0; o < onuNum; o++ {
-			lenOfGemPorts := len(flowMgr.onuGemInfo[uint32(i)][o].GemPorts)
+		for o := 1; o <= onuNum; o++ {
+			lenOfGemPorts := len(flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts)
 			// check that each onuEntry has 1 gemPort
 			if lenOfGemPorts != 1 {
 				t.Errorf("Expected 1 GemPort per ONU, found %d", lenOfGemPorts)
 			}
 
 			// check that the value of the gemport is correct
-			gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o))
-			currentValue := flowMgr.onuGemInfo[uint32(i)][o].GemPorts[0]
+			gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o-1))
+			currentValue := flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts[0]
 			if uint32(gemID) != currentValue {
 				t.Errorf("Expected GemPort value to be %d, found %d", gemID, currentValue)
 			}
@@ -674,7 +660,8 @@
 }
 
 func TestOpenOltFlowMgr_deleteGemPortFromLocalCache(t *testing.T) {
-	flwMgr := newMockFlowmgr()
+	// Create fresh flowMgr instance
+	flowMgr = newMockFlowmgr()
 	type args struct {
 		intfID                uint32
 		onuID                 uint32
@@ -704,18 +691,18 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			// TODO: should check returned errors are as expected?
-			_ = flwMgr.UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
+			_ = flowMgr[tt.args.intfID].UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
 			for _, gemPort := range tt.args.gemPortIDs {
-				flwMgr.addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
+				flowMgr[tt.args.intfID].addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
 			}
 			for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
-				flwMgr.deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
+				flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
 			}
-			lenofGemPorts := len(flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts)
+			lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts)
 			if lenofGemPorts != tt.args.finalLength {
 				t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
 			}
-			gemPorts := flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts
+			gemPorts := flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts
 			if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
 				t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
 			}
@@ -725,7 +712,6 @@
 }
 
 func TestOpenOltFlowMgr_GetLogicalPortFromPacketIn(t *testing.T) {
-	flwMgr := newMockFlowmgr()
 	type args struct {
 		packetIn *openoltpb2.PacketIndication
 	}
@@ -736,18 +722,18 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048577, false},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
 		// Negative Test cases.
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 2, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 4112, false},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			got, err := flwMgr.GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
+			got, err := flowMgr[tt.args.packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.GetLogicalPortFromPacketIn() error = %v, wantErr %v", err, tt.wantErr)
 				return
@@ -760,7 +746,8 @@
 }
 
 func TestOpenOltFlowMgr_GetPacketOutGemPortID(t *testing.T) {
-	// flwMgr := newMockFlowmgr()
+	// Create fresh flowMgr instance
+	flowMgr = newMockFlowmgr()
 
 	//untagged packet in hex string
 	untaggedStr := "01005e000002000000000001080046c00020000040000102fa140a000001e00000029404000017000705e10000fa"
@@ -769,15 +756,15 @@
 		t.Error("Unable to parse hex string", err)
 		panic(err)
 	}
-	//single-tagged packet in hex string. vlanID.pbit: 540.0
-	singleTaggedStr := "01005e0000010025ba48172481000225080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
+	//single-tagged packet in hex string. vlanID.pbit: 1.1
+	singleTaggedStr := "01005e0000010025ba48172481002001080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
 	singleTagged, err := hex.DecodeString(singleTaggedStr)
 	if err != nil {
 		t.Error("Unable to parse hex string", err)
 		panic(err)
 	}
-	//double-tagged packet in hex string. vlanID.pbit: 210.0-48.7
-	doubleTaggedStr := "01005e000016deadbeefba11810002108100e030080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
+	//double-tagged packet in hex string. vlanID.pbit: 210.0-0.0
+	doubleTaggedStr := "01005e000016deadbeefba118100021081000000080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
 	doubleTagged, err := hex.DecodeString(doubleTaggedStr)
 	if err != nil {
 		t.Error("Unable to parse hex string", err)
@@ -797,11 +784,11 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: untagged}, 3, false},
-		{"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 4, packet: singleTagged}, 4, false},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: doubleTagged}, 2, false},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 10, portNum: 10, packet: untagged}, 2, true},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: untagged}, 1, false},
+		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: singleTagged}, 2, false},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: doubleTagged}, 1, false},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 10, portNum: 10, packet: untagged}, 2, true},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
 	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -809,7 +796,7 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
+			got, err := flowMgr[tt.args.intfID].GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
 			if tt.wantErr {
 				if err == nil {
 					//error expected but got value
@@ -830,7 +817,6 @@
 }
 
 func TestOpenOltFlowMgr_DeleteTechProfileInstance(t *testing.T) {
-	// flwMgr := newMockFlowmgr()
 	type args struct {
 		intfID uint32
 		onuID  uint32
@@ -850,7 +836,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
+			if err := flowMgr[tt.args.intfID].DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.DeleteTechProfileInstance() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -859,7 +845,6 @@
 
 func TestOpenOltFlowMgr_checkAndAddFlow(t *testing.T) {
 	ctx := context.Background()
-	// flowMgr := newMockFlowmgr()
 	kw := make(map[string]uint64)
 	kw["table_id"] = 1
 	kw["meter_id"] = 1
@@ -1097,7 +1082,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			flowMgr.checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
+			flowMgr[tt.args.intfID].checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
 				tt.args.TpInst, tt.args.gemPorts, tt.args.TpID, tt.args.uni)
 		})
 	}
@@ -1108,7 +1093,7 @@
 	defer cancel()
 	//create group
 	group := newGroup(2, []uint32{1})
-	err := flowMgr.AddGroup(ctx, group)
+	err := flowMgr[0].grpMgr.AddGroup(ctx, group)
 	if err != nil {
 		t.Error("group-add failed", err)
 		return
@@ -1128,7 +1113,7 @@
 	}
 	ofpStats, _ := fu.MkFlowStat(multicastFlowArgs)
 	fmt.Println(ofpStats.Id)
-	err = flowMgr.AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
+	err = flowMgr[0].AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
 	if err != nil {
 		t.Error("Multicast flow-add failed", err)
 		return
@@ -1136,20 +1121,20 @@
 
 	//add bucket to the group
 	group = newGroup(2, []uint32{1, 2})
-	err = flowMgr.ModifyGroup(ctx, group)
+	err = flowMgr[0].grpMgr.ModifyGroup(ctx, group)
 	if err != nil {
 		t.Error("modify-group failed", err)
 		return
 	}
 	//remove the multicast flow
-	err = flowMgr.RemoveFlow(ctx, ofpStats)
+	err = flowMgr[0].RemoveFlow(ctx, ofpStats)
 	if err != nil {
 		t.Error("Multicast flow-remove failed", err)
 		return
 	}
 
 	//remove the group
-	err = flowMgr.DeleteGroup(ctx, group)
+	err = flowMgr[0].grpMgr.DeleteGroup(ctx, group)
 	if err != nil {
 		t.Error("delete-group failed", err)
 		return