[VOL-4699] Fix panic in PM module in openonu-go adapter during Device Delete in Memory leak tests

Change-Id: I1baaa044ff311048d9ee3f6a5e368cdafe7220bc
diff --git a/internal/pkg/pmmgr/onu_metrics_manager.go b/internal/pkg/pmmgr/onu_metrics_manager.go
index 2271864..25befe2 100755
--- a/internal/pkg/pmmgr/onu_metrics_manager.go
+++ b/internal/pkg/pmmgr/onu_metrics_manager.go
@@ -67,6 +67,7 @@
 	GroupMetricEnabled               = true    // This is READONLY and cannot be changed from VOLTHA NBI
 	DefaultFrequencyOverrideEnabled  = true    // This is READONLY and cannot be changed from VOLTHA NBI
 	FrequencyGranularity             = 5       // The frequency (in seconds) has to be multiple of 5. This setting cannot changed later.
+	MaxTimeForPmFsmShutDown          = 120     // in seconds
 )
 
 // constants for ethernet frame extended pm collection
@@ -311,6 +312,9 @@
 	StopTicks            chan bool
 	tickGenerationActive bool
 
+	deviceDeletionInProgress  bool
+	GarbageCollectionComplete chan bool
+
 	NextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
 
 	OnuMetricsManagerLock sync.RWMutex
@@ -360,6 +364,9 @@
 	metricsManager.ethernetFrameExtendedPmUpStreamMEByEntityID = make(map[uint16]*me.ManagedEntity)
 	metricsManager.ethernetFrameExtendedPmDownStreamMEByEntityID = make(map[uint16]*me.ManagedEntity)
 
+	// make this a buffered channel so that the sender is not blocked for any reason if there is no listener
+	metricsManager.GarbageCollectionComplete = make(chan bool, 1)
+
 	if dh.GetPmConfigs() == nil { // dh.GetPmConfigs() is NOT nil if adapter comes back from a restart. We should NOT go back to defaults in this case
 		metricsManager.initializeAllGroupMetrics()
 	}
@@ -1280,6 +1287,10 @@
 // ** L2 PM FSM Handlers start **
 
 func (mm *OnuMetricsManager) l2PMFsmStarting(ctx context.Context, e *fsm.Event) {
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return
+	}
 
 	// Loop through all the group metrics
 	// If it is a L2 PM Interval metric and it is enabled, then if it is not in the
@@ -1328,8 +1339,17 @@
 }
 
 func (mm *OnuMetricsManager) l2PMFsmSyncTime(ctx context.Context, e *fsm.Event) {
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return
+	}
 	// Sync time with the ONU to establish 15min boundary for PM collection.
 	if err := mm.syncTime(ctx); err != nil {
+		// device could be deleted while waiting on sync time response
+		if mm.GetdeviceDeletionInProgress() {
+			logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+			return
+		}
 		go func() {
 			time.Sleep(SyncTimeRetryInterval * time.Second) // retry to sync time after this timeout
 			// This will result in FSM attempting to sync time again
@@ -1361,9 +1381,18 @@
 		_ = mm.clearPmGroupData(ctx) // ignore error
 	}
 
+	if mm.GetdeviceDeletionInProgress() {
+		mm.pDeviceHandler = nil
+		mm.pOnuDeviceEntry = nil
+		mm.GarbageCollectionComplete <- true
+	}
 }
 func (mm *OnuMetricsManager) l2PMFsmIdle(ctx context.Context, e *fsm.Event) {
 	logger.Debugw(ctx, "Enter state idle", log.Fields{"device-id": mm.deviceID})
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return
+	}
 
 	mm.OnuMetricsManagerLock.RLock()
 	numOfPmToDelete := len(mm.l2PmToDelete)
@@ -1389,6 +1418,10 @@
 
 func (mm *OnuMetricsManager) l2PmFsmCollectData(ctx context.Context, e *fsm.Event) {
 	logger.Debugw(ctx, "state collect data", log.Fields{"device-id": mm.deviceID})
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return
+	}
 	// Copy the activeL2Pms for which we want to collect the metrics since activeL2Pms can change dynamically
 	mm.OnuMetricsManagerLock.RLock()
 	copyOfActiveL2Pms := make([]string, len(mm.activeL2Pms))
@@ -1453,6 +1486,10 @@
 
 // nolint: gocyclo
 func (mm *OnuMetricsManager) l2PmFsmCreatePM(ctx context.Context, e *fsm.Event) error {
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return nil
+	}
 	// Copy the l2PmToAdd for which we want to collect the metrics since l2PmToAdd can change dynamically
 	mm.OnuMetricsManagerLock.RLock()
 	copyOfL2PmToAdd := make([]string, len(mm.l2PmToAdd))
@@ -1494,6 +1531,10 @@
 							_ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
 							break inner1
 						}
+						if mm.GetdeviceDeletionInProgress() {
+							logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+							return nil
+						}
 					}
 					if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
 						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1523,6 +1564,10 @@
 							_ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
 							break inner2
 						}
+						if mm.GetdeviceDeletionInProgress() {
+							logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+							return nil
+						}
 					}
 					if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
 						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1550,6 +1595,10 @@
 						_ = mm.updatePmData(ctx, n, anigInstID, cPmAdded) // TODO: ignore error for now
 						break inner3
 					}
+					if mm.GetdeviceDeletionInProgress() {
+						logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+						return nil
+					}
 				}
 				if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
 					_ = mm.updatePmData(ctx, n, anigInstID, cPmRemoved) // TODO: ignore error for now
@@ -1591,6 +1640,10 @@
 						_ = mm.updatePmData(ctx, n, v, cPmAdded) // TODO: ignore error for now
 						break inner4
 					}
+					if mm.GetdeviceDeletionInProgress() {
+						logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+						return nil
+					}
 				}
 				if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
 					_ = mm.updatePmData(ctx, n, v, cPmRemoved) // TODO: ignore error for now
@@ -1641,6 +1694,10 @@
 
 // nolint: gocyclo
 func (mm *OnuMetricsManager) l2PmFsmDeletePM(ctx context.Context, e *fsm.Event) error {
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return nil
+	}
 	// Copy the l2PmToDelete for which we want to collect the metrics since l2PmToDelete can change dynamically
 	mm.OnuMetricsManagerLock.RLock()
 	copyOfL2PmToDelete := make([]string, len(mm.l2PmToDelete))
@@ -1693,6 +1750,10 @@
 						}
 						_ = mm.updatePmData(ctx, n, entityID, cPmRemove) // TODO: ignore error for now
 						if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
+							if mm.GetdeviceDeletionInProgress() {
+								logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+								return nil
+							}
 							atLeastOneDeleteFailure = true
 						} else {
 							_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1725,6 +1786,10 @@
 							mm.deviceID, err))
 					}
 					if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
+						if mm.GetdeviceDeletionInProgress() {
+							logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+							return nil
+						}
 						atLeastOneDeleteFailure = true
 					} else {
 						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1750,6 +1815,10 @@
 							mm.deviceID, err))
 					}
 					if resp := mm.waitForResponseOrTimeout(ctx, false, entityID, "FecPerformanceMonitoringHistoryData"); !resp {
+						if mm.GetdeviceDeletionInProgress() {
+							logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+							return nil
+						}
 						atLeastOneDeleteFailure = true
 					} else {
 						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1775,6 +1844,10 @@
 							mm.deviceID, err))
 					}
 					if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
+						if mm.GetdeviceDeletionInProgress() {
+							logger.Debugw(ctx, "device deleted, no more pm processing", log.Fields{"deviceID": mm.deviceID})
+							return nil
+						}
 						atLeastOneDeleteFailure = true
 					} else {
 						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
@@ -1828,6 +1901,16 @@
 
 // syncTime synchronizes time with the ONU to establish a 15 min boundary for PM collection and reporting.
 func (mm *OnuMetricsManager) syncTime(ctx context.Context) error {
+	if mm.GetdeviceDeletionInProgress() {
+		logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+		return nil
+	}
+
+	if !mm.GetOmciProcessingStatus() {
+		logger.Errorw(ctx, "not sending sync time, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+		return fmt.Errorf("omci-resp-processor-not-running")
+	}
+
 	if err := mm.pOnuDeviceEntry.GetDevOmciCC().SendSyncTime(ctx, mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan); err != nil {
 		logger.Errorw(ctx, "cannot send sync time request", log.Fields{"device-id": mm.deviceID})
 		return err
@@ -2527,6 +2610,10 @@
 
 func (mm *OnuMetricsManager) waitForResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassName string) bool {
 	logger.Debugw(ctx, "waitForResponseOrTimeout", log.Fields{"create": create, "instID": instID, "meClassName": meClassName})
+	if !mm.GetOmciProcessingStatus() {
+		logger.Errorw(ctx, "not waiting for omci resp, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+		return false
+	}
 	select {
 	case resp := <-mm.l2PmCreateOrDeleteResponseChan:
 		logger.Debugw(ctx, "received l2 pm me response",
@@ -3033,6 +3120,10 @@
 
 func (mm *OnuMetricsManager) waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassID me.ClassID, upstream bool) (bool, error) {
 	logger.Debugw(ctx, "wait-for-ethernet-frame-create-or-delete-response-or-timeout", log.Fields{"create": create, "instID": instID, "meClassID": meClassID})
+	if !mm.GetOmciProcessingStatus() || mm.GetdeviceDeletionInProgress() {
+		logger.Errorw(ctx, "not waiting for omci resp, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+		return false, fmt.Errorf("omci-processor-shutdown")
+	}
 	select {
 	case resp := <-mm.extendedPMMeResponseChan:
 		logger.Debugw(ctx, "received-extended-pm-me-response",
@@ -3193,6 +3284,10 @@
 
 func (mm *OnuMetricsManager) waitForResetResponseOrTimeout(ctx context.Context, instID uint16, meClassName string) bool {
 	logger.Debugw(ctx, "wait-for-ethernet-frame-reset-counters-response-or-timeout", log.Fields{"instID": instID, "meClassName": meClassName})
+	if !mm.GetOmciProcessingStatus() {
+		logger.Errorw(ctx, "not waiting for omci resp, because the omci resp processor is shutdown due to device down/delete", log.Fields{"device-id": mm.deviceID})
+		return false
+	}
 	select {
 	case resp := <-mm.extendedPMMeResponseChan:
 		logger.Debugw(ctx, "received-extended-pm-me-reset-response",
@@ -3866,9 +3961,21 @@
 	return controlBlock
 }
 
+// GetdeviceDeletionInProgress gets the deviceDeletionInProgress flag
+func (mm *OnuMetricsManager) GetdeviceDeletionInProgress() bool {
+	mm.OnuMetricsManagerLock.RLock()
+	defer mm.OnuMetricsManagerLock.RUnlock()
+	return mm.deviceDeletionInProgress
+}
+
+// SetdeviceDeletionInProgress sets the deviceDeletionInProgress flag
+func (mm *OnuMetricsManager) SetdeviceDeletionInProgress(deleted bool) {
+	mm.OnuMetricsManagerLock.Lock()
+	defer mm.OnuMetricsManagerLock.Unlock()
+	mm.deviceDeletionInProgress = true
+}
+
 // PrepareForGarbageCollection - remove references to prepare for garbage collection
 func (mm *OnuMetricsManager) PrepareForGarbageCollection(ctx context.Context, aDeviceID string) {
-	logger.Debugw(ctx, "prepare for garbage collection", log.Fields{"device-id": aDeviceID})
-	mm.pDeviceHandler = nil
-	mm.pOnuDeviceEntry = nil
+	logger.Debugw(ctx, "prepare for garbage collection - no action, garbage collection done when PM FSM is stopped", log.Fields{"device-id": aDeviceID})
 }