VOL-3521 : scale: intermittent issue - voltha complains that different meter is in use for subscriber
- Process incoming flows on a per ONU basis using channels per ONU

Change-Id: I0f375d90d786a0135bb51ce18036e5297dc7297b
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e1bc9aa..009b43c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -35,7 +35,6 @@
 	"strings"
 	"sync"
 
-	"github.com/EagleChen/mapmutex"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -141,23 +140,13 @@
 	//NoneUniID constant
 	NoneUniID = -1
 
-	// MapMutex
-	maxRetry  = 300
-	maxDelay  = 100000000
-	baseDelay = 10000000
-	factor    = 1.1
-	jitter    = 0.2
+	// Max number of flows that can be queued per ONU
+	maxConcurrentFlowsPerOnu = 20
 
 	bitMapPrefix = "0b"
 	pbit1        = '1'
 )
 
-type tpLockKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -170,13 +159,6 @@
 	flowMetadata *voltha.FlowMetadata
 }
 
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 // subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
 type subscriberDataPathFlowIDKey struct {
 	intfID    uint32
@@ -186,12 +168,16 @@
 	tpID      uint32
 }
 
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
-	pendingFlowRemoveCount uint32
-	allFlowsRemoved        chan struct{}
+// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
+// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
+// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
+// flow and processes it serially
+type flowControlBlock struct {
+	ctx          context.Context      // Flow handler context
+	addFlow      bool                 // if true flow to be added, else removed
+	flow         *voltha.OfpFlowStats // Flow message
+	flowMetadata *voltha.FlowMetadata // FlowMetadata that contains flow meter information. This can be nil for Flow remove
+	errChan      *chan error          // channel to report the Flow handling error
 }
 
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
@@ -215,21 +201,14 @@
 	// We need to have a global lock on the onuGemInfo map
 	onuGemInfoLock sync.RWMutex
 
-	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
-	perUserFlowHandleLock *mapmutex.Mutex
-
-	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
-	// subscriber basis for the number of pending flow removes. This data is used
-	// to process all the flow removes for a subscriber before handling flow adds.
-	// Interleaving flow delete and flow add processing has known to cause PON resource
-	// management contentions on a per subscriber bases, so we need ensure ordering.
-	pendingFlowRemoveDataPerSubscriber     map[pendingFlowRemoveDataKey]pendingFlowRemoveData
-	pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
-
 	// Map of voltha flowID associated with subscriberDataPathFlowIDKey
 	// This information is not persisted on Kv store and hence should be reconciled on adapter restart
 	subscriberDataPathFlowIDMap     map[subscriberDataPathFlowIDKey]uint64
 	subscriberDataPathFlowIDMapLock sync.RWMutex
+
+	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
+	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
+	incomingFlows []chan flowControlBlock
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,15 +227,24 @@
 		return nil
 	}
 	flowMgr.onuIdsLock = sync.RWMutex{}
-	flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
 	flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 	flowMgr.packetInGemPortLock = sync.RWMutex{}
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
-	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
-	flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 	flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
 	flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+
+	// Create a slice of buffered channels for handling concurrent flows per ONU.
+	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
+	flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+	for i := range flowMgr.incomingFlows {
+		flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+		// Spin up a go routine to handling incoming flows (add/remove).
+		// There will be on go routine per ONU.
+		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
+		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+	}
+
 	//Load the onugem info cache from kv store on flowmanager start
 	if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
 		logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
@@ -337,51 +325,36 @@
 		"uni":       uni,
 		"device-id": f.deviceHandler.device.Id})
 
-	tpLockMapKey := tpLockKey{intfID, onuID, uniID}
-	if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
-		logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
-			"device-id":  f.deviceHandler.device.Id,
-			"intf-id":    intfID,
-			"onu-id":     onuID,
-			"uni-id":     uniID,
-			"port-no":    portNo,
-			"classifier": classifierInfo,
-			"action":     actionInfo,
-			"usmeter-id": UsMeterID,
-			"dsmeter-id": DsMeterID,
-			"tp-id":      TpID})
-		allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
-		if allocID == 0 || gemPorts == nil || TpInst == nil {
-			logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
-			f.perUserFlowHandleLock.Unlock(tpLockMapKey)
-			return olterrors.NewErrNotFound(
-				"alloc-id-gem-ports-tp-unavailable",
-				nil, nil)
-		}
-		args := make(map[string]uint32)
-		args[IntfID] = intfID
-		args[OnuID] = onuID
-		args[UniID] = uniID
-		args[PortNo] = portNo
-		args[AllocID] = allocID
-
-		/* Flows can be added specific to gemport if p-bits are received.
-		 * If no pbit mentioned then adding flows for all gemports
-		 */
-		f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
-		f.perUserFlowHandleLock.Unlock(tpLockMapKey)
-	} else {
-		cause := "failed-to-acquire-per-user-flow-handle-lock"
-		fields := log.Fields{
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"uni-id":      uniID,
-			"flow-id":     flow.Id,
-			"flow-cookie": flow.Cookie,
-			"device-id":   f.deviceHandler.device.Id}
-		logger.Errorw(ctx, cause, fields)
-		return olterrors.NewErrAdapter(cause, fields, nil)
+	logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
+		"device-id":  f.deviceHandler.device.Id,
+		"intf-id":    intfID,
+		"onu-id":     onuID,
+		"uni-id":     uniID,
+		"port-no":    portNo,
+		"classifier": classifierInfo,
+		"action":     actionInfo,
+		"usmeter-id": UsMeterID,
+		"dsmeter-id": DsMeterID,
+		"tp-id":      TpID})
+	allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+	if allocID == 0 || gemPorts == nil || TpInst == nil {
+		logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
+		return olterrors.NewErrNotFound(
+			"alloc-id-gem-ports-tp-unavailable",
+			nil, nil)
 	}
+	args := make(map[string]uint32)
+	args[IntfID] = intfID
+	args[OnuID] = onuID
+	args[UniID] = uniID
+	args[PortNo] = portNo
+	args[AllocID] = allocID
+
+	/* Flows can be added specific to gemport if p-bits are received.
+	 * If no pbit mentioned then adding flows for all gemports
+	 */
+	f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+
 	return nil
 }
 
@@ -676,10 +649,18 @@
 		return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
 			log.Fields{
 				"intf-id":            sq.intfID,
-				"traffic-schedulers": TrafficSched}, err)
+				"traffic-schedulers": TrafficSched,
+				"onu-id":             sq.onuID,
+				"uni-id":             sq.uniID,
+				"uni-port":           sq.uniPort}, err)
 	}
 
-	logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
+	logger.Infow(ctx, "removed-traffic-schedulers-successfully",
+		log.Fields{"device-id": f.deviceHandler.device.Id,
+			"intf-id":  sq.intfID,
+			"onu-id":   sq.onuID,
+			"uni-id":   sq.uniID,
+			"uni-port": sq.uniPort})
 
 	/* After we successfully remove the scheduler configuration on the OLT device,
 	 * delete the meter id on the KV store.
@@ -690,13 +671,21 @@
 			log.Fields{
 				"onu":       sq.onuID,
 				"meter":     KVStoreMeter.MeterId,
-				"device-id": f.deviceHandler.device.Id}, err)
+				"device-id": f.deviceHandler.device.Id,
+				"intf-id":   sq.intfID,
+				"onu-id":    sq.onuID,
+				"uni-id":    sq.uniID,
+				"uni-port":  sq.uniPort}, err)
 	}
 	logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
 		log.Fields{
 			"meter-id":  KVStoreMeter.MeterId,
 			"dir":       Direction,
-			"device-id": f.deviceHandler.device.Id})
+			"device-id": f.deviceHandler.device.Id,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort})
 	return err
 }
 
@@ -710,7 +699,6 @@
 
 	allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
 	allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
-
 	tpPath := f.getTPpath(ctx, intfID, uni, TpID)
 
 	logger.Debugw(ctx, "creating-new-tcont-and-gem", log.Fields{
@@ -2058,9 +2046,6 @@
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 
-	f.incrementActiveFlowRemoveCount(ctx, flow)
-	defer f.decrementActiveFlowRemoveCount(ctx, flow)
-
 	logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
 	var direction string
 	actionInfo := make(map[string]interface{})
@@ -2086,22 +2071,8 @@
 		direction = Downstream
 	}
 
-	_, intfID, onuID, uniID, _, _, err := FlowExtractInfo(ctx, flow, direction)
-	if err != nil {
-		return err
-	}
-
-	userKey := tpLockKey{intfID, onuID, uniID}
-
 	// Serialize flow removes on a per subscriber basis
-	if f.perUserFlowHandleLock.TryLock(userKey) {
-		err = f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
-		f.perUserFlowHandleLock.Unlock(userKey)
-	} else {
-		// Ideally this should never happen
-		logger.Errorw(ctx, "failed-to-acquire-lock-to-remove-flow--remove-aborted", log.Fields{"flow": flow})
-		return errors.New("failed-to-acquire-per-user-lock")
-	}
+	err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
 
 	return err
 }
@@ -2122,6 +2093,61 @@
 	return false
 }
 
+// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
+func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+	// Step1 : Fill flowControlBlock
+	// Step2 : Push the flowControlBlock to ONU channel
+	// Step3 : Wait on response channel for response
+	// Step4 : Return error value
+	logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+	errChan := make(chan error)
+	flowCb := flowControlBlock{
+		ctx:          ctx,
+		addFlow:      addFlow,
+		flow:         flow,
+		flowMetadata: flowMetadata,
+		errChan:      &errChan,
+	}
+	inPort, outPort := getPorts(flow)
+	var onuID uint32
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
+	}
+	// inPort or outPort is InvalidPort for trap-from-nni flows.
+	// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+	// Send the flowCb on the ONU flow channel
+	f.incomingFlows[onuID] <- flowCb
+	// Wait on the channel for flow handlers return value
+	err := <-errChan
+	logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+	return err
+}
+
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+	for {
+		// block on the channel to receive an incoming flow
+		// process the flow completely before proceeding to handle the next flow
+		flowCb := <-subscriberFlowChannel
+		if flowCb.addFlow {
+			logger.Debugw(flowCb.ctx, "adding-flow",
+				log.Fields{"device-id": f.deviceHandler.device.Id,
+					"flowToAdd": flowCb.flow})
+			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+			// Pass the return value over the return channel
+			*flowCb.errChan <- err
+		} else {
+			logger.Debugw(flowCb.ctx, "removing-flow",
+				log.Fields{"device-id": f.deviceHandler.device.Id,
+					"flowToRemove": flowCb.flow})
+			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+			// Pass the return value over the return channel
+			*flowCb.errChan <- err
+		}
+	}
+}
+
 // AddFlow add flow to device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
@@ -2182,11 +2208,6 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
-	// If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
-	// Wait for any FlowRemoves for that specific subscriber to finish first
-	// The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
-	f.waitForFlowRemoveToFinish(ctx, flow)
-
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2215,28 +2236,6 @@
 	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
 }
 
-//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
-func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-
-	key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-	f.pendingFlowRemoveDataPerSubscriberLock.RLock()
-	if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-		logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-		return
-	}
-	f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
-	// Wait for all flow removes to finish first
-	<-flowRemoveData.allFlowsRemoved
-
-	logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-}
-
 // handleFlowWithGroup adds multicast flow to the device.
 func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
 	classifierInfo[PacketTagType] = DoubleTag
@@ -2430,8 +2429,6 @@
 			"onu-gem":     f.onuGemInfo})
 }
 
-// This function Lookup maps  by serialNumber or (intfId, gemPort)
-
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
 func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
 
@@ -3263,91 +3260,6 @@
 	return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
 }
 
-func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-
-	f.pendingFlowRemoveDataPerSubscriberLock.Lock()
-	defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
-		if !ok {
-			flowRemoveData = pendingFlowRemoveData{
-				pendingFlowRemoveCount: 0,
-				allFlowsRemoved:        make(chan struct{}),
-			}
-		}
-		flowRemoveData.pendingFlowRemoveCount++
-		f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
-		logger.Debugw(ctx, "current-flow-remove-count–increment",
-			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-				"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-	}
-}
-
-func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-	f.pendingFlowRemoveDataPerSubscriberLock.Lock()
-	defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		} else {
-			if val.pendingFlowRemoveCount > 0 {
-				val.pendingFlowRemoveCount--
-			}
-			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
-				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-					"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-			// If all flow removes have finished, then close the channel to signal the receiver
-			// to go ahead with flow adds.
-			if val.pendingFlowRemoveCount == 0 {
-				close(val.allFlowsRemoved)
-				delete(f.pendingFlowRemoveDataPerSubscriber, key)
-				return
-			}
-			f.pendingFlowRemoveDataPerSubscriber[key] = val
-		}
-	}
-}
-
-func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		f.pendingFlowRemoveDataPerSubscriberLock.RLock()
-		if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-			f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-			return
-		}
-		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
-
-		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-	}
-}
-
 // reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
 func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
 	onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)