VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
  separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
  had a different meaning and the name seems to have been blindly ported to 2.x adapter
  and does not make sense anymore

Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0dfb781..f342bcf 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -57,21 +57,6 @@
 	InvalidPort    = 0xffffffff
 )
 
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
-	pendingFlowRemoveCount uint32
-	allFlowsRemoved        chan struct{}
-}
-
 //DeviceHandler will interact with the OLT device.
 type DeviceHandler struct {
 	device        *voltha.Device
@@ -84,7 +69,8 @@
 	Client        oop.OpenoltClient
 	transitionMap *TransitionMap
 	clientCon     *grpc.ClientConn
-	flowMgr       *OpenOltFlowMgr
+	flowMgr       []*OpenOltFlowMgr
+	groupMgr      *OpenOltGroupMgr
 	eventMgr      *OpenOltEventMgr
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 
@@ -98,12 +84,7 @@
 	stopIndications               chan bool
 	isReadIndicationRoutineActive bool
 
-	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
-	// subscriber basis for the number of pending flow removes. This data is used
-	// to process all the flow removes for a subscriber before handling flow adds.
-	// Interleaving flow delete and flow add processing has known to cause PON resource
-	// management contentions on a per subscriber bases, so we need ensure ordering.
-	pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+	totalPonPorts uint32
 }
 
 //OnuDevice represents ONU related info
@@ -158,7 +139,6 @@
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
-	dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 
 	//TODO initialize the support classes.
 	return &dh
@@ -590,7 +570,7 @@
 		defer span.Finish()
 
 		portStats := indication.GetPortStats()
-		go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.resourceMgr.DevInfo.GetPonPorts())
+		go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.totalPonPorts)
 	case *oop.Indication_FlowStats:
 		span, ctx := log.CreateChildSpan(ctx, "flow-stats-indication", log.Fields{"device-id": dh.device.Id})
 		defer span.Finish()
@@ -777,16 +757,23 @@
 	if err != nil {
 		return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
+	dh.totalPonPorts = deviceInfo.GetPonPorts()
+
 	// Instantiate resource manager
 	if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo); dh.resourceMgr == nil {
 		return olterrors.ErrResourceManagerInstantiating
 	}
 
-	// Instantiate flow manager
-	if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
-		return olterrors.ErrResourceManagerInstantiating
+	dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
 
+	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+	for i := range dh.flowMgr {
+		// Instantiate flow manager
+		if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+			return olterrors.ErrResourceManagerInstantiating
+		}
 	}
+
 	/* TODO: Instantiate Alarm , stats , BW managers */
 	/* Instantiating Event Manager to handle Alarms and KPIs */
 	dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -1060,7 +1047,7 @@
 
 func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
 	logger.Debugw(ctx, "activate-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "serialNum": serialNum, "serialNumber": serialNumber, "device-id": dh.device.Id, "OmccEncryption": dh.openOLT.config.OmccEncryption})
-	if err := dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
+	if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
 		return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
 	}
 	// TODO: need resource manager
@@ -1153,9 +1140,7 @@
 		logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
 		// we need to create a new ChildDevice
 		ponintfid := onuDiscInd.GetIntfId()
-		dh.lockDevice.Lock()
 		onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
-		dh.lockDevice.Unlock()
 
 		logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
 
@@ -1437,26 +1422,26 @@
 
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
-			dh.incrementActiveFlowRemoveCount(ctx, flow)
+			ponIf := dh.getPonIfFromFlow(ctx, flow)
 
 			logger.Debugw(ctx, "removing-flow",
 				log.Fields{"device-id": device.Id,
+					"ponIf":        ponIf,
 					"flowToRemove": flow})
-			err := dh.flowMgr.RemoveFlow(ctx, flow)
+			err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
-
-			dh.decrementActiveFlowRemoveCount(ctx, flow)
 		}
 
 		for _, flow := range flows.ToAdd.Items {
+			ponIf := dh.getPonIfFromFlow(ctx, flow)
 			logger.Debugw(ctx, "adding-flow",
 				log.Fields{"device-id": device.Id,
+					"ponIf":     ponIf,
 					"flowToAdd": flow})
 			// If there are active Flow Remove in progress for a given subscriber, wait until it completes
-			dh.waitForFlowRemoveToFinish(ctx, flow)
-			err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+			err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -1466,19 +1451,19 @@
 	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
-			err := dh.flowMgr.AddGroup(ctx, group)
+			err := dh.groupMgr.AddGroup(ctx, group)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 		for _, group := range groups.ToUpdate.Items {
-			err := dh.flowMgr.ModifyGroup(ctx, group)
+			err := dh.groupMgr.ModifyGroup(ctx, group)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 		for _, group := range groups.ToRemove.Items {
-			err := dh.flowMgr.DeleteGroup(ctx, group)
+			err := dh.groupMgr.DeleteGroup(ctx, group)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -1596,7 +1581,7 @@
 		uniID = UniIDFromPortNum(uint32(port))
 		logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
 		/* Delete tech-profile instance from the KV store */
-		if err = dh.flowMgr.DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+		if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
 			logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
 		}
 		logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
@@ -1687,9 +1672,8 @@
 func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
 
 	if dh.resourceMgr != nil {
-		noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
 		var ponPort uint32
-		for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
+		for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
 			var onuGemData []rsrcMgr.OnuGemInfo
 			err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
 			if err != nil {
@@ -1762,7 +1746,7 @@
 			"packet":            hex.EncodeToString(packetIn.Pkt),
 		})
 	}
-	logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
+	logicalPortNum, err := dh.flowMgr[packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, packetIn)
 	if err != nil {
 		return olterrors.NewErrNotFound("logical-port", log.Fields{"packet": hex.EncodeToString(packetIn.Pkt)}, err)
 	}
@@ -1834,7 +1818,7 @@
 		onuID := OnuIDFromPortNum(uint32(egressPortNo))
 		uniID := UniIDFromPortNum(uint32(egressPortNo))
 
-		gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
+		gemPortID, err := dh.flowMgr[intfID].GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
 		if err != nil {
 			// In this case the openolt agent will receive the gemPortID as 0.
 			// The agent tries to retrieve the gemPortID in this case.
@@ -2098,21 +2082,12 @@
 	}
 
 	for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
-		var flowRemoveData pendingFlowRemoveData
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uint32(uniID)}
-		dh.lockDevice.RLock()
-		if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			dh.lockDevice.RUnlock()
-			continue
-		}
-		dh.lockDevice.RUnlock()
-
 		logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
 			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
+		dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
 		logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
 			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
+		// TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
 	}
 
 	onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
@@ -2189,88 +2164,6 @@
 	return InvalidPort
 }
 
-func (dh *DeviceHandler) incrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		dh.lockDevice.Lock()
-		defer dh.lockDevice.Unlock()
-		flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
-		if !ok {
-			flowRemoveData = pendingFlowRemoveData{
-				pendingFlowRemoveCount: 0,
-				allFlowsRemoved:        make(chan struct{}),
-			}
-		}
-		flowRemoveData.pendingFlowRemoveCount++
-		dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
-		logger.Debugw(ctx, "current-flow-remove-count–increment",
-			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-				"currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-	}
-}
-
-func (dh *DeviceHandler) decrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		dh.lockDevice.Lock()
-		defer dh.lockDevice.Unlock()
-		if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		} else {
-			if val.pendingFlowRemoveCount > 0 {
-				val.pendingFlowRemoveCount--
-			}
-			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
-				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-					"currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-			// If all flow removes have finished, then close the channel to signal the receiver
-			// to go ahead with flow adds.
-			if val.pendingFlowRemoveCount == 0 {
-				close(val.allFlowsRemoved)
-				delete(dh.pendingFlowRemoveDataPerSubscriber, key)
-				return
-			}
-			dh.pendingFlowRemoveDataPerSubscriber[key] = val
-		}
-	}
-}
-
-func (dh *DeviceHandler) waitForFlowRemoveToFinish(ctx context.Context, flow *of.OfpFlowStats) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		dh.lockDevice.RLock()
-		if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-			dh.lockDevice.RUnlock()
-			return
-		}
-		dh.lockDevice.RUnlock()
-
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
-
-		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-	}
-}
-
 func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
 	inPort := getInPortFromFlow(flow)
 	outPort := getOutPortFromFlow(flow)
@@ -2350,3 +2243,14 @@
 	logger.Infow(ctx, "get-ext-value", log.Fields{"resp": resp, "device-id": dh.device, "onu-id": device.Id, "pon-intf": device.ParentPortNo})
 	return resp, nil
 }
+
+func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+	// Default to PON0
+	var intfID uint32
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, _, _ = ExtractAccessFromFlow(inPort, outPort)
+	}
+	return intfID
+}
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 855f42c..1a11b92 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -43,6 +43,18 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
+const (
+	NumPonPorts  = 2
+	OnuIDStart   = 1
+	OnuIDEnd     = 32
+	AllocIDStart = 1
+	AllocIDEnd   = 10
+	GemIDStart   = 1
+	GemIDEnd     = 10
+	FlowIDStart  = 1
+	FlowIDEnd    = 10
+)
+
 func newMockCoreProxy() *mocks.MockCoreProxy {
 	mcp := mocks.MockCoreProxy{
 		Devices:     make(map[string]*voltha.Device),
@@ -153,7 +165,7 @@
 		Pools:      []*oop.DeviceInfo_DeviceResourceRanges_Pool{{}},
 	}}
 
-	deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: 2}
+	deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
 	rsrMgr := resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
 		KVStore: &db.Backend{
 			Client: &mocks.MockKVClient{},
@@ -192,7 +204,15 @@
 	dh.resourceMgr.ResourceMgrs[1] = ponmgr
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
-	dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr)
+	dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
+	dh.totalPonPorts = NumPonPorts
+	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+	for i := 0; i < int(dh.totalPonPorts); i++ {
+		// Instantiate flow manager
+		if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+			return nil
+		}
+	}
 	dh.Client = &mocks.MockOpenoltClient{}
 	dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}, handler: dh}
 	dh.transitionMap = &TransitionMap{}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d68b8f8..83da756 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -24,11 +24,6 @@
 	"encoding/json"
 	"errors"
 	"fmt"
-	"math/big"
-	"strings"
-	"sync"
-	"time"
-
 	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
@@ -39,6 +34,9 @@
 	openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"math/big"
+	"strings"
+	"sync"
 
 	//deepcopy "github.com/getlantern/deepcopy"
 	"github.com/EagleChen/mapmutex"
@@ -187,12 +185,6 @@
 	gemPort uint32
 }
 
-type pendingFlowDeleteKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 type tpLockKey struct {
 	intfID uint32
 	onuID  uint32
@@ -211,15 +203,26 @@
 	flowMetadata *voltha.FlowMetadata
 }
 
-type queueInfoBrief struct {
-	gemPortID       uint32
-	servicePriority uint32
+// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
+type pendingFlowRemoveDataKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
+// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
+// This holds the number of pending flow removes and also a signal channel to
+// to indicate the receiver when all flow removes are handled
+type pendingFlowRemoveData struct {
+	pendingFlowRemoveCount uint32
+	allFlowsRemoved        chan struct{}
 }
 
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
 type OpenOltFlowMgr struct {
 	techprofile        map[uint32]tp.TechProfileIf
 	deviceHandler      *DeviceHandler
+	grpMgr             *OpenOltGroupMgr
 	resourceMgr        *rsrcMgr.OpenOltResourceMgr
 	onuIdsLock         sync.RWMutex
 	perGemPortLock     *mapmutex.Mutex                    // lock to be used to access the flowsUsedByGemPort map
@@ -228,21 +231,28 @@
 	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
 	onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
 	// We need to have a global lock on the onuGemInfo map
-	onuGemInfoLock    sync.RWMutex
-	pendingFlowDelete sync.Map
+	onuGemInfoLock sync.RWMutex
 	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
-	perUserFlowHandleLock    *mapmutex.Mutex
-	interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+	perUserFlowHandleLock *mapmutex.Mutex
+
+	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
+	// subscriber basis for the number of pending flow removes. This data is used
+	// to process all the flow removes for a subscriber before handling flow adds.
+	// Interleaving flow delete and flow add processing has known to cause PON resource
+	// management contentions on a per subscriber bases, so we need ensure ordering.
+	pendingFlowRemoveDataPerSubscriber     map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+	pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr) *OpenOltFlowMgr {
 	logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
 	var flowMgr OpenOltFlowMgr
 	var err error
 	var idx uint32
 
 	flowMgr.deviceHandler = dh
+	flowMgr.grpMgr = grpMgr
 	flowMgr.resourceMgr = rMgr
 	flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
 	if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
@@ -250,15 +260,15 @@
 		return nil
 	}
 	flowMgr.onuIdsLock = sync.RWMutex{}
+	flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
 	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 	ponPorts := rMgr.DevInfo.GetPonPorts()
 	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
-	flowMgr.pendingFlowDelete = sync.Map{}
 	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
 	flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
-	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
+	flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 	//Load the onugem info cache from kv store on flowmanager start
 	for idx = 0; idx < ponPorts; idx++ {
 		if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -268,7 +278,7 @@
 		flowMgr.loadFlowIDlistForGem(ctx, idx)
 	}
 	//load interface to multicast queue map from kv store
-	flowMgr.loadInterfaceToMulticastQueueMap(ctx)
+	flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
 	logger.Info(ctx, "initialization-of-flow-manager-success")
 	return &flowMgr
 }
@@ -305,7 +315,7 @@
 	}, nil)
 }
 
-func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
 	classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
 	UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) error {
 	var allocID uint32
@@ -571,16 +581,17 @@
 	if sq.direction == tp_pb.Direction_DOWNSTREAM {
 		multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
 		if len(multicastTrafficQueues) > 0 {
-			if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
+			if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
 				//assumed that there is only one queue per PON for the multicast service
 				//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
 				//just put it in interfaceToMcastQueueMap to use for building group members
 				logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
 				multicastQueuePerPonPort := multicastTrafficQueues[0]
-				f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
+				val := &QueueInfoBrief{
 					gemPortID:       multicastQueuePerPonPort.GemportId,
 					servicePriority: multicastQueuePerPonPort.Priority,
 				}
+				f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
 				//also store the queue info in kv store
 				if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
 					logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
@@ -1923,42 +1934,6 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) deletePendingFlows(ctx context.Context, Intf uint32, onuID int32, uniID int32) {
-	pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
-	if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
-		if val.(int) > 0 {
-			pnFlDels := val.(int) - 1
-			if pnFlDels > 0 {
-				logger.Debugw(ctx, "flow-delete-succeeded--more-pending",
-					log.Fields{
-						"intf":               Intf,
-						"onu-id":             onuID,
-						"uni-id":             uniID,
-						"currpendingflowcnt": pnFlDels,
-						"device-id":          f.deviceHandler.device.Id})
-				f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
-			} else {
-				logger.Debugw(ctx, "all-pending-flow-deletes-handled--removing-entry-from-map",
-					log.Fields{
-						"intf":      Intf,
-						"onu-id":    onuID,
-						"uni-id":    uniID,
-						"device-id": f.deviceHandler.device.Id})
-				f.pendingFlowDelete.Delete(pnFlDelKey)
-			}
-		}
-	} else {
-		logger.Debugw(ctx, "no-pending-delete-flows-found",
-			log.Fields{
-				"intf":      Intf,
-				"onu-id":    onuID,
-				"uni-id":    uniID,
-				"device-id": f.deviceHandler.device.Id})
-
-	}
-
-}
-
 // Once the gemport is released for a given onu, it also has to be cleared from local cache
 // which was used for deriving the gemport->logicalPortNo during packet-in.
 // Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
@@ -1975,6 +1950,7 @@
 			"onu-id":      onuID,
 			"device-id":   f.deviceHandler.device.Id,
 			"onu-gem":     f.onuGemInfo[intfID]})
+
 	onugem := f.onuGemInfo[intfID]
 deleteLoop:
 	for i, onu := range onugem {
@@ -2028,34 +2004,6 @@
 					"device-id": f.deviceHandler.device.Id}, err).Log()
 		}
 		if len(updatedFlows) == 0 {
-			// Do this for subscriber flows only (not trap from NNI flows)
-			if onuID != -1 && uniID != -1 {
-				pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
-				if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
-					logger.Debugw(ctx, "creating-entry-for-pending-flow-delete",
-						log.Fields{
-							"flow-id":   flowID,
-							"intf":      Intf,
-							"onu-id":    onuID,
-							"uni-id":    uniID,
-							"device-id": f.deviceHandler.device.Id})
-					f.pendingFlowDelete.Store(pnFlDelKey, 1)
-				} else {
-					pnFlDels := val.(int) + 1
-					logger.Debugw(ctx, "updating-flow-delete-entry",
-						log.Fields{
-							"flow-id":            flowID,
-							"intf":               Intf,
-							"onu-id":             onuID,
-							"uni-id":             uniID,
-							"currPendingFlowCnt": pnFlDels,
-							"device-id":          f.deviceHandler.device.Id})
-					f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
-				}
-
-				defer f.deletePendingFlows(ctx, Intf, onuID, uniID)
-			}
-
 			logger.Debugw(ctx, "releasing-flow-id-to-resource-manager",
 				log.Fields{
 					"Intf":      Intf,
@@ -2310,72 +2258,6 @@
 	}
 }
 
-//clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
-// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
-	classifierInfo := make(map[string]interface{})
-	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
-	networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
-	if err != nil {
-		logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
-		return
-	}
-
-	var onuID = int32(NoneOnuID)
-	var uniID = int32(NoneUniID)
-	var flowID uint32
-
-	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
-
-	for _, flowID = range flowIds {
-		flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
-		if flowInfo == nil {
-			logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
-				log.Fields{
-					"intf":    networkInterfaceID,
-					"onu-id":  onuID,
-					"uni-id":  uniID,
-					"flow-id": flowID})
-			continue
-		}
-		updatedFlows := *flowInfo
-		for i, storedFlow := range updatedFlows {
-			if flow.Id == storedFlow.LogicalFlowID {
-				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
-				logger.Debugw(ctx, "multicast-flow-to-be-deleted",
-					log.Fields{
-						"flow":      storedFlow,
-						"flow-id":   flow.Id,
-						"device-id": f.deviceHandler.device.Id})
-				//remove from device
-				if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
-					// DKB
-					logger.Errorw(ctx, "failed-to-remove-multicast-flow",
-						log.Fields{
-							"flow-id": flow.Id,
-							"error":   err})
-					return
-				}
-				logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
-				//Remove the Flow from FlowInfo
-				updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
-				if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
-					logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
-						log.Fields{"flow": storedFlow,
-							"err": err})
-					return
-				}
-				//release flow id
-				logger.Debugw(ctx, "releasing-multicast-flow-id",
-					log.Fields{"flow-id": flowID,
-						"interfaceID": networkInterfaceID})
-				f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
-			}
-		}
-	}
-}
-
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 	logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
@@ -2394,6 +2276,9 @@
 		}
 	}
 
+	f.incrementActiveFlowRemoveCount(ctx, flow)
+	defer f.decrementActiveFlowRemoveCount(ctx, flow)
+
 	if flows.HasGroup(flow) {
 		direction = Multicast
 		f.clearFlowFromResourceManager(ctx, flow, direction)
@@ -2424,24 +2309,6 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
-	uniID uint32, ch chan bool) {
-	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
-	for {
-		select {
-		case <-time.After(20 * time.Millisecond):
-			if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
-				logger.Debug(ctx, "pending-flow-deletes-completed")
-				ch <- true
-				return
-			}
-		case <-ctx.Done():
-			logger.Error(ctx, "flow-delete-wait-handler-routine-canceled")
-			return
-		}
-	}
-}
-
 //isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
 func isIgmpTrapDownstreamFlow(classifierInfo map[string]interface{}) bool {
 	if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_ETHERNET_NNI {
@@ -2518,6 +2385,11 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
+	// If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
+	// Wait for any FlowRemoves for that specific subscriber to finish first
+	// The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
+	f.waitForFlowRemoveToFinish(ctx, flow)
+
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2543,33 +2415,29 @@
 		logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
 
 	}
+	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+}
 
-	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
-	if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
-		logger.Debugw(ctx, "no-pending-flows-found--going-ahead-with-flow-install",
-			log.Fields{
-				"intf-id": intfID,
-				"onu-id":  onuID,
-				"uni-id":  uniID})
-		return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
+func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
+	var flowRemoveData pendingFlowRemoveData
+	var ok bool
+
+	key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+	f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+	if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+		logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+		return
 	}
-	pendingFlowDelComplete := make(chan bool)
-	go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
-	select {
-	case <-pendingFlowDelComplete:
-		logger.Debugw(ctx, "all-pending-flow-deletes-completed",
-			log.Fields{
-				"intf-id": intfID,
-				"onu-id":  onuID,
-				"uni-id":  uniID})
-		return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
-	case <-time.After(10 * time.Second):
-		return olterrors.NewErrTimeout("pending-flow-deletes",
-			log.Fields{
-				"intf-id": intfID,
-				"onu-id":  onuID,
-				"uni-id":  uniID}, nil)
-	}
+	f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+	// Wait for all flow removes to finish first
+	<-flowRemoveData.allFlowsRemoved
+
+	logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
 }
 
 // handleFlowWithGroup adds multicast flow to the device.
@@ -2643,9 +2511,9 @@
 	}
 	logger.Info(ctx, "multicast-flow-added-to-device-successfully")
 	//get cached group
-	if group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true); err == nil {
+	if group, _, err := f.grpMgr.getFlowGroupFromKVStore(ctx, groupID, true); err == nil {
 		//calling groupAdd to set group members after multicast flow creation
-		if err := f.ModifyGroup(ctx, group); err != nil {
+		if err := f.grpMgr.ModifyGroup(ctx, group); err != nil {
 			return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err)
 		}
 		//cached group can be removed now
@@ -2681,241 +2549,6 @@
 	return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
 }
 
-// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	logger.Infow(ctx, "add-group", log.Fields{"group": group})
-	if group == nil {
-		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
-	}
-
-	groupToOlt := openoltpb2.Group{
-		GroupId: group.Desc.GroupId,
-		Command: openoltpb2.Group_SET_MEMBERS,
-		Action:  f.buildGroupAction(),
-	}
-
-	logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
-	_, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
-	if err != nil {
-		return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
-	}
-	// group members not created yet. So let's store the group
-	if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
-		return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
-	}
-	logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
-	return nil
-}
-
-// DeleteGroup deletes a group from the device
-func (f *OpenOltFlowMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
-	if group == nil {
-		logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
-		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
-	}
-
-	groupToOlt := openoltpb2.Group{
-		GroupId: group.Desc.GroupId,
-	}
-
-	logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
-	_, err := f.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
-	if err != nil {
-		logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
-		return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
-	}
-	//remove group from the store
-	if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
-		return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
-	}
-	logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
-	return nil
-}
-
-//buildGroupAction creates and returns a group action
-func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
-	var actionCmd openoltpb2.ActionCmd
-	var action openoltpb2.Action
-	action.Cmd = &actionCmd
-	//pop outer vlan
-	action.Cmd.RemoveOuterTag = true
-	return &action
-}
-
-// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	logger.Infow(ctx, "modify-group", log.Fields{"group": group})
-	if group == nil || group.Desc == nil {
-		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
-	}
-
-	newGroup := f.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
-	//get existing members of the group
-	val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
-
-	if err != nil {
-		return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
-	}
-
-	var current *openoltpb2.Group // represents the group on the device
-	if groupExists {
-		// group already exists
-		current = f.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
-		logger.Debugw(ctx, "modify-group--group exists",
-			log.Fields{
-				"group on the device": val,
-				"new":                 group})
-	} else {
-		current = f.buildGroup(ctx, group.Desc.GroupId, nil)
-	}
-
-	logger.Debugw(ctx, "modify-group--comparing-current-and-new",
-		log.Fields{
-			"group on the device": current,
-			"new":                 newGroup})
-	// get members to be added
-	membersToBeAdded := f.findDiff(current, newGroup)
-	// get members to be removed
-	membersToBeRemoved := f.findDiff(newGroup, current)
-
-	logger.Infow(ctx, "modify-group--differences found", log.Fields{
-		"membersToBeAdded":   membersToBeAdded,
-		"membersToBeRemoved": membersToBeRemoved,
-		"groupId":            group.Desc.GroupId})
-
-	groupToOlt := openoltpb2.Group{
-		GroupId: group.Desc.GroupId,
-	}
-	var errAdd, errRemoved error
-	if len(membersToBeAdded) > 0 {
-		groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
-		groupToOlt.Members = membersToBeAdded
-		//execute addMembers
-		errAdd = f.callGroupAddRemove(ctx, &groupToOlt)
-	}
-	if len(membersToBeRemoved) > 0 {
-		groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
-		groupToOlt.Members = membersToBeRemoved
-		//execute removeMembers
-		errRemoved = f.callGroupAddRemove(ctx, &groupToOlt)
-	}
-
-	//save the modified group
-	if errAdd == nil && errRemoved == nil {
-		if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
-			return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
-		}
-		logger.Infow(ctx, "modify-group-was-success--storing-group",
-			log.Fields{
-				"group":         group,
-				"existingGroup": current})
-	} else {
-		logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
-			log.Fields{"group": group})
-		if errAdd != nil {
-			return errAdd
-		}
-		return errRemoved
-	}
-	return nil
-}
-
-//callGroupAddRemove performs add/remove buckets operation for the indicated group
-func (f *OpenOltFlowMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
-	if err := f.performGroupOperation(ctx, group); err != nil {
-		st, _ := status.FromError(err)
-		//ignore already exists error code
-		if st.Code() != codes.AlreadyExists {
-			return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
-		}
-	}
-	return nil
-}
-
-//findDiff compares group members and finds members which only exists in groups2
-func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
-	var members []*openoltpb2.GroupMember
-	for _, bucket := range group2.Members {
-		if !f.contains(group1.Members, bucket) {
-			// bucket does not exist and must be added
-			members = append(members, bucket)
-		}
-	}
-	return members
-}
-
-//contains returns true if the members list contains the given member; false otherwise
-func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
-	for _, groupMember := range members {
-		if groupMember.InterfaceId == member.InterfaceId {
-			return true
-		}
-	}
-	return false
-}
-
-//performGroupOperation call performGroupOperation operation of openolt proto
-func (f *OpenOltFlowMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
-	logger.Debugw(ctx, "sending-group-to-device",
-		log.Fields{
-			"groupToOlt": group,
-			"command":    group.Command})
-	_, err := f.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
-	if err != nil {
-		return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
-	}
-	return nil
-}
-
-//buildGroup build openoltpb2.Group from given group id and bucket list
-func (f *OpenOltFlowMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
-	group := openoltpb2.Group{
-		GroupId: groupID}
-	// create members of the group
-	for _, ofBucket := range buckets {
-		member := f.buildMember(ctx, ofBucket)
-		if member != nil && !f.contains(group.Members, member) {
-			group.Members = append(group.Members, member)
-		}
-	}
-	return &group
-}
-
-//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
-func (f *OpenOltFlowMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
-	var outPort uint32
-	outPortFound := false
-	for _, ofAction := range ofBucket.Actions {
-		if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
-			outPort = ofAction.GetOutput().Port
-			outPortFound = true
-		}
-	}
-
-	if !outPortFound {
-		logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
-		return nil
-	}
-	interfaceID := IntfIDFromUniPortNum(outPort)
-	logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
-		log.Fields{
-			"portNumber:":  outPort,
-			"interfaceId:": interfaceID})
-	if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
-		member := openoltpb2.GroupMember{
-			InterfaceId:   interfaceID,
-			InterfaceType: openoltpb2.GroupMember_PON,
-			GemPortId:     groupInfo.gemPortID,
-			Priority:      groupInfo.servicePriority,
-		}
-		//add member to the group
-		return &member
-	}
-	logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
-	return nil
-}
-
 //sendTPDownloadMsgToChild send payload
 func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
 
@@ -2957,6 +2590,16 @@
 
 	f.onuGemInfoLock.Lock()
 	defer f.onuGemInfoLock.Unlock()
+	onugem := f.onuGemInfo[intfID]
+	// If the ONU already exists in onuGemInfo list, nothing to do
+	for _, onu := range onugem {
+		if onu.OnuID == onuID && onu.SerialNumber == serialNum {
+			logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+				log.Fields{"onuID": onuID,
+					"serialNum": serialNum})
+			return nil
+		}
+	}
 
 	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
 	f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
@@ -3002,6 +2645,7 @@
 			}
 			onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
 			f.onuGemInfo[intfID] = onugem
+			break
 		}
 	}
 	err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
@@ -3037,7 +2681,6 @@
 			"onu-geminfo": f.onuGemInfo[intfID],
 			"intf-id":     intfID,
 			"gemport-id":  gemPortID})
-
 	// get onuid from the onugem info cache
 	onugem := f.onuGemInfo[intfID]
 
@@ -4010,52 +3653,150 @@
 	}
 }
 
-//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
-//and put them into interfaceToMcastQueueMap.
-func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
-	storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+//clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
+// clears resources reserved for this multicast flow
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
+	classifierInfo := make(map[string]interface{})
+	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+	networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
+
 	if err != nil {
-		logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+		logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
 		return
 	}
-	for intf, queueInfo := range storedMulticastQueueMap {
-		q := queueInfoBrief{
-			gemPortID:       queueInfo[0],
-			servicePriority: queueInfo[1],
+
+	var onuID = int32(NoneOnuID)
+	var uniID = int32(NoneUniID)
+	var flowID uint32
+
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
+
+	for _, flowID = range flowIds {
+		flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
+		if flowInfo == nil {
+			logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
+				log.Fields{
+					"intf":    networkInterfaceID,
+					"onu-id":  onuID,
+					"uni-id":  uniID,
+					"flow-id": flowID})
+			continue
 		}
-		f.interfaceToMcastQueueMap[intf] = &q
+		updatedFlows := *flowInfo
+		for i, storedFlow := range updatedFlows {
+			if flow.Id == storedFlow.LogicalFlowID {
+				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+				logger.Debugw(ctx, "multicast-flow-to-be-deleted",
+					log.Fields{
+						"flow":      storedFlow,
+						"flow-id":   flow.Id,
+						"device-id": f.deviceHandler.device.Id})
+				//remove from device
+				if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
+					// DKB
+					logger.Errorw(ctx, "failed-to-remove-multicast-flow",
+						log.Fields{
+							"flow-id": flow.Id,
+							"error":   err})
+					return
+				}
+				logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
+				//Remove the Flow from FlowInfo
+				updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+				if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+					logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
+						log.Fields{"flow": storedFlow,
+							"err": err})
+					return
+				}
+				//release flow id
+				logger.Debugw(ctx, "releasing-multicast-flow-id",
+					log.Fields{"flow-id": flowID,
+						"interfaceID": networkInterfaceID})
+				f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+			}
+		}
 	}
 }
 
-//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
-//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
-//Returns (nil, false, nil) if the group does not exists in the KV store.
-func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
-	exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
-	if err != nil {
-		return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+		f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+		defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+		flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
+		if !ok {
+			flowRemoveData = pendingFlowRemoveData{
+				pendingFlowRemoveCount: 0,
+				allFlowsRemoved:        make(chan struct{}),
+			}
+		}
+		flowRemoveData.pendingFlowRemoveCount++
+		f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
+
+		logger.Debugw(ctx, "current-flow-remove-count–increment",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+				"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
 	}
-	if exists {
-		return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
-	}
-	return nil, exists, nil
 }
 
-func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
-	groupDesc := ofp.OfpGroupDesc{
-		Type:    ofp.OfpGroupType_OFPGT_ALL,
-		GroupId: groupID,
-	}
-	groupEntry := ofp.OfpGroupEntry{
-		Desc: &groupDesc,
-	}
-	for i := 0; i < len(outPorts); i++ {
-		var acts []*ofp.OfpAction
-		acts = append(acts, flows.Output(outPorts[i]))
-		bucket := ofp.OfpBucket{
-			Actions: acts,
+func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+		f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+		defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+		if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+		} else {
+			if val.pendingFlowRemoveCount > 0 {
+				val.pendingFlowRemoveCount--
+			}
+			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
+				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+					"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+			// If all flow removes have finished, then close the channel to signal the receiver
+			// to go ahead with flow adds.
+			if val.pendingFlowRemoveCount == 0 {
+				close(val.allFlowsRemoved)
+				delete(f.pendingFlowRemoveDataPerSubscriber, key)
+				return
+			}
+			f.pendingFlowRemoveDataPerSubscriber[key] = val
 		}
-		groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
 	}
-	return &groupEntry
+}
+
+func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
+	var flowRemoveData pendingFlowRemoveData
+	var ok bool
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+		f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+		if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+			f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+			return
+		}
+		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+		// Wait for all flow removes to finish first
+		<-flowRemoveData.allFlowsRemoved
+
+		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+	}
 }
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 490a40c..b75bfac 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -42,7 +42,7 @@
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 )
 
-var flowMgr *OpenOltFlowMgr
+var flowMgr []*OpenOltFlowMgr
 
 func init() {
 	_, _ = log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
@@ -58,8 +58,8 @@
 
 	deviceinfo := &openolt.DeviceInfo{Vendor: "openolt", Model: "openolt", HardwareVersion: "1.0", FirmwareVersion: "1.0",
 		DeviceId: "olt", DeviceSerialNumber: "openolt", PonPorts: 16, Technology: "Default",
-		OnuIdStart: 1, OnuIdEnd: 1, AllocIdStart: 1, AllocIdEnd: 1,
-		GemportIdStart: 1, GemportIdEnd: 1, FlowIdStart: 1, FlowIdEnd: 1,
+		OnuIdStart: OnuIDStart, OnuIdEnd: OnuIDEnd, AllocIdStart: AllocIDStart, AllocIdEnd: AllocIDEnd,
+		GemportIdStart: GemIDStart, GemportIdEnd: GemIDEnd, FlowIdStart: FlowIDStart, FlowIdEnd: FlowIDEnd,
 		Ranges: ranges,
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -73,7 +73,7 @@
 	return rsrMgr
 }
 
-func newMockFlowmgr() *OpenOltFlowMgr {
+func newMockFlowmgr() []*OpenOltFlowMgr {
 	rMgr := newMockResourceMgr()
 	dh := newMockDeviceHandler()
 
@@ -81,40 +81,32 @@
 	rMgr.KVStore.Client = &mocks.MockKVClient{}
 
 	dh.resourceMgr = rMgr
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-	defer cancel()
-	flwMgr := NewFlowManager(ctx, dh, rMgr)
 
-	onuGemInfo1 := make([]rsrcMgr.OnuGemInfo, 2)
-	onuGemInfo2 := make([]rsrcMgr.OnuGemInfo, 2)
-	onuGemInfo1[0] = rsrcMgr.OnuGemInfo{OnuID: 1, SerialNumber: "1", IntfID: 1, GemPorts: []uint32{1}}
-	onuGemInfo2[1] = rsrcMgr.OnuGemInfo{OnuID: 2, SerialNumber: "2", IntfID: 2, GemPorts: []uint32{2}}
-	flwMgr.onuGemInfo[1] = onuGemInfo1
-	flwMgr.onuGemInfo[2] = onuGemInfo2
+	// onuGemInfo := make([]rsrcMgr.OnuGemInfo, NumPonPorts)
+	var i uint32
 
-	packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
-	packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 1, OnuID: 1, LogicalPort: 1}] = 1
-	packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 2, OnuID: 2, LogicalPort: 2}] = 2
+	for i = 0; i < NumPonPorts; i++ {
+		packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
+		packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
 
-	flwMgr.packetInGemPort = packetInGemPort
-	tps := make(map[uint32]tp.TechProfileIf)
-	for key := range rMgr.ResourceMgrs {
-		tps[key] = mocks.MockTechProfile{TpID: key}
+		dh.flowMgr[i].packetInGemPort = packetInGemPort
+		tps := make(map[uint32]tp.TechProfileIf)
+		for key := range rMgr.ResourceMgrs {
+			tps[key] = mocks.MockTechProfile{TpID: key}
+		}
+		dh.flowMgr[i].techprofile = tps
+		interface2mcastQeueuMap := make(map[uint32]*QueueInfoBrief)
+		interface2mcastQeueuMap[0] = &QueueInfoBrief{
+			gemPortID:       4000,
+			servicePriority: 3,
+		}
+		dh.flowMgr[i].grpMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
 	}
-	flwMgr.techprofile = tps
 
-	interface2mcastQeueuMap := make(map[uint32]*queueInfoBrief)
-	interface2mcastQeueuMap[0] = &queueInfoBrief{
-		gemPortID:       4000,
-		servicePriority: 3,
-	}
-	flwMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
-	return flwMgr
+	return dh.flowMgr
 }
 
 func TestOpenOltFlowMgr_CreateSchedulerQueues(t *testing.T) {
-	// flowMgr := newMockFlowmgr()
-
 	tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
 		ProfileType: "pt1", NumGemPorts: 1, Version: 1,
 		InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -140,26 +132,26 @@
 		wantErr    bool
 	}{
 		// TODO: Add test cases.
-		{"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
-		{"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
-		{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 2, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 2, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
-		{"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+		{"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
+		{"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+		{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
+		{"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
 		//Negative testcases
-		{"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
-		{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, nil}, true},
+		{"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
+		{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+			if err := flowMgr[tt.schedQueue.intfID].CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.CreateSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -167,8 +159,6 @@
 }
 
 func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
-
-	// flowMgr := newMockFlowmgr()
 	tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
 		ProfileType: "pt1", NumGemPorts: 1, Version: 1,
 		InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -198,7 +188,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+			if err := flowMgr[tt.schedQueue.intfID].RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.RemoveSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -207,7 +197,6 @@
 }
 
 func TestOpenOltFlowMgr_createTcontGemports(t *testing.T) {
-	// flowMgr := newMockFlowmgr()
 	bands := make([]*ofp.OfpMeterBandHeader, 2)
 	bands[0] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 1000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
 	bands[1] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 2000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
@@ -237,7 +226,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_, _, tpInst := flowMgr.createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
+			_, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
 			switch tpInst := tpInst.(type) {
 			case *tp.TechProfile:
 				if tt.args.TpID != 64 {
@@ -256,7 +245,6 @@
 
 func TestOpenOltFlowMgr_RemoveFlow(t *testing.T) {
 	ctx := context.Background()
-	// flowMgr := newMockFlowmgr()
 	logger.Debug(ctx, "Info Warning Error: Starting RemoveFlow() test")
 	fa := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
@@ -334,7 +322,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.RemoveFlow(ctx, tt.args.flow); err != nil {
+			if err := flowMgr[0].RemoveFlow(ctx, tt.args.flow); err != nil {
 				logger.Warn(ctx, err)
 			}
 		})
@@ -343,7 +331,6 @@
 }
 
 func TestOpenOltFlowMgr_AddFlow(t *testing.T) {
-	// flowMgr := newMockFlowmgr()
 	kw := make(map[string]uint64)
 	kw["table_id"] = 1
 	kw["meter_id"] = 1
@@ -581,29 +568,27 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_ = flowMgr.AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
+			_ = flowMgr[0].AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
 			// TODO: actually verify test cases
 		})
 	}
 }
 
 func TestOpenOltFlowMgr_UpdateOnuInfo(t *testing.T) {
-	flwMgr := newMockFlowmgr()
-
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 
 	wg := sync.WaitGroup{}
 
-	intfCount := 16
-	onuCount := 32
+	intfCount := NumPonPorts
+	onuCount := OnuIDEnd - OnuIDStart + 1
 
 	for i := 0; i < intfCount; i++ {
-		for j := 0; j < onuCount; j++ {
+		for j := 1; j <= onuCount; j++ {
 			wg.Add(1)
 			go func(i uint32, j uint32) {
 				// TODO: actually verify success
-				_ = flwMgr.UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
+				_ = flowMgr[i].UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
 				wg.Done()
 			}(uint32(i), uint32(j))
 		}
@@ -614,34 +599,35 @@
 }
 
 func TestOpenOltFlowMgr_addGemPortToOnuInfoMap(t *testing.T) {
-	flowMgr = newMockFlowmgr()
-	intfNum := 16
-	onuNum := 32
+	intfNum := NumPonPorts
+	onuNum := OnuIDEnd - OnuIDStart + 1
 
 	// clean the flowMgr
-	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+	for i := 0; i < intfNum; i++ {
+		flowMgr[i].onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 
 	// Create OnuInfo
 	for i := 0; i < intfNum; i++ {
-		for o := 0; o < onuNum; o++ {
+		for o := 1; o <= onuNum; o++ {
 			// TODO: actually verify success
-			_ = flowMgr.UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o))
+			_ = flowMgr[i].UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o-1))
 		}
 	}
 
 	// Add gemPorts to OnuInfo in parallel threads
 	wg := sync.WaitGroup{}
 
-	for o := 0; o < onuNum; o++ {
+	for o := 1; o <= onuNum; o++ {
 		for i := 0; i < intfNum; i++ {
 			wg.Add(1)
 			go func(intfId uint32, onuId uint32) {
-				gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId))
+				gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId-1))
 
-				flowMgr.addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
+				flowMgr[intfId].addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
 				wg.Done()
 			}(uint32(i), uint32(o))
 		}
@@ -651,21 +637,21 @@
 
 	// check that each entry of onuGemInfo has the correct number of ONUs
 	for i := 0; i < intfNum; i++ {
-		lenofOnu := len(flowMgr.onuGemInfo[uint32(i)])
+		lenofOnu := len(flowMgr[i].onuGemInfo[uint32(i)])
 		if onuNum != lenofOnu {
 			t.Errorf("OnuGemInfo length is not as expected len = %d, want %d", lenofOnu, onuNum)
 		}
 
-		for o := 0; o < onuNum; o++ {
-			lenOfGemPorts := len(flowMgr.onuGemInfo[uint32(i)][o].GemPorts)
+		for o := 1; o <= onuNum; o++ {
+			lenOfGemPorts := len(flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts)
 			// check that each onuEntry has 1 gemPort
 			if lenOfGemPorts != 1 {
 				t.Errorf("Expected 1 GemPort per ONU, found %d", lenOfGemPorts)
 			}
 
 			// check that the value of the gemport is correct
-			gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o))
-			currentValue := flowMgr.onuGemInfo[uint32(i)][o].GemPorts[0]
+			gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o-1))
+			currentValue := flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts[0]
 			if uint32(gemID) != currentValue {
 				t.Errorf("Expected GemPort value to be %d, found %d", gemID, currentValue)
 			}
@@ -674,7 +660,8 @@
 }
 
 func TestOpenOltFlowMgr_deleteGemPortFromLocalCache(t *testing.T) {
-	flwMgr := newMockFlowmgr()
+	// Create fresh flowMgr instance
+	flowMgr = newMockFlowmgr()
 	type args struct {
 		intfID                uint32
 		onuID                 uint32
@@ -704,18 +691,18 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			// TODO: should check returned errors are as expected?
-			_ = flwMgr.UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
+			_ = flowMgr[tt.args.intfID].UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
 			for _, gemPort := range tt.args.gemPortIDs {
-				flwMgr.addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
+				flowMgr[tt.args.intfID].addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
 			}
 			for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
-				flwMgr.deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
+				flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
 			}
-			lenofGemPorts := len(flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts)
+			lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts)
 			if lenofGemPorts != tt.args.finalLength {
 				t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
 			}
-			gemPorts := flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts
+			gemPorts := flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts
 			if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
 				t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
 			}
@@ -725,7 +712,6 @@
 }
 
 func TestOpenOltFlowMgr_GetLogicalPortFromPacketIn(t *testing.T) {
-	flwMgr := newMockFlowmgr()
 	type args struct {
 		packetIn *openoltpb2.PacketIndication
 	}
@@ -736,18 +722,18 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048577, false},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
 		// Negative Test cases.
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 2, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
-		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 4112, false},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
+		{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			got, err := flwMgr.GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
+			got, err := flowMgr[tt.args.packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.GetLogicalPortFromPacketIn() error = %v, wantErr %v", err, tt.wantErr)
 				return
@@ -760,7 +746,8 @@
 }
 
 func TestOpenOltFlowMgr_GetPacketOutGemPortID(t *testing.T) {
-	// flwMgr := newMockFlowmgr()
+	// Create fresh flowMgr instance
+	flowMgr = newMockFlowmgr()
 
 	//untagged packet in hex string
 	untaggedStr := "01005e000002000000000001080046c00020000040000102fa140a000001e00000029404000017000705e10000fa"
@@ -769,15 +756,15 @@
 		t.Error("Unable to parse hex string", err)
 		panic(err)
 	}
-	//single-tagged packet in hex string. vlanID.pbit: 540.0
-	singleTaggedStr := "01005e0000010025ba48172481000225080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
+	//single-tagged packet in hex string. vlanID.pbit: 1.1
+	singleTaggedStr := "01005e0000010025ba48172481002001080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
 	singleTagged, err := hex.DecodeString(singleTaggedStr)
 	if err != nil {
 		t.Error("Unable to parse hex string", err)
 		panic(err)
 	}
-	//double-tagged packet in hex string. vlanID.pbit: 210.0-48.7
-	doubleTaggedStr := "01005e000016deadbeefba11810002108100e030080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
+	//double-tagged packet in hex string. vlanID.pbit: 210.0-0.0
+	doubleTaggedStr := "01005e000016deadbeefba118100021081000000080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
 	doubleTagged, err := hex.DecodeString(doubleTaggedStr)
 	if err != nil {
 		t.Error("Unable to parse hex string", err)
@@ -797,11 +784,11 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: untagged}, 3, false},
-		{"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 4, packet: singleTagged}, 4, false},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: doubleTagged}, 2, false},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 10, portNum: 10, packet: untagged}, 2, true},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: untagged}, 1, false},
+		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: singleTagged}, 2, false},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: doubleTagged}, 1, false},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 10, portNum: 10, packet: untagged}, 2, true},
+		{"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
 	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -809,7 +796,7 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
+			got, err := flowMgr[tt.args.intfID].GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
 			if tt.wantErr {
 				if err == nil {
 					//error expected but got value
@@ -830,7 +817,6 @@
 }
 
 func TestOpenOltFlowMgr_DeleteTechProfileInstance(t *testing.T) {
-	// flwMgr := newMockFlowmgr()
 	type args struct {
 		intfID uint32
 		onuID  uint32
@@ -850,7 +836,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := flowMgr.DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
+			if err := flowMgr[tt.args.intfID].DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
 				t.Errorf("OpenOltFlowMgr.DeleteTechProfileInstance() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -859,7 +845,6 @@
 
 func TestOpenOltFlowMgr_checkAndAddFlow(t *testing.T) {
 	ctx := context.Background()
-	// flowMgr := newMockFlowmgr()
 	kw := make(map[string]uint64)
 	kw["table_id"] = 1
 	kw["meter_id"] = 1
@@ -1097,7 +1082,7 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			flowMgr.checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
+			flowMgr[tt.args.intfID].checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
 				tt.args.TpInst, tt.args.gemPorts, tt.args.TpID, tt.args.uni)
 		})
 	}
@@ -1108,7 +1093,7 @@
 	defer cancel()
 	//create group
 	group := newGroup(2, []uint32{1})
-	err := flowMgr.AddGroup(ctx, group)
+	err := flowMgr[0].grpMgr.AddGroup(ctx, group)
 	if err != nil {
 		t.Error("group-add failed", err)
 		return
@@ -1128,7 +1113,7 @@
 	}
 	ofpStats, _ := fu.MkFlowStat(multicastFlowArgs)
 	fmt.Println(ofpStats.Id)
-	err = flowMgr.AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
+	err = flowMgr[0].AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
 	if err != nil {
 		t.Error("Multicast flow-add failed", err)
 		return
@@ -1136,20 +1121,20 @@
 
 	//add bucket to the group
 	group = newGroup(2, []uint32{1, 2})
-	err = flowMgr.ModifyGroup(ctx, group)
+	err = flowMgr[0].grpMgr.ModifyGroup(ctx, group)
 	if err != nil {
 		t.Error("modify-group failed", err)
 		return
 	}
 	//remove the multicast flow
-	err = flowMgr.RemoveFlow(ctx, ofpStats)
+	err = flowMgr[0].RemoveFlow(ctx, ofpStats)
 	if err != nil {
 		t.Error("Multicast flow-remove failed", err)
 		return
 	}
 
 	//remove the group
-	err = flowMgr.DeleteGroup(ctx, group)
+	err = flowMgr[0].grpMgr.DeleteGroup(ctx, group)
 	if err != nil {
 		t.Error("delete-group failed", err)
 		return
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
new file mode 100644
index 0000000..d14d24b
--- /dev/null
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -0,0 +1,350 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package core provides the utility for olt devices, flows, groups and statistics
+package core
+
+import (
+	"context"
+	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+//QueueInfoBrief has information about gemPortID and service priority associated with Mcast group
+type QueueInfoBrief struct {
+	gemPortID       uint32
+	servicePriority uint32
+}
+
+//OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj
+type OpenOltGroupMgr struct {
+	deviceHandler                *DeviceHandler
+	resourceMgr                  *rsrcMgr.OpenOltResourceMgr
+	interfaceToMcastQueueMap     map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+	interfaceToMcastQueueMapLock sync.RWMutex
+}
+
+//////////////////////////////////////////////
+//            EXPORTED FUNCTIONS            //
+//////////////////////////////////////////////
+
+//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
+func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
+	logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
+	var grpMgr OpenOltGroupMgr
+	grpMgr.deviceHandler = dh
+	grpMgr.resourceMgr = rMgr
+	grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief)
+	grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{}
+	logger.Info(ctx, "initialization-of-group-manager-success")
+	return &grpMgr
+}
+
+// AddGroup add or update the group
+func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+	logger.Infow(ctx, "add-group", log.Fields{"group": group})
+	if group == nil {
+		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+	}
+	groupToOlt := openoltpb2.Group{
+		GroupId: group.Desc.GroupId,
+		Command: openoltpb2.Group_SET_MEMBERS,
+		Action:  g.buildGroupAction(),
+	}
+	logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
+	_, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
+	if err != nil {
+		return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
+	}
+	// group members not created yet. So let's store the group
+	if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
+		return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+	}
+	logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
+	return nil
+}
+
+// DeleteGroup deletes a group from the device
+func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+	logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
+	if group == nil {
+		logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
+		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+	}
+	groupToOlt := openoltpb2.Group{
+		GroupId: group.Desc.GroupId,
+	}
+	logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
+	_, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
+	if err != nil {
+		logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
+		return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
+	}
+	//remove group from the store
+	if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
+		return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+	}
+	logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
+	return nil
+}
+
+// ModifyGroup updates the group
+func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+	logger.Infow(ctx, "modify-group", log.Fields{"group": group})
+	if group == nil || group.Desc == nil {
+		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+	}
+	newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
+	//get existing members of the group
+	val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
+	if err != nil {
+		return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
+	}
+	var current *openoltpb2.Group // represents the group on the device
+	if groupExists {
+		// group already exists
+		current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
+		logger.Debugw(ctx, "modify-group--group exists",
+			log.Fields{
+				"group on the device": val,
+				"new":                 group})
+	} else {
+		current = g.buildGroup(ctx, group.Desc.GroupId, nil)
+	}
+	logger.Debugw(ctx, "modify-group--comparing-current-and-new",
+		log.Fields{
+			"group on the device": current,
+			"new":                 newGroup})
+	// get members to be added
+	membersToBeAdded := g.findDiff(current, newGroup)
+	// get members to be removed
+	membersToBeRemoved := g.findDiff(newGroup, current)
+	logger.Infow(ctx, "modify-group--differences found", log.Fields{
+		"membersToBeAdded":   membersToBeAdded,
+		"membersToBeRemoved": membersToBeRemoved,
+		"groupId":            group.Desc.GroupId})
+	groupToOlt := openoltpb2.Group{
+		GroupId: group.Desc.GroupId,
+	}
+	var errAdd, errRemoved error
+	if len(membersToBeAdded) > 0 {
+		groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
+		groupToOlt.Members = membersToBeAdded
+		//execute addMembers
+		errAdd = g.callGroupAddRemove(ctx, &groupToOlt)
+	}
+	if len(membersToBeRemoved) > 0 {
+		groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
+		groupToOlt.Members = membersToBeRemoved
+		//execute removeMembers
+		errRemoved = g.callGroupAddRemove(ctx, &groupToOlt)
+	}
+	//save the modified group
+	if errAdd == nil && errRemoved == nil {
+		if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
+			return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+		}
+		logger.Infow(ctx, "modify-group-was-success--storing-group",
+			log.Fields{
+				"group":         group,
+				"existingGroup": current})
+	} else {
+		logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
+			log.Fields{"group": group})
+		if errAdd != nil {
+			return errAdd
+		}
+		return errRemoved
+	}
+	return nil
+}
+
+//LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
+//and put them into interfaceToMcastQueueMap.
+func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) {
+	storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+	if err != nil {
+		logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+		return
+	}
+	for intf, queueInfo := range storedMulticastQueueMap {
+		q := QueueInfoBrief{
+			gemPortID:       queueInfo[0],
+			servicePriority: queueInfo[1],
+		}
+		g.interfaceToMcastQueueMap[intf] = &q
+	}
+}
+
+//GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface
+func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) {
+	g.interfaceToMcastQueueMapLock.RLock()
+	defer g.interfaceToMcastQueueMapLock.RUnlock()
+	val, present := g.interfaceToMcastQueueMap[intfID]
+	return val, present
+}
+
+//UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface
+func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) {
+	g.interfaceToMcastQueueMapLock.Lock()
+	defer g.interfaceToMcastQueueMapLock.Unlock()
+	g.interfaceToMcastQueueMap[intfID] = val
+}
+
+////////////////////////////////////////////////
+//      INTERNAL or UNEXPORTED FUNCTIONS      //
+////////////////////////////////////////////////
+//getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
+//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
+//Returns (nil, false, nil) if the group does not exists in the KV store.
+func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+	exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
+	if err != nil {
+		return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+	}
+	if exists {
+		return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
+	}
+	return nil, exists, nil
+}
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+	groupDesc := ofp.OfpGroupDesc{
+		Type:    ofp.OfpGroupType_OFPGT_ALL,
+		GroupId: groupID,
+	}
+	groupEntry := ofp.OfpGroupEntry{
+		Desc: &groupDesc,
+	}
+	for i := 0; i < len(outPorts); i++ {
+		var acts []*ofp.OfpAction
+		acts = append(acts, flows.Output(outPorts[i]))
+		bucket := ofp.OfpBucket{
+			Actions: acts,
+		}
+		groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
+	}
+	return &groupEntry
+}
+
+//buildGroupAction creates and returns a group action
+func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action {
+	var actionCmd openoltpb2.ActionCmd
+	var action openoltpb2.Action
+	action.Cmd = &actionCmd
+	//pop outer vlan
+	action.Cmd.RemoveOuterTag = true
+	return &action
+}
+
+//callGroupAddRemove performs add/remove buckets operation for the indicated group
+func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
+	if err := g.performGroupOperation(ctx, group); err != nil {
+		st, _ := status.FromError(err)
+		//ignore already exists error code
+		if st.Code() != codes.AlreadyExists {
+			return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
+		}
+	}
+	return nil
+}
+
+//findDiff compares group members and finds members which only exists in groups2
+func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
+	var members []*openoltpb2.GroupMember
+	for _, bucket := range group2.Members {
+		if !g.contains(group1.Members, bucket) {
+			// bucket does not exist and must be added
+			members = append(members, bucket)
+		}
+	}
+	return members
+}
+
+//contains returns true if the members list contains the given member; false otherwise
+func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
+	for _, groupMember := range members {
+		if groupMember.InterfaceId == member.InterfaceId {
+			return true
+		}
+	}
+	return false
+}
+
+//performGroupOperation call performGroupOperation operation of openolt proto
+func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
+	logger.Debugw(ctx, "sending-group-to-device",
+		log.Fields{
+			"groupToOlt": group,
+			"command":    group.Command})
+	_, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
+	if err != nil {
+		return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
+	}
+	return nil
+}
+
+//buildGroup build openoltpb2.Group from given group id and bucket list
+func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
+	group := openoltpb2.Group{
+		GroupId: groupID}
+	// create members of the group
+	for _, ofBucket := range buckets {
+		member := g.buildMember(ctx, ofBucket)
+		if member != nil && !g.contains(group.Members, member) {
+			group.Members = append(group.Members, member)
+		}
+	}
+	return &group
+}
+
+//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
+func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
+	var outPort uint32
+	outPortFound := false
+	for _, ofAction := range ofBucket.Actions {
+		if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+			outPort = ofAction.GetOutput().Port
+			outPortFound = true
+		}
+	}
+	if !outPortFound {
+		logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
+		return nil
+	}
+	interfaceID := IntfIDFromUniPortNum(outPort)
+	logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
+		log.Fields{
+			"portNumber:":  outPort,
+			"interfaceId:": interfaceID})
+	g.interfaceToMcastQueueMapLock.RLock()
+	defer g.interfaceToMcastQueueMapLock.RUnlock()
+	if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok {
+		member := openoltpb2.GroupMember{
+			InterfaceId:   interfaceID,
+			InterfaceType: openoltpb2.GroupMember_PON,
+			GemPortId:     groupInfo.gemPortID,
+			Priority:      groupInfo.servicePriority,
+		}
+		//add member to the group
+		return &member
+	}
+	logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
+	return nil
+}