VOL-4312: Reboot OLT test fails on Edgecore GPON ASGvOLT64 OLT
- Fixed issue where 'N*N' (where N is the number of PON ports)
  number of TP Managers were getting created for an OLT device
  instead of just 'N' TP managers.
  It was seen that after OLT reboot burst of conncurrent TP
  managers (N*N) were trying to create etcd pool connections
  and at some point the etcd stopped responding and this lead
  re-connection handling to OLT after reboot failing.

VOL-4313: Stop stale go routines on monitoring flow and group
messages for the ONUs after OLT reboot or OLT delete
- There are stale go routines for the ONU monitoring for flow
  and group messages. Although these never caused problem it
  is always recomendded to stop such routines as they consume
  memory and cpu.

Change-Id: Ie4d1ce9155dbd15e1831361d50cb959402045cc8
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 10bdd09..bc6ca7c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -102,7 +102,9 @@
 
 	// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
 	// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
-	incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+	incomingMcastFlowOrGroup  []chan McastFlowOrGroupControlBlock
+	stopMcastHandlerRoutine   []chan bool
+	mcastHandlerRoutineActive []bool
 
 	adapterPreviouslyConnected bool
 	agentPreviouslyConnected   bool
@@ -186,13 +188,17 @@
 	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
 	// Create a slice of buffered channels for handling concurrent mcast flow/group.
 	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+	dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
+	dh.mcastHandlerRoutineActive = make([]bool, MaxNumOfGroupHandlerChannels)
 	for i := range dh.incomingMcastFlowOrGroup {
 		dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+		dh.stopMcastHandlerRoutine[i] = make(chan bool, 1)
 		// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
 		// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
 		// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
 		// for incoming mcast flow/group to be processed serially.
-		go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+		dh.mcastHandlerRoutineActive[i] = true
+		go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
 	}
 	//TODO initialize the support classes.
 	return &dh
@@ -1749,6 +1755,10 @@
 	}
 	dh.lockDevice.RUnlock()
 	dh.removeOnuIndicationChannels(ctx)
+	go dh.StopAllMcastHandlerRoutines(ctx)
+	for _, flMgr := range dh.flowMgr {
+		go flMgr.StopAllFlowHandlerRoutines(ctx)
+	}
 	//Reset the state
 	if dh.Client != nil {
 		if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
@@ -2053,6 +2063,11 @@
 		}
 		dh.lockDevice.RUnlock()
 
+		go dh.StopAllMcastHandlerRoutines(ctx)
+		for _, flMgr := range dh.flowMgr {
+			go flMgr.StopAllFlowHandlerRoutines(ctx)
+		}
+
 		//reset adapter reconcile flag
 		dh.adapterPreviouslyConnected = false
 
@@ -2421,66 +2436,86 @@
 	} else {
 		return errors.New("flow-and-group-both-nil")
 	}
-	// Derive the appropriate go routine to handle the request by a simple module operation.
-	// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
-	dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
-	// Wait for handler to return error value
-	err := <-errChan
-	logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
-	return err
+	mcastRoutineIdx := groupID % MaxNumOfGroupHandlerChannels
+	if dh.mcastHandlerRoutineActive[mcastRoutineIdx] {
+		// Derive the appropriate go routine to handle the request by a simple module operation.
+		// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+		dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+		// Wait for handler to return error value
+		err := <-errChan
+		logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
+		return err
+	}
+	logger.Errorw(ctx, "mcast handler routine not active for onu", log.Fields{"mcastRoutineIdx": mcastRoutineIdx})
+	return fmt.Errorf("mcast-handler-routine-not-active-for-index-%v", mcastRoutineIdx)
 }
 
 // mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
-func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(routineIndex int, mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock, stopHandler chan bool) {
 	for {
+		select {
 		// block on the channel to receive an incoming mcast flow/group
 		// process the flow completely before proceeding to handle the next flow
-		mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
-		if mcastFlowOrGroupCb.flow != nil {
-			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
-					log.Fields{"device-id": dh.device.Id,
-						"flowToAdd": mcastFlowOrGroupCb.flow})
-				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
-				err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else { // flow remove
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
-					log.Fields{"device-id": dh.device.Id,
-						"flowToRemove": mcastFlowOrGroupCb.flow})
-				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
-				err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
+		case mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel:
+			if mcastFlowOrGroupCb.flow != nil {
+				if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+						log.Fields{"device-id": dh.device.Id,
+							"flowToAdd": mcastFlowOrGroupCb.flow})
+					// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+					err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else { // flow remove
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+						log.Fields{"device-id": dh.device.Id,
+							"flowToRemove": mcastFlowOrGroupCb.flow})
+					// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+					err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				}
+			} else { // mcast group
+				if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToAdd": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToModify": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else { // group remove
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToRemove": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				}
 			}
-		} else { // mcast group
-			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToAdd": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToModify": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else { // group remove
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToRemove": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			}
+		case <-stopHandler:
+			dh.mcastHandlerRoutineActive[routineIndex] = false
+			return
 		}
 	}
 }
 
+// StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context) {
+	for i, v := range dh.stopMcastHandlerRoutine {
+		if dh.mcastHandlerRoutineActive[i] {
+			v <- true
+		}
+	}
+	logger.Debug(ctx, "stopped all mcast handler routines")
+}
+
 func (dh *DeviceHandler) getOltPortCounters(ctx context.Context, oltPortInfo *extension.GetOltPortCounters) *extension.SingleGetValueResponse {
 
 	singleValResp := extension.SingleGetValueResponse{