[VOL-2801] : readIndication routine going in an infinite loop on delete device.
This is part of many fixes to come while debugging intermittent issues seen
where auth/dhcp/ping fail sometimes in various scenarios.

Change-Id: I7da8bd35d3f6e1694724c6b0ae26f68d95972a99
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 86eea67..04af054 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -308,12 +308,9 @@
 // readIndications to read the indications from the OLT device
 func (dh *DeviceHandler) readIndications(ctx context.Context) error {
 	defer logger.Debugw("indications-ended", log.Fields{"device-id": dh.device.Id})
-	indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
+	indications, err := dh.startOpenOltIndicationStream(ctx)
 	if err != nil {
-		return olterrors.NewErrCommunication("fail-to-read-indications", log.Fields{"device-id": dh.device.Id}, err)
-	}
-	if indications == nil {
-		return olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil)
+		return err
 	}
 	/* get device state */
 	device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
@@ -336,11 +333,13 @@
 	indicationBackoff := backoff.NewExponentialBackOff()
 	indicationBackoff.MaxElapsedTime = 0
 	indicationBackoff.MaxInterval = 1 * time.Minute
+
+Loop:
 	for {
 		select {
 		case <-dh.stopIndications:
 			logger.Debugw("Stopping-collecting-indications-for-OLT", log.Fields{"deviceID:": dh.deviceID})
-			break
+			break Loop
 		default:
 			indication, err := indications.Recv()
 			if err == io.EOF {
@@ -355,20 +354,27 @@
 					indicationBackoff.Reset()
 				}
 				time.Sleep(indicationBackoff.NextBackOff())
-				indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
-				if err != nil {
-					return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
-				}
-				if indications == nil {
-					return olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
+				if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
+					return err
 				}
 				continue
 			}
 			if err != nil {
+				logger.Errorw("Read indication error", log.Fields{"err": err})
 				if dh.adminState == "deleted" {
-					logger.Debug("Device deleted stoping the read indication thread")
-					break
+					logger.Debug("Device deleted stopping the read indication thread")
+					break Loop
 				}
+				// Close the stream, and re-initialize it
+				if err = indications.CloseSend(); err != nil {
+					// Ok to ignore here, because we landed here due to a problem on the stream
+					// In all probability, the closeSend call may fail
+					logger.Debugw("error closing send stream, error ignored", log.Fields{"err": err})
+				}
+				if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
+					return err
+				}
+				// once we re-initialized the indication stream, continue to read indications
 				continue
 			}
 			// Reset backoff if we have a successful receive
@@ -384,6 +390,22 @@
 			dh.handleIndication(ctx, indication)
 		}
 	}
+	// Close the send stream
+	_ = indications.CloseSend() // Ok to ignore error, as we stopping the readIndication anyway
+	return nil
+}
+
+func (dh *DeviceHandler) startOpenOltIndicationStream(ctx context.Context) (oop.Openolt_EnableIndicationClient, error) {
+
+	indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
+	if err != nil {
+		return nil, olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
+	}
+	if indications == nil {
+		return nil, olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
+	}
+
+	return indications, nil
 }
 
 // isIndicationAllowedDuringOltAdminDown returns true if the indication is allowed during OLT Admin down, else false