VOL-4471: Stale data in resource manager

Change-Id: I026774317ba577b1d5d0748c3d177b4b7bf2ac94
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 8c855a0..b596412 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -219,7 +219,9 @@
 
 	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
 	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
-	incomingFlows []chan flowControlBlock
+	incomingFlows            []chan flowControlBlock
+	stopFlowHandlerRoutine   []chan bool
+	flowHandlerRoutineActive []bool
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -243,12 +245,16 @@
 	// Create a slice of buffered channels for handling concurrent flows per ONU.
 	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
 	flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+	flowMgr.stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+	flowMgr.flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
 	for i := range flowMgr.incomingFlows {
 		flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+		flowMgr.stopFlowHandlerRoutine[i] = make(chan bool, 1)
 		// Spin up a go routine to handling incoming flows (add/remove).
 		// There will be on go routine per ONU.
 		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
-		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+		flowMgr.flowHandlerRoutineActive[i] = true
+		go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
 	}
 	flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
 	//Load the onugem info cache from kv store on flowmanager start
@@ -2151,6 +2157,11 @@
 
 // RouteFlowToOnuChannel routes incoming flow to ONU specific channel
 func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+	if f.deviceHandler.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": f.deviceHandler.device.Id})
+		return nil
+	}
 	// Step1 : Fill flowControlBlock
 	// Step2 : Push the flowControlBlock to ONU channel
 	// Step3 : Wait on response channel for response
@@ -2170,41 +2181,66 @@
 	if inPort != InvalidPort && outPort != InvalidPort {
 		_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
 	}
-	// inPort or outPort is InvalidPort for trap-from-nni flows.
-	// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
-	// Send the flowCb on the ONU flow channel
-	f.incomingFlows[onuID] <- flowCb
-	// Wait on the channel for flow handlers return value
-	err := <-errChan
-	logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
-	return err
+	if f.flowHandlerRoutineActive[onuID] {
+		// inPort or outPort is InvalidPort for trap-from-nni flows.
+		// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+		// Send the flowCb on the ONU flow channel
+		f.incomingFlows[onuID] <- flowCb
+		// Wait on the channel for flow handlers return value
+		err := <-errChan
+		logger.Infow(ctx, "process-flow-received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+		return err
+	}
+	logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
+	return fmt.Errorf("flow-handler-routine-not-active-for-onu-%v-pon-%d", onuID, f.ponPortIdx)
 }
 
 // This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
 // Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
-func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
+	var flowCb flowControlBlock
 	for {
+		select {
 		// block on the channel to receive an incoming flow
 		// process the flow completely before proceeding to handle the next flow
-		flowCb := <-subscriberFlowChannel
-		if flowCb.addFlow {
-			logger.Info(flowCb.ctx, "adding-flow-start")
-			startTime := time.Now()
-			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
-			logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
-			// Pass the return value over the return channel
-			*flowCb.errChan <- err
-		} else {
-			logger.Info(flowCb.ctx, "removing-flow-start")
-			startTime := time.Now()
-			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
-			logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
-			// Pass the return value over the return channel
-			*flowCb.errChan <- err
+		case flowCb = <-subscriberFlowChannel:
+			if flowCb.addFlow {
+				logger.Info(flowCb.ctx, "adding-flow-start")
+				startTime := time.Now()
+				err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+				logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+				// Pass the return value over the return channel
+				*flowCb.errChan <- err
+			} else {
+				logger.Info(flowCb.ctx, "removing-flow-start")
+				startTime := time.Now()
+				err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+				logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+				// Pass the return value over the return channel
+				*flowCb.errChan <- err
+			}
+		case <-stopHandler:
+			f.flowHandlerRoutineActive[handlerRoutineIndex] = false
+			return
 		}
 	}
 }
 
+// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+	for i, v := range f.stopFlowHandlerRoutine {
+		if f.flowHandlerRoutineActive[i] {
+			select {
+			case v <- true:
+			case <-time.After(time.Second * 5):
+				logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
+			}
+		}
+	}
+	wg.Done()
+	logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
+}
+
 // AddFlow add flow to device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {