[VOL-2801] : readIndication routine going in an infinite loop on delete device.
This is part of many fixes to come while debugging intermittent issues seen
where auth/dhcp/ping fail sometimes in various scenarios.
Change-Id: I7da8bd35d3f6e1694724c6b0ae26f68d95972a99
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 86eea67..04af054 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -308,12 +308,9 @@
// readIndications to read the indications from the OLT device
func (dh *DeviceHandler) readIndications(ctx context.Context) error {
defer logger.Debugw("indications-ended", log.Fields{"device-id": dh.device.Id})
- indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
+ indications, err := dh.startOpenOltIndicationStream(ctx)
if err != nil {
- return olterrors.NewErrCommunication("fail-to-read-indications", log.Fields{"device-id": dh.device.Id}, err)
- }
- if indications == nil {
- return olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil)
+ return err
}
/* get device state */
device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
@@ -336,11 +333,13 @@
indicationBackoff := backoff.NewExponentialBackOff()
indicationBackoff.MaxElapsedTime = 0
indicationBackoff.MaxInterval = 1 * time.Minute
+
+Loop:
for {
select {
case <-dh.stopIndications:
logger.Debugw("Stopping-collecting-indications-for-OLT", log.Fields{"deviceID:": dh.deviceID})
- break
+ break Loop
default:
indication, err := indications.Recv()
if err == io.EOF {
@@ -355,20 +354,27 @@
indicationBackoff.Reset()
}
time.Sleep(indicationBackoff.NextBackOff())
- indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
- if err != nil {
- return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
- }
- if indications == nil {
- return olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
+ if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
+ return err
}
continue
}
if err != nil {
+ logger.Errorw("Read indication error", log.Fields{"err": err})
if dh.adminState == "deleted" {
- logger.Debug("Device deleted stoping the read indication thread")
- break
+ logger.Debug("Device deleted stopping the read indication thread")
+ break Loop
}
+ // Close the stream, and re-initialize it
+ if err = indications.CloseSend(); err != nil {
+ // Ok to ignore here, because we landed here due to a problem on the stream
+ // In all probability, the closeSend call may fail
+ logger.Debugw("error closing send stream, error ignored", log.Fields{"err": err})
+ }
+ if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
+ return err
+ }
+ // once we re-initialized the indication stream, continue to read indications
continue
}
// Reset backoff if we have a successful receive
@@ -384,6 +390,22 @@
dh.handleIndication(ctx, indication)
}
}
+ // Close the send stream
+ _ = indications.CloseSend() // Ok to ignore error, as we stopping the readIndication anyway
+ return nil
+}
+
+func (dh *DeviceHandler) startOpenOltIndicationStream(ctx context.Context) (oop.Openolt_EnableIndicationClient, error) {
+
+ indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
+ if err != nil {
+ return nil, olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
+ }
+ if indications == nil {
+ return nil, olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
+ }
+
+ return indications, nil
}
// isIndicationAllowedDuringOltAdminDown returns true if the indication is allowed during OLT Admin down, else false