[VOL-2801] : Following OLT reboot, the readIndication receive stream would have
already stopped. Sending a stop signal again, would cause subsequent readIndication
invocation to immediately stop. We need to first check if the readIndication is
running before sending a stop signal

Change-Id: I1c8940d426bbc715660d59db0d618049d7b86d31
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 04af054..7304b9a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -89,14 +89,15 @@
 	eventMgr      *OpenOltEventMgr
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 
-	discOnus           sync.Map
-	onus               sync.Map
-	portStats          *OpenOltStatisticsMgr
-	metrics            *pmmetrics.PmMetrics
-	stopCollector      chan bool
-	stopHeartbeatCheck chan bool
-	activePorts        sync.Map
-	stopIndications    chan bool
+	discOnus                      sync.Map
+	onus                          sync.Map
+	portStats                     *OpenOltStatisticsMgr
+	metrics                       *pmmetrics.PmMetrics
+	stopCollector                 chan bool
+	stopHeartbeatCheck            chan bool
+	activePorts                   sync.Map
+	stopIndications               chan bool
+	isReadIndicationRoutineActive bool
 
 	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
 	// subscriber basis for the number of pending flow removes. This data is used
@@ -308,6 +309,11 @@
 // readIndications to read the indications from the OLT device
 func (dh *DeviceHandler) readIndications(ctx context.Context) error {
 	defer logger.Debugw("indications-ended", log.Fields{"device-id": dh.device.Id})
+	defer func() {
+		dh.lockDevice.Lock()
+		dh.isReadIndicationRoutineActive = false
+		dh.lockDevice.Unlock()
+	}()
 	indications, err := dh.startOpenOltIndicationStream(ctx)
 	if err != nil {
 		return err
@@ -334,6 +340,10 @@
 	indicationBackoff.MaxElapsedTime = 0
 	indicationBackoff.MaxInterval = 1 * time.Minute
 
+	dh.lockDevice.Lock()
+	dh.isReadIndicationRoutineActive = true
+	dh.lockDevice.Unlock()
+
 Loop:
 	for {
 		select {
@@ -392,6 +402,7 @@
 	}
 	// Close the send stream
 	_ = indications.CloseSend() // Ok to ignore error, as we stopping the readIndication anyway
+
 	return nil
 }
 
@@ -1804,7 +1815,16 @@
 		}
 		go dh.cleanupDeviceResources(ctx)
 
-		dh.stopIndications <- true
+		dh.lockDevice.RLock()
+		// Stop the read indication only if it the routine is active
+		// The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
+		// Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
+		// on next execution of the readIndication routine.
+		if dh.isReadIndicationRoutineActive {
+			dh.stopIndications <- true
+		}
+		dh.lockDevice.RUnlock()
+
 		dh.transitionMap.Handle(ctx, DeviceInit)
 
 	}