VOL-4471: Stale data in resource manager
- Make sure to stop flow and mcast group handlers before deleting
  data on the DB. Otherwise it is possible that some stray handling
  in these routines can create entries on the DB post cleanup.

Change-Id: If73c4ea5f972b7333bd2d6c7a8b88dcf0c31638d
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 646b8f9..86cce43 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1612,6 +1612,12 @@
 	var err error
 	var errorsList []error
 
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+		return nil
+	}
+
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
 			ponIf := dh.getPonIfFromFlow(flow)
@@ -1660,6 +1666,12 @@
 	var err error
 	var errorsList []error
 
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+		return nil
+	}
+
 	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
@@ -1868,6 +1880,18 @@
 	*/
 
 	dh.setDeviceDeletionInProgressFlag(true)
+	var wg sync.WaitGroup
+	wg.Add(1) // for the mcast routine below to finish
+	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+	for _, flMgr := range dh.flowMgr {
+		wg.Add(1) // for the flow handler routine below to finish
+		go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+	}
+	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+	} else {
+		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+	}
 
 	dh.cleanupDeviceResources(ctx)
 	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -1882,10 +1906,6 @@
 	}
 	dh.lockDevice.RUnlock()
 	dh.removeOnuIndicationChannels(ctx)
-	go dh.StopAllMcastHandlerRoutines(ctx)
-	for _, flMgr := range dh.flowMgr {
-		go flMgr.StopAllFlowHandlerRoutines(ctx)
-	}
 	//Reset the state
 	if dh.Client != nil {
 		if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
@@ -2198,9 +2218,17 @@
 		}
 		dh.lockDevice.RUnlock()
 
-		go dh.StopAllMcastHandlerRoutines(ctx)
+		var wg sync.WaitGroup
+		wg.Add(1) // for the multicast handler routine
+		go dh.StopAllMcastHandlerRoutines(ctx, &wg)
 		for _, flMgr := range dh.flowMgr {
-			go flMgr.StopAllFlowHandlerRoutines(ctx)
+			wg.Add(1) // for the flow handler routine
+			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+		}
+		if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+			logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+		} else {
+			logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
 		}
 
 		//reset adapter reconcile flag
@@ -2556,6 +2584,12 @@
 // RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
 // instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
 func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *of.OfpFlowStats, group *of.OfpGroupEntry, action string) error {
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+		return nil
+	}
+
 	// Step1 : Fill McastFlowOrGroupControlBlock
 	// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
 	// Step3 : Wait on response channel for response
@@ -2649,12 +2683,17 @@
 }
 
 // StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
-func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context) {
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
 	for i, v := range dh.stopMcastHandlerRoutine {
 		if dh.mcastHandlerRoutineActive[i] {
-			v <- true
+			select {
+			case v <- true:
+			case <-time.After(time.Second * 5):
+				logger.Warnw(ctx, "timeout stopping mcast handler routine", log.Fields{"idx": i, "deviceID": dh.device.Id})
+			}
 		}
 	}
+	wg.Done()
 	logger.Debug(ctx, "stopped all mcast handler routines")
 }
 
@@ -3126,3 +3165,19 @@
 	defer dh.lockDevice.RUnlock()
 	return dh.isDeviceDeletionInProgress
 }
+
+// waitForTimeoutOrCompletion waits for the waitgroup for the specified max timeout.
+// Returns false if waiting timed out.
+func (dh *DeviceHandler) waitForTimeoutOrCompletion(wg *sync.WaitGroup, timeout time.Duration) bool {
+	c := make(chan struct{})
+	go func() {
+		defer close(c)
+		wg.Wait()
+	}()
+	select {
+	case <-c:
+		return true // completed normally
+	case <-time.After(timeout):
+		return false // timed out
+	}
+}