VOL-3521 : scale: intermittent issue - voltha complains that different meter is in use for subscriber
- Process incoming flows on a per ONU basis using channels per ONU

Change-Id: I0f375d90d786a0135bb51ce18036e5297dc7297b
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index b87ca12..a098387 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1424,26 +1424,25 @@
 
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
-			ponIf := dh.getPonIfFromFlow(ctx, flow)
+			ponIf := dh.getPonIfFromFlow(flow)
 
 			logger.Debugw(ctx, "removing-flow",
 				log.Fields{"device-id": device.Id,
 					"ponIf":        ponIf,
 					"flowToRemove": flow})
-			err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
+			err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 
 		for _, flow := range flows.ToAdd.Items {
-			ponIf := dh.getPonIfFromFlow(ctx, flow)
+			ponIf := dh.getPonIfFromFlow(flow)
 			logger.Debugw(ctx, "adding-flow",
 				log.Fields{"device-id": device.Id,
 					"ponIf":     ponIf,
 					"flowToAdd": flow})
-			// If there are active Flow Remove in progress for a given subscriber, wait until it completes
-			err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
+			err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -2082,15 +2081,6 @@
 				"serial-number": onuDevice.(*OnuDevice).serialNumber}, err).Log()
 	}
 
-	for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
-		logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
-			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
-		logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
-			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		// TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
-	}
-
 	onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
 	if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
 		return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
@@ -2245,7 +2235,7 @@
 	return resp, nil
 }
 
-func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+func (dh *DeviceHandler) getPonIfFromFlow(flow *of.OfpFlowStats) uint32 {
 	// Default to PON0
 	var intfID uint32
 	inPort, outPort := getPorts(flow)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 311a371..68309ad 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -44,7 +44,7 @@
 )
 
 const (
-	NumPonPorts  = 2
+	NumPonPorts  = 16
 	OnuIDStart   = 1
 	OnuIDEnd     = 32
 	AllocIDStart = 1
@@ -160,7 +160,7 @@
 	openOLT := &OpenOLT{coreProxy: cp, adapterProxy: ap, eventProxy: ep, config: cfg}
 	dh := NewDeviceHandler(cp, ap, ep, device, openOLT)
 	oopRanges := []*oop.DeviceInfo_DeviceResourceRanges{{
-		IntfIds:    []uint32{0, 1},
+		IntfIds:    []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
 		Technology: "xgs-pon",
 		Pools:      []*oop.DeviceInfo_DeviceResourceRanges_Pool{{}},
 	}}
@@ -193,15 +193,16 @@
 
 	ponmgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
-		IntfIDs:  []uint32{0, 1},
+		IntfIDs:  []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
 		KVStore: &db.Backend{
 			Client: &mocks.MockKVClient{},
 		},
 		PonResourceRanges: ranges,
 		SharedIdxByType:   sharedIdxByType,
 	}
-	dh.resourceMgr.ResourceMgrs[0] = ponmgr
-	dh.resourceMgr.ResourceMgrs[1] = ponmgr
+	for i := 0; i < NumPonPorts; i++ {
+		dh.resourceMgr.ResourceMgrs[uint32(i)] = ponmgr
+	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e1bc9aa..009b43c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -35,7 +35,6 @@
 	"strings"
 	"sync"
 
-	"github.com/EagleChen/mapmutex"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -141,23 +140,13 @@
 	//NoneUniID constant
 	NoneUniID = -1
 
-	// MapMutex
-	maxRetry  = 300
-	maxDelay  = 100000000
-	baseDelay = 10000000
-	factor    = 1.1
-	jitter    = 0.2
+	// Max number of flows that can be queued per ONU
+	maxConcurrentFlowsPerOnu = 20
 
 	bitMapPrefix = "0b"
 	pbit1        = '1'
 )
 
-type tpLockKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -170,13 +159,6 @@
 	flowMetadata *voltha.FlowMetadata
 }
 
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 // subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
 type subscriberDataPathFlowIDKey struct {
 	intfID    uint32
@@ -186,12 +168,16 @@
 	tpID      uint32
 }
 
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
-	pendingFlowRemoveCount uint32
-	allFlowsRemoved        chan struct{}
+// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
+// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
+// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
+// flow and processes it serially
+type flowControlBlock struct {
+	ctx          context.Context      // Flow handler context
+	addFlow      bool                 // if true flow to be added, else removed
+	flow         *voltha.OfpFlowStats // Flow message
+	flowMetadata *voltha.FlowMetadata // FlowMetadata that contains flow meter information. This can be nil for Flow remove
+	errChan      *chan error          // channel to report the Flow handling error
 }
 
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
@@ -215,21 +201,14 @@
 	// We need to have a global lock on the onuGemInfo map
 	onuGemInfoLock sync.RWMutex
 
-	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
-	perUserFlowHandleLock *mapmutex.Mutex
-
-	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
-	// subscriber basis for the number of pending flow removes. This data is used
-	// to process all the flow removes for a subscriber before handling flow adds.
-	// Interleaving flow delete and flow add processing has known to cause PON resource
-	// management contentions on a per subscriber bases, so we need ensure ordering.
-	pendingFlowRemoveDataPerSubscriber     map[pendingFlowRemoveDataKey]pendingFlowRemoveData
-	pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
-
 	// Map of voltha flowID associated with subscriberDataPathFlowIDKey
 	// This information is not persisted on Kv store and hence should be reconciled on adapter restart
 	subscriberDataPathFlowIDMap     map[subscriberDataPathFlowIDKey]uint64
 	subscriberDataPathFlowIDMapLock sync.RWMutex
+
+	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
+	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
+	incomingFlows []chan flowControlBlock
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,15 +227,24 @@
 		return nil
 	}
 	flowMgr.onuIdsLock = sync.RWMutex{}
-	flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
 	flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 	flowMgr.packetInGemPortLock = sync.RWMutex{}
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
-	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
-	flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 	flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
 	flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+
+	// Create a slice of buffered channels for handling concurrent flows per ONU.
+	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
+	flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+	for i := range flowMgr.incomingFlows {
+		flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+		// Spin up a go routine to handling incoming flows (add/remove).
+		// There will be on go routine per ONU.
+		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
+		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+	}
+
 	//Load the onugem info cache from kv store on flowmanager start
 	if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
 		logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
@@ -337,51 +325,36 @@
 		"uni":       uni,
 		"device-id": f.deviceHandler.device.Id})
 
-	tpLockMapKey := tpLockKey{intfID, onuID, uniID}
-	if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
-		logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
-			"device-id":  f.deviceHandler.device.Id,
-			"intf-id":    intfID,
-			"onu-id":     onuID,
-			"uni-id":     uniID,
-			"port-no":    portNo,
-			"classifier": classifierInfo,
-			"action":     actionInfo,
-			"usmeter-id": UsMeterID,
-			"dsmeter-id": DsMeterID,
-			"tp-id":      TpID})
-		allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
-		if allocID == 0 || gemPorts == nil || TpInst == nil {
-			logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
-			f.perUserFlowHandleLock.Unlock(tpLockMapKey)
-			return olterrors.NewErrNotFound(
-				"alloc-id-gem-ports-tp-unavailable",
-				nil, nil)
-		}
-		args := make(map[string]uint32)
-		args[IntfID] = intfID
-		args[OnuID] = onuID
-		args[UniID] = uniID
-		args[PortNo] = portNo
-		args[AllocID] = allocID
-
-		/* Flows can be added specific to gemport if p-bits are received.
-		 * If no pbit mentioned then adding flows for all gemports
-		 */
-		f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
-		f.perUserFlowHandleLock.Unlock(tpLockMapKey)
-	} else {
-		cause := "failed-to-acquire-per-user-flow-handle-lock"
-		fields := log.Fields{
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"uni-id":      uniID,
-			"flow-id":     flow.Id,
-			"flow-cookie": flow.Cookie,
-			"device-id":   f.deviceHandler.device.Id}
-		logger.Errorw(ctx, cause, fields)
-		return olterrors.NewErrAdapter(cause, fields, nil)
+	logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
+		"device-id":  f.deviceHandler.device.Id,
+		"intf-id":    intfID,
+		"onu-id":     onuID,
+		"uni-id":     uniID,
+		"port-no":    portNo,
+		"classifier": classifierInfo,
+		"action":     actionInfo,
+		"usmeter-id": UsMeterID,
+		"dsmeter-id": DsMeterID,
+		"tp-id":      TpID})
+	allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+	if allocID == 0 || gemPorts == nil || TpInst == nil {
+		logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
+		return olterrors.NewErrNotFound(
+			"alloc-id-gem-ports-tp-unavailable",
+			nil, nil)
 	}
+	args := make(map[string]uint32)
+	args[IntfID] = intfID
+	args[OnuID] = onuID
+	args[UniID] = uniID
+	args[PortNo] = portNo
+	args[AllocID] = allocID
+
+	/* Flows can be added specific to gemport if p-bits are received.
+	 * If no pbit mentioned then adding flows for all gemports
+	 */
+	f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+
 	return nil
 }
 
@@ -676,10 +649,18 @@
 		return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
 			log.Fields{
 				"intf-id":            sq.intfID,
-				"traffic-schedulers": TrafficSched}, err)
+				"traffic-schedulers": TrafficSched,
+				"onu-id":             sq.onuID,
+				"uni-id":             sq.uniID,
+				"uni-port":           sq.uniPort}, err)
 	}
 
-	logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
+	logger.Infow(ctx, "removed-traffic-schedulers-successfully",
+		log.Fields{"device-id": f.deviceHandler.device.Id,
+			"intf-id":  sq.intfID,
+			"onu-id":   sq.onuID,
+			"uni-id":   sq.uniID,
+			"uni-port": sq.uniPort})
 
 	/* After we successfully remove the scheduler configuration on the OLT device,
 	 * delete the meter id on the KV store.
@@ -690,13 +671,21 @@
 			log.Fields{
 				"onu":       sq.onuID,
 				"meter":     KVStoreMeter.MeterId,
-				"device-id": f.deviceHandler.device.Id}, err)
+				"device-id": f.deviceHandler.device.Id,
+				"intf-id":   sq.intfID,
+				"onu-id":    sq.onuID,
+				"uni-id":    sq.uniID,
+				"uni-port":  sq.uniPort}, err)
 	}
 	logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
 		log.Fields{
 			"meter-id":  KVStoreMeter.MeterId,
 			"dir":       Direction,
-			"device-id": f.deviceHandler.device.Id})
+			"device-id": f.deviceHandler.device.Id,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort})
 	return err
 }
 
@@ -710,7 +699,6 @@
 
 	allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
 	allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
-
 	tpPath := f.getTPpath(ctx, intfID, uni, TpID)
 
 	logger.Debugw(ctx, "creating-new-tcont-and-gem", log.Fields{
@@ -2058,9 +2046,6 @@
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 
-	f.incrementActiveFlowRemoveCount(ctx, flow)
-	defer f.decrementActiveFlowRemoveCount(ctx, flow)
-
 	logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
 	var direction string
 	actionInfo := make(map[string]interface{})
@@ -2086,22 +2071,8 @@
 		direction = Downstream
 	}
 
-	_, intfID, onuID, uniID, _, _, err := FlowExtractInfo(ctx, flow, direction)
-	if err != nil {
-		return err
-	}
-
-	userKey := tpLockKey{intfID, onuID, uniID}
-
 	// Serialize flow removes on a per subscriber basis
-	if f.perUserFlowHandleLock.TryLock(userKey) {
-		err = f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
-		f.perUserFlowHandleLock.Unlock(userKey)
-	} else {
-		// Ideally this should never happen
-		logger.Errorw(ctx, "failed-to-acquire-lock-to-remove-flow--remove-aborted", log.Fields{"flow": flow})
-		return errors.New("failed-to-acquire-per-user-lock")
-	}
+	err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
 
 	return err
 }
@@ -2122,6 +2093,61 @@
 	return false
 }
 
+// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
+func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+	// Step1 : Fill flowControlBlock
+	// Step2 : Push the flowControlBlock to ONU channel
+	// Step3 : Wait on response channel for response
+	// Step4 : Return error value
+	logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+	errChan := make(chan error)
+	flowCb := flowControlBlock{
+		ctx:          ctx,
+		addFlow:      addFlow,
+		flow:         flow,
+		flowMetadata: flowMetadata,
+		errChan:      &errChan,
+	}
+	inPort, outPort := getPorts(flow)
+	var onuID uint32
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
+	}
+	// inPort or outPort is InvalidPort for trap-from-nni flows.
+	// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+	// Send the flowCb on the ONU flow channel
+	f.incomingFlows[onuID] <- flowCb
+	// Wait on the channel for flow handlers return value
+	err := <-errChan
+	logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+	return err
+}
+
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+	for {
+		// block on the channel to receive an incoming flow
+		// process the flow completely before proceeding to handle the next flow
+		flowCb := <-subscriberFlowChannel
+		if flowCb.addFlow {
+			logger.Debugw(flowCb.ctx, "adding-flow",
+				log.Fields{"device-id": f.deviceHandler.device.Id,
+					"flowToAdd": flowCb.flow})
+			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+			// Pass the return value over the return channel
+			*flowCb.errChan <- err
+		} else {
+			logger.Debugw(flowCb.ctx, "removing-flow",
+				log.Fields{"device-id": f.deviceHandler.device.Id,
+					"flowToRemove": flowCb.flow})
+			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+			// Pass the return value over the return channel
+			*flowCb.errChan <- err
+		}
+	}
+}
+
 // AddFlow add flow to device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
@@ -2182,11 +2208,6 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
-	// If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
-	// Wait for any FlowRemoves for that specific subscriber to finish first
-	// The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
-	f.waitForFlowRemoveToFinish(ctx, flow)
-
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2215,28 +2236,6 @@
 	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
 }
 
-//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
-func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-
-	key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-	f.pendingFlowRemoveDataPerSubscriberLock.RLock()
-	if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-		logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-		return
-	}
-	f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
-	// Wait for all flow removes to finish first
-	<-flowRemoveData.allFlowsRemoved
-
-	logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-}
-
 // handleFlowWithGroup adds multicast flow to the device.
 func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
 	classifierInfo[PacketTagType] = DoubleTag
@@ -2430,8 +2429,6 @@
 			"onu-gem":     f.onuGemInfo})
 }
 
-// This function Lookup maps  by serialNumber or (intfId, gemPort)
-
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
 func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
 
@@ -3263,91 +3260,6 @@
 	return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
 }
 
-func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-
-	f.pendingFlowRemoveDataPerSubscriberLock.Lock()
-	defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
-		if !ok {
-			flowRemoveData = pendingFlowRemoveData{
-				pendingFlowRemoveCount: 0,
-				allFlowsRemoved:        make(chan struct{}),
-			}
-		}
-		flowRemoveData.pendingFlowRemoveCount++
-		f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
-		logger.Debugw(ctx, "current-flow-remove-count–increment",
-			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-				"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-	}
-}
-
-func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-	f.pendingFlowRemoveDataPerSubscriberLock.Lock()
-	defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		} else {
-			if val.pendingFlowRemoveCount > 0 {
-				val.pendingFlowRemoveCount--
-			}
-			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
-				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-					"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-			// If all flow removes have finished, then close the channel to signal the receiver
-			// to go ahead with flow adds.
-			if val.pendingFlowRemoveCount == 0 {
-				close(val.allFlowsRemoved)
-				delete(f.pendingFlowRemoveDataPerSubscriber, key)
-				return
-			}
-			f.pendingFlowRemoveDataPerSubscriber[key] = val
-		}
-	}
-}
-
-func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		f.pendingFlowRemoveDataPerSubscriberLock.RLock()
-		if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-			f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-			return
-		}
-		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
-
-		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-	}
-}
-
 // reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
 func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
 	onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 9464830..d8caa38 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -927,37 +927,37 @@
 	actionInfo3 := make(map[string]interface{})
 	classifierInfo4 := make(map[string]interface{})
 	actionInfo4 := make(map[string]interface{})
-	flowState, _ := fu.MkFlowStat(fa)
-	flowState2, _ := fu.MkFlowStat(fa2)
-	flowState3, _ := fu.MkFlowStat(fa3)
-	flowState4, _ := fu.MkFlowStat(fa4)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo, flowState)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo2, flowState2)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo3, flowState3)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo4, flowState4)
+	flow, _ := fu.MkFlowStat(fa)
+	flow2, _ := fu.MkFlowStat(fa2)
+	flow3, _ := fu.MkFlowStat(fa3)
+	flow4, _ := fu.MkFlowStat(fa4)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo2, flow2)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo3, flow3)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo4, flow4)
 
-	err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flowState)
+	err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flow)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flowState2)
+	err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flow2)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flowState3)
+	err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flow3)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flowState4)
+	err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flow4)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
@@ -1035,7 +1035,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo,
 				actionInfo:     actionInfo,
-				flow:           flowState,
+				flow:           flow,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1054,7 +1054,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo2,
 				actionInfo:     actionInfo2,
-				flow:           flowState2,
+				flow:           flow2,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1073,7 +1073,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo3,
 				actionInfo:     actionInfo3,
-				flow:           flowState3,
+				flow:           flow3,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1092,7 +1092,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo4,
 				actionInfo:     actionInfo4,
-				flow:           flowState4,
+				flow:           flow4,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1168,3 +1168,301 @@
 		return
 	}
 }
+
+func TestOpenOltFlowMgr_TestRouteFlowToOnuChannel(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/core", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/pkg/mocks", log.DebugLevel)
+	kw := make(map[string]uint64)
+	kw["table_id"] = 1
+	kw["meter_id"] = 1
+	kw["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+
+	flowMetadata1 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+		{
+			Flags:   5,
+			MeterId: 1,
+			Bands: []*voltha.OfpMeterBandHeader{
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      16000,
+					BurstSize: 30,
+				},
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      32000,
+					BurstSize: 30,
+				},
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      64000,
+					BurstSize: 30,
+				},
+			},
+		},
+	}}
+
+	flowMetadata2 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+		{
+			Flags:   5,
+			MeterId: 2,
+			Bands: []*voltha.OfpMeterBandHeader{
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      16000,
+					BurstSize: 30,
+				},
+			},
+		},
+	}}
+
+	// Downstream LLDP Trap from NNI0 flow
+	fa0 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1048576),
+			fu.EthType(35020),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.Output(4294967293),
+		},
+		KV: kw,
+	}
+
+	// Upstream flow DHCP flow - ONU1 UNI0 PON0
+	fa1 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			fu.IpProto(17), // dhcp
+			fu.VlanPcp(0),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(16),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Upstream EAPOL - ONU1 UNI0 PON0
+	fa2 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			fu.EthType(0x888E),
+			fu.VlanPcp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257),
+			fu.TunnelId(16),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Upstream HSIA - ONU1 UNI0 PON0
+	fa3 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			//fu.EthType(0x8100),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(1048576),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Downstream HSIA - ONU1 UNI0 PON0
+	fa4 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1048576),
+			fu.Metadata_ofp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.VlanPcp(1),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(536870912),
+			fu.PopVlan(),
+		},
+		KV: kw,
+	}
+
+	// Upstream flow DHCP flow - ONU1 UNI0 PON15
+	fa5 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870927),
+			fu.Metadata_ofp(1),
+			fu.IpProto(17), // dhcp
+			fu.VlanPcp(0),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(61456),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+	// Upstream EAPOL - ONU1 UNI0 PON15
+	fa6 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870927),
+			fu.Metadata_ofp(1),
+			fu.EthType(0x888E),
+			fu.VlanPcp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259),
+			fu.TunnelId(61456),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+	flow0, _ := fu.MkFlowStat(fa0)
+	flow1, _ := fu.MkFlowStat(fa1)
+	flow2, _ := fu.MkFlowStat(fa2)
+	flow3, _ := fu.MkFlowStat(fa3)
+	flow4, _ := fu.MkFlowStat(fa4)
+
+	flow5, _ := fu.MkFlowStat(fa5)
+	flow6, _ := fu.MkFlowStat(fa6)
+
+	type args struct {
+		ctx          context.Context
+		flow         *ofp.OfpFlowStats
+		addFlow      bool
+		flowMetadata *voltha.FlowMetadata
+	}
+	tests := []struct {
+		name        string
+		args        args
+		wantErr     bool
+		returnedErr error
+	}{
+		{
+			name: "RouteFlowToOnuChannel-0",
+			args: args{
+				ctx:          ctx,
+				flow:         flow0,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-1",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-2",
+			args: args{
+				ctx:          ctx,
+				flow:         flow2,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-3",
+			args: args{
+				ctx:          ctx,
+				flow:         flow3,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-4",
+			args: args{
+				ctx:          ctx,
+				flow:         flow4,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-5",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      false,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-6",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      true,
+				flowMetadata: &flowMetadata2,
+			},
+			wantErr: true,
+		},
+		{
+			name: "RouteFlowToOnuChannel-7",
+			args: args{
+				ctx:          ctx,
+				flow:         flow5,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-8",
+			args: args{
+				ctx:          ctx,
+				flow:         flow6,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+	}
+
+	var wg sync.WaitGroup
+	defer wg.Wait() // wait for all go routines to complete
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			wg.Add(1) // one per go routine
+			go func() {
+				defer wg.Done()
+				tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+				if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
+					t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
+				}
+			}()
+		})
+	}
+}
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index fd69e94..5901c7c 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -104,19 +104,20 @@
 	ranges["alloc_id_shared"] = uint32(0)
 	ranges["gemport_id_shared"] = uint32(0)
 	ranges["flow_id_shared"] = uint32(0)
-	resMgr.NumOfPonPorts = 2
+	resMgr.NumOfPonPorts = 16
 	ponMgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
-		IntfIDs:  []uint32{1, 2},
+		IntfIDs:  []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
 		KVStore: &db.Backend{
 			Client: &MockResKVClient{},
 		},
 		PonResourceRanges: ranges,
 		SharedIdxByType:   sharedIdxByType,
 	}
-	resMgr.ResourceMgrs[1] = ponMgr
-	resMgr.ResourceMgrs[2] = ponMgr
-
+	var ponIntf uint32
+	for ponIntf = 0; ponIntf < resMgr.NumOfPonPorts; ponIntf++ {
+		resMgr.ResourceMgrs[ponIntf] = ponMgr
+	}
 	return &resMgr
 }