VOL-4125: Respond with success/failure to flow insertion according the
          operation result in adapter
VOL-4323: Support serialization of flow messages (execute a flow to
          completion before picking up another flow)

Change-Id: I934fc68b103042971109923491a3f27aaf885328
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 449d96e..b84bb83 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -120,6 +120,16 @@
 	cSkipOnuConfigReconciling
 )
 
+// FlowCb is the flow control block containing flow add/delete information along with a response channel
+type FlowCb struct {
+	ctx          context.Context // Flow handler context
+	addFlow      bool            // if true flow to be added, else removed
+	flowItem     *of.OfpFlowStats
+	uniPort      *cmn.OnuUniPort
+	flowMetaData *voltha.FlowMetadata
+	respChan     *chan error // channel to report the Flow handling error
+}
+
 //deviceHandler will interact with the ONU ? device.
 type deviceHandler struct {
 	DeviceID         string
@@ -186,6 +196,11 @@
 	mutexDeletionInProgressFlag sync.RWMutex
 	pLastUpgradeImageState      *voltha.ImageState
 	upgradeFsmChan              chan struct{}
+
+	flowCbChan                     []chan FlowCb
+	mutexFlowMonitoringRoutineFlag sync.RWMutex
+	stopFlowMonitoringRoutine      []chan bool // length of slice equal to number of uni ports
+	isFlowMonitoringRoutineActive  []bool      // length of slice equal to number of uni ports
 }
 
 //newDeviceHandler creates a new device handler
@@ -532,8 +547,9 @@
 func (dh *deviceHandler) FlowUpdateIncremental(ctx context.Context,
 	apOfFlowChanges *of.FlowChanges,
 	apOfGroupChanges *of.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.DeviceID, "metadata": apFlowMetaData})
-	var retError error = nil
+	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.DeviceID, "flow": apOfFlowChanges, "metadata": apFlowMetaData})
+	var errorsList []error
+	var retError error
 	//Remove flows (always remove flows first - remove old and add new with same cookie may be part of the same request)
 	if apOfFlowChanges.ToRemove != nil {
 		for _, flowItem := range apOfFlowChanges.ToRemove.Items {
@@ -541,12 +557,14 @@
 				logger.Warnw(ctx, "flow-remove no cookie: ignore and continuing on checking further flows", log.Fields{
 					"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-remove no cookie, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 			}
 			flowInPort := flow.GetInPort(flowItem)
 			if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
 				logger.Warnw(ctx, "flow-remove inPort invalid: ignore and continuing on checking further flows", log.Fields{"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-remove inPort invalid, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 				//return fmt.Errorf("flow inPort invalid: %s", dh.DeviceID)
 			} else if flowInPort == dh.ponPortNumber {
@@ -564,22 +582,43 @@
 						log.Fields{"device-id": dh.DeviceID, "inPort": flowInPort})
 					retError = fmt.Errorf("flow-remove inPort not found in UniPorts, inPort %d, device-id %s",
 						flowInPort, dh.DeviceID)
+					errorsList = append(errorsList, retError)
 					continue
 				}
 				flowOutPort := flow.GetOutPort(flowItem)
 				logger.Debugw(ctx, "flow-remove port indications", log.Fields{
 					"device-id": dh.DeviceID, "inPort": flowInPort, "outPort": flowOutPort,
 					"uniPortName": loUniPort.Name})
-				err := dh.removeFlowItemFromUniPort(ctx, flowItem, loUniPort)
-				//try next flow after processing error
-				if err != nil {
-					logger.Warnw(ctx, "flow-remove processing error: continuing on checking further flows",
-						log.Fields{"device-id": dh.DeviceID, "error": err})
-					retError = err
-					continue
-					//return err
-				} else { // if last setting succeeds, overwrite possibly previously set error
-					retError = nil
+
+				if dh.GetFlowMonitoringIsRunning(loUniPort.UniID) {
+					// Step1 : Fill flowControlBlock
+					// Step2 : Push the flowControlBlock to ONU channel
+					// Step3 : Wait on response channel for response
+					// Step4 : Return error value
+					startTime := time.Now()
+					respChan := make(chan error)
+					flowCb := FlowCb{
+						ctx:          ctx,
+						addFlow:      false,
+						flowItem:     flowItem,
+						flowMetaData: nil,
+						uniPort:      loUniPort,
+						respChan:     &respChan,
+					}
+					dh.flowCbChan[loUniPort.UniID] <- flowCb
+					logger.Infow(ctx, "process-flow-remove-start", log.Fields{"device-id": dh.DeviceID})
+					// Wait on the channel for flow handlers return value
+					retError = <-respChan
+					logger.Infow(ctx, "process-flow-remove-end", log.Fields{"device-id": dh.DeviceID, "err": retError, "totalTimeSeconds": time.Since(startTime).Seconds()})
+					if retError != nil {
+						logger.Warnw(ctx, "flow-delete processing error: continuing on checking further flows",
+							log.Fields{"device-id": dh.DeviceID, "error": retError})
+						errorsList = append(errorsList, retError)
+						continue
+					}
+				} else {
+					retError = fmt.Errorf("flow-handler-routine-not-active-for-onu--device-id-%v", dh.DeviceID)
+					errorsList = append(errorsList, retError)
 				}
 			}
 		}
@@ -590,12 +629,14 @@
 				logger.Debugw(ctx, "incremental flow-add no cookie: ignore and continuing on checking further flows", log.Fields{
 					"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-add no cookie, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 			}
 			flowInPort := flow.GetInPort(flowItem)
 			if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
 				logger.Warnw(ctx, "flow-add inPort invalid: ignore and continuing on checking further flows", log.Fields{"device-id": dh.DeviceID})
 				retError = fmt.Errorf("flow-add inPort invalid, device-id %s", dh.DeviceID)
+				errorsList = append(errorsList, retError)
 				continue
 				//return fmt.Errorf("flow inPort invalid: %s", dh.DeviceID)
 			} else if flowInPort == dh.ponPortNumber {
@@ -613,8 +654,8 @@
 						log.Fields{"device-id": dh.DeviceID, "inPort": flowInPort})
 					retError = fmt.Errorf("flow-add inPort not found in UniPorts, inPort %d, device-id %s",
 						flowInPort, dh.DeviceID)
+					errorsList = append(errorsList, retError)
 					continue
-					//return fmt.Errorf("flow-parameter inPort %d not found in internal UniPorts", flowInPort)
 				}
 				// let's still assume that we receive the flow-add only in some 'active' device state (as so far observed)
 				// if not, we just throw some error here to have an indication about that, if we really need to support that
@@ -625,28 +666,53 @@
 				if !dh.IsReadyForOmciConfig() {
 					logger.Errorw(ctx, "flow-add rejected: improper device state", log.Fields{"device-id": dh.DeviceID,
 						"last device-reason": dh.GetDeviceReasonString()})
-					return fmt.Errorf("improper device state on device %s", dh.DeviceID)
+					retError = fmt.Errorf("improper device state on device %s", dh.DeviceID)
+					errorsList = append(errorsList, retError)
+					continue
 				}
 
 				flowOutPort := flow.GetOutPort(flowItem)
 				logger.Debugw(ctx, "flow-add port indications", log.Fields{
 					"device-id": dh.DeviceID, "inPort": flowInPort, "outPort": flowOutPort,
 					"uniPortName": loUniPort.Name})
-				err := dh.addFlowItemToUniPort(ctx, flowItem, loUniPort, apFlowMetaData)
-				//try next flow after processing error
-				if err != nil {
-					logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
-						log.Fields{"device-id": dh.DeviceID, "error": err})
-					retError = err
-					continue
-					//return err
-				} else { // if last setting succeeds, overwrite possibly previously set error
-					retError = nil
+				if dh.GetFlowMonitoringIsRunning(loUniPort.UniID) {
+					// Step1 : Fill flowControlBlock
+					// Step2 : Push the flowControlBlock to ONU channel
+					// Step3 : Wait on response channel for response
+					// Step4 : Return error value
+					startTime := time.Now()
+					respChan := make(chan error)
+					flowCb := FlowCb{
+						ctx:          ctx,
+						addFlow:      true,
+						flowItem:     flowItem,
+						flowMetaData: apFlowMetaData,
+						uniPort:      loUniPort,
+						respChan:     &respChan,
+					}
+					dh.flowCbChan[loUniPort.UniID] <- flowCb
+					logger.Infow(ctx, "process-flow-add-start", log.Fields{"device-id": dh.DeviceID})
+					// Wait on the channel for flow handlers return value
+					retError = <-respChan
+					logger.Infow(ctx, "process-flow-add-end", log.Fields{"device-id": dh.DeviceID, "err": retError, "totalTimeSeconds": time.Since(startTime).Seconds()})
+					if retError != nil {
+						logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
+							log.Fields{"device-id": dh.DeviceID, "error": retError})
+						errorsList = append(errorsList, retError)
+						continue
+					}
+				} else {
+					retError = fmt.Errorf("flow-handler-routine-not-active-for-onu--device-id-%v", dh.DeviceID)
+					errorsList = append(errorsList, retError)
 				}
 			}
 		}
 	}
-	return retError
+	if len(errorsList) > 0 {
+		logger.Errorw(ctx, "error-processing-flow", log.Fields{"device-id": dh.DeviceID, "errList": errorsList})
+		return fmt.Errorf("errors-installing-one-or-more-flows-groups, errors:%v", errorsList)
+	}
+	return nil
 }
 
 //disableDevice locks the ONU and its UNI/VEIP ports (admin lock via OMCI)
@@ -939,13 +1005,13 @@
 			if _, exist = dh.UniVlanConfigFsmMap[uniData.PersUniID]; exist {
 				if err := dh.UniVlanConfigFsmMap[uniData.PersUniID].SetUniFlowParams(ctx, flowData.VlanRuleParams.TpID,
 					flowData.CookieSlice, uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
-					uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter); err != nil {
+					uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter, nil); err != nil {
 					logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.DeviceID})
 				}
 			} else {
 				if err := dh.createVlanFilterFsm(ctx, uniPort, flowData.VlanRuleParams.TpID, flowData.CookieSlice,
 					uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
-					uint8(flowData.VlanRuleParams.SetPcp), cmn.OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter); err != nil {
+					uint8(flowData.VlanRuleParams.SetPcp), cmn.OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter, nil); err != nil {
 					logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.DeviceID})
 				}
 			}
@@ -2025,6 +2091,8 @@
 		dh.pSelfTestHdlr.StopSelfTestModule <- true
 	}
 
+	// Note: We want flow deletes to be processed on onu down, so do not stop flow monitoring routines
+
 	//reset a possibly running upgrade FSM
 	//  (note the Upgrade FSM may stay alive e.g. in state UpgradeStWaitForCommit to endure the ONU reboot)
 	dh.lockUpgradeFsm.RLock()
@@ -2146,6 +2214,16 @@
 		go dh.StartAlarmManager(ctx)
 	}
 
+	// Start flow handler routines per UNI
+	for _, uniPort := range dh.uniEntityMap {
+		// only if this port was enabled for use by the operator at startup
+		if (1<<uniPort.UniID)&dh.pOpenOnuAc.config.UniPortMask == (1 << uniPort.UniID) {
+			if !dh.GetFlowMonitoringIsRunning(uniPort.UniID) {
+				go dh.PerOnuFlowHandlerRoutine(uniPort.UniID)
+			}
+		}
+	}
+
 	// Initialize classical L2 PM Interval Counters
 	if err := dh.pOnuMetricsMgr.PAdaptFsm.PFsm.Event(pmmgr.L2PmEventInit); err != nil {
 		// There is no way we should be landing here, but if we do then
@@ -2405,14 +2483,14 @@
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.DeviceID})
 		return
 	}
-	i := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
+	uniCnt := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
 	if pptpInstKeys := pDevEntry.GetOnuDB().GetSortedInstKeys(
 		ctx, me.PhysicalPathTerminationPointEthernetUniClassID); len(pptpInstKeys) > 0 {
 		for _, mgmtEntityID := range pptpInstKeys {
 			logger.Debugw(ctx, "Add PPTPEthUni port for MIB-stored instance:", log.Fields{
 				"device-id": dh.DeviceID, "PPTPEthUni EntityID": mgmtEntityID})
-			dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniPPTP)
-			i++
+			dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniPPTP)
+			uniCnt++
 		}
 	} else {
 		logger.Debugw(ctx, "No PPTP instances found", log.Fields{"device-id": dh.DeviceID})
@@ -2422,8 +2500,8 @@
 		for _, mgmtEntityID := range veipInstKeys {
 			logger.Debugw(ctx, "Add VEIP for MIB-stored instance:", log.Fields{
 				"device-id": dh.DeviceID, "VEIP EntityID": mgmtEntityID})
-			dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniVEIP)
-			i++
+			dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniVEIP)
+			uniCnt++
 		}
 	} else {
 		logger.Debugw(ctx, "No VEIP instances found", log.Fields{"device-id": dh.DeviceID})
@@ -2433,14 +2511,23 @@
 		for _, mgmtEntityID := range potsInstKeys {
 			logger.Debugw(ctx, "Add PPTP Pots UNI for MIB-stored instance:", log.Fields{
 				"device-id": dh.DeviceID, "PPTP Pots UNI EntityID": mgmtEntityID})
-			dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniPPTPPots)
-			i++
+			dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniPPTPPots)
+			uniCnt++
 		}
 	} else {
 		logger.Debugw(ctx, "No PPTP Pots UNI instances found", log.Fields{"device-id": dh.DeviceID})
 	}
-	if i == 0 {
+	if uniCnt == 0 {
 		logger.Warnw(ctx, "No UniG instances found", log.Fields{"device-id": dh.DeviceID})
+		return
+	}
+
+	dh.flowCbChan = make([]chan FlowCb, uniCnt)
+	dh.stopFlowMonitoringRoutine = make([]chan bool, uniCnt)
+	dh.isFlowMonitoringRoutineActive = make([]bool, uniCnt)
+	for i := 0; i < int(uniCnt); i++ {
+		dh.flowCbChan[i] = make(chan FlowCb, dh.pOpenOnuAc.config.MaxConcurrentFlowsPerUni)
+		dh.stopFlowMonitoringRoutine[i] = make(chan bool)
 	}
 }
 
@@ -2931,7 +3018,7 @@
 
 //addFlowItemToUniPort parses the actual flow item to add it to the UniPort
 func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort,
-	apFlowMetaData *voltha.FlowMetadata) error {
+	apFlowMetaData *voltha.FlowMetadata, respChan *chan error) {
 	var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE)      //noValidEntry
 	var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
 	var loAddPcp, loSetPcp uint8
@@ -2950,7 +3037,7 @@
 	if metadata == 0 {
 		logger.Debugw(ctx, "flow-add invalid metadata - abort",
 			log.Fields{"device-id": dh.DeviceID})
-		return fmt.Errorf("flow-add invalid metadata: %s", dh.DeviceID)
+		*respChan <- fmt.Errorf("flow-add invalid metadata: %s", dh.DeviceID)
 	}
 	loTpID := uint8(flow.GetTechProfileIDFromWriteMetaData(ctx, metadata))
 	loCookie := apFlowItem.GetCookie()
@@ -2977,7 +3064,7 @@
 			"match_vid": strconv.FormatInt(int64(loMatchVlan), 16)})
 		//TODO!!: Use DeviceId within the error response to rwCore
 		//  likewise also in other error response cases to calling components as requested in [VOL-3458]
-		return fmt.Errorf("flow-add Set/Match VlanId inconsistent: %s", dh.DeviceID)
+		*respChan <- fmt.Errorf("flow-add Set/Match VlanId inconsistent: %s", dh.DeviceID)
 	}
 	if loSetVlan == uint16(of.OfpVlanId_OFPVID_NONE) && loMatchVlan == uint16(of.OfpVlanId_OFPVID_PRESENT) {
 		logger.Debugw(ctx, "flow-add vlan-any/copy", log.Fields{"device-id": dh.DeviceID})
@@ -3005,23 +3092,26 @@
 	if _, exist := dh.UniVlanConfigFsmMap[apUniPort.UniID]; exist {
 		//SetUniFlowParams() may block on some rule that is suspended-to-add
 		//  in order to allow for according flow removal lockVlanConfig may only be used with RLock here
-		err := dh.UniVlanConfigFsmMap[apUniPort.UniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
-			loMatchVlan, loSetVlan, loSetPcp, false, meter)
+		// Also the error is returned to caller via response channel
+		_ = dh.UniVlanConfigFsmMap[apUniPort.UniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
+			loMatchVlan, loSetVlan, loSetPcp, false, meter, respChan)
 		dh.lockVlanConfig.RUnlock()
 		dh.lockVlanAdd.Unlock() //re-admit new Add-flow-processing
-		return err
+		return
 	}
 	dh.lockVlanConfig.RUnlock()
 	dh.lockVlanConfig.Lock() //createVlanFilterFsm should always be a non-blocking operation and requires r+w lock
 	err := dh.createVlanFilterFsm(ctx, apUniPort, loTpID, loCookieSlice,
-		loMatchVlan, loSetVlan, loSetPcp, cmn.OmciVlanFilterAddDone, false, meter)
+		loMatchVlan, loSetVlan, loSetPcp, cmn.OmciVlanFilterAddDone, false, meter, respChan)
 	dh.lockVlanConfig.Unlock()
 	dh.lockVlanAdd.Unlock() //re-admit new Add-flow-processing
-	return err
+	if err != nil {
+		*respChan <- err
+	}
 }
 
 //removeFlowItemFromUniPort parses the actual flow item to remove it from the UniPort
-func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort) error {
+func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort, respChan *chan error) {
 	//optimization and assumption: the flow cookie uniquely identifies the flow and with that the internal rule
 	//hence only the cookie is used here to find the relevant flow and possibly remove the rule
 	//no extra check is done on the rule parameters
@@ -3054,22 +3144,30 @@
 	defer dh.lockVlanConfig.RUnlock()
 	logger.Debugw(ctx, "flow-remove got RLock", log.Fields{"device-id": dh.DeviceID, "uniID": apUniPort.UniID})
 	if _, exist := dh.UniVlanConfigFsmMap[apUniPort.UniID]; exist {
-		return dh.UniVlanConfigFsmMap[apUniPort.UniID].RemoveUniFlowParams(ctx, loCookie)
+		_ = dh.UniVlanConfigFsmMap[apUniPort.UniID].RemoveUniFlowParams(ctx, loCookie, respChan)
+		return
 	}
 	logger.Debugw(ctx, "flow-remove called, but no flow is configured (no VlanConfigFsm, flow already removed) ",
 		log.Fields{"device-id": dh.DeviceID})
 	//but as we regard the flow as not existing = removed we respond just ok
 	// and treat the reason accordingly (which in the normal removal procedure is initiated by the FSM)
+	// Push response on the response channel
+	if respChan != nil {
+		// Do it in a non blocking fashion, so that in case the flow handler routine has shutdown for any reason, we do not block here
+		select {
+		case *respChan <- nil:
+			logger.Debugw(ctx, "submitted-response-for-flow", log.Fields{"device-id": dh.DeviceID, "err": nil})
+		default:
+		}
+	}
 	go dh.DeviceProcStatusUpdate(ctx, cmn.OmciVlanFilterRemDone)
-
-	return nil
 }
 
 // createVlanFilterFsm initializes and runs the VlanFilter FSM to transfer OMCI related VLAN config
 // if this function is called from possibly concurrent processes it must be mutex-protected from the caller!
 // precondition: dh.lockVlanConfig is locked by the caller!
 func (dh *deviceHandler) createVlanFilterFsm(ctx context.Context, apUniPort *cmn.OnuUniPort, aTpID uint8, aCookieSlice []uint64,
-	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent cmn.OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig) error {
+	aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent cmn.OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig, respChan *chan error) error {
 	chVlanFilterFsm := make(chan cmn.Message, 2048)
 
 	pDevEntry := dh.GetOnuDeviceEntry(ctx, true)
@@ -3080,7 +3178,7 @@
 
 	pVlanFilterFsm := avcfg.NewUniVlanConfigFsm(ctx, dh, pDevEntry, pDevEntry.PDevOmciCC, apUniPort, dh.pOnuTP,
 		pDevEntry.GetOnuDB(), aTpID, aDevEvent, "UniVlanConfigFsm", chVlanFilterFsm,
-		dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter)
+		dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter, respChan)
 	if pVlanFilterFsm != nil {
 		//dh.lockVlanConfig is locked (by caller) throughout the state transition to 'starting'
 		// to prevent unintended (ignored) events to be sent there (from parallel processing)
@@ -3636,6 +3734,21 @@
 	}
 }
 
+func (dh *deviceHandler) setFlowMonitoringIsRunning(uniID uint8, flag bool) {
+	dh.mutexFlowMonitoringRoutineFlag.Lock()
+	defer dh.mutexFlowMonitoringRoutineFlag.Unlock()
+	logger.Debugw(context.Background(), "set-flow-monitoring-routine", log.Fields{"flag": flag})
+	dh.isFlowMonitoringRoutineActive[uniID] = flag
+}
+
+func (dh *deviceHandler) GetFlowMonitoringIsRunning(uniID uint8) bool {
+	dh.mutexFlowMonitoringRoutineFlag.RLock()
+	defer dh.mutexFlowMonitoringRoutineFlag.RUnlock()
+	logger.Debugw(context.Background(), "get-flow-monitoring-routine",
+		log.Fields{"isFlowMonitoringRoutineActive": dh.isFlowMonitoringRoutineActive})
+	return dh.isFlowMonitoringRoutineActive[uniID]
+}
+
 func (dh *deviceHandler) StartReconciling(ctx context.Context, skipOnuConfig bool) {
 	logger.Debugw(ctx, "start reconciling", log.Fields{"skipOnuConfig": skipOnuConfig, "device-id": dh.DeviceID})
 
@@ -3891,6 +4004,36 @@
 	return pgClient.GetTechProfileInstance(subCtx, request)
 }
 
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (dh *deviceHandler) PerOnuFlowHandlerRoutine(uniID uint8) {
+	logger.Infow(context.Background(), "starting-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
+	dh.setFlowMonitoringIsRunning(uniID, true)
+	for {
+		select {
+		// block on the channel to receive an incoming flow
+		// process the flow completely before proceeding to handle the next flow
+		case flowCb := <-dh.flowCbChan[uniID]:
+			startTime := time.Now()
+			logger.Debugw(flowCb.ctx, "serial-flow-processor--start", log.Fields{"device-id": dh.DeviceID})
+			respChan := make(chan error)
+			if flowCb.addFlow {
+				go dh.addFlowItemToUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, flowCb.flowMetaData, &respChan)
+			} else {
+				go dh.removeFlowItemFromUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, &respChan)
+			}
+			// Block on response and tunnel it back to the caller
+			*flowCb.respChan <- <-respChan
+			logger.Debugw(flowCb.ctx, "serial-flow-processor--end",
+				log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+		case <-dh.stopFlowMonitoringRoutine[uniID]:
+			logger.Infow(context.Background(), "stopping-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
+			dh.setFlowMonitoringIsRunning(uniID, false)
+			return
+		}
+	}
+}
+
 func (dh *deviceHandler) SendOMCIRequest(ctx context.Context, parentEndpoint string, request *ic.OmciMessage) error {
 	pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
 	if err != nil || pgClient == nil {