VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
  separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
  had a different meaning and the name seems to have been blindly ported to 2.x adapter
  and does not make sense anymore

Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d68b8f8..83da756 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -24,11 +24,6 @@
 	"encoding/json"
 	"errors"
 	"fmt"
-	"math/big"
-	"strings"
-	"sync"
-	"time"
-
 	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
@@ -39,6 +34,9 @@
 	openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"math/big"
+	"strings"
+	"sync"
 
 	//deepcopy "github.com/getlantern/deepcopy"
 	"github.com/EagleChen/mapmutex"
@@ -187,12 +185,6 @@
 	gemPort uint32
 }
 
-type pendingFlowDeleteKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 type tpLockKey struct {
 	intfID uint32
 	onuID  uint32
@@ -211,15 +203,26 @@
 	flowMetadata *voltha.FlowMetadata
 }
 
-type queueInfoBrief struct {
-	gemPortID       uint32
-	servicePriority uint32
+// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
+type pendingFlowRemoveDataKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
+// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
+// This holds the number of pending flow removes and also a signal channel to
+// to indicate the receiver when all flow removes are handled
+type pendingFlowRemoveData struct {
+	pendingFlowRemoveCount uint32
+	allFlowsRemoved        chan struct{}
 }
 
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
 type OpenOltFlowMgr struct {
 	techprofile        map[uint32]tp.TechProfileIf
 	deviceHandler      *DeviceHandler
+	grpMgr             *OpenOltGroupMgr
 	resourceMgr        *rsrcMgr.OpenOltResourceMgr
 	onuIdsLock         sync.RWMutex
 	perGemPortLock     *mapmutex.Mutex                    // lock to be used to access the flowsUsedByGemPort map
@@ -228,21 +231,28 @@
 	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
 	onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
 	// We need to have a global lock on the onuGemInfo map
-	onuGemInfoLock    sync.RWMutex
-	pendingFlowDelete sync.Map
+	onuGemInfoLock sync.RWMutex
 	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
-	perUserFlowHandleLock    *mapmutex.Mutex
-	interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+	perUserFlowHandleLock *mapmutex.Mutex
+
+	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
+	// subscriber basis for the number of pending flow removes. This data is used
+	// to process all the flow removes for a subscriber before handling flow adds.
+	// Interleaving flow delete and flow add processing has known to cause PON resource
+	// management contentions on a per subscriber bases, so we need ensure ordering.
+	pendingFlowRemoveDataPerSubscriber     map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+	pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr) *OpenOltFlowMgr {
 	logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
 	var flowMgr OpenOltFlowMgr
 	var err error
 	var idx uint32
 
 	flowMgr.deviceHandler = dh
+	flowMgr.grpMgr = grpMgr
 	flowMgr.resourceMgr = rMgr
 	flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
 	if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
@@ -250,15 +260,15 @@
 		return nil
 	}
 	flowMgr.onuIdsLock = sync.RWMutex{}
+	flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
 	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 	ponPorts := rMgr.DevInfo.GetPonPorts()
 	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
-	flowMgr.pendingFlowDelete = sync.Map{}
 	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
 	flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
-	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
+	flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 	//Load the onugem info cache from kv store on flowmanager start
 	for idx = 0; idx < ponPorts; idx++ {
 		if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -268,7 +278,7 @@
 		flowMgr.loadFlowIDlistForGem(ctx, idx)
 	}
 	//load interface to multicast queue map from kv store
-	flowMgr.loadInterfaceToMulticastQueueMap(ctx)
+	flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
 	logger.Info(ctx, "initialization-of-flow-manager-success")
 	return &flowMgr
 }
@@ -305,7 +315,7 @@
 	}, nil)
 }
 
-func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
 	classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
 	UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) error {
 	var allocID uint32
@@ -571,16 +581,17 @@
 	if sq.direction == tp_pb.Direction_DOWNSTREAM {
 		multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
 		if len(multicastTrafficQueues) > 0 {
-			if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
+			if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
 				//assumed that there is only one queue per PON for the multicast service
 				//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
 				//just put it in interfaceToMcastQueueMap to use for building group members
 				logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
 				multicastQueuePerPonPort := multicastTrafficQueues[0]
-				f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
+				val := &QueueInfoBrief{
 					gemPortID:       multicastQueuePerPonPort.GemportId,
 					servicePriority: multicastQueuePerPonPort.Priority,
 				}
+				f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
 				//also store the queue info in kv store
 				if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
 					logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
@@ -1923,42 +1934,6 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) deletePendingFlows(ctx context.Context, Intf uint32, onuID int32, uniID int32) {
-	pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
-	if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
-		if val.(int) > 0 {
-			pnFlDels := val.(int) - 1
-			if pnFlDels > 0 {
-				logger.Debugw(ctx, "flow-delete-succeeded--more-pending",
-					log.Fields{
-						"intf":               Intf,
-						"onu-id":             onuID,
-						"uni-id":             uniID,
-						"currpendingflowcnt": pnFlDels,
-						"device-id":          f.deviceHandler.device.Id})
-				f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
-			} else {
-				logger.Debugw(ctx, "all-pending-flow-deletes-handled--removing-entry-from-map",
-					log.Fields{
-						"intf":      Intf,
-						"onu-id":    onuID,
-						"uni-id":    uniID,
-						"device-id": f.deviceHandler.device.Id})
-				f.pendingFlowDelete.Delete(pnFlDelKey)
-			}
-		}
-	} else {
-		logger.Debugw(ctx, "no-pending-delete-flows-found",
-			log.Fields{
-				"intf":      Intf,
-				"onu-id":    onuID,
-				"uni-id":    uniID,
-				"device-id": f.deviceHandler.device.Id})
-
-	}
-
-}
-
 // Once the gemport is released for a given onu, it also has to be cleared from local cache
 // which was used for deriving the gemport->logicalPortNo during packet-in.
 // Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
@@ -1975,6 +1950,7 @@
 			"onu-id":      onuID,
 			"device-id":   f.deviceHandler.device.Id,
 			"onu-gem":     f.onuGemInfo[intfID]})
+
 	onugem := f.onuGemInfo[intfID]
 deleteLoop:
 	for i, onu := range onugem {
@@ -2028,34 +2004,6 @@
 					"device-id": f.deviceHandler.device.Id}, err).Log()
 		}
 		if len(updatedFlows) == 0 {
-			// Do this for subscriber flows only (not trap from NNI flows)
-			if onuID != -1 && uniID != -1 {
-				pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
-				if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
-					logger.Debugw(ctx, "creating-entry-for-pending-flow-delete",
-						log.Fields{
-							"flow-id":   flowID,
-							"intf":      Intf,
-							"onu-id":    onuID,
-							"uni-id":    uniID,
-							"device-id": f.deviceHandler.device.Id})
-					f.pendingFlowDelete.Store(pnFlDelKey, 1)
-				} else {
-					pnFlDels := val.(int) + 1
-					logger.Debugw(ctx, "updating-flow-delete-entry",
-						log.Fields{
-							"flow-id":            flowID,
-							"intf":               Intf,
-							"onu-id":             onuID,
-							"uni-id":             uniID,
-							"currPendingFlowCnt": pnFlDels,
-							"device-id":          f.deviceHandler.device.Id})
-					f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
-				}
-
-				defer f.deletePendingFlows(ctx, Intf, onuID, uniID)
-			}
-
 			logger.Debugw(ctx, "releasing-flow-id-to-resource-manager",
 				log.Fields{
 					"Intf":      Intf,
@@ -2310,72 +2258,6 @@
 	}
 }
 
-//clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
-// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
-	classifierInfo := make(map[string]interface{})
-	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
-	networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
-	if err != nil {
-		logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
-		return
-	}
-
-	var onuID = int32(NoneOnuID)
-	var uniID = int32(NoneUniID)
-	var flowID uint32
-
-	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
-
-	for _, flowID = range flowIds {
-		flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
-		if flowInfo == nil {
-			logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
-				log.Fields{
-					"intf":    networkInterfaceID,
-					"onu-id":  onuID,
-					"uni-id":  uniID,
-					"flow-id": flowID})
-			continue
-		}
-		updatedFlows := *flowInfo
-		for i, storedFlow := range updatedFlows {
-			if flow.Id == storedFlow.LogicalFlowID {
-				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
-				logger.Debugw(ctx, "multicast-flow-to-be-deleted",
-					log.Fields{
-						"flow":      storedFlow,
-						"flow-id":   flow.Id,
-						"device-id": f.deviceHandler.device.Id})
-				//remove from device
-				if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
-					// DKB
-					logger.Errorw(ctx, "failed-to-remove-multicast-flow",
-						log.Fields{
-							"flow-id": flow.Id,
-							"error":   err})
-					return
-				}
-				logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
-				//Remove the Flow from FlowInfo
-				updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
-				if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
-					logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
-						log.Fields{"flow": storedFlow,
-							"err": err})
-					return
-				}
-				//release flow id
-				logger.Debugw(ctx, "releasing-multicast-flow-id",
-					log.Fields{"flow-id": flowID,
-						"interfaceID": networkInterfaceID})
-				f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
-			}
-		}
-	}
-}
-
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 	logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
@@ -2394,6 +2276,9 @@
 		}
 	}
 
+	f.incrementActiveFlowRemoveCount(ctx, flow)
+	defer f.decrementActiveFlowRemoveCount(ctx, flow)
+
 	if flows.HasGroup(flow) {
 		direction = Multicast
 		f.clearFlowFromResourceManager(ctx, flow, direction)
@@ -2424,24 +2309,6 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
-	uniID uint32, ch chan bool) {
-	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
-	for {
-		select {
-		case <-time.After(20 * time.Millisecond):
-			if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
-				logger.Debug(ctx, "pending-flow-deletes-completed")
-				ch <- true
-				return
-			}
-		case <-ctx.Done():
-			logger.Error(ctx, "flow-delete-wait-handler-routine-canceled")
-			return
-		}
-	}
-}
-
 //isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
 func isIgmpTrapDownstreamFlow(classifierInfo map[string]interface{}) bool {
 	if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_ETHERNET_NNI {
@@ -2518,6 +2385,11 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
+	// If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
+	// Wait for any FlowRemoves for that specific subscriber to finish first
+	// The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
+	f.waitForFlowRemoveToFinish(ctx, flow)
+
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2543,33 +2415,29 @@
 		logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
 
 	}
+	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+}
 
-	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
-	if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
-		logger.Debugw(ctx, "no-pending-flows-found--going-ahead-with-flow-install",
-			log.Fields{
-				"intf-id": intfID,
-				"onu-id":  onuID,
-				"uni-id":  uniID})
-		return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
+func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
+	var flowRemoveData pendingFlowRemoveData
+	var ok bool
+
+	key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+	f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+	if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+		logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+		return
 	}
-	pendingFlowDelComplete := make(chan bool)
-	go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
-	select {
-	case <-pendingFlowDelComplete:
-		logger.Debugw(ctx, "all-pending-flow-deletes-completed",
-			log.Fields{
-				"intf-id": intfID,
-				"onu-id":  onuID,
-				"uni-id":  uniID})
-		return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
-	case <-time.After(10 * time.Second):
-		return olterrors.NewErrTimeout("pending-flow-deletes",
-			log.Fields{
-				"intf-id": intfID,
-				"onu-id":  onuID,
-				"uni-id":  uniID}, nil)
-	}
+	f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+	// Wait for all flow removes to finish first
+	<-flowRemoveData.allFlowsRemoved
+
+	logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
 }
 
 // handleFlowWithGroup adds multicast flow to the device.
@@ -2643,9 +2511,9 @@
 	}
 	logger.Info(ctx, "multicast-flow-added-to-device-successfully")
 	//get cached group
-	if group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true); err == nil {
+	if group, _, err := f.grpMgr.getFlowGroupFromKVStore(ctx, groupID, true); err == nil {
 		//calling groupAdd to set group members after multicast flow creation
-		if err := f.ModifyGroup(ctx, group); err != nil {
+		if err := f.grpMgr.ModifyGroup(ctx, group); err != nil {
 			return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err)
 		}
 		//cached group can be removed now
@@ -2681,241 +2549,6 @@
 	return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
 }
 
-// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	logger.Infow(ctx, "add-group", log.Fields{"group": group})
-	if group == nil {
-		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
-	}
-
-	groupToOlt := openoltpb2.Group{
-		GroupId: group.Desc.GroupId,
-		Command: openoltpb2.Group_SET_MEMBERS,
-		Action:  f.buildGroupAction(),
-	}
-
-	logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
-	_, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
-	if err != nil {
-		return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
-	}
-	// group members not created yet. So let's store the group
-	if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
-		return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
-	}
-	logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
-	return nil
-}
-
-// DeleteGroup deletes a group from the device
-func (f *OpenOltFlowMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
-	if group == nil {
-		logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
-		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
-	}
-
-	groupToOlt := openoltpb2.Group{
-		GroupId: group.Desc.GroupId,
-	}
-
-	logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
-	_, err := f.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
-	if err != nil {
-		logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
-		return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
-	}
-	//remove group from the store
-	if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
-		return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
-	}
-	logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
-	return nil
-}
-
-//buildGroupAction creates and returns a group action
-func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
-	var actionCmd openoltpb2.ActionCmd
-	var action openoltpb2.Action
-	action.Cmd = &actionCmd
-	//pop outer vlan
-	action.Cmd.RemoveOuterTag = true
-	return &action
-}
-
-// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	logger.Infow(ctx, "modify-group", log.Fields{"group": group})
-	if group == nil || group.Desc == nil {
-		return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
-	}
-
-	newGroup := f.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
-	//get existing members of the group
-	val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
-
-	if err != nil {
-		return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
-	}
-
-	var current *openoltpb2.Group // represents the group on the device
-	if groupExists {
-		// group already exists
-		current = f.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
-		logger.Debugw(ctx, "modify-group--group exists",
-			log.Fields{
-				"group on the device": val,
-				"new":                 group})
-	} else {
-		current = f.buildGroup(ctx, group.Desc.GroupId, nil)
-	}
-
-	logger.Debugw(ctx, "modify-group--comparing-current-and-new",
-		log.Fields{
-			"group on the device": current,
-			"new":                 newGroup})
-	// get members to be added
-	membersToBeAdded := f.findDiff(current, newGroup)
-	// get members to be removed
-	membersToBeRemoved := f.findDiff(newGroup, current)
-
-	logger.Infow(ctx, "modify-group--differences found", log.Fields{
-		"membersToBeAdded":   membersToBeAdded,
-		"membersToBeRemoved": membersToBeRemoved,
-		"groupId":            group.Desc.GroupId})
-
-	groupToOlt := openoltpb2.Group{
-		GroupId: group.Desc.GroupId,
-	}
-	var errAdd, errRemoved error
-	if len(membersToBeAdded) > 0 {
-		groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
-		groupToOlt.Members = membersToBeAdded
-		//execute addMembers
-		errAdd = f.callGroupAddRemove(ctx, &groupToOlt)
-	}
-	if len(membersToBeRemoved) > 0 {
-		groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
-		groupToOlt.Members = membersToBeRemoved
-		//execute removeMembers
-		errRemoved = f.callGroupAddRemove(ctx, &groupToOlt)
-	}
-
-	//save the modified group
-	if errAdd == nil && errRemoved == nil {
-		if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
-			return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
-		}
-		logger.Infow(ctx, "modify-group-was-success--storing-group",
-			log.Fields{
-				"group":         group,
-				"existingGroup": current})
-	} else {
-		logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
-			log.Fields{"group": group})
-		if errAdd != nil {
-			return errAdd
-		}
-		return errRemoved
-	}
-	return nil
-}
-
-//callGroupAddRemove performs add/remove buckets operation for the indicated group
-func (f *OpenOltFlowMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
-	if err := f.performGroupOperation(ctx, group); err != nil {
-		st, _ := status.FromError(err)
-		//ignore already exists error code
-		if st.Code() != codes.AlreadyExists {
-			return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
-		}
-	}
-	return nil
-}
-
-//findDiff compares group members and finds members which only exists in groups2
-func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
-	var members []*openoltpb2.GroupMember
-	for _, bucket := range group2.Members {
-		if !f.contains(group1.Members, bucket) {
-			// bucket does not exist and must be added
-			members = append(members, bucket)
-		}
-	}
-	return members
-}
-
-//contains returns true if the members list contains the given member; false otherwise
-func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
-	for _, groupMember := range members {
-		if groupMember.InterfaceId == member.InterfaceId {
-			return true
-		}
-	}
-	return false
-}
-
-//performGroupOperation call performGroupOperation operation of openolt proto
-func (f *OpenOltFlowMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
-	logger.Debugw(ctx, "sending-group-to-device",
-		log.Fields{
-			"groupToOlt": group,
-			"command":    group.Command})
-	_, err := f.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
-	if err != nil {
-		return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
-	}
-	return nil
-}
-
-//buildGroup build openoltpb2.Group from given group id and bucket list
-func (f *OpenOltFlowMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
-	group := openoltpb2.Group{
-		GroupId: groupID}
-	// create members of the group
-	for _, ofBucket := range buckets {
-		member := f.buildMember(ctx, ofBucket)
-		if member != nil && !f.contains(group.Members, member) {
-			group.Members = append(group.Members, member)
-		}
-	}
-	return &group
-}
-
-//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
-func (f *OpenOltFlowMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
-	var outPort uint32
-	outPortFound := false
-	for _, ofAction := range ofBucket.Actions {
-		if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
-			outPort = ofAction.GetOutput().Port
-			outPortFound = true
-		}
-	}
-
-	if !outPortFound {
-		logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
-		return nil
-	}
-	interfaceID := IntfIDFromUniPortNum(outPort)
-	logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
-		log.Fields{
-			"portNumber:":  outPort,
-			"interfaceId:": interfaceID})
-	if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
-		member := openoltpb2.GroupMember{
-			InterfaceId:   interfaceID,
-			InterfaceType: openoltpb2.GroupMember_PON,
-			GemPortId:     groupInfo.gemPortID,
-			Priority:      groupInfo.servicePriority,
-		}
-		//add member to the group
-		return &member
-	}
-	logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
-	return nil
-}
-
 //sendTPDownloadMsgToChild send payload
 func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
 
@@ -2957,6 +2590,16 @@
 
 	f.onuGemInfoLock.Lock()
 	defer f.onuGemInfoLock.Unlock()
+	onugem := f.onuGemInfo[intfID]
+	// If the ONU already exists in onuGemInfo list, nothing to do
+	for _, onu := range onugem {
+		if onu.OnuID == onuID && onu.SerialNumber == serialNum {
+			logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+				log.Fields{"onuID": onuID,
+					"serialNum": serialNum})
+			return nil
+		}
+	}
 
 	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
 	f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
@@ -3002,6 +2645,7 @@
 			}
 			onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
 			f.onuGemInfo[intfID] = onugem
+			break
 		}
 	}
 	err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
@@ -3037,7 +2681,6 @@
 			"onu-geminfo": f.onuGemInfo[intfID],
 			"intf-id":     intfID,
 			"gemport-id":  gemPortID})
-
 	// get onuid from the onugem info cache
 	onugem := f.onuGemInfo[intfID]
 
@@ -4010,52 +3653,150 @@
 	}
 }
 
-//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
-//and put them into interfaceToMcastQueueMap.
-func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
-	storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+//clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
+// clears resources reserved for this multicast flow
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
+	classifierInfo := make(map[string]interface{})
+	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+	networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
+
 	if err != nil {
-		logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+		logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
 		return
 	}
-	for intf, queueInfo := range storedMulticastQueueMap {
-		q := queueInfoBrief{
-			gemPortID:       queueInfo[0],
-			servicePriority: queueInfo[1],
+
+	var onuID = int32(NoneOnuID)
+	var uniID = int32(NoneUniID)
+	var flowID uint32
+
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
+
+	for _, flowID = range flowIds {
+		flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
+		if flowInfo == nil {
+			logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
+				log.Fields{
+					"intf":    networkInterfaceID,
+					"onu-id":  onuID,
+					"uni-id":  uniID,
+					"flow-id": flowID})
+			continue
 		}
-		f.interfaceToMcastQueueMap[intf] = &q
+		updatedFlows := *flowInfo
+		for i, storedFlow := range updatedFlows {
+			if flow.Id == storedFlow.LogicalFlowID {
+				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+				logger.Debugw(ctx, "multicast-flow-to-be-deleted",
+					log.Fields{
+						"flow":      storedFlow,
+						"flow-id":   flow.Id,
+						"device-id": f.deviceHandler.device.Id})
+				//remove from device
+				if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
+					// DKB
+					logger.Errorw(ctx, "failed-to-remove-multicast-flow",
+						log.Fields{
+							"flow-id": flow.Id,
+							"error":   err})
+					return
+				}
+				logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
+				//Remove the Flow from FlowInfo
+				updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+				if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+					logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
+						log.Fields{"flow": storedFlow,
+							"err": err})
+					return
+				}
+				//release flow id
+				logger.Debugw(ctx, "releasing-multicast-flow-id",
+					log.Fields{"flow-id": flowID,
+						"interfaceID": networkInterfaceID})
+				f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+			}
+		}
 	}
 }
 
-//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
-//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
-//Returns (nil, false, nil) if the group does not exists in the KV store.
-func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
-	exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
-	if err != nil {
-		return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+		f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+		defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+		flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
+		if !ok {
+			flowRemoveData = pendingFlowRemoveData{
+				pendingFlowRemoveCount: 0,
+				allFlowsRemoved:        make(chan struct{}),
+			}
+		}
+		flowRemoveData.pendingFlowRemoveCount++
+		f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
+
+		logger.Debugw(ctx, "current-flow-remove-count–increment",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+				"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
 	}
-	if exists {
-		return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
-	}
-	return nil, exists, nil
 }
 
-func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
-	groupDesc := ofp.OfpGroupDesc{
-		Type:    ofp.OfpGroupType_OFPGT_ALL,
-		GroupId: groupID,
-	}
-	groupEntry := ofp.OfpGroupEntry{
-		Desc: &groupDesc,
-	}
-	for i := 0; i < len(outPorts); i++ {
-		var acts []*ofp.OfpAction
-		acts = append(acts, flows.Output(outPorts[i]))
-		bucket := ofp.OfpBucket{
-			Actions: acts,
+func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+		f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+		defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+		if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+		} else {
+			if val.pendingFlowRemoveCount > 0 {
+				val.pendingFlowRemoveCount--
+			}
+			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
+				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+					"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+			// If all flow removes have finished, then close the channel to signal the receiver
+			// to go ahead with flow adds.
+			if val.pendingFlowRemoveCount == 0 {
+				close(val.allFlowsRemoved)
+				delete(f.pendingFlowRemoveDataPerSubscriber, key)
+				return
+			}
+			f.pendingFlowRemoveDataPerSubscriber[key] = val
 		}
-		groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
 	}
-	return &groupEntry
+}
+
+func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
+	var flowRemoveData pendingFlowRemoveData
+	var ok bool
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+		f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+		if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+			f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+			return
+		}
+		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+		// Wait for all flow removes to finish first
+		<-flowRemoveData.allFlowsRemoved
+
+		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+	}
 }