VOL-4125: Respond with success/failure to flow insertion according the
          operation result in adapter
VOL-4323: Support serialization of flow messages (execute a flow to
          completion before picking up another flow)

Change-Id: I934fc68b103042971109923491a3f27aaf885328
diff --git a/internal/pkg/avcfg/omci_vlan_config.go b/internal/pkg/avcfg/omci_vlan_config.go
index 4a690f7..03cacf9 100755
--- a/internal/pkg/avcfg/omci_vlan_config.go
+++ b/internal/pkg/avcfg/omci_vlan_config.go
@@ -140,6 +140,7 @@
 	removeChannel    chan bool
 	cookie           uint64 //just the last cookie valid for removal
 	vlanRuleParams   cmn.UniVlanRuleParams
+	respChan         *chan error
 }
 
 //UniVlanConfigFsm defines the structure for the state machine for configuration of the VLAN related setting via OMCI
@@ -162,8 +163,7 @@
 	mutexIsAwaitingResponse     sync.RWMutex
 	mutexFlowParams             sync.RWMutex
 	chCookieDeleted             chan bool //channel to indicate that a specific cookie (related to the active rule) was deleted
-	actualUniVlanConfigRule     cmn.UniVlanRuleParams
-	actualUniVlanConfigMeter    *voltha.OfpMeterConfig
+	actualUniFlowParam          cmn.UniVlanFlowParams
 	uniVlanFlowParamsSlice      []cmn.UniVlanFlowParams
 	uniRemoveFlowsSlice         []uniRemoveVlanFlowParams
 	NumUniFlows                 uint8 // expected number of flows should be less than 12
@@ -190,7 +190,7 @@
 func NewUniVlanConfigFsm(ctx context.Context, apDeviceHandler cmn.IdeviceHandler, apOnuDeviceEntry cmn.IonuDeviceEntry, apDevOmciCC *cmn.OmciCC, apUniPort *cmn.OnuUniPort,
 	apUniTechProf *OnuUniTechProf, apOnuDB *devdb.OnuDeviceDB, aTechProfileID uint8,
 	aRequestEvent cmn.OnuDeviceEvent, aName string, aCommChannel chan cmn.Message, aAcceptIncrementalEvto bool,
-	aCookieSlice []uint64, aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToRec bool, aMeter *voltha.OfpMeterConfig) *UniVlanConfigFsm {
+	aCookieSlice []uint64, aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToRec bool, aMeter *voltha.OfpMeterConfig, respChan *chan error) *UniVlanConfigFsm {
 	instFsm := &UniVlanConfigFsm{
 		pDeviceHandler:              apDeviceHandler,
 		pOnuDeviceEntry:             apOnuDeviceEntry,
@@ -212,6 +212,8 @@
 	if instFsm.PAdaptFsm == nil {
 		logger.Errorw(ctx, "UniVlanConfigFsm's cmn.AdapterFsm could not be instantiated!!", log.Fields{
 			"device-id": instFsm.deviceID})
+		// Push response on the response channel
+		instFsm.PushReponseOnFlowResponseChannel(ctx, respChan, fmt.Errorf("adapter-fsm-could-not-be-instantiated"))
 		return nil
 	}
 	instFsm.PAdaptFsm.PFsm = fsm.NewFSM(
@@ -268,10 +270,12 @@
 	if instFsm.PAdaptFsm.PFsm == nil {
 		logger.Errorw(ctx, "UniVlanConfigFsm's Base FSM could not be instantiated!!", log.Fields{
 			"device-id": instFsm.deviceID})
+		// Push response on the response channel
+		instFsm.PushReponseOnFlowResponseChannel(ctx, respChan, fmt.Errorf("adapter-base-fsm-could-not-be-instantiated"))
 		return nil
 	}
 
-	_ = instFsm.initUniFlowParams(ctx, aTechProfileID, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, aMeter)
+	_ = instFsm.initUniFlowParams(ctx, aTechProfileID, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, aMeter, respChan)
 
 	logger.Debugw(ctx, "UniVlanConfigFsm created", log.Fields{"device-id": instFsm.deviceID,
 		"accIncrEvto": instFsm.acceptIncrementalEvtoOption})
@@ -280,7 +284,7 @@
 
 //initUniFlowParams is a simplified form of SetUniFlowParams() used for first flow parameters configuration
 func (oFsm *UniVlanConfigFsm) initUniFlowParams(ctx context.Context, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aMeter *voltha.OfpMeterConfig) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aMeter *voltha.OfpMeterConfig, respChan *chan error) error {
 	loRuleParams := cmn.UniVlanRuleParams{
 		TpID:     aTpID,
 		MatchVid: uint32(aMatchVlan),
@@ -315,7 +319,7 @@
 		}
 	}
 
-	loFlowParams := cmn.UniVlanFlowParams{VlanRuleParams: loRuleParams}
+	loFlowParams := cmn.UniVlanFlowParams{VlanRuleParams: loRuleParams, RespChan: respChan}
 	loFlowParams.CookieSlice = make([]uint64, 0)
 	loFlowParams.CookieSlice = append(loFlowParams.CookieSlice, aCookieSlice...)
 	if aMeter != nil {
@@ -395,13 +399,14 @@
 // ignore complexity by now
 // nolint: gocyclo
 func (oFsm *UniVlanConfigFsm) SetUniFlowParams(ctx context.Context, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig, respChan *chan error) error {
 	loRuleParams := cmn.UniVlanRuleParams{
 		TpID:     aTpID,
 		MatchVid: uint32(aMatchVlan),
 		SetVid:   uint32(aSetVlan),
 		SetPcp:   uint32(aSetPcp),
 	}
+	var err error
 	// some automatic adjustments on the filter/treat parameters as not specifically configured/ensured by flow configuration parameters
 	loRuleParams.TagsToRemove = 1            //one tag to remove as default setting
 	loRuleParams.MatchPcp = cPrioDoNotFilter // do not Filter on prio as default
@@ -443,14 +448,19 @@
 					logger.Errorw(ctx, "abort UniVlanConfigFsm flow add - inconsistent RemoveFlowsSlice", log.Fields{
 						"device-id": oFsm.deviceID, "slice length": len(oFsm.uniRemoveFlowsSlice)})
 					oFsm.mutexFlowParams.RUnlock()
-					return fmt.Errorf("abort UniVlanConfigFsm flow add - inconsistent RemoveFlowsSlice %s", oFsm.deviceID)
+					err = fmt.Errorf("abort UniVlanConfigFsm flow add - inconsistent RemoveFlowsSlice %s", oFsm.deviceID)
+					oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+					return err
+
 				}
 				pRemoveParams := &oFsm.uniRemoveFlowsSlice[flow] //wants to modify the uniRemoveFlowsSlice element directly!
 				oFsm.mutexFlowParams.RUnlock()
 				if err := oFsm.suspendAddRule(ctx, pRemoveParams); err != nil {
 					logger.Errorw(ctx, "UniVlanConfigFsm suspension on add aborted - abort complete add-request", log.Fields{
 						"device-id": oFsm.deviceID, "cookie": removeUniFlowParams.cookie})
-					return fmt.Errorf("abort UniVlanConfigFsm suspension on add %s", oFsm.deviceID)
+					err = fmt.Errorf("abort UniVlanConfigFsm suspension on add %s", oFsm.deviceID)
+					oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+					return err
 				}
 				oFsm.mutexFlowParams.RLock()
 				break //this specific rule should only exist once per uniRemoveFlowsSlice
@@ -495,7 +505,9 @@
 						if deleteSuccess := oFsm.suspendNewRule(ctx); !deleteSuccess {
 							logger.Errorw(ctx, "UniVlanConfigFsm suspended add-cookie-to-rule aborted", log.Fields{
 								"device-id": oFsm.deviceID, "cookie": delayedCookie})
-							return fmt.Errorf(" UniVlanConfigFsm suspended add-cookie-to-rule aborted %s", oFsm.deviceID)
+							err = fmt.Errorf(" UniVlanConfigFsm suspended add-cookie-to-rule aborted %s", oFsm.deviceID)
+							oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+							return err
 						}
 						flowCookieModify, requestAppendRule = oFsm.reviseFlowConstellation(ctx, delayedCookie, loRuleParams)
 						oFsm.mutexFlowParams.Lock()
@@ -519,7 +531,9 @@
 		if !deleteSuccess {
 			logger.Errorw(ctx, "UniVlanConfigFsm suspended add-new-rule aborted", log.Fields{
 				"device-id": oFsm.deviceID, "cookie": delayedCookie})
-			return fmt.Errorf(" UniVlanConfigFsm suspended add-new-rule aborted %s", oFsm.deviceID)
+			err = fmt.Errorf(" UniVlanConfigFsm suspended add-new-rule aborted %s", oFsm.deviceID)
+			oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+			return err
 		}
 		requestAppendRule = true //default assumption here is that rule is to be appended
 		flowCookieModify = true  //and that the the flow data base is to be updated
@@ -531,7 +545,7 @@
 	if requestAppendRule {
 		oFsm.mutexFlowParams.Lock()
 		if oFsm.NumUniFlows < cMaxAllowedFlows {
-			loFlowParams := cmn.UniVlanFlowParams{VlanRuleParams: loRuleParams}
+			loFlowParams := cmn.UniVlanFlowParams{VlanRuleParams: loRuleParams, RespChan: respChan}
 			loFlowParams.CookieSlice = make([]uint64, 0)
 			loFlowParams.CookieSlice = append(loFlowParams.CookieSlice, aCookieSlice...)
 			if aMeter != nil {
@@ -561,6 +575,9 @@
 					if fsmErr := pConfigVlanStateBaseFsm.Event(VlanEvSkipOmciConfig); fsmErr != nil {
 						logger.Errorw(ctx, "error in FsmEvent handling UniVlanConfigFsm!",
 							log.Fields{"fsmState": oFsm.PAdaptFsm.PFsm.Current(), "error": fsmErr, "device-id": oFsm.deviceID})
+						err = fsmErr
+						oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+						return err
 					}
 				}
 				return nil
@@ -591,14 +608,16 @@
 							log.Fields{"ConfiguredUniFlow": oFsm.ConfiguredUniFlow,
 								"sliceLen": len(oFsm.uniVlanFlowParamsSlice), "device-id": oFsm.deviceID})
 						oFsm.mutexFlowParams.Unlock()
-						return fmt.Errorf("abort UniVlanConfigFsm on add due to internal counter mismatch %s", oFsm.deviceID)
+						err = fmt.Errorf("abort UniVlanConfigFsm on add due to internal counter mismatch %s", oFsm.deviceID)
+						oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+						return err
 					}
-					oFsm.actualUniVlanConfigRule = oFsm.uniVlanFlowParamsSlice[oFsm.ConfiguredUniFlow].VlanRuleParams
-					oFsm.actualUniVlanConfigMeter = oFsm.uniVlanFlowParamsSlice[oFsm.ConfiguredUniFlow].Meter
+
+					oFsm.actualUniFlowParam = oFsm.uniVlanFlowParamsSlice[oFsm.ConfiguredUniFlow]
 					//tpId of the next rule to be configured
-					tpID := oFsm.actualUniVlanConfigRule.TpID
+					tpID := oFsm.actualUniFlowParam.VlanRuleParams.TpID
 					oFsm.TpIDWaitingFor = tpID
-					loSetVlan := oFsm.actualUniVlanConfigRule.SetVid
+					loSetVlan := oFsm.actualUniFlowParam.VlanRuleParams.SetVid
 					//attention: take care to release the mutexFlowParams when calling the FSM directly -
 					//  synchronous FSM 'event/state' functions may rely on this mutex
 					//  but it must be released already before calling getTechProfileDone() as it may already be locked
@@ -620,6 +639,8 @@
 					if fsmErr != nil {
 						logger.Errorw(ctx, "error in FsmEvent handling UniVlanConfigFsm!",
 							log.Fields{"fsmState": pConfigVlanStateBaseFsm.Current(), "error": fsmErr, "device-id": oFsm.deviceID})
+						oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, fsmErr)
+						return fsmErr
 					}
 				}
 			} else {
@@ -631,11 +652,16 @@
 			logger.Errorw(ctx, "UniVlanConfigFsm flow limit exceeded", log.Fields{
 				"device-id": oFsm.deviceID, "flow-number": oFsm.NumUniFlows})
 			oFsm.mutexFlowParams.Unlock()
-			return fmt.Errorf(" UniVlanConfigFsm flow limit exceeded %s", oFsm.deviceID)
+			err = fmt.Errorf(" UniVlanConfigFsm flow limit exceeded %s", oFsm.deviceID)
+			oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, err)
+			return err
 		}
 	} else {
 		// no activity within the FSM for OMCI processing, the deviceReason may be updated immediately
 		kvStoreWrite = true // ensure actual data write to kvStore immediately (no FSM activity)
+		// push response on response channel as there is nothing to be done for this flow
+		oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, nil)
+
 		oFsm.mutexFlowParams.RLock()
 		if oFsm.NumUniFlows == oFsm.ConfiguredUniFlow {
 			//all requested rules really have been configured
@@ -793,7 +819,7 @@
 //RemoveUniFlowParams verifies on existence of flow cookie,
 // if found removes cookie from flow cookie list and if this is empty
 // initiates removal of the flow related configuration from the ONU (via OMCI)
-func (oFsm *UniVlanConfigFsm) RemoveUniFlowParams(ctx context.Context, aCookie uint64) error {
+func (oFsm *UniVlanConfigFsm) RemoveUniFlowParams(ctx context.Context, aCookie uint64, respChan *chan error) error {
 	var deletedCookie uint64
 	flowCookieMatch := false
 	//mutex protection is required for possible concurrent access to FSM members
@@ -809,7 +835,7 @@
 				//remove the cookie from the cookie slice and verify it is getting empty
 				if len(storedUniFlowParams.CookieSlice) == 1 {
 					// had to shift content to function due to sca complexity
-					flowCookieMatch = oFsm.removeRuleComplete(ctx, storedUniFlowParams, aCookie)
+					flowCookieMatch = oFsm.removeRuleComplete(ctx, storedUniFlowParams, aCookie, respChan)
 					//persistencyData write is now part of removeRuleComplete() (on success)
 				} else {
 					flowCookieMatch = true
@@ -833,6 +859,8 @@
 						logger.Debugw(ctx, "UniVlanConfigFsm remaining cookie awaited for deletion before new rule add", log.Fields{
 							"device-id": oFsm.deviceID, "cookie": oFsm.delayNewRuleCookie})
 					}
+					// Push response on the response channel
+					oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, nil)
 					//permanently store the modified flow config for reconcile case and immediately write to KvStore
 					if oFsm.pDeviceHandler != nil {
 						if err := oFsm.pDeviceHandler.StorePersUniFlowConfig(ctx, oFsm.pOnuUniPort.UniID,
@@ -856,6 +884,8 @@
 			// success indication without the need to write to kvStore (no change)
 			go oFsm.pDeviceHandler.DeviceProcStatusUpdate(ctx, cmn.OnuDeviceEvent(oFsm.requestEvent+cDeviceEventOffsetRemoveNoKvStore))
 		}
+		// Push response on the response channel
+		oFsm.PushReponseOnFlowResponseChannel(ctx, respChan, nil)
 		return nil
 	} //unknown cookie
 
@@ -865,7 +895,7 @@
 // removeRuleComplete initiates the complete removal of a VLAN rule (from single cookie element)
 // requires mutexFlowParams to be locked at call
 func (oFsm *UniVlanConfigFsm) removeRuleComplete(ctx context.Context,
-	aUniFlowParams cmn.UniVlanFlowParams, aCookie uint64) bool {
+	aUniFlowParams cmn.UniVlanFlowParams, aCookie uint64, respChan *chan error) bool {
 	pConfigVlanStateBaseFsm := oFsm.PAdaptFsm.PFsm
 	var cancelPendingConfig bool = false
 	var loRemoveParams uniRemoveVlanFlowParams = uniRemoveVlanFlowParams{}
@@ -884,6 +914,7 @@
 		loRemoveParams = uniRemoveVlanFlowParams{
 			vlanRuleParams: aUniFlowParams.VlanRuleParams,
 			cookie:         aCookie,
+			respChan:       respChan,
 		}
 		loRemoveParams.removeChannel = make(chan bool)
 		oFsm.uniRemoveFlowsSlice = append(oFsm.uniRemoveFlowsSlice, loRemoveParams)
@@ -1003,6 +1034,12 @@
 				if aWasConfigured && oFsm.ConfiguredUniFlow > 0 {
 					oFsm.ConfiguredUniFlow--
 				}
+				if !aWasConfigured {
+					// We did not actually process this flow but was removed before that.
+					// Indicate success response for the flow to caller who is blocking on a response
+					oFsm.PushReponseOnFlowResponseChannel(ctx, storedUniFlowParams.RespChan, nil)
+				}
+
 				//cut off the requested flow by slicing out this element
 				oFsm.uniVlanFlowParamsSlice = append(
 					oFsm.uniVlanFlowParamsSlice[:flow], oFsm.uniVlanFlowParamsSlice[flow+1:]...)
@@ -1105,13 +1142,12 @@
 		}
 		//access to uniVlanFlowParamsSlice is done on first element only here per definition
 		//store the actual rule that shall be worked upon in the following transient states
-		oFsm.actualUniVlanConfigRule = oFsm.uniVlanFlowParamsSlice[0].VlanRuleParams
-		oFsm.actualUniVlanConfigMeter = oFsm.uniVlanFlowParamsSlice[0].Meter
-		tpID := oFsm.actualUniVlanConfigRule.TpID
+		oFsm.actualUniFlowParam = oFsm.uniVlanFlowParamsSlice[0]
+		tpID := oFsm.actualUniFlowParam.VlanRuleParams.TpID
 		oFsm.TpIDWaitingFor = tpID
 		//cmp also usage in EVTOCDE create in omci_cc
 		oFsm.evtocdID = cmn.MacBridgeServiceProfileEID + uint16(oFsm.pOnuUniPort.MacBpNo)
-		loSetVlan := oFsm.actualUniVlanConfigRule.SetVid
+		loSetVlan := oFsm.actualUniFlowParam.VlanRuleParams.SetVid
 		//attention: take care to release the mutexFlowParams when calling the FSM directly -
 		//  synchronous FSM 'event/state' functions may rely on this mutex
 		//  but it must be released already before calling getTechProfileDone() as it may already be locked
@@ -1146,7 +1182,7 @@
 	//mutex protection is required for possible concurrent access to FSM members
 	oFsm.mutexFlowParams.Lock()
 	oFsm.TpIDWaitingFor = 0 //reset indication to avoid misinterpretation
-	if oFsm.actualUniVlanConfigRule.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
+	if oFsm.actualUniFlowParam.VlanRuleParams.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
 		// meaning transparent setup - no specific VTFD setting required
 		oFsm.mutexFlowParams.Unlock()
 		logger.Debugw(ctx, "UniVlanConfigFsm: no VTFD config required", log.Fields{
@@ -1160,13 +1196,13 @@
 	} else {
 		// This attribute uniquely identifies each instance of this managed entity. Through an identical ID,
 		// this managed entity is implicitly linked to an instance of the MAC bridge port configuration data ME.
-		vtfdID, _ := cmn.GenerateANISideMBPCDEID(uint16(oFsm.pOnuUniPort.MacBpNo), uint16(oFsm.actualUniVlanConfigRule.TpID))
+		vtfdID, _ := cmn.GenerateANISideMBPCDEID(uint16(oFsm.pOnuUniPort.MacBpNo), uint16(oFsm.actualUniFlowParam.VlanRuleParams.TpID))
 		logger.Debugw(ctx, "UniVlanConfigFsm create VTFD", log.Fields{
 			"EntitytId": strconv.FormatInt(int64(vtfdID), 16),
 			"in state":  e.FSM.Current(), "device-id": oFsm.deviceID,
-			"macBpNo": oFsm.pOnuUniPort.MacBpNo, "TpID": oFsm.actualUniVlanConfigRule.TpID})
+			"macBpNo": oFsm.pOnuUniPort.MacBpNo, "TpID": oFsm.actualUniFlowParam.VlanRuleParams.TpID})
 		// setVid is assumed to be masked already by the caller to 12 bit
-		oFsm.vlanFilterList[0] = uint16(oFsm.actualUniVlanConfigRule.SetVid)
+		oFsm.vlanFilterList[0] = uint16(oFsm.actualUniFlowParam.VlanRuleParams.SetVid)
 		oFsm.mutexFlowParams.Unlock()
 		vtfdFilterList := make([]uint16, cVtfdTableSize) //needed for parameter serialization
 		vtfdFilterList[0] = oFsm.vlanFilterList[0]
@@ -1216,8 +1252,8 @@
 		//This is correct passing scenario
 		if errEvto == nil {
 			oFsm.mutexFlowParams.RLock()
-			tpID := oFsm.actualUniVlanConfigRule.TpID
-			vlanID := oFsm.actualUniVlanConfigRule.SetVid
+			tpID := oFsm.actualUniFlowParam.VlanRuleParams.TpID
+			vlanID := oFsm.actualUniFlowParam.VlanRuleParams.SetVid
 			configuredUniFlows := oFsm.ConfiguredUniFlow
 			// ensure mutexFlowParams not locked before calling some TPProcessing activity (that might already be pending on it)
 			oFsm.mutexFlowParams.RUnlock()
@@ -1233,11 +1269,11 @@
 				}
 			}
 			//If this first flow contains a meter, then create TD for related gems.
-			if oFsm.actualUniVlanConfigMeter != nil {
-				logger.Debugw(ctx, "Creating Traffic Descriptor", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniVlanConfigMeter})
+			if oFsm.actualUniFlowParam.Meter != nil {
+				logger.Debugw(ctx, "Creating Traffic Descriptor", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniFlowParam.Meter})
 				for _, gemPort := range oFsm.pUniTechProf.getBidirectionalGemPortIDsForTP(ctx, oFsm.pOnuUniPort.UniID, tpID) {
-					logger.Debugw(ctx, "Creating Traffic Descriptor for gem", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniVlanConfigMeter, "gem": gemPort})
-					errCreateTrafficDescriptor := oFsm.createTrafficDescriptor(ctx, oFsm.actualUniVlanConfigMeter, tpID,
+					logger.Debugw(ctx, "Creating Traffic Descriptor for gem", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniFlowParam.Meter, "gem": gemPort})
+					errCreateTrafficDescriptor := oFsm.createTrafficDescriptor(ctx, oFsm.actualUniFlowParam.Meter, tpID,
 						oFsm.pOnuUniPort.UniID, gemPort)
 					if errCreateTrafficDescriptor != nil {
 						logger.Errorw(ctx, "Create Traffic Descriptor create failed, aborting Ani Config FSM!",
@@ -1260,6 +1296,10 @@
 	logger.Infow(ctx, "UniVlanConfigFsm config done - checking on more flows", log.Fields{
 		"device-id":         oFsm.deviceID,
 		"overall-uni-rules": oFsm.NumUniFlows, "configured-uni-rules": oFsm.ConfiguredUniFlow})
+	if len(oFsm.uniVlanFlowParamsSlice) > 0 {
+		oFsm.PushReponseOnFlowResponseChannel(ctx, oFsm.actualUniFlowParam.RespChan, nil)
+	}
+
 	pConfigVlanStateAFsm := oFsm.PAdaptFsm
 	if pConfigVlanStateAFsm == nil {
 		oFsm.mutexFlowParams.Unlock()
@@ -1319,12 +1359,11 @@
 			}(pConfigVlanStateAFsm)
 			return
 		}
-		oFsm.actualUniVlanConfigRule = oFsm.uniVlanFlowParamsSlice[oFsm.ConfiguredUniFlow].VlanRuleParams
-		oFsm.actualUniVlanConfigMeter = oFsm.uniVlanFlowParamsSlice[oFsm.ConfiguredUniFlow].Meter
+		oFsm.actualUniFlowParam = oFsm.uniVlanFlowParamsSlice[oFsm.ConfiguredUniFlow]
 		//tpId of the next rule to be configured
-		tpID := oFsm.actualUniVlanConfigRule.TpID
+		tpID := oFsm.actualUniFlowParam.VlanRuleParams.TpID
 		oFsm.TpIDWaitingFor = tpID
-		loSetVlan := oFsm.actualUniVlanConfigRule.SetVid
+		loSetVlan := oFsm.actualUniFlowParam.VlanRuleParams.SetVid
 		//attention: take care to release the mutexFlowParams when calling the FSM directly -
 		//  synchronous FSM 'event/state' functions may rely on this mutex
 		//  but it must be released already before calling getTechProfileDone() as it may already be locked
@@ -1375,7 +1414,7 @@
 		"device-id":          oFsm.deviceID})
 	oFsm.TpIDWaitingFor = 0 //reset indication to avoid misinterpretation
 
-	if oFsm.actualUniVlanConfigRule.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
+	if oFsm.actualUniFlowParam.VlanRuleParams.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
 		// meaning transparent setup - no specific VTFD setting required
 		logger.Debugw(ctx, "UniVlanConfigFsm: no VTFD config required", log.Fields{
 			"in state": e.FSM.Current(), "device-id": oFsm.deviceID})
@@ -1387,14 +1426,14 @@
 		if oFsm.numVlanFilterEntries == 0 {
 			// This attribute uniquely identifies each instance of this managed entity. Through an identical ID,
 			// this managed entity is implicitly linked to an instance of the MAC bridge port configuration data ME.
-			vtfdID, _ := cmn.GenerateANISideMBPCDEID(uint16(oFsm.pOnuUniPort.MacBpNo), uint16(oFsm.actualUniVlanConfigRule.TpID))
+			vtfdID, _ := cmn.GenerateANISideMBPCDEID(uint16(oFsm.pOnuUniPort.MacBpNo), uint16(oFsm.actualUniFlowParam.VlanRuleParams.TpID))
 			//no VTFD yet created
 			logger.Debugw(ctx, "UniVlanConfigFsm create VTFD", log.Fields{
 				"EntitytId": strconv.FormatInt(int64(vtfdID), 16),
 				"device-id": oFsm.deviceID,
-				"macBpNo":   oFsm.pOnuUniPort.MacBpNo, "TpID": oFsm.actualUniVlanConfigRule.TpID})
+				"macBpNo":   oFsm.pOnuUniPort.MacBpNo, "TpID": oFsm.actualUniFlowParam.VlanRuleParams.TpID})
 			// 'SetVid' below is assumed to be masked already by the caller to 12 bit
-			oFsm.vlanFilterList[0] = uint16(oFsm.actualUniVlanConfigRule.SetVid)
+			oFsm.vlanFilterList[0] = uint16(oFsm.actualUniFlowParam.VlanRuleParams.SetVid)
 
 			vtfdFilterList := make([]uint16, cVtfdTableSize) //needed for parameter serialization
 			vtfdFilterList[0] = oFsm.vlanFilterList[0]
@@ -1433,21 +1472,21 @@
 		} else {
 			// This attribute uniquely identifies each instance of this managed entity. Through an identical ID,
 			// this managed entity is implicitly linked to an instance of the MAC bridge port configuration data ME.
-			vtfdID, _ := cmn.GenerateANISideMBPCDEID(uint16(oFsm.pOnuUniPort.MacBpNo), uint16(oFsm.actualUniVlanConfigRule.TpID))
+			vtfdID, _ := cmn.GenerateANISideMBPCDEID(uint16(oFsm.pOnuUniPort.MacBpNo), uint16(oFsm.actualUniFlowParam.VlanRuleParams.TpID))
 
 			logger.Debugw(ctx, "UniVlanConfigFsm set VTFD", log.Fields{
 				"EntitytId": strconv.FormatInt(int64(vtfdID), 16),
 				"device-id": oFsm.deviceID,
-				"macBpNo":   oFsm.pOnuUniPort.MacBpNo, "TpID": oFsm.actualUniVlanConfigRule.TpID})
+				"macBpNo":   oFsm.pOnuUniPort.MacBpNo, "TpID": oFsm.actualUniFlowParam.VlanRuleParams.TpID})
 			// setVid is assumed to be masked already by the caller to 12 bit
 			oFsm.vlanFilterList[oFsm.numVlanFilterEntries] =
-				uint16(oFsm.actualUniVlanConfigRule.SetVid)
+				uint16(oFsm.actualUniFlowParam.VlanRuleParams.SetVid)
 			vtfdFilterList := make([]uint16, cVtfdTableSize) //needed for parameter serialization
 
 			// FIXME: VOL-3685: Issues with resetting a table entry in EVTOCD ME
 			// VTFD has to be created afresh with a new entity ID that has the same entity ID as the MBPCD ME for every
 			// new vlan associated with a different TP.
-			vtfdFilterList[0] = uint16(oFsm.actualUniVlanConfigRule.SetVid)
+			vtfdFilterList[0] = uint16(oFsm.actualUniFlowParam.VlanRuleParams.SetVid)
 
 			oFsm.numVlanFilterEntries++
 			meParams := me.ParamData{
@@ -1496,20 +1535,20 @@
 	oFsm.mutexFlowParams.Unlock()
 	go func() {
 		oFsm.mutexFlowParams.RLock()
-		tpID := oFsm.actualUniVlanConfigRule.TpID
-		ConfiguredUniFlow := oFsm.ConfiguredUniFlow
+		tpID := oFsm.actualUniFlowParam.VlanRuleParams.TpID
+		configuredUniFlow := oFsm.ConfiguredUniFlow
 		// ensure mutexFlowParams not locked before calling some TPProcessing activity (that might already be pending on it)
 		oFsm.mutexFlowParams.RUnlock()
-		errEvto := oFsm.performConfigEvtocdEntries(ctx, ConfiguredUniFlow)
+		errEvto := oFsm.performConfigEvtocdEntries(ctx, configuredUniFlow)
 		//This is correct passing scenario
 		if errEvto == nil {
 			//TODO Possibly insert new state for multicast --> possibly another jira/later time.
 			for _, gemPort := range oFsm.pUniTechProf.getMulticastGemPorts(ctx, oFsm.pOnuUniPort.UniID, uint8(tpID)) {
 				oFsm.mutexFlowParams.RLock()
-				vlanID := oFsm.actualUniVlanConfigRule.SetVid
+				vlanID := oFsm.actualUniFlowParam.VlanRuleParams.SetVid
 				logger.Infow(ctx, "Setting multicast MEs for additional flows", log.Fields{"deviceID": oFsm.deviceID,
 					"techProfile": tpID, "gemPort": gemPort,
-					"vlanID": vlanID, "ConfiguredUniFlow": ConfiguredUniFlow})
+					"vlanID": vlanID, "ConfiguredUniFlow": configuredUniFlow})
 				oFsm.mutexFlowParams.RUnlock()
 				errCreateAllMulticastME := oFsm.performSettingMulticastME(ctx, tpID, gemPort, vlanID)
 				if errCreateAllMulticastME != nil {
@@ -1519,10 +1558,10 @@
 				}
 			}
 			//If this incremental flow contains a meter, then create TD for related gems.
-			if oFsm.actualUniVlanConfigMeter != nil {
+			if oFsm.actualUniFlowParam.Meter != nil {
 				for _, gemPort := range oFsm.pUniTechProf.getBidirectionalGemPortIDsForTP(ctx, oFsm.pOnuUniPort.UniID, tpID) {
-					logger.Debugw(ctx, "Creating Traffic Descriptor for gem", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniVlanConfigMeter, "gem": gemPort})
-					errCreateTrafficDescriptor := oFsm.createTrafficDescriptor(ctx, oFsm.actualUniVlanConfigMeter, tpID,
+					logger.Debugw(ctx, "Creating Traffic Descriptor for gem", log.Fields{"device-id": oFsm.deviceID, "meter": oFsm.actualUniFlowParam.Meter, "gem": gemPort})
+					errCreateTrafficDescriptor := oFsm.createTrafficDescriptor(ctx, oFsm.actualUniFlowParam.Meter, tpID,
 						oFsm.pOnuUniPort.UniID, gemPort)
 					if errCreateTrafficDescriptor != nil {
 						logger.Errorw(ctx, "Create Traffic Descriptor create failed, aborting Ani Config FSM!",
@@ -1726,6 +1765,9 @@
 		"in state": e.FSM.Current(), "device-id": oFsm.deviceID,
 		"removed cookie": deletedCookie, "waitForDeleteCookie": oFsm.delayNewRuleCookie})
 
+	// Store the reference to the flow response channel before this entry in the slice is deleted
+	flowRespChan := oFsm.uniRemoveFlowsSlice[0].respChan
+
 	if len(oFsm.uniRemoveFlowsSlice) <= 1 {
 		oFsm.uniRemoveFlowsSlice = nil //reset the slice
 		logger.Debugw(ctx, "UniVlanConfigFsm flow removal - last remove-flow deleted", log.Fields{
@@ -1768,6 +1810,9 @@
 		}
 	}
 	oFsm.mutexFlowParams.Unlock()
+
+	// send response on the response channel for the removed flow.
+	oFsm.PushReponseOnFlowResponseChannel(ctx, flowRespChan, nil)
 }
 
 func (oFsm *UniVlanConfigFsm) enterResetting(ctx context.Context, e *fsm.Event) {
@@ -1817,6 +1862,8 @@
 				removeChannel <- false
 				oFsm.mutexFlowParams.RLock()
 			}
+			// Send response on response channel if the caller is waiting on it.
+			oFsm.PushReponseOnFlowResponseChannel(ctx, removeUniFlowParams.respChan, fmt.Errorf("internal-error"))
 		}
 	}
 
@@ -1828,6 +1875,10 @@
 		if oFsm.clearPersistency {
 			//permanently remove possibly stored persistent data
 			if len(oFsm.uniVlanFlowParamsSlice) > 0 {
+				for _, vlanRule := range oFsm.uniVlanFlowParamsSlice {
+					// Send response on response channel if the caller is waiting on it.
+					oFsm.PushReponseOnFlowResponseChannel(ctx, vlanRule.RespChan, fmt.Errorf("internal-error"))
+				}
 				var emptySlice = make([]cmn.UniVlanFlowParams, 0)
 				_ = oFsm.pDeviceHandler.StorePersUniFlowConfig(ctx, oFsm.pOnuUniPort.UniID, &emptySlice, true) //ignore errors
 			}
@@ -2138,7 +2189,7 @@
 	} //first flow element
 
 	oFsm.mutexFlowParams.RLock()
-	if oFsm.actualUniVlanConfigRule.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
+	if oFsm.actualUniFlowParam.VlanRuleParams.SetVid == uint32(of.OfpVlanId_OFPVID_PRESENT) {
 		//transparent transmission required
 		oFsm.mutexFlowParams.RUnlock()
 		logger.Debugw(ctx, "UniVlanConfigFsm Tx Set::EVTOCD single tagged transparent rule", log.Fields{
@@ -2201,10 +2252,10 @@
 	} else {
 		// according to py-code acceptIncrementalEvto program option decides upon stacking or translation scenario
 		if oFsm.acceptIncrementalEvtoOption {
-			matchPcp := oFsm.actualUniVlanConfigRule.MatchPcp
-			matchVid := oFsm.actualUniVlanConfigRule.MatchVid
-			setPcp := oFsm.actualUniVlanConfigRule.SetPcp
-			setVid := oFsm.actualUniVlanConfigRule.SetVid
+			matchPcp := oFsm.actualUniFlowParam.VlanRuleParams.MatchPcp
+			matchVid := oFsm.actualUniFlowParam.VlanRuleParams.MatchVid
+			setPcp := oFsm.actualUniFlowParam.VlanRuleParams.SetPcp
+			setVid := oFsm.actualUniFlowParam.VlanRuleParams.SetVid
 			// this defines VID translation scenario: singletagged->singletagged (if not transparent)
 			logger.Debugw(ctx, "UniVlanConfigFsm Tx Set::EVTOCD single tagged translation rule", log.Fields{
 				"match-pcp": matchPcp, "match-vid": matchVid, "set-pcp": setPcp, "set-vid:": setVid, "device-id": oFsm.deviceID})
@@ -2216,20 +2267,20 @@
 					cDoNotFilterTPID<<cFilterTpidOffset) // Do not filter on outer TPID field
 
 			binary.BigEndian.PutUint32(sliceEvtocdRule[cFilterInnerOffset:],
-				oFsm.actualUniVlanConfigRule.MatchPcp<<cFilterPrioOffset| // either DNFonPrio or ignore tag (default) on innerVLAN
-					oFsm.actualUniVlanConfigRule.MatchVid<<cFilterVidOffset| // either DNFonVid or real filter VID
+				oFsm.actualUniFlowParam.VlanRuleParams.MatchPcp<<cFilterPrioOffset| // either DNFonPrio or ignore tag (default) on innerVLAN
+					oFsm.actualUniFlowParam.VlanRuleParams.MatchVid<<cFilterVidOffset| // either DNFonVid or real filter VID
 					cDoNotFilterTPID<<cFilterTpidOffset| // Do not filter on inner TPID field
 					cDoNotFilterEtherType<<cFilterEtherTypeOffset) // Do not filter of EtherType
 
 			binary.BigEndian.PutUint32(sliceEvtocdRule[cTreatOuterOffset:],
-				oFsm.actualUniVlanConfigRule.TagsToRemove<<cTreatTTROffset| // either 1 or 0
+				oFsm.actualUniFlowParam.VlanRuleParams.TagsToRemove<<cTreatTTROffset| // either 1 or 0
 					cDoNotAddPrio<<cTreatPrioOffset| // do not add outer tag
 					cDontCareVid<<cTreatVidOffset| // Outer VID don't care
 					cDontCareTpid<<cTreatTpidOffset) // Outer TPID field don't care
 
 			binary.BigEndian.PutUint32(sliceEvtocdRule[cTreatInnerOffset:],
-				oFsm.actualUniVlanConfigRule.SetPcp<<cTreatPrioOffset| // as configured in flow
-					oFsm.actualUniVlanConfigRule.SetVid<<cTreatVidOffset| //as configured in flow
+				oFsm.actualUniFlowParam.VlanRuleParams.SetPcp<<cTreatPrioOffset| // as configured in flow
+					oFsm.actualUniFlowParam.VlanRuleParams.SetVid<<cTreatVidOffset| //as configured in flow
 					cSetOutputTpidCopyDei<<cTreatTpidOffset) // Set TPID = 0x8100
 			oFsm.mutexFlowParams.RUnlock()
 
@@ -2291,7 +2342,7 @@
 				binary.BigEndian.PutUint32(sliceEvtocdRule[cTreatInnerOffset:],
 					0<<cTreatPrioOffset| // vlan prio set to 0
 						//   (as done in Py code, maybe better option would be setPcp here, which still could be 0?)
-						oFsm.actualUniVlanConfigRule.SetVid<<cTreatVidOffset| // Outer VID don't care
+						oFsm.actualUniFlowParam.VlanRuleParams.SetVid<<cTreatVidOffset| // Outer VID don't care
 						cSetOutputTpidCopyDei<<cTreatTpidOffset) // Set TPID = 0x8100
 
 				oFsm.mutexFlowParams.RUnlock()
@@ -2354,7 +2405,7 @@
 				binary.BigEndian.PutUint32(sliceEvtocdRule[cTreatInnerOffset:],
 					cCopyPrioFromInner<<cTreatPrioOffset| // vlan copy from PrioTag
 						//   (as done in Py code, maybe better option would be setPcp here, which still could be PrioCopy?)
-						oFsm.actualUniVlanConfigRule.SetVid<<cTreatVidOffset| // Outer VID as configured
+						oFsm.actualUniFlowParam.VlanRuleParams.SetVid<<cTreatVidOffset| // Outer VID as configured
 						cSetOutputTpidCopyDei<<cTreatTpidOffset) // Set TPID = 0x8100
 				oFsm.mutexFlowParams.RUnlock()
 
@@ -3121,3 +3172,15 @@
 		oFsm.numVlanFilterEntries++
 	}
 }
+
+// PushReponseOnFlowResponseChannel pushes response on the response channel if available
+func (oFsm *UniVlanConfigFsm) PushReponseOnFlowResponseChannel(ctx context.Context, respChan *chan error, err error) {
+	if respChan != nil {
+		// Do it in a non blocking fashion, so that in case the flow handler routine has shutdown for any reason, we do not block here
+		select {
+		case *respChan <- err:
+			logger.Debugw(ctx, "submitted-response-for-flow", log.Fields{"device-id": oFsm.deviceID, "err": err})
+		default:
+		}
+	}
+}
diff --git a/internal/pkg/common/defines.go b/internal/pkg/common/defines.go
index 2c597c1..23a16dd 100755
--- a/internal/pkg/common/defines.go
+++ b/internal/pkg/common/defines.go
@@ -314,6 +314,7 @@
 	CookieSlice    []uint64               `json:"cookie_slice"`
 	VlanRuleParams UniVlanRuleParams      `json:"vlan_rule_params"`
 	Meter          *voltha.OfpMeterConfig `json:"flow_meter"`
+	RespChan       *chan error            `json:"-"`
 }
 
 ///////////////////////////////////////////////////////////
diff --git a/internal/pkg/common/interfaces.go b/internal/pkg/common/interfaces.go
index a15bf35..ef5e004 100755
--- a/internal/pkg/common/interfaces.go
+++ b/internal/pkg/common/interfaces.go
@@ -76,6 +76,8 @@
 	GetAlarmManagerIsRunning(context.Context) bool
 	StartAlarmManager(context.Context)
 
+	GetFlowMonitoringIsRunning(uniID uint8) bool
+
 	CheckAuditStartCondition(context.Context, UsedOmciConfigFsms) bool
 
 	RemoveOnuUpgradeFsm(context.Context, *voltha.ImageState)
@@ -112,6 +114,8 @@
 
 	SendOMCIRequest(context.Context, string, *ic.OmciMessage) error
 	CreatePortInCore(context.Context, *voltha.Port) error
+
+	PerOnuFlowHandlerRoutine(uniID uint8)
 }
 
 // IonuDeviceEntry interface to onuDeviceEntry
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index aff1de3..98c7eac 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -71,6 +71,7 @@
 	GrpcAddress                 string
 	CoreEndpoint                string
 	RPCTimeout                  time.Duration
+	MaxConcurrentFlowsPerUni    int
 }
 
 // ParseCommandArguments parses the arguments when running read-write adaptercore service
@@ -270,6 +271,10 @@
 		"max_retry_delay",
 		10*time.Second,
 		"The maximum number of milliseconds to delay before a connection retry attempt")
+	fs.IntVar(&(so.MaxConcurrentFlowsPerUni),
+		"max_concurrent_flows_per_uni",
+		16,
+		"The max number of concurrent flows (add/remove) that can be queued per UNI")
 
 	_ = fs.Parse(args)
 	containerName := getContainerInfo()
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 449d96e..b84bb83 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -120,6 +120,16 @@
 	cSkipOnuConfigReconciling
 )
 
+// FlowCb is the flow control block containing flow add/delete information along with a response channel
+type FlowCb struct {
+	ctx          context.Context // Flow handler context
+	addFlow      bool            // if true flow to be added, else removed
+	flowItem     *of.OfpFlowStats
+	uniPort      *cmn.OnuUniPort
+	flowMetaData *voltha.FlowMetadata
+	respChan     *chan error // channel to report the Flow handling error
+}
+
 //deviceHandler will interact with the ONU ? device.
 type deviceHandler struct {
 	DeviceID         string
@@ -186,6 +196,11 @@
 	mutexDeletionInProgressFlag sync.RWMutex
 	pLastUpgradeImageState      *voltha.ImageState
 	upgradeFsmChan              chan struct{}
+
+	flowCbChan                     []chan FlowCb
+	mutexFlowMonitoringRoutineFlag sync.RWMutex
+	stopFlowMonitoringRoutine      []chan bool // length of slice equal to number of uni ports
+	isFlowMonitoringRoutineActive  []bool      // length of slice equal to number of uni ports
 }
 
 //newDeviceHandler creates a new device handler
@@ -532,8 +547,9 @@
 func (dh *deviceHandler) FlowUpdateIncremental(ctx context.Context,
 	apOfFlowChanges *of.FlowChanges,
 	apOfGroupChanges *of.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.DeviceID, "metadata": apFlowMetaData})
-	var retError error = nil
+	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.DeviceID, "flow": apOfFlowChanges, "metadata": apFlowMetaData})
+	var errorsList []error
+	var retError error
 	//Remove flows (always remove flows first - remove old and add new with same cookie may be part of the same request)
 	if apOfFlowChanges.ToRemove != nil {
 		for _, flowItem := range apOfFlowChanges.ToRemove.Items {
@@ -541,12 +557,14 @@
 				logger.Warnw(ctx, "flow-remove no cookie: ignore and continuing on checking further flows", log.Fields{
 					"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-remove no cookie, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 			}
 			flowInPort := flow.GetInPort(flowItem)
 			if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
 				logger.Warnw(ctx, "flow-remove inPort invalid: ignore and continuing on checking further flows", log.Fields{"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-remove inPort invalid, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 				//return fmt.Errorf("flow inPort invalid: %s", dh.DeviceID)
 			} else if flowInPort == dh.ponPortNumber {
@@ -564,22 +582,43 @@
 						log.Fields{"device-id": dh.DeviceID, "inPort": flowInPort})
 					retError = fmt.Errorf("flow-remove inPort not found in UniPorts, inPort %d, device-id %s",
 						flowInPort, dh.DeviceID)
+					errorsList = append(errorsList, retError)
 					continue
 				}
 				flowOutPort := flow.GetOutPort(flowItem)
 				logger.Debugw(ctx, "flow-remove port indications", log.Fields{
 					"device-id": dh.DeviceID, "inPort": flowInPort, "outPort": flowOutPort,
 					"uniPortName": loUniPort.Name})
-				err := dh.removeFlowItemFromUniPort(ctx, flowItem, loUniPort)
-				//try next flow after processing error
-				if err != nil {
-					logger.Warnw(ctx, "flow-remove processing error: continuing on checking further flows",
-						log.Fields{"device-id": dh.DeviceID, "error": err})
-					retError = err
-					continue
-					//return err
-				} else { // if last setting succeeds, overwrite possibly previously set error
-					retError = nil
+
+				if dh.GetFlowMonitoringIsRunning(loUniPort.UniID) {
+					// Step1 : Fill flowControlBlock
+					// Step2 : Push the flowControlBlock to ONU channel
+					// Step3 : Wait on response channel for response
+					// Step4 : Return error value
+					startTime := time.Now()
+					respChan := make(chan error)
+					flowCb := FlowCb{
+						ctx:          ctx,
+						addFlow:      false,
+						flowItem:     flowItem,
+						flowMetaData: nil,
+						uniPort:      loUniPort,
+						respChan:     &respChan,
+					}
+					dh.flowCbChan[loUniPort.UniID] <- flowCb
+					logger.Infow(ctx, "process-flow-remove-start", log.Fields{"device-id": dh.DeviceID})
+					// Wait on the channel for flow handlers return value
+					retError = <-respChan
+					logger.Infow(ctx, "process-flow-remove-end", log.Fields{"device-id": dh.DeviceID, "err": retError, "totalTimeSeconds": time.Since(startTime).Seconds()})
+					if retError != nil {
+						logger.Warnw(ctx, "flow-delete processing error: continuing on checking further flows",
+							log.Fields{"device-id": dh.DeviceID, "error": retError})
+						errorsList = append(errorsList, retError)
+						continue
+					}
+				} else {
+					retError = fmt.Errorf("flow-handler-routine-not-active-for-onu--device-id-%v", dh.DeviceID)
+					errorsList = append(errorsList, retError)
 				}
 			}
 		}
@@ -590,12 +629,14 @@
 				logger.Debugw(ctx, "incremental flow-add no cookie: ignore and continuing on checking further flows", log.Fields{
 					"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-add no cookie, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 			}
 			flowInPort := flow.GetInPort(flowItem)
 			if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
 				logger.Warnw(ctx, "flow-add inPort invalid: ignore and continuing on checking further flows", log.Fields{"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-add inPort invalid, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 				//return fmt.Errorf("flow inPort invalid: %s", dh.DeviceID)
 			} else if flowInPort == dh.ponPortNumber {
@@ -613,8 +654,8 @@
 						log.Fields{"device-id": dh.DeviceID, "inPort": flowInPort})
 					retError = fmt.Errorf("flow-add inPort not found in UniPorts, inPort %d, device-id %s",
 						flowInPort, dh.DeviceID)
+					errorsList = append(errorsList, retError)
 					continue
-					//return fmt.Errorf("flow-parameter inPort %d not found in internal UniPorts", flowInPort)
 				}
 				// let's still assume that we receive the flow-add only in some 'active' device state (as so far observed)
 				// if not, we just throw some error here to have an indication about that, if we really need to support that
@@ -625,28 +666,53 @@
 				if !dh.IsReadyForOmciConfig() {
 					logger.Errorw(ctx, "flow-add rejected: improper device state", log.Fields{"device-id": dh.DeviceID,
 						"last device-reason": dh.GetDeviceReasonString()})
-					return fmt.Errorf("improper device state on device %s", dh.DeviceID)
+					retError = fmt.Errorf("improper device state on device %s", dh.DeviceID)
+					errorsList = append(errorsList, retError)
+					continue
 				}
 
 				flowOutPort := flow.GetOutPort(flowItem)
 				logger.Debugw(ctx, "flow-add port indications", log.Fields{
 					"device-id": dh.DeviceID, "inPort": flowInPort, "outPort": flowOutPort,
 					"uniPortName": loUniPort.Name})
-				err := dh.addFlowItemToUniPort(ctx, flowItem, loUniPort, apFlowMetaData)
-				//try next flow after processing error
-				if err != nil {
-					logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
-						log.Fields{"device-id": dh.DeviceID, "error": err})
-					retError = err
-					continue
-					//return err
-				} else { // if last setting succeeds, overwrite possibly previously set error
-					retError = nil
+				if dh.GetFlowMonitoringIsRunning(loUniPort.UniID) {
+					// Step1 : Fill flowControlBlock
+					// Step2 : Push the flowControlBlock to ONU channel
+					// Step3 : Wait on response channel for response
+					// Step4 : Return error value
+					startTime := time.Now()
+					respChan := make(chan error)
+					flowCb := FlowCb{
+						ctx:          ctx,
+						addFlow:      true,
+						flowItem:     flowItem,
+						flowMetaData: apFlowMetaData,
+						uniPort:      loUniPort,
+						respChan:     &respChan,
+					}
+					dh.flowCbChan[loUniPort.UniID] <- flowCb
+					logger.Infow(ctx, "process-flow-add-start", log.Fields{"device-id": dh.DeviceID})
+					// Wait on the channel for flow handlers return value
+					retError = <-respChan
+					logger.Infow(ctx, "process-flow-add-end", log.Fields{"device-id": dh.DeviceID, "err": retError, "totalTimeSeconds": time.Since(startTime).Seconds()})
+					if retError != nil {
+						logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
+							log.Fields{"device-id": dh.DeviceID, "error": retError})
+						errorsList = append(errorsList, retError)
+						continue
+					}
+				} else {
+					retError = fmt.Errorf("flow-handler-routine-not-active-for-onu--device-id-%v", dh.DeviceID)
+					errorsList = append(errorsList, retError)
 				}
 			}
 		}
 	}
-	return retError
+	if len(errorsList) > 0 {
+		logger.Errorw(ctx, "error-processing-flow", log.Fields{"device-id": dh.DeviceID, "errList": errorsList})
+		return fmt.Errorf("errors-installing-one-or-more-flows-groups, errors:%v", errorsList)
+	}
+	return nil
 }
 
 //disableDevice locks the ONU and its UNI/VEIP ports (admin lock via OMCI)
@@ -939,13 +1005,13 @@
 			if _, exist = dh.UniVlanConfigFsmMap[uniData.PersUniID]; exist {
 				if err := dh.UniVlanConfigFsmMap[uniData.PersUniID].SetUniFlowParams(ctx, flowData.VlanRuleParams.TpID,
 					flowData.CookieSlice, uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
-					uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter); err != nil {
+					uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter, nil); err != nil {
 					logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.DeviceID})
 				}
 			} else {
 				if err := dh.createVlanFilterFsm(ctx, uniPort, flowData.VlanRuleParams.TpID, flowData.CookieSlice,
 					uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
-					uint8(flowData.VlanRuleParams.SetPcp), cmn.OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter); err != nil {
+					uint8(flowData.VlanRuleParams.SetPcp), cmn.OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter, nil); err != nil {
 					logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.DeviceID})
 				}
 			}
@@ -2025,6 +2091,8 @@
 		dh.pSelfTestHdlr.StopSelfTestModule <- true
 	}
 
+	// Note: We want flow deletes to be processed on onu down, so do not stop flow monitoring routines
+
 	//reset a possibly running upgrade FSM
 	//  (note the Upgrade FSM may stay alive e.g. in state UpgradeStWaitForCommit to endure the ONU reboot)
 	dh.lockUpgradeFsm.RLock()
@@ -2146,6 +2214,16 @@
 		go dh.StartAlarmManager(ctx)
 	}
 
+	// Start flow handler routines per UNI
+	for _, uniPort := range dh.uniEntityMap {
+		// only if this port was enabled for use by the operator at startup
+		if (1<<uniPort.UniID)&dh.pOpenOnuAc.config.UniPortMask == (1 << uniPort.UniID) {
+			if !dh.GetFlowMonitoringIsRunning(uniPort.UniID) {
+				go dh.PerOnuFlowHandlerRoutine(uniPort.UniID)
+			}
+		}
+	}
+
 	// Initialize classical L2 PM Interval Counters
 	if err := dh.pOnuMetricsMgr.PAdaptFsm.PFsm.Event(pmmgr.L2PmEventInit); err != nil {
 		// There is no way we should be landing here, but if we do then
@@ -2405,14 +2483,14 @@
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.DeviceID})
 		return
 	}
-	i := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
+	uniCnt := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
 	if pptpInstKeys := pDevEntry.GetOnuDB().GetSortedInstKeys(
 		ctx, me.PhysicalPathTerminationPointEthernetUniClassID); len(pptpInstKeys) > 0 {
 		for _, mgmtEntityID := range pptpInstKeys {
 			logger.Debugw(ctx, "Add PPTPEthUni port for MIB-stored instance:", log.Fields{
 				"device-id": dh.DeviceID, "PPTPEthUni EntityID": mgmtEntityID})
-			dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniPPTP)
-			i++
+			dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniPPTP)
+			uniCnt++
 		}
 	} else {
 		logger.Debugw(ctx, "No PPTP instances found", log.Fields{"device-id": dh.DeviceID})
@@ -2422,8 +2500,8 @@
 		for _, mgmtEntityID := range veipInstKeys {
 			logger.Debugw(ctx, "Add VEIP for MIB-stored instance:", log.Fields{
 				"device-id": dh.DeviceID, "VEIP EntityID": mgmtEntityID})
-			dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniVEIP)
-			i++
+			dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniVEIP)
+			uniCnt++
 		}
 	} else {
 		logger.Debugw(ctx, "No VEIP instances found", log.Fields{"device-id": dh.DeviceID})
@@ -2433,14 +2511,23 @@
 		for _, mgmtEntityID := range potsInstKeys {
 			logger.Debugw(ctx, "Add PPTP Pots UNI for MIB-stored instance:", log.Fields{
 				"device-id": dh.DeviceID, "PPTP Pots UNI EntityID": mgmtEntityID})
-			dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniPPTPPots)
-			i++
+			dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniPPTPPots)
+			uniCnt++
 		}
 	} else {
 		logger.Debugw(ctx, "No PPTP Pots UNI instances found", log.Fields{"device-id": dh.DeviceID})
 	}
-	if i == 0 {
+	if uniCnt == 0 {
 		logger.Warnw(ctx, "No UniG instances found", log.Fields{"device-id": dh.DeviceID})
+		return
+	}
+
+	dh.flowCbChan = make([]chan FlowCb, uniCnt)
+	dh.stopFlowMonitoringRoutine = make([]chan bool, uniCnt)
+	dh.isFlowMonitoringRoutineActive = make([]bool, uniCnt)
+	for i := 0; i < int(uniCnt); i++ {
+		dh.flowCbChan[i] = make(chan FlowCb, dh.pOpenOnuAc.config.MaxConcurrentFlowsPerUni)
+		dh.stopFlowMonitoringRoutine[i] = make(chan bool)
 	}
 }
 
@@ -2931,7 +3018,7 @@
 
 //addFlowItemToUniPort parses the actual flow item to add it to the UniPort
 func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort,
-	apFlowMetaData *voltha.FlowMetadata) error {
+	apFlowMetaData *voltha.FlowMetadata, respChan *chan error) {
 	var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE)      //noValidEntry
 	var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
 	var loAddPcp, loSetPcp uint8
@@ -2950,7 +3037,7 @@
 	if metadata == 0 {
 		logger.Debugw(ctx, "flow-add invalid metadata - abort",
 			log.Fields{"device-id": dh.DeviceID})
-		return fmt.Errorf("flow-add invalid metadata: %s", dh.DeviceID)
+		*respChan <- fmt.Errorf("flow-add invalid metadata: %s", dh.DeviceID)
 	}
 	loTpID := uint8(flow.GetTechProfileIDFromWriteMetaData(ctx, metadata))
 	loCookie := apFlowItem.GetCookie()
@@ -2977,7 +3064,7 @@
 			"match_vid": strconv.FormatInt(int64(loMatchVlan), 16)})
 		//TODO!!: Use DeviceId within the error response to rwCore
 		//  likewise also in other error response cases to calling components as requested in [VOL-3458]
-		return fmt.Errorf("flow-add Set/Match VlanId inconsistent: %s", dh.DeviceID)
+		*respChan <- fmt.Errorf("flow-add Set/Match VlanId inconsistent: %s", dh.DeviceID)
 	}
 	if loSetVlan == uint16(of.OfpVlanId_OFPVID_NONE) && loMatchVlan == uint16(of.OfpVlanId_OFPVID_PRESENT) {
 		logger.Debugw(ctx, "flow-add vlan-any/copy", log.Fields{"device-id": dh.DeviceID})
@@ -3005,23 +3092,26 @@
 	if _, exist := dh.UniVlanConfigFsmMap[apUniPort.UniID]; exist {
 		//SetUniFlowParams() may block on some rule that is suspended-to-add
 		//  in order to allow for according flow removal lockVlanConfig may only be used with RLock here
-		err := dh.UniVlanConfigFsmMap[apUniPort.UniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
-			loMatchVlan, loSetVlan, loSetPcp, false, meter)
+		// Also the error is returned to caller via response channel
+		_ = dh.UniVlanConfigFsmMap[apUniPort.UniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
+			loMatchVlan, loSetVlan, loSetPcp, false, meter, respChan)
 		dh.lockVlanConfig.RUnlock()
 		dh.lockVlanAdd.Unlock() //re-admit new Add-flow-processing
-		return err
+		return
 	}
 	dh.lockVlanConfig.RUnlock()
 	dh.lockVlanConfig.Lock() //createVlanFilterFsm should always be a non-blocking operation and requires r+w lock
 	err := dh.createVlanFilterFsm(ctx, apUniPort, loTpID, loCookieSlice,
-		loMatchVlan, loSetVlan, loSetPcp, cmn.OmciVlanFilterAddDone, false, meter)
+		loMatchVlan, loSetVlan, loSetPcp, cmn.OmciVlanFilterAddDone, false, meter, respChan)
 	dh.lockVlanConfig.Unlock()
 	dh.lockVlanAdd.Unlock() //re-admit new Add-flow-processing
-	return err
+	if err != nil {
+		*respChan <- err
+	}
 }
 
 //removeFlowItemFromUniPort parses the actual flow item to remove it from the UniPort
-func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort) error {
+func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort, respChan *chan error) {
 	//optimization and assumption: the flow cookie uniquely identifies the flow and with that the internal rule
 	//hence only the cookie is used here to find the relevant flow and possibly remove the rule
 	//no extra check is done on the rule parameters
@@ -3054,22 +3144,30 @@
 	defer dh.lockVlanConfig.RUnlock()
 	logger.Debugw(ctx, "flow-remove got RLock", log.Fields{"device-id": dh.DeviceID, "uniID": apUniPort.UniID})
 	if _, exist := dh.UniVlanConfigFsmMap[apUniPort.UniID]; exist {
-		return dh.UniVlanConfigFsmMap[apUniPort.UniID].RemoveUniFlowParams(ctx, loCookie)
+		_ = dh.UniVlanConfigFsmMap[apUniPort.UniID].RemoveUniFlowParams(ctx, loCookie, respChan)
+		return
 	}
 	logger.Debugw(ctx, "flow-remove called, but no flow is configured (no VlanConfigFsm, flow already removed) ",
 		log.Fields{"device-id": dh.DeviceID})
 	//but as we regard the flow as not existing = removed we respond just ok
 	// and treat the reason accordingly (which in the normal removal procedure is initiated by the FSM)
+	// Push response on the response channel
+	if respChan != nil {
+		// Do it in a non blocking fashion, so that in case the flow handler routine has shutdown for any reason, we do not block here
+		select {
+		case *respChan <- nil:
+			logger.Debugw(ctx, "submitted-response-for-flow", log.Fields{"device-id": dh.DeviceID, "err": nil})
+		default:
+		}
+	}
 	go dh.DeviceProcStatusUpdate(ctx, cmn.OmciVlanFilterRemDone)
-
-	return nil
 }
 
 // createVlanFilterFsm initializes and runs the VlanFilter FSM to transfer OMCI related VLAN config
 // if this function is called from possibly concurrent processes it must be mutex-protected from the caller!
 // precondition: dh.lockVlanConfig is locked by the caller!
 func (dh *deviceHandler) createVlanFilterFsm(ctx context.Context, apUniPort *cmn.OnuUniPort, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent cmn.OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent cmn.OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig, respChan *chan error) error {
 	chVlanFilterFsm := make(chan cmn.Message, 2048)
 
 	pDevEntry := dh.GetOnuDeviceEntry(ctx, true)
@@ -3080,7 +3178,7 @@
 
 	pVlanFilterFsm := avcfg.NewUniVlanConfigFsm(ctx, dh, pDevEntry, pDevEntry.PDevOmciCC, apUniPort, dh.pOnuTP,
 		pDevEntry.GetOnuDB(), aTpID, aDevEvent, "UniVlanConfigFsm", chVlanFilterFsm,
-		dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter)
+		dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter, respChan)
 	if pVlanFilterFsm != nil {
 		//dh.lockVlanConfig is locked (by caller) throughout the state transition to 'starting'
 		// to prevent unintended (ignored) events to be sent there (from parallel processing)
@@ -3636,6 +3734,21 @@
 	}
 }
 
+func (dh *deviceHandler) setFlowMonitoringIsRunning(uniID uint8, flag bool) {
+	dh.mutexFlowMonitoringRoutineFlag.Lock()
+	defer dh.mutexFlowMonitoringRoutineFlag.Unlock()
+	logger.Debugw(context.Background(), "set-flow-monitoring-routine", log.Fields{"flag": flag})
+	dh.isFlowMonitoringRoutineActive[uniID] = flag
+}
+
+func (dh *deviceHandler) GetFlowMonitoringIsRunning(uniID uint8) bool {
+	dh.mutexFlowMonitoringRoutineFlag.RLock()
+	defer dh.mutexFlowMonitoringRoutineFlag.RUnlock()
+	logger.Debugw(context.Background(), "get-flow-monitoring-routine",
+		log.Fields{"isFlowMonitoringRoutineActive": dh.isFlowMonitoringRoutineActive})
+	return dh.isFlowMonitoringRoutineActive[uniID]
+}
+
 func (dh *deviceHandler) StartReconciling(ctx context.Context, skipOnuConfig bool) {
 	logger.Debugw(ctx, "start reconciling", log.Fields{"skipOnuConfig": skipOnuConfig, "device-id": dh.DeviceID})
 
@@ -3891,6 +4004,36 @@
 	return pgClient.GetTechProfileInstance(subCtx, request)
 }
 
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (dh *deviceHandler) PerOnuFlowHandlerRoutine(uniID uint8) {
+	logger.Infow(context.Background(), "starting-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
+	dh.setFlowMonitoringIsRunning(uniID, true)
+	for {
+		select {
+		// block on the channel to receive an incoming flow
+		// process the flow completely before proceeding to handle the next flow
+		case flowCb := <-dh.flowCbChan[uniID]:
+			startTime := time.Now()
+			logger.Debugw(flowCb.ctx, "serial-flow-processor--start", log.Fields{"device-id": dh.DeviceID})
+			respChan := make(chan error)
+			if flowCb.addFlow {
+				go dh.addFlowItemToUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, flowCb.flowMetaData, &respChan)
+			} else {
+				go dh.removeFlowItemFromUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, &respChan)
+			}
+			// Block on response and tunnel it back to the caller
+			*flowCb.respChan <- <-respChan
+			logger.Debugw(flowCb.ctx, "serial-flow-processor--end",
+				log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+		case <-dh.stopFlowMonitoringRoutine[uniID]:
+			logger.Infow(context.Background(), "stopping-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
+			dh.setFlowMonitoringIsRunning(uniID, false)
+			return
+		}
+	}
+}
+
 func (dh *deviceHandler) SendOMCIRequest(ctx context.Context, parentEndpoint string, request *ic.OmciMessage) error {
 	pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
 	if err != nil || pgClient == nil {
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index c14655e..f0d9ecc 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -79,6 +79,7 @@
 	alarmAuditInterval         time.Duration
 	dlToOnuTimeout4M           time.Duration
 	rpcTimeout                 time.Duration
+	maxConcurrentFlowsPerUni   int
 }
 
 //NewOpenONUAC returns a new instance of OpenONU_AC
@@ -114,6 +115,7 @@
 	openOnuAc.alarmAuditInterval = cfg.AlarmAuditInterval
 	openOnuAc.dlToOnuTimeout4M = cfg.DownloadToOnuTimeout4MB
 	openOnuAc.rpcTimeout = cfg.RPCTimeout
+	openOnuAc.maxConcurrentFlowsPerUni = cfg.MaxConcurrentFlowsPerUni
 
 	openOnuAc.pSupportedFsms = &cmn.OmciDeviceFsms{
 		"mib-synchronizer": {
@@ -331,6 +333,12 @@
 			handler.pSelfTestHdlr.StopSelfTestModule <- true
 			logger.Debugw(ctx, "sent stop signal to self test handler module", log.Fields{"device-id": device.Id})
 		}
+		for _, uni := range handler.uniEntityMap {
+			if handler.GetFlowMonitoringIsRunning(uni.UniID) {
+				handler.stopFlowMonitoringRoutine[uni.UniID] <- true
+				logger.Debugw(ctx, "sent stop signal to self flow monitoring routine", log.Fields{"device-id": device.Id})
+			}
+		}
 
 		// Clear PM data on the KV store
 		if handler.pOnuMetricsMgr != nil {
diff --git a/internal/pkg/mib/mib_sync.go b/internal/pkg/mib/mib_sync.go
index 871b6de..a26422d 100755
--- a/internal/pkg/mib/mib_sync.go
+++ b/internal/pkg/mib/mib_sync.go
@@ -314,6 +314,16 @@
 		if !oo.baseDeviceHandler.GetAlarmManagerIsRunning(ctx) {
 			go oo.baseDeviceHandler.StartAlarmManager(ctx)
 		}
+
+		for _, uniPort := range *oo.baseDeviceHandler.GetUniEntityMap() {
+			// only if this port was enabled for use by the operator at startup
+			if (1<<uniPort.UniID)&oo.baseDeviceHandler.GetUniPortMask() == (1 << uniPort.UniID) {
+				if !oo.baseDeviceHandler.GetFlowMonitoringIsRunning(uniPort.UniID) {
+					go oo.baseDeviceHandler.PerOnuFlowHandlerRoutine(uniPort.UniID)
+				}
+			}
+		}
+
 		// no need to reconcile additional data for MibDownloadFsm, LockStateFsm, or UnlockStateFsm
 		oo.baseDeviceHandler.ReconcileDeviceTechProf(ctx)