VOL-3521 : scale: intermittent issue - voltha complains that different meter is in use for subscriber
- Process incoming flows on a per ONU basis using channels per ONU

Change-Id: I0f375d90d786a0135bb51ce18036e5297dc7297b
diff --git a/VERSION b/VERSION
index 05d78bc..cb2b00e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.1-dev
+3.0.1
diff --git a/go.mod b/go.mod
index faca99e..8cccee9 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,6 @@
 go 1.13
 
 require (
-	github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d
 	github.com/cenkalti/backoff/v3 v3.1.1
 	github.com/gogo/protobuf v1.3.1
 	github.com/golang/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 91da69d..bb10b31 100644
--- a/go.sum
+++ b/go.sum
@@ -4,8 +4,6 @@
 github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
 github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
-github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d h1:j5hduAppx4gHqltfZ1cm7jHbXR0LuQulnF4VkBU8esw=
-github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d/go.mod h1:H87WPRkM4YDLkW5tC6biLEzWaKtNse5xL1AR91FXC74=
 github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index b87ca12..a098387 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1424,26 +1424,25 @@
 
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
-			ponIf := dh.getPonIfFromFlow(ctx, flow)
+			ponIf := dh.getPonIfFromFlow(flow)
 
 			logger.Debugw(ctx, "removing-flow",
 				log.Fields{"device-id": device.Id,
 					"ponIf":        ponIf,
 					"flowToRemove": flow})
-			err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
+			err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 
 		for _, flow := range flows.ToAdd.Items {
-			ponIf := dh.getPonIfFromFlow(ctx, flow)
+			ponIf := dh.getPonIfFromFlow(flow)
 			logger.Debugw(ctx, "adding-flow",
 				log.Fields{"device-id": device.Id,
 					"ponIf":     ponIf,
 					"flowToAdd": flow})
-			// If there are active Flow Remove in progress for a given subscriber, wait until it completes
-			err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
+			err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -2082,15 +2081,6 @@
 				"serial-number": onuDevice.(*OnuDevice).serialNumber}, err).Log()
 	}
 
-	for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
-		logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
-			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
-		logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
-			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		// TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
-	}
-
 	onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
 	if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
 		return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
@@ -2245,7 +2235,7 @@
 	return resp, nil
 }
 
-func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+func (dh *DeviceHandler) getPonIfFromFlow(flow *of.OfpFlowStats) uint32 {
 	// Default to PON0
 	var intfID uint32
 	inPort, outPort := getPorts(flow)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 311a371..68309ad 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -44,7 +44,7 @@
 )
 
 const (
-	NumPonPorts  = 2
+	NumPonPorts  = 16
 	OnuIDStart   = 1
 	OnuIDEnd     = 32
 	AllocIDStart = 1
@@ -160,7 +160,7 @@
 	openOLT := &OpenOLT{coreProxy: cp, adapterProxy: ap, eventProxy: ep, config: cfg}
 	dh := NewDeviceHandler(cp, ap, ep, device, openOLT)
 	oopRanges := []*oop.DeviceInfo_DeviceResourceRanges{{
-		IntfIds:    []uint32{0, 1},
+		IntfIds:    []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
 		Technology: "xgs-pon",
 		Pools:      []*oop.DeviceInfo_DeviceResourceRanges_Pool{{}},
 	}}
@@ -193,15 +193,16 @@
 
 	ponmgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
-		IntfIDs:  []uint32{0, 1},
+		IntfIDs:  []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
 		KVStore: &db.Backend{
 			Client: &mocks.MockKVClient{},
 		},
 		PonResourceRanges: ranges,
 		SharedIdxByType:   sharedIdxByType,
 	}
-	dh.resourceMgr.ResourceMgrs[0] = ponmgr
-	dh.resourceMgr.ResourceMgrs[1] = ponmgr
+	for i := 0; i < NumPonPorts; i++ {
+		dh.resourceMgr.ResourceMgrs[uint32(i)] = ponmgr
+	}
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e1bc9aa..009b43c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -35,7 +35,6 @@
 	"strings"
 	"sync"
 
-	"github.com/EagleChen/mapmutex"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -141,23 +140,13 @@
 	//NoneUniID constant
 	NoneUniID = -1
 
-	// MapMutex
-	maxRetry  = 300
-	maxDelay  = 100000000
-	baseDelay = 10000000
-	factor    = 1.1
-	jitter    = 0.2
+	// Max number of flows that can be queued per ONU
+	maxConcurrentFlowsPerOnu = 20
 
 	bitMapPrefix = "0b"
 	pbit1        = '1'
 )
 
-type tpLockKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -170,13 +159,6 @@
 	flowMetadata *voltha.FlowMetadata
 }
 
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
 // subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
 type subscriberDataPathFlowIDKey struct {
 	intfID    uint32
@@ -186,12 +168,16 @@
 	tpID      uint32
 }
 
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
-	pendingFlowRemoveCount uint32
-	allFlowsRemoved        chan struct{}
+// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
+// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
+// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
+// flow and processes it serially
+type flowControlBlock struct {
+	ctx          context.Context      // Flow handler context
+	addFlow      bool                 // if true flow to be added, else removed
+	flow         *voltha.OfpFlowStats // Flow message
+	flowMetadata *voltha.FlowMetadata // FlowMetadata that contains flow meter information. This can be nil for Flow remove
+	errChan      *chan error          // channel to report the Flow handling error
 }
 
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
@@ -215,21 +201,14 @@
 	// We need to have a global lock on the onuGemInfo map
 	onuGemInfoLock sync.RWMutex
 
-	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
-	perUserFlowHandleLock *mapmutex.Mutex
-
-	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
-	// subscriber basis for the number of pending flow removes. This data is used
-	// to process all the flow removes for a subscriber before handling flow adds.
-	// Interleaving flow delete and flow add processing has known to cause PON resource
-	// management contentions on a per subscriber bases, so we need ensure ordering.
-	pendingFlowRemoveDataPerSubscriber     map[pendingFlowRemoveDataKey]pendingFlowRemoveData
-	pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
-
 	// Map of voltha flowID associated with subscriberDataPathFlowIDKey
 	// This information is not persisted on Kv store and hence should be reconciled on adapter restart
 	subscriberDataPathFlowIDMap     map[subscriberDataPathFlowIDKey]uint64
 	subscriberDataPathFlowIDMapLock sync.RWMutex
+
+	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
+	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
+	incomingFlows []chan flowControlBlock
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,15 +227,24 @@
 		return nil
 	}
 	flowMgr.onuIdsLock = sync.RWMutex{}
-	flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
 	flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 	flowMgr.packetInGemPortLock = sync.RWMutex{}
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
-	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
-	flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 	flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
 	flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+
+	// Create a slice of buffered channels for handling concurrent flows per ONU.
+	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
+	flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+	for i := range flowMgr.incomingFlows {
+		flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+		// Spin up a go routine to handling incoming flows (add/remove).
+		// There will be on go routine per ONU.
+		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
+		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+	}
+
 	//Load the onugem info cache from kv store on flowmanager start
 	if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
 		logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
@@ -337,51 +325,36 @@
 		"uni":       uni,
 		"device-id": f.deviceHandler.device.Id})
 
-	tpLockMapKey := tpLockKey{intfID, onuID, uniID}
-	if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
-		logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
-			"device-id":  f.deviceHandler.device.Id,
-			"intf-id":    intfID,
-			"onu-id":     onuID,
-			"uni-id":     uniID,
-			"port-no":    portNo,
-			"classifier": classifierInfo,
-			"action":     actionInfo,
-			"usmeter-id": UsMeterID,
-			"dsmeter-id": DsMeterID,
-			"tp-id":      TpID})
-		allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
-		if allocID == 0 || gemPorts == nil || TpInst == nil {
-			logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
-			f.perUserFlowHandleLock.Unlock(tpLockMapKey)
-			return olterrors.NewErrNotFound(
-				"alloc-id-gem-ports-tp-unavailable",
-				nil, nil)
-		}
-		args := make(map[string]uint32)
-		args[IntfID] = intfID
-		args[OnuID] = onuID
-		args[UniID] = uniID
-		args[PortNo] = portNo
-		args[AllocID] = allocID
-
-		/* Flows can be added specific to gemport if p-bits are received.
-		 * If no pbit mentioned then adding flows for all gemports
-		 */
-		f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
-		f.perUserFlowHandleLock.Unlock(tpLockMapKey)
-	} else {
-		cause := "failed-to-acquire-per-user-flow-handle-lock"
-		fields := log.Fields{
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"uni-id":      uniID,
-			"flow-id":     flow.Id,
-			"flow-cookie": flow.Cookie,
-			"device-id":   f.deviceHandler.device.Id}
-		logger.Errorw(ctx, cause, fields)
-		return olterrors.NewErrAdapter(cause, fields, nil)
+	logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
+		"device-id":  f.deviceHandler.device.Id,
+		"intf-id":    intfID,
+		"onu-id":     onuID,
+		"uni-id":     uniID,
+		"port-no":    portNo,
+		"classifier": classifierInfo,
+		"action":     actionInfo,
+		"usmeter-id": UsMeterID,
+		"dsmeter-id": DsMeterID,
+		"tp-id":      TpID})
+	allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+	if allocID == 0 || gemPorts == nil || TpInst == nil {
+		logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
+		return olterrors.NewErrNotFound(
+			"alloc-id-gem-ports-tp-unavailable",
+			nil, nil)
 	}
+	args := make(map[string]uint32)
+	args[IntfID] = intfID
+	args[OnuID] = onuID
+	args[UniID] = uniID
+	args[PortNo] = portNo
+	args[AllocID] = allocID
+
+	/* Flows can be added specific to gemport if p-bits are received.
+	 * If no pbit mentioned then adding flows for all gemports
+	 */
+	f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+
 	return nil
 }
 
@@ -676,10 +649,18 @@
 		return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
 			log.Fields{
 				"intf-id":            sq.intfID,
-				"traffic-schedulers": TrafficSched}, err)
+				"traffic-schedulers": TrafficSched,
+				"onu-id":             sq.onuID,
+				"uni-id":             sq.uniID,
+				"uni-port":           sq.uniPort}, err)
 	}
 
-	logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
+	logger.Infow(ctx, "removed-traffic-schedulers-successfully",
+		log.Fields{"device-id": f.deviceHandler.device.Id,
+			"intf-id":  sq.intfID,
+			"onu-id":   sq.onuID,
+			"uni-id":   sq.uniID,
+			"uni-port": sq.uniPort})
 
 	/* After we successfully remove the scheduler configuration on the OLT device,
 	 * delete the meter id on the KV store.
@@ -690,13 +671,21 @@
 			log.Fields{
 				"onu":       sq.onuID,
 				"meter":     KVStoreMeter.MeterId,
-				"device-id": f.deviceHandler.device.Id}, err)
+				"device-id": f.deviceHandler.device.Id,
+				"intf-id":   sq.intfID,
+				"onu-id":    sq.onuID,
+				"uni-id":    sq.uniID,
+				"uni-port":  sq.uniPort}, err)
 	}
 	logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
 		log.Fields{
 			"meter-id":  KVStoreMeter.MeterId,
 			"dir":       Direction,
-			"device-id": f.deviceHandler.device.Id})
+			"device-id": f.deviceHandler.device.Id,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort})
 	return err
 }
 
@@ -710,7 +699,6 @@
 
 	allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
 	allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
-
 	tpPath := f.getTPpath(ctx, intfID, uni, TpID)
 
 	logger.Debugw(ctx, "creating-new-tcont-and-gem", log.Fields{
@@ -2058,9 +2046,6 @@
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 
-	f.incrementActiveFlowRemoveCount(ctx, flow)
-	defer f.decrementActiveFlowRemoveCount(ctx, flow)
-
 	logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
 	var direction string
 	actionInfo := make(map[string]interface{})
@@ -2086,22 +2071,8 @@
 		direction = Downstream
 	}
 
-	_, intfID, onuID, uniID, _, _, err := FlowExtractInfo(ctx, flow, direction)
-	if err != nil {
-		return err
-	}
-
-	userKey := tpLockKey{intfID, onuID, uniID}
-
 	// Serialize flow removes on a per subscriber basis
-	if f.perUserFlowHandleLock.TryLock(userKey) {
-		err = f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
-		f.perUserFlowHandleLock.Unlock(userKey)
-	} else {
-		// Ideally this should never happen
-		logger.Errorw(ctx, "failed-to-acquire-lock-to-remove-flow--remove-aborted", log.Fields{"flow": flow})
-		return errors.New("failed-to-acquire-per-user-lock")
-	}
+	err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
 
 	return err
 }
@@ -2122,6 +2093,61 @@
 	return false
 }
 
+// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
+func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+	// Step1 : Fill flowControlBlock
+	// Step2 : Push the flowControlBlock to ONU channel
+	// Step3 : Wait on response channel for response
+	// Step4 : Return error value
+	logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+	errChan := make(chan error)
+	flowCb := flowControlBlock{
+		ctx:          ctx,
+		addFlow:      addFlow,
+		flow:         flow,
+		flowMetadata: flowMetadata,
+		errChan:      &errChan,
+	}
+	inPort, outPort := getPorts(flow)
+	var onuID uint32
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
+	}
+	// inPort or outPort is InvalidPort for trap-from-nni flows.
+	// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+	// Send the flowCb on the ONU flow channel
+	f.incomingFlows[onuID] <- flowCb
+	// Wait on the channel for flow handlers return value
+	err := <-errChan
+	logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+	return err
+}
+
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+	for {
+		// block on the channel to receive an incoming flow
+		// process the flow completely before proceeding to handle the next flow
+		flowCb := <-subscriberFlowChannel
+		if flowCb.addFlow {
+			logger.Debugw(flowCb.ctx, "adding-flow",
+				log.Fields{"device-id": f.deviceHandler.device.Id,
+					"flowToAdd": flowCb.flow})
+			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+			// Pass the return value over the return channel
+			*flowCb.errChan <- err
+		} else {
+			logger.Debugw(flowCb.ctx, "removing-flow",
+				log.Fields{"device-id": f.deviceHandler.device.Id,
+					"flowToRemove": flowCb.flow})
+			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+			// Pass the return value over the return channel
+			*flowCb.errChan <- err
+		}
+	}
+}
+
 // AddFlow add flow to device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
@@ -2182,11 +2208,6 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
-	// If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
-	// Wait for any FlowRemoves for that specific subscriber to finish first
-	// The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
-	f.waitForFlowRemoveToFinish(ctx, flow)
-
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2215,28 +2236,6 @@
 	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
 }
 
-//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
-func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-
-	key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-	f.pendingFlowRemoveDataPerSubscriberLock.RLock()
-	if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-		logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-		return
-	}
-	f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
-	// Wait for all flow removes to finish first
-	<-flowRemoveData.allFlowsRemoved
-
-	logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-}
-
 // handleFlowWithGroup adds multicast flow to the device.
 func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
 	classifierInfo[PacketTagType] = DoubleTag
@@ -2430,8 +2429,6 @@
 			"onu-gem":     f.onuGemInfo})
 }
 
-// This function Lookup maps  by serialNumber or (intfId, gemPort)
-
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
 func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
 
@@ -3263,91 +3260,6 @@
 	return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
 }
 
-func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-
-	f.pendingFlowRemoveDataPerSubscriberLock.Lock()
-	defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
-		if !ok {
-			flowRemoveData = pendingFlowRemoveData{
-				pendingFlowRemoveCount: 0,
-				allFlowsRemoved:        make(chan struct{}),
-			}
-		}
-		flowRemoveData.pendingFlowRemoveCount++
-		f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
-		logger.Debugw(ctx, "current-flow-remove-count–increment",
-			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-				"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-	}
-}
-
-func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-	f.pendingFlowRemoveDataPerSubscriberLock.Lock()
-	defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		} else {
-			if val.pendingFlowRemoveCount > 0 {
-				val.pendingFlowRemoveCount--
-			}
-			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
-				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-					"currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-			// If all flow removes have finished, then close the channel to signal the receiver
-			// to go ahead with flow adds.
-			if val.pendingFlowRemoveCount == 0 {
-				close(val.allFlowsRemoved)
-				delete(f.pendingFlowRemoveDataPerSubscriber, key)
-				return
-			}
-			f.pendingFlowRemoveDataPerSubscriber[key] = val
-		}
-	}
-}
-
-func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		f.pendingFlowRemoveDataPerSubscriberLock.RLock()
-		if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-			f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-			return
-		}
-		f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
-
-		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-	}
-}
-
 // reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
 func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
 	onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 9464830..d8caa38 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -927,37 +927,37 @@
 	actionInfo3 := make(map[string]interface{})
 	classifierInfo4 := make(map[string]interface{})
 	actionInfo4 := make(map[string]interface{})
-	flowState, _ := fu.MkFlowStat(fa)
-	flowState2, _ := fu.MkFlowStat(fa2)
-	flowState3, _ := fu.MkFlowStat(fa3)
-	flowState4, _ := fu.MkFlowStat(fa4)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo, flowState)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo2, flowState2)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo3, flowState3)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo4, flowState4)
+	flow, _ := fu.MkFlowStat(fa)
+	flow2, _ := fu.MkFlowStat(fa2)
+	flow3, _ := fu.MkFlowStat(fa3)
+	flow4, _ := fu.MkFlowStat(fa4)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo2, flow2)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo3, flow3)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo4, flow4)
 
-	err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flowState)
+	err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flow)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flowState2)
+	err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flow2)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flowState3)
+	err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flow3)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flowState4)
+	err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flow4)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
@@ -1035,7 +1035,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo,
 				actionInfo:     actionInfo,
-				flow:           flowState,
+				flow:           flow,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1054,7 +1054,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo2,
 				actionInfo:     actionInfo2,
-				flow:           flowState2,
+				flow:           flow2,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1073,7 +1073,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo3,
 				actionInfo:     actionInfo3,
-				flow:           flowState3,
+				flow:           flow3,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1092,7 +1092,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo4,
 				actionInfo:     actionInfo4,
-				flow:           flowState4,
+				flow:           flow4,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1168,3 +1168,301 @@
 		return
 	}
 }
+
+func TestOpenOltFlowMgr_TestRouteFlowToOnuChannel(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/core", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/pkg/mocks", log.DebugLevel)
+	kw := make(map[string]uint64)
+	kw["table_id"] = 1
+	kw["meter_id"] = 1
+	kw["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+
+	flowMetadata1 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+		{
+			Flags:   5,
+			MeterId: 1,
+			Bands: []*voltha.OfpMeterBandHeader{
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      16000,
+					BurstSize: 30,
+				},
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      32000,
+					BurstSize: 30,
+				},
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      64000,
+					BurstSize: 30,
+				},
+			},
+		},
+	}}
+
+	flowMetadata2 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+		{
+			Flags:   5,
+			MeterId: 2,
+			Bands: []*voltha.OfpMeterBandHeader{
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      16000,
+					BurstSize: 30,
+				},
+			},
+		},
+	}}
+
+	// Downstream LLDP Trap from NNI0 flow
+	fa0 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1048576),
+			fu.EthType(35020),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.Output(4294967293),
+		},
+		KV: kw,
+	}
+
+	// Upstream flow DHCP flow - ONU1 UNI0 PON0
+	fa1 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			fu.IpProto(17), // dhcp
+			fu.VlanPcp(0),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(16),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Upstream EAPOL - ONU1 UNI0 PON0
+	fa2 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			fu.EthType(0x888E),
+			fu.VlanPcp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257),
+			fu.TunnelId(16),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Upstream HSIA - ONU1 UNI0 PON0
+	fa3 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			//fu.EthType(0x8100),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(1048576),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Downstream HSIA - ONU1 UNI0 PON0
+	fa4 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1048576),
+			fu.Metadata_ofp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.VlanPcp(1),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(536870912),
+			fu.PopVlan(),
+		},
+		KV: kw,
+	}
+
+	// Upstream flow DHCP flow - ONU1 UNI0 PON15
+	fa5 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870927),
+			fu.Metadata_ofp(1),
+			fu.IpProto(17), // dhcp
+			fu.VlanPcp(0),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(61456),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+	// Upstream EAPOL - ONU1 UNI0 PON15
+	fa6 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870927),
+			fu.Metadata_ofp(1),
+			fu.EthType(0x888E),
+			fu.VlanPcp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259),
+			fu.TunnelId(61456),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+	flow0, _ := fu.MkFlowStat(fa0)
+	flow1, _ := fu.MkFlowStat(fa1)
+	flow2, _ := fu.MkFlowStat(fa2)
+	flow3, _ := fu.MkFlowStat(fa3)
+	flow4, _ := fu.MkFlowStat(fa4)
+
+	flow5, _ := fu.MkFlowStat(fa5)
+	flow6, _ := fu.MkFlowStat(fa6)
+
+	type args struct {
+		ctx          context.Context
+		flow         *ofp.OfpFlowStats
+		addFlow      bool
+		flowMetadata *voltha.FlowMetadata
+	}
+	tests := []struct {
+		name        string
+		args        args
+		wantErr     bool
+		returnedErr error
+	}{
+		{
+			name: "RouteFlowToOnuChannel-0",
+			args: args{
+				ctx:          ctx,
+				flow:         flow0,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-1",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-2",
+			args: args{
+				ctx:          ctx,
+				flow:         flow2,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-3",
+			args: args{
+				ctx:          ctx,
+				flow:         flow3,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-4",
+			args: args{
+				ctx:          ctx,
+				flow:         flow4,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-5",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      false,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-6",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      true,
+				flowMetadata: &flowMetadata2,
+			},
+			wantErr: true,
+		},
+		{
+			name: "RouteFlowToOnuChannel-7",
+			args: args{
+				ctx:          ctx,
+				flow:         flow5,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-8",
+			args: args{
+				ctx:          ctx,
+				flow:         flow6,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+	}
+
+	var wg sync.WaitGroup
+	defer wg.Wait() // wait for all go routines to complete
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			wg.Add(1) // one per go routine
+			go func() {
+				defer wg.Done()
+				tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+				if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
+					t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
+				}
+			}()
+		})
+	}
+}
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index fd69e94..5901c7c 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -104,19 +104,20 @@
 	ranges["alloc_id_shared"] = uint32(0)
 	ranges["gemport_id_shared"] = uint32(0)
 	ranges["flow_id_shared"] = uint32(0)
-	resMgr.NumOfPonPorts = 2
+	resMgr.NumOfPonPorts = 16
 	ponMgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
-		IntfIDs:  []uint32{1, 2},
+		IntfIDs:  []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
 		KVStore: &db.Backend{
 			Client: &MockResKVClient{},
 		},
 		PonResourceRanges: ranges,
 		SharedIdxByType:   sharedIdxByType,
 	}
-	resMgr.ResourceMgrs[1] = ponMgr
-	resMgr.ResourceMgrs[2] = ponMgr
-
+	var ponIntf uint32
+	for ponIntf = 0; ponIntf < resMgr.NumOfPonPorts; ponIntf++ {
+		resMgr.ResourceMgrs[ponIntf] = ponMgr
+	}
 	return &resMgr
 }
 
diff --git a/pkg/mocks/mockKVClient.go b/pkg/mocks/mockKVClient.go
index a034f87..662e236 100644
--- a/pkg/mocks/mockKVClient.go
+++ b/pkg/mocks/mockKVClient.go
@@ -54,6 +54,8 @@
 	FlowGroupCached = "flow_groups_cached"
 	//OnuPacketIn to extract gem port from packet-in
 	OnuPacketIn = "onu_packetin"
+	// OnuGemInfoPath has path on the kvstore to store OnuGemInfo info per PON interface
+	OnuGemInfoPath = "onu_gem_info"
 )
 
 // MockKVClient mocks the AdapterProxy interface.
@@ -71,6 +73,7 @@
 }
 
 // Get mock function implementation for KVClient
+// nolint: gocyclo
 func (kvclient *MockKVClient) Get(ctx context.Context, key string) (*kvstore.KVPair, error) {
 	logger.Debugw(ctx, "Get of MockKVClient called", log.Fields{"key": key})
 	if key != "" {
@@ -114,7 +117,8 @@
 			return nil, errors.New("invalid meter")
 		}
 		if strings.Contains(key, TpIDPathSuffix) {
-			str, _ := json.Marshal(64)
+			data := []uint32{64}
+			str, _ := json.Marshal(data)
 			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
 		}
 		if strings.Contains(key, FlowIDpool) {
@@ -152,12 +156,14 @@
 		}
 		if strings.Contains(key, GemportIDs) {
 			logger.Debug(ctx, "Error Error Error Key:", GemportIDs)
-			str, _ := json.Marshal(1)
+			data := []uint32{1}
+			str, _ := json.Marshal(data)
 			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
 		}
 		if strings.Contains(key, AllocIDs) {
 			logger.Debug(ctx, "Error Error Error Key:", AllocIDs)
-			str, _ := json.Marshal(1)
+			data := []uint32{1}
+			str, _ := json.Marshal(data)
 			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
 		}
 		if strings.Contains(key, FlowGroup) || strings.Contains(key, FlowGroupCached) {
@@ -169,10 +175,14 @@
 			str, _ := json.Marshal(&groupInfo)
 			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
 		}
-
 		if strings.Contains(key, OnuPacketIn) {
 			return getPacketInGemPort(key)
 		}
+		if strings.Contains(key, OnuGemInfoPath) {
+			var data []resourcemanager.OnuGemInfo
+			str, _ := json.Marshal(data)
+			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+		}
 
 		maps := make(map[string]*kvstore.KVPair)
 		maps[key] = &kvstore.KVPair{Key: key}
@@ -222,7 +232,6 @@
 // Put mock function implementation for KVClient
 func (kvclient *MockKVClient) Put(ctx context.Context, key string, value interface{}) error {
 	if key != "" {
-
 		return nil
 	}
 	return errors.New("key didn't find")
diff --git a/pkg/mocks/mockTechprofile.go b/pkg/mocks/mockTechprofile.go
index 367f5e3..548a6fd 100644
--- a/pkg/mocks/mockTechprofile.go
+++ b/pkg/mocks/mockTechprofile.go
@@ -53,13 +53,37 @@
 
 	if techProfiletblID == 64 {
 		return &tp.TechProfile{
-			Name:                           "mock-tech-profile",
-			SubscriberIdentifier:           "257",
-			ProfileType:                    "mock",
-			Version:                        0,
-			NumGemPorts:                    2,
-			UpstreamGemPortAttributeList:   nil,
-			DownstreamGemPortAttributeList: nil,
+			Name:                 "mock-tech-profile",
+			SubscriberIdentifier: "257",
+			ProfileType:          "mock",
+			Version:              0,
+			NumGemPorts:          1,
+			UsScheduler: tp.IScheduler{
+				AllocID:      1,
+				Direction:    "upstream",
+				AdditionalBw: "None",
+				Priority:     0,
+				Weight:       0,
+				QSchedPolicy: "",
+			},
+			DsScheduler: tp.IScheduler{
+				AllocID:      1,
+				Direction:    "downstream",
+				AdditionalBw: "None",
+				Priority:     0,
+				Weight:       0,
+				QSchedPolicy: "",
+			},
+			UpstreamGemPortAttributeList: []tp.IGemPortAttribute{{
+				GemportID: 1,
+				PbitMap:   "0b11111111",
+			},
+			},
+			DownstreamGemPortAttributeList: []tp.IGemPortAttribute{{
+				GemportID: 1,
+				PbitMap:   "0b11111111",
+			},
+			},
 		}, nil
 	} else if techProfiletblID == 65 {
 		return &tp.EponProfile{
diff --git a/vendor/github.com/EagleChen/mapmutex/README.md b/vendor/github.com/EagleChen/mapmutex/README.md
deleted file mode 100644
index 9ee890d..0000000
--- a/vendor/github.com/EagleChen/mapmutex/README.md
+++ /dev/null
@@ -1,73 +0,0 @@
-# mapmutex
-
-mapmutex is a simple implementation to act as a group of mutex.
-
-## What's it for?
-Synchronization is needed in many cases. But in some cases, you don't want a gaint lock to block totally irrelevant actions. Instead, you need many fine-grained tiny locks to only block on same resource.
-
-Take an example. A website have many users. Each user has a different counter. While one user want to increment the counter at the same time in different devices(say, from a pad and a phone), these increments need to happen one by one. But user A's incremntation has nothing to do with user B's incrementation, they don't have to affect each other.
-This is where this package comes in. You can lock for each user (by using user id as key) without blocking other users.
-
-## Performance
-As shown by the result of benchmark(in `mutex_test.go`), it's several times faster than one giant mutex.
-```
-(11 times faster)
-BenchmarkMutex1000_100_20_20-4          	       1	20164937908 ns/op
-BenchmarkMapMutex1000_100_20_20-4       	       1	1821899222 ns/op 
-
-(7 times faster)
-BenchmarkMutex1000_20_20_20-4           	       1	19726327623 ns/op
-BenchmarkMapMutex1000_20_20_20-4        	       1	2759654813 ns/op
-
-(11 times faster)
-BenchmarkMutex1000_20_40_20-4           	       1	20380128848 ns/op
-BenchmarkMapMutex1000_20_40_20-4        	       1	1828899343 ns/op
-
-(only 2 keys in map, 2 times faster)
-(in case of only one key in map, it's the same as one gaint lock)
-BenchmarkMutex1000_2_40_20-4            	       1	20721092007 ns/op
-BenchmarkMapMutex1000_2_40_20-4         	       1	10818512020 ns/op (989 of 1000 success)
-
-(9 times faster)
-BenchmarkMutex1000_20_40_60-4           	       1	60341833247 ns/op
-BenchmarkMapMutex1000_20_40_60-4        	       1	6240238975 ns/op
-
-(11 times faster)
-BenchmarkMutex10000_20_40_20-4          	       1	205493472245 ns/op
-BenchmarkMapMutex10000_20_40_20-4       	       1	18677416055 ns/op
-```
-
-## How to get
-```
-go get github.com/EagleChen/mapmutex
-```
-
-## How to use
-```
-mutex := mapmutex.NewMapMutex()
-if mutex.TryLock(key) { // for example, key can be user id
-    // do the real job here
-
-    mutex.Unlock(key)
-}
-```
-
-TryLock itself will retry several times to aquire the lock. But in the application level, you can also try several times when the lock cannot be got.
-```
-got := false
-for i := 0; && i < retryTimes; i++ {
-    if got = mutex.TryLock(key); got {
-        break
-    }
-}
-if got {
-    // do the real job here
-
-    mutex.Unlock(key)
-}
-```
-
-## How to tune
-1. Use `NewCustomizedMapMutex` to customize how hard 'TryLock' will try to get the lock. The parameters controls how many times to try, how long to wait before another try when failing to aquire the lock, etc. They may be very different for various use cases.
-
-2. Change some source code for your use case. For general use, `map[interface{}]interface{}` is used for storing 'locks'. But it can be changed to `map[int]bool` if your `key` is `int` and `map[string]bool` if you `key` is `string`. As far as i know, this trick will improve the performance, a little bit.
\ No newline at end of file
diff --git a/vendor/github.com/EagleChen/mapmutex/mutex.go b/vendor/github.com/EagleChen/mapmutex/mutex.go
deleted file mode 100644
index 2555e55..0000000
--- a/vendor/github.com/EagleChen/mapmutex/mutex.go
+++ /dev/null
@@ -1,90 +0,0 @@
-package mapmutex
-
-import (
-	"math/rand"
-	"sync"
-	"time"
-)
-
-// Mutex is the mutex with synchronized map
-// it's for reducing unnecessary locks among different keys
-type Mutex struct {
-	locks     map[interface{}]interface{}
-	m         *sync.Mutex
-	maxRetry  int
-	maxDelay  float64 // in nanosend
-	baseDelay float64 // in nanosecond
-	factor    float64
-	jitter    float64
-}
-
-// TryLock tries to aquire the lock.
-func (m *Mutex) TryLock(key interface{}) (gotLock bool) {
-	for i := 0; i < m.maxRetry; i++ {
-		m.m.Lock()
-		if _, ok := m.locks[key]; ok { // if locked
-			m.m.Unlock()
-			time.Sleep(m.backoff(i))
-		} else { // if unlock, lockit
-			m.locks[key] = struct{}{}
-			m.m.Unlock()
-			return true
-		}
-	}
-
-	return false
-}
-
-// Unlock unlocks for the key
-// please call Unlock only after having aquired the lock
-func (m *Mutex) Unlock(key interface{}) {
-	m.m.Lock()
-	delete(m.locks, key)
-	m.m.Unlock()
-}
-
-// borrowed from grpc
-func (m *Mutex) backoff(retries int) time.Duration {
-	if retries == 0 {
-		return time.Duration(m.baseDelay) * time.Nanosecond
-	}
-	backoff, max := m.baseDelay, m.maxDelay
-	for backoff < max && retries > 0 {
-		backoff *= m.factor
-		retries--
-	}
-	if backoff > max {
-		backoff = max
-	}
-	backoff *= 1 + m.jitter*(rand.Float64()*2-1)
-	if backoff < 0 {
-		return 0
-	}
-	return time.Duration(backoff) * time.Nanosecond
-}
-
-// NewMapMutex returns a mapmutex with default configs
-func NewMapMutex() *Mutex {
-	return &Mutex{
-		locks:     make(map[interface{}]interface{}),
-		m:         &sync.Mutex{},
-		maxRetry:  200,
-		maxDelay:  100000000, // 0.1 second
-		baseDelay: 10,        // 10 nanosecond
-		factor:    1.1,
-		jitter:    0.2,
-	}
-}
-
-// NewCustomizedMapMutex returns a customized mapmutex
-func NewCustomizedMapMutex(mRetry int, mDelay, bDelay, factor, jitter float64) *Mutex {
-	return &Mutex{
-		locks:     make(map[interface{}]interface{}),
-		m:         &sync.Mutex{},
-		maxRetry:  mRetry,
-		maxDelay:  mDelay,
-		baseDelay: bDelay,
-		factor:    factor,
-		jitter:    jitter,
-	}
-}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index ad94dd7..181b84c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1,7 +1,5 @@
 # github.com/DataDog/zstd v1.4.1
 github.com/DataDog/zstd
-# github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d
-github.com/EagleChen/mapmutex
 # github.com/Shopify/sarama v1.23.1
 github.com/Shopify/sarama
 # github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878