VOL-4471: Stale data in resource manager

Change-Id: I026774317ba577b1d5d0748c3d177b4b7bf2ac94
diff --git a/VERSION b/VERSION
index 6284111..efe3085 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.5.9
+3.5.10
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 3f23fb4..f7d1405 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -102,7 +102,9 @@
 
 	// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
 	// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
-	incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+	incomingMcastFlowOrGroup  []chan McastFlowOrGroupControlBlock
+	stopMcastHandlerRoutine   []chan bool
+	mcastHandlerRoutineActive []bool
 
 	adapterPreviouslyConnected bool
 	agentPreviouslyConnected   bool
@@ -188,13 +190,17 @@
 	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
 	// Create a slice of buffered channels for handling concurrent mcast flow/group.
 	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+	dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
+	dh.mcastHandlerRoutineActive = make([]bool, MaxNumOfGroupHandlerChannels)
 	for i := range dh.incomingMcastFlowOrGroup {
 		dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+		dh.stopMcastHandlerRoutine[i] = make(chan bool, 1)
 		// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
 		// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
 		// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
 		// for incoming mcast flow/group to be processed serially.
-		go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+		dh.mcastHandlerRoutineActive[i] = true
+		go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
 	}
 	//TODO initialize the support classes.
 	return &dh
@@ -639,6 +645,20 @@
 	//starting the stat collector
 	go startCollector(ctx, dh)
 
+	// instantiate the mcast handler routines.
+	for i := range dh.incomingMcastFlowOrGroup {
+		// We land inside the below "if" code path, after the OLT comes back from a reboot, otherwise the routines
+		// are already active when the DeviceHandler module is first instantiated (as part of Adopt_device RPC invocation).
+		if !dh.mcastHandlerRoutineActive[i] {
+			// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
+			// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
+			// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
+			// for incoming mcast flow/group to be processed serially.
+			dh.mcastHandlerRoutineActive[i] = true
+			go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
+		}
+	}
+
 	// Synchronous call to update device state - this method is run in its own go routine
 	if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
 		voltha.OperStatus_ACTIVE); err != nil {
@@ -1525,9 +1545,7 @@
 	}
 }
 
-//UpdateFlowsIncrementally updates the device flow
-func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+func (dh *DeviceHandler) handleFlows(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, flowMetadata *voltha.FlowMetadata) []error {
 	var err error
 	var errorsList []error
 
@@ -1564,7 +1582,13 @@
 			if flow_utils.HasGroup(flow) {
 				err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
 			} else {
-				err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+				if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+					// The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
+					logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
+					err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
+				} else {
+					err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+				}
 			}
 			if err != nil {
 				errorsList = append(errorsList, err)
@@ -1572,6 +1596,19 @@
 		}
 	}
 
+	return errorsList
+}
+
+func (dh *DeviceHandler) handleGroups(ctx context.Context, groups *of.FlowGroupChanges) []error {
+	var err error
+	var errorsList []error
+
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+		return nil
+	}
+
 	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
@@ -1596,6 +1633,24 @@
 			}
 		}
 	}
+
+	return errorsList
+}
+
+//UpdateFlowsIncrementally updates the device flow
+func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+
+	var errorsList []error
+
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+		return nil
+	}
+
+	logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+	errorsList = append(errorsList, dh.handleFlows(ctx, device, flows, flowMetadata)...)
+	errorsList = append(errorsList, dh.handleGroups(ctx, groups)...)
 	if len(errorsList) > 0 {
 		return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
 	}
@@ -1744,6 +1799,18 @@
 	*/
 
 	dh.setDeviceDeletionInProgressFlag(true)
+	var wg sync.WaitGroup
+	wg.Add(1) // for the mcast routine below to finish
+	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+	for _, flMgr := range dh.flowMgr {
+		wg.Add(1) // for the flow handler routine below to finish
+		go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+	}
+	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+	} else {
+		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+	}
 
 	dh.cleanupDeviceResources(ctx)
 	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2054,6 +2121,18 @@
 		}
 		dh.lockDevice.RUnlock()
 
+		var wg sync.WaitGroup
+		wg.Add(1) // for the multicast handler routine
+		go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+		for _, flMgr := range dh.flowMgr {
+			wg.Add(1) // for the flow handler routine
+			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+		}
+		if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+			logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+		} else {
+			logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+		}
 		//reset adapter reconcile flag
 		dh.adapterPreviouslyConnected = false
 
@@ -2400,6 +2479,11 @@
 // RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
 // instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
 func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *voltha.OfpFlowStats, group *voltha.OfpGroupEntry, action string) error {
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+		return nil
+	}
 	// Step1 : Fill McastFlowOrGroupControlBlock
 	// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
 	// Step3 : Wait on response channel for response
@@ -2422,64 +2506,89 @@
 	} else {
 		return errors.New("flow-and-group-both-nil")
 	}
-	// Derive the appropriate go routine to handle the request by a simple module operation.
-	// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
-	dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
-	// Wait for handler to return error value
-	err := <-errChan
-	logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
-	return err
+	mcastRoutineIdx := groupID % MaxNumOfGroupHandlerChannels
+	if dh.mcastHandlerRoutineActive[mcastRoutineIdx] {
+		// Derive the appropriate go routine to handle the request by a simple module operation.
+		// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+		dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+		// Wait for handler to return error value
+		err := <-errChan
+		logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
+		return err
+	}
+	logger.Errorw(ctx, "mcast handler routine not active for onu", log.Fields{"mcastRoutineIdx": mcastRoutineIdx})
+	return fmt.Errorf("mcast-handler-routine-not-active-for-index-%v", mcastRoutineIdx)
 }
 
 // mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
-func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(routineIndex int, mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock, stopHandler chan bool) {
 	for {
+		select {
 		// block on the channel to receive an incoming mcast flow/group
 		// process the flow completely before proceeding to handle the next flow
-		mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
-		if mcastFlowOrGroupCb.flow != nil {
-			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
-					log.Fields{"device-id": dh.device.Id,
-						"flowToAdd": mcastFlowOrGroupCb.flow})
-				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
-				err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else { // flow remove
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
-					log.Fields{"device-id": dh.device.Id,
-						"flowToRemove": mcastFlowOrGroupCb.flow})
-				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
-				err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
+		case mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel:
+			if mcastFlowOrGroupCb.flow != nil {
+				if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+						log.Fields{"device-id": dh.device.Id,
+							"flowToAdd": mcastFlowOrGroupCb.flow})
+					// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+					err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else { // flow remove
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+						log.Fields{"device-id": dh.device.Id,
+							"flowToRemove": mcastFlowOrGroupCb.flow})
+					// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+					err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				}
+			} else { // mcast group
+				if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToAdd": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToModify": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else { // group remove
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToRemove": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				}
 			}
-		} else { // mcast group
-			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToAdd": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToModify": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else { // group remove
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToRemove": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
+		case <-stopHandler:
+			dh.mcastHandlerRoutineActive[routineIndex] = false
+			return
+		}
+	}
+}
+
+// StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+	for i, v := range dh.stopMcastHandlerRoutine {
+		if dh.mcastHandlerRoutineActive[i] {
+			select {
+			case v <- true:
+			case <-time.After(time.Second * 5):
+				logger.Warnw(ctx, "timeout stopping mcast handler routine", log.Fields{"idx": i, "deviceID": dh.device.Id})
 			}
 		}
 	}
+	wg.Done()
+	logger.Debug(ctx, "stopped all mcast handler routines")
 }
 
 func (dh *DeviceHandler) getOltPortCounters(ctx context.Context, oltPortInfo *extension.GetOltPortCounters) *extension.SingleGetValueResponse {
@@ -2659,3 +2768,19 @@
 	defer dh.lockDevice.RUnlock()
 	return dh.isDeviceDeletionInProgress
 }
+
+// waitForTimeoutOrCompletion waits for the waitgroup for the specified max timeout.
+// Returns false if waiting timed out.
+func (dh *DeviceHandler) waitForTimeoutOrCompletion(wg *sync.WaitGroup, timeout time.Duration) bool {
+	c := make(chan struct{})
+	go func() {
+		defer close(c)
+		wg.Wait()
+	}()
+	select {
+	case <-c:
+		return true // completed normally
+	case <-time.After(timeout):
+		return false // timed out
+	}
+}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 8c855a0..b596412 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -219,7 +219,9 @@
 
 	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
 	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
-	incomingFlows []chan flowControlBlock
+	incomingFlows            []chan flowControlBlock
+	stopFlowHandlerRoutine   []chan bool
+	flowHandlerRoutineActive []bool
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -243,12 +245,16 @@
 	// Create a slice of buffered channels for handling concurrent flows per ONU.
 	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
 	flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+	flowMgr.stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+	flowMgr.flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
 	for i := range flowMgr.incomingFlows {
 		flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+		flowMgr.stopFlowHandlerRoutine[i] = make(chan bool, 1)
 		// Spin up a go routine to handling incoming flows (add/remove).
 		// There will be on go routine per ONU.
 		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
-		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+		flowMgr.flowHandlerRoutineActive[i] = true
+		go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
 	}
 	flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
 	//Load the onugem info cache from kv store on flowmanager start
@@ -2151,6 +2157,11 @@
 
 // RouteFlowToOnuChannel routes incoming flow to ONU specific channel
 func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+	if f.deviceHandler.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": f.deviceHandler.device.Id})
+		return nil
+	}
 	// Step1 : Fill flowControlBlock
 	// Step2 : Push the flowControlBlock to ONU channel
 	// Step3 : Wait on response channel for response
@@ -2170,41 +2181,66 @@
 	if inPort != InvalidPort && outPort != InvalidPort {
 		_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
 	}
-	// inPort or outPort is InvalidPort for trap-from-nni flows.
-	// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
-	// Send the flowCb on the ONU flow channel
-	f.incomingFlows[onuID] <- flowCb
-	// Wait on the channel for flow handlers return value
-	err := <-errChan
-	logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
-	return err
+	if f.flowHandlerRoutineActive[onuID] {
+		// inPort or outPort is InvalidPort for trap-from-nni flows.
+		// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+		// Send the flowCb on the ONU flow channel
+		f.incomingFlows[onuID] <- flowCb
+		// Wait on the channel for flow handlers return value
+		err := <-errChan
+		logger.Infow(ctx, "process-flow-received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+		return err
+	}
+	logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
+	return fmt.Errorf("flow-handler-routine-not-active-for-onu-%v-pon-%d", onuID, f.ponPortIdx)
 }
 
 // This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
 // Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
-func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
+	var flowCb flowControlBlock
 	for {
+		select {
 		// block on the channel to receive an incoming flow
 		// process the flow completely before proceeding to handle the next flow
-		flowCb := <-subscriberFlowChannel
-		if flowCb.addFlow {
-			logger.Info(flowCb.ctx, "adding-flow-start")
-			startTime := time.Now()
-			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
-			logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
-			// Pass the return value over the return channel
-			*flowCb.errChan <- err
-		} else {
-			logger.Info(flowCb.ctx, "removing-flow-start")
-			startTime := time.Now()
-			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
-			logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
-			// Pass the return value over the return channel
-			*flowCb.errChan <- err
+		case flowCb = <-subscriberFlowChannel:
+			if flowCb.addFlow {
+				logger.Info(flowCb.ctx, "adding-flow-start")
+				startTime := time.Now()
+				err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+				logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+				// Pass the return value over the return channel
+				*flowCb.errChan <- err
+			} else {
+				logger.Info(flowCb.ctx, "removing-flow-start")
+				startTime := time.Now()
+				err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+				logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+				// Pass the return value over the return channel
+				*flowCb.errChan <- err
+			}
+		case <-stopHandler:
+			f.flowHandlerRoutineActive[handlerRoutineIndex] = false
+			return
 		}
 	}
 }
 
+// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+	for i, v := range f.stopFlowHandlerRoutine {
+		if f.flowHandlerRoutineActive[i] {
+			select {
+			case v <- true:
+			case <-time.After(time.Second * 5):
+				logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
+			}
+		}
+	}
+	wg.Done()
+	logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
+}
+
 // AddFlow add flow to device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {