VOL-4312: Reboot OLT test fails on Edgecore GPON ASGvOLT64 OLT
- Fixed issue where 'N*N' (where N is the number of PON ports)
  number of TP Managers were getting created for an OLT device
  instead of just 'N' TP managers.
  It was seen that after OLT reboot burst of conncurrent TP
  managers (N*N) were trying to create etcd pool connections
  and at some point the etcd stopped responding and this lead
  re-connection handling to OLT after reboot failing.

VOL-4313: Stop stale go routines on monitoring flow and group
messages for the ONUs after OLT reboot or OLT delete
- There are stale go routines for the ONU monitoring for flow
  and group messages. Although these never caused problem it
  is always recomendded to stop such routines as they consume
  memory and cpu.

Change-Id: Ie4d1ce9155dbd15e1831361d50cb959402045cc8
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 6136798..61da77d 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -219,7 +219,9 @@
 
 	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
 	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
-	incomingFlows []chan flowControlBlock
+	incomingFlows            []chan flowControlBlock
+	stopFlowHandlerRoutine   []chan bool
+	flowHandlerRoutineActive []bool
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -232,7 +234,7 @@
 	flowMgr.ponPortIdx = ponPortIdx
 	flowMgr.grpMgr = grpMgr
 	flowMgr.resourceMgr = rMgr
-	if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
+	if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
 		logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
 		return nil
 	}
@@ -243,12 +245,16 @@
 	// Create a slice of buffered channels for handling concurrent flows per ONU.
 	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
 	flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+	flowMgr.stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+	flowMgr.flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
 	for i := range flowMgr.incomingFlows {
 		flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+		flowMgr.stopFlowHandlerRoutine[i] = make(chan bool, 1)
 		// Spin up a go routine to handling incoming flows (add/remove).
 		// There will be on go routine per ONU.
 		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
-		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+		flowMgr.flowHandlerRoutineActive[i] = true
+		go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
 	}
 	flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
 	//Load the onugem info cache from kv store on flowmanager start
@@ -937,38 +943,27 @@
 	}
 }
 
-func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
-	var tpCount int
+func (f *OpenOltFlowMgr) populateTechProfileForCurrentPonPort(ctx context.Context) error {
 	for _, techRange := range f.resourceMgr.DevInfo.Ranges {
 		for _, intfID := range techRange.IntfIds {
-			var err error
-			f.techprofile, err = tp.NewTechProfile(ctx, f.resourceMgr.PonRsrMgr, f.resourceMgr.PonRsrMgr.Backend,
-				f.resourceMgr.PonRsrMgr.Address, f.deviceHandler.cm.Backend.PathPrefix)
-			if err != nil || f.techprofile == nil {
-				logger.Errorw(ctx, "failed-to-allocate-to-techprofile-for-pon-port", log.Fields{"intfID": intfID, "err": err})
-				return fmt.Errorf("failed-to-allocate-tech-profile-for-pon-port--pon-%v-err-%v", intfID, err)
+			if intfID == f.ponPortIdx { // initialize only for the pon port that this flow manager is managing
+				var err error
+				f.techprofile, err = tp.NewTechProfile(ctx, f.resourceMgr.PonRsrMgr, f.resourceMgr.PonRsrMgr.Backend,
+					f.resourceMgr.PonRsrMgr.Address, f.deviceHandler.cm.Backend.PathPrefix)
+				if err != nil || f.techprofile == nil {
+					logger.Errorw(ctx, "failed-to-allocate-to-techprofile-for-pon-port", log.Fields{"intfID": intfID, "err": err})
+					return fmt.Errorf("failed-to-allocate-tech-profile-for-pon-port--pon-%v-err-%v", intfID, err)
+				}
+				logger.Debugw(ctx, "init-tech-profile-done",
+					log.Fields{
+						"intf-id":   intfID,
+						"device-id": f.deviceHandler.device.Id})
+				return nil
 			}
-			tpCount++
-			logger.Debugw(ctx, "init-tech-profile-done",
-				log.Fields{
-					"intf-id":   intfID,
-					"device-id": f.deviceHandler.device.Id})
 		}
 	}
-	//Make sure we have as many tech_profiles as there are pon ports on the device
-	if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
-		return olterrors.NewErrInvalidValue(log.Fields{
-			"reason":             "tP-count-does-not-match-number-of-pon-ports",
-			"tech-profile-count": tpCount,
-			"pon-port-count":     f.resourceMgr.DevInfo.GetPonPorts(),
-			"device-id":          f.deviceHandler.device.Id}, nil)
-	}
-	logger.Infow(ctx, "populated-techprofile-for-ponports-successfully",
-		log.Fields{
-			"numofTech":   tpCount,
-			"numPonPorts": f.resourceMgr.DevInfo.GetPonPorts(),
-			"device-id":   f.deviceHandler.device.Id})
-	return nil
+	logger.Errorw(ctx, "pon port not found in the the device pon port range", log.Fields{"intfID": f.ponPortIdx})
+	return fmt.Errorf("pon-port-idx-not-found-in-the-device-info-pon-port-range-%v", f.ponPortIdx)
 }
 
 func (f *OpenOltFlowMgr) addUpstreamDataPathFlow(ctx context.Context, flowContext *flowContext) error {
@@ -2176,41 +2171,60 @@
 	if inPort != InvalidPort && outPort != InvalidPort {
 		_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
 	}
-	// inPort or outPort is InvalidPort for trap-from-nni flows.
-	// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
-	// Send the flowCb on the ONU flow channel
-	f.incomingFlows[onuID] <- flowCb
-	// Wait on the channel for flow handlers return value
-	err := <-errChan
-	logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
-	return err
+	if f.flowHandlerRoutineActive[onuID] {
+		// inPort or outPort is InvalidPort for trap-from-nni flows.
+		// In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+		// Send the flowCb on the ONU flow channel
+		f.incomingFlows[onuID] <- flowCb
+		// Wait on the channel for flow handlers return value
+		err := <-errChan
+		logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+		return err
+	}
+	logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
+	return fmt.Errorf("flow-handler-routine-not-active-for-onu-%v-pon-%d", onuID, f.ponPortIdx)
 }
 
 // This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
 // Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
-func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
 	for {
+		select {
 		// block on the channel to receive an incoming flow
 		// process the flow completely before proceeding to handle the next flow
-		flowCb := <-subscriberFlowChannel
-		if flowCb.addFlow {
-			logger.Info(flowCb.ctx, "adding-flow-start")
-			startTime := time.Now()
-			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
-			logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
-			// Pass the return value over the return channel
-			*flowCb.errChan <- err
-		} else {
-			logger.Info(flowCb.ctx, "removing-flow-start")
-			startTime := time.Now()
-			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
-			logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
-			// Pass the return value over the return channel
-			*flowCb.errChan <- err
+		case flowCb := <-subscriberFlowChannel:
+			if flowCb.addFlow {
+				logger.Info(flowCb.ctx, "adding-flow-start")
+				startTime := time.Now()
+				err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+				logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+				// Pass the return value over the return channel
+				*flowCb.errChan <- err
+			} else {
+				logger.Info(flowCb.ctx, "removing-flow-start")
+				startTime := time.Now()
+				err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+				logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+				// Pass the return value over the return channel
+				*flowCb.errChan <- err
+			}
+		case <-stopHandler:
+			f.flowHandlerRoutineActive[handlerRoutineIndex] = false
+			return
 		}
 	}
 }
 
+// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context) {
+	for i, v := range f.stopFlowHandlerRoutine {
+		if f.flowHandlerRoutineActive[i] {
+			v <- true
+		}
+	}
+	logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
+}
+
 // AddFlow add flow to device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {