[VOL-4699] Fix panic in PM module in openonu-go adapter during Device Delete in Memory leak tests
Change-Id: I1baaa044ff311048d9ee3f6a5e368cdafe7220bc
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index fbf2ea7..6465a0c 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -4548,6 +4548,12 @@
}
if dh.pOnuMetricsMgr != nil {
dh.pOnuMetricsMgr.PrepareForGarbageCollection(ctx, aDeviceID)
+ select {
+ case <-dh.pOnuMetricsMgr.GarbageCollectionComplete:
+ logger.Debugw(ctx, "pm fsm shut down and garbage collection complete", log.Fields{"deviceID": aDeviceID})
+ case <-time.After(pmmgr.MaxTimeForPmFsmShutDown * time.Second):
+ logger.Errorw(ctx, "fsm did not shut down in time", log.Fields{"deviceID": aDeviceID})
+ }
}
if dh.pAlarmMgr != nil {
dh.pAlarmMgr.PrepareForGarbageCollection(ctx, aDeviceID)
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index 60d2f81..0069794 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -331,6 +331,9 @@
handler.deletionInProgress = true
handler.mutexDeletionInProgressFlag.Unlock()
+ // Setting the device deletion progress flag will cause the PM FSM to cleanup for GC after FSM moves to NULL state
+ handler.pOnuMetricsMgr.SetdeviceDeletionInProgress(true)
+
if err := handler.resetFsms(ctx, true); err != nil {
errorsList = append(errorsList, err)
}
diff --git a/internal/pkg/pmmgr/onu_metrics_manager.go b/internal/pkg/pmmgr/onu_metrics_manager.go
index 2271864..25befe2 100755
--- a/internal/pkg/pmmgr/onu_metrics_manager.go
+++ b/internal/pkg/pmmgr/onu_metrics_manager.go
@@ -67,6 +67,7 @@
GroupMetricEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
DefaultFrequencyOverrideEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
FrequencyGranularity = 5 // The frequency (in seconds) has to be multiple of 5. This setting cannot changed later.
+ MaxTimeForPmFsmShutDown = 120 // in seconds
)
// constants for ethernet frame extended pm collection
@@ -311,6 +312,9 @@
StopTicks chan bool
tickGenerationActive bool
+ deviceDeletionInProgress bool
+ GarbageCollectionComplete chan bool
+
NextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
OnuMetricsManagerLock sync.RWMutex
@@ -360,6 +364,9 @@
metricsManager.ethernetFrameExtendedPmUpStreamMEByEntityID = make(map[uint16]*me.ManagedEntity)
metricsManager.ethernetFrameExtendedPmDownStreamMEByEntityID = make(map[uint16]*me.ManagedEntity)
+ // make this a buffered channel so that the sender is not blocked for any reason if there is no listener
+ metricsManager.GarbageCollectionComplete = make(chan bool, 1)
+
if dh.GetPmConfigs() == nil { // dh.GetPmConfigs() is NOT nil if adapter comes back from a restart. We should NOT go back to defaults in this case
metricsManager.initializeAllGroupMetrics()
}
@@ -1280,6 +1287,10 @@
// ** L2 PM FSM Handlers start **
func (mm *OnuMetricsManager) l2PMFsmStarting(ctx context.Context, e *fsm.Event) {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return
+ }
// Loop through all the group metrics
// If it is a L2 PM Interval metric and it is enabled, then if it is not in the
@@ -1328,8 +1339,17 @@
}
func (mm *OnuMetricsManager) l2PMFsmSyncTime(ctx context.Context, e *fsm.Event) {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return
+ }
// Sync time with the ONU to establish 15min boundary for PM collection.
if err := mm.syncTime(ctx); err != nil {
+ // device could be deleted while waiting on sync time response
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return
+ }
go func() {
time.Sleep(SyncTimeRetryInterval * time.Second) // retry to sync time after this timeout
// This will result in FSM attempting to sync time again
@@ -1361,9 +1381,18 @@
_ = mm.clearPmGroupData(ctx) // ignore error
}
+ if mm.GetdeviceDeletionInProgress() {
+ mm.pDeviceHandler = nil
+ mm.pOnuDeviceEntry = nil
+ mm.GarbageCollectionComplete <- true
+ }
}
func (mm *OnuMetricsManager) l2PMFsmIdle(ctx context.Context, e *fsm.Event) {
logger.Debugw(ctx, "Enter state idle", log.Fields{"device-id": mm.deviceID})
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return
+ }
mm.OnuMetricsManagerLock.RLock()
numOfPmToDelete := len(mm.l2PmToDelete)
@@ -1389,6 +1418,10 @@
func (mm *OnuMetricsManager) l2PmFsmCollectData(ctx context.Context, e *fsm.Event) {
logger.Debugw(ctx, "state collect data", log.Fields{"device-id": mm.deviceID})
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return
+ }
// Copy the activeL2Pms for which we want to collect the metrics since activeL2Pms can change dynamically
mm.OnuMetricsManagerLock.RLock()
copyOfActiveL2Pms := make([]string, len(mm.activeL2Pms))
@@ -1453,6 +1486,10 @@
// nolint: gocyclo
func (mm *OnuMetricsManager) l2PmFsmCreatePM(ctx context.Context, e *fsm.Event) error {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return nil
+ }
// Copy the l2PmToAdd for which we want to collect the metrics since l2PmToAdd can change dynamically
mm.OnuMetricsManagerLock.RLock()
copyOfL2PmToAdd := make([]string, len(mm.l2PmToAdd))
@@ -1494,6 +1531,10 @@
_ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
break inner1
}
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
}
if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1523,6 +1564,10 @@
_ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
break inner2
}
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
}
if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1550,6 +1595,10 @@
_ = mm.updatePmData(ctx, n, anigInstID, cPmAdded) // TODO: ignore error for now
break inner3
}
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
}
if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
_ = mm.updatePmData(ctx, n, anigInstID, cPmRemoved) // TODO: ignore error for now
@@ -1591,6 +1640,10 @@
_ = mm.updatePmData(ctx, n, v, cPmAdded) // TODO: ignore error for now
break inner4
}
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
}
if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
_ = mm.updatePmData(ctx, n, v, cPmRemoved) // TODO: ignore error for now
@@ -1641,6 +1694,10 @@
// nolint: gocyclo
func (mm *OnuMetricsManager) l2PmFsmDeletePM(ctx context.Context, e *fsm.Event) error {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return nil
+ }
// Copy the l2PmToDelete for which we want to collect the metrics since l2PmToDelete can change dynamically
mm.OnuMetricsManagerLock.RLock()
copyOfL2PmToDelete := make([]string, len(mm.l2PmToDelete))
@@ -1693,6 +1750,10 @@
}
_ = mm.updatePmData(ctx, n, entityID, cPmRemove) // TODO: ignore error for now
if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
atLeastOneDeleteFailure = true
} else {
_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1725,6 +1786,10 @@
mm.deviceID, err))
}
if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
atLeastOneDeleteFailure = true
} else {
_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1750,6 +1815,10 @@
mm.deviceID, err))
}
if resp := mm.waitForResponseOrTimeout(ctx, false, entityID, "FecPerformanceMonitoringHistoryData"); !resp {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
atLeastOneDeleteFailure = true
} else {
_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1775,6 +1844,10 @@
mm.deviceID, err))
}
if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+ return nil
+ }
atLeastOneDeleteFailure = true
} else {
_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1828,6 +1901,16 @@
// syncTime synchronizes time with the ONU to establish a 15 min boundary for PM collection and reporting.
func (mm *OnuMetricsManager) syncTime(ctx context.Context) error {
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return nil
+ }
+
+ if !mm.GetOmciProcessingStatus() {
+ logger.Errorw(ctx, "not sending sync time, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("omci-resp-processor-not-running")
+ }
+
if err := mm.pOnuDeviceEntry.GetDevOmciCC().SendSyncTime(ctx, mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan); err != nil {
logger.Errorw(ctx, "cannot send sync time request", log.Fields{"device-id": mm.deviceID})
return err
@@ -2527,6 +2610,10 @@
func (mm *OnuMetricsManager) waitForResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassName string) bool {
logger.Debugw(ctx, "waitForResponseOrTimeout", log.Fields{"create": create, "instID": instID, "meClassName": meClassName})
+ if !mm.GetOmciProcessingStatus() {
+ logger.Errorw(ctx, "not waiting for omci resp, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+ return false
+ }
select {
case resp := <-mm.l2PmCreateOrDeleteResponseChan:
logger.Debugw(ctx, "received l2 pm me response",
@@ -3033,6 +3120,10 @@
func (mm *OnuMetricsManager) waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassID me.ClassID, upstream bool) (bool, error) {
logger.Debugw(ctx, "wait-for-ethernet-frame-create-or-delete-response-or-timeout", log.Fields{"create": create, "instID": instID, "meClassID": meClassID})
+ if !mm.GetOmciProcessingStatus() || mm.GetdeviceDeletionInProgress() {
+ logger.Errorw(ctx, "not waiting for omci resp, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+ return false, fmt.Errorf("omci-processor-shutdown")
+ }
select {
case resp := <-mm.extendedPMMeResponseChan:
logger.Debugw(ctx, "received-extended-pm-me-response",
@@ -3193,6 +3284,10 @@
func (mm *OnuMetricsManager) waitForResetResponseOrTimeout(ctx context.Context, instID uint16, meClassName string) bool {
logger.Debugw(ctx, "wait-for-ethernet-frame-reset-counters-response-or-timeout", log.Fields{"instID": instID, "meClassName": meClassName})
+ if !mm.GetOmciProcessingStatus() {
+ logger.Errorw(ctx, "not waiting for omci resp, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+ return false
+ }
select {
case resp := <-mm.extendedPMMeResponseChan:
logger.Debugw(ctx, "received-extended-pm-me-reset-response",
@@ -3866,9 +3961,21 @@
return controlBlock
}
+// GetdeviceDeletionInProgress gets the deviceDeletionInProgress flag
+func (mm *OnuMetricsManager) GetdeviceDeletionInProgress() bool {
+ mm.OnuMetricsManagerLock.RLock()
+ defer mm.OnuMetricsManagerLock.RUnlock()
+ return mm.deviceDeletionInProgress
+}
+
+// SetdeviceDeletionInProgress sets the deviceDeletionInProgress flag
+func (mm *OnuMetricsManager) SetdeviceDeletionInProgress(deleted bool) {
+ mm.OnuMetricsManagerLock.Lock()
+ defer mm.OnuMetricsManagerLock.Unlock()
+ mm.deviceDeletionInProgress = true
+}
+
// PrepareForGarbageCollection - remove references to prepare for garbage collection
func (mm *OnuMetricsManager) PrepareForGarbageCollection(ctx context.Context, aDeviceID string) {
- logger.Debugw(ctx, "prepare for garbage collection", log.Fields{"device-id": aDeviceID})
- mm.pDeviceHandler = nil
- mm.pOnuDeviceEntry = nil
+ logger.Debugw(ctx, "prepare for garbage collection - no action, garbage collection done when PM FSM is stopped", log.Fields{"device-id": aDeviceID})
}