[VOL-2801] : Following OLT reboot, the readIndication receive stream would have
already stopped. Sending a stop signal again, would cause subsequent readIndication
invocation to immediately stop. We need to first check if the readIndication is
running before sending a stop signal
Change-Id: I1c8940d426bbc715660d59db0d618049d7b86d31
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 04af054..7304b9a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -89,14 +89,15 @@
eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- discOnus sync.Map
- onus sync.Map
- portStats *OpenOltStatisticsMgr
- metrics *pmmetrics.PmMetrics
- stopCollector chan bool
- stopHeartbeatCheck chan bool
- activePorts sync.Map
- stopIndications chan bool
+ discOnus sync.Map
+ onus sync.Map
+ portStats *OpenOltStatisticsMgr
+ metrics *pmmetrics.PmMetrics
+ stopCollector chan bool
+ stopHeartbeatCheck chan bool
+ activePorts sync.Map
+ stopIndications chan bool
+ isReadIndicationRoutineActive bool
// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
// subscriber basis for the number of pending flow removes. This data is used
@@ -308,6 +309,11 @@
// readIndications to read the indications from the OLT device
func (dh *DeviceHandler) readIndications(ctx context.Context) error {
defer logger.Debugw("indications-ended", log.Fields{"device-id": dh.device.Id})
+ defer func() {
+ dh.lockDevice.Lock()
+ dh.isReadIndicationRoutineActive = false
+ dh.lockDevice.Unlock()
+ }()
indications, err := dh.startOpenOltIndicationStream(ctx)
if err != nil {
return err
@@ -334,6 +340,10 @@
indicationBackoff.MaxElapsedTime = 0
indicationBackoff.MaxInterval = 1 * time.Minute
+ dh.lockDevice.Lock()
+ dh.isReadIndicationRoutineActive = true
+ dh.lockDevice.Unlock()
+
Loop:
for {
select {
@@ -392,6 +402,7 @@
}
// Close the send stream
_ = indications.CloseSend() // Ok to ignore error, as we stopping the readIndication anyway
+
return nil
}
@@ -1804,7 +1815,16 @@
}
go dh.cleanupDeviceResources(ctx)
- dh.stopIndications <- true
+ dh.lockDevice.RLock()
+ // Stop the read indication only if it the routine is active
+ // The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
+ // Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
+ // on next execution of the readIndication routine.
+ if dh.isReadIndicationRoutineActive {
+ dh.stopIndications <- true
+ }
+ dh.lockDevice.RUnlock()
+
dh.transitionMap.Handle(ctx, DeviceInit)
}