[VOL-5450]: Onu adapter exception fix during metrics collection
Change-Id: I887fe6398606e4e75374e72ea065ff4270a83271
Signed-off-by: balaji.nagarajan <balaji.nagarajan@radisys.com>
diff --git a/internal/pkg/pmmgr/onu_metrics_manager.go b/internal/pkg/pmmgr/onu_metrics_manager.go
index b60e5f3..2736b65 100755
--- a/internal/pkg/pmmgr/onu_metrics_manager.go
+++ b/internal/pkg/pmmgr/onu_metrics_manager.go
@@ -34,6 +34,8 @@
cmn "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/common"
"github.com/opencord/voltha-protos/v5/go/extension"
"github.com/opencord/voltha-protos/v5/go/voltha"
+ codes "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// events of L2 PM FSM
@@ -776,9 +778,12 @@
opticalMetrics := make(map[string]float32)
// Get the ANI-G instance optical power attributes
requestedAttributes := me.AttributeValueMap{me.AniG_OpticalSignalLevel: 0, me.AniG_TransmitOpticalLevel: 0}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, me.AniGClassID, anigInstID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, me.AniGClassID, anigInstID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return nil, err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetMe failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return nil, err
@@ -865,9 +870,12 @@
var meAttributes me.AttributeValueMap
// Get the UNI-G instance optical power attributes
requestedAttributes := me.AttributeValueMap{me.UniG_AdministrativeState: 0}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, me.UniGClassID, unigInstID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, me.UniGClassID, unigInstID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return nil, err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "UNI-G failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return nil, err
@@ -930,9 +938,12 @@
me.PhysicalPathTerminationPointEthernetUni_ConfigurationInd: 0,
me.PhysicalPathTerminationPointEthernetUni_OperationalState: 0,
me.PhysicalPathTerminationPointEthernetUni_AdministrativeState: 0}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, me.PhysicalPathTerminationPointEthernetUniClassID,
+ meInstance, err := mm.GetMeInstance(ctx, me.PhysicalPathTerminationPointEthernetUniClassID,
pptpInstID, requestedAttributes, mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return nil, err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetMe failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return nil, err
@@ -1002,9 +1013,12 @@
requestedAttributes := me.AttributeValueMap{me.VirtualEthernetInterfacePoint_OperationalState: 0,
me.VirtualEthernetInterfacePoint_AdministrativeState: 0}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, me.VirtualEthernetInterfacePointClassID, veipInstID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, me.VirtualEthernetInterfacePointClassID, veipInstID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return nil, err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetMe failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return nil, err
@@ -1491,50 +1505,58 @@
mm.OnuMetricsManagerLock.RUnlock()
for _, n := range copyOfActiveL2Pms {
- var metricInfoSlice []*voltha.MetricInformation
-
- // mm.GroupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
- mm.OnuMetricsManagerLock.RLock()
- copyOfEntityIDs := make([]uint16, len(mm.GroupMetricMap[n].pmMEData.InstancesActive))
- _ = copy(copyOfEntityIDs, mm.GroupMetricMap[n].pmMEData.InstancesActive)
- mm.OnuMetricsManagerLock.RUnlock()
-
- switch n {
- case EthernetBridgeHistoryName:
- logger.Debugw(ctx, "state collect data - collecting data for EthernetFramePerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.deviceID})
- for _, entityID := range copyOfEntityIDs {
- if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, true, entityID); metricInfo != nil { // upstream
- metricInfoSlice = append(metricInfoSlice, metricInfo)
- }
- if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, false, entityID); metricInfo != nil { // downstream
- metricInfoSlice = append(metricInfoSlice, metricInfo)
- }
+ select {
+ case _, ok := <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ if !ok {
+ logger.Warnw(ctx, "Deleting the device, stopping l2PmFsmCollectData for the device ", log.Fields{"device-id": mm.deviceID})
+ return
}
- case EthernetUniHistoryName:
- logger.Debugw(ctx, "state collect data - collecting data for EthernetPerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.deviceID})
- for _, entityID := range copyOfEntityIDs {
- if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
- metricInfoSlice = append(metricInfoSlice, metricInfo)
- }
- }
-
- case FecHistoryName:
- for _, entityID := range copyOfEntityIDs {
- if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
- metricInfoSlice = append(metricInfoSlice, metricInfo)
- }
- }
- case GemPortHistoryName:
- for _, entityID := range copyOfEntityIDs {
- if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
- metricInfoSlice = append(metricInfoSlice, metricInfo)
- }
- }
-
default:
- logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.deviceID, "name": n})
+ var metricInfoSlice []*voltha.MetricInformation
+
+ // mm.GroupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
+ mm.OnuMetricsManagerLock.RLock()
+ copyOfEntityIDs := make([]uint16, len(mm.GroupMetricMap[n].pmMEData.InstancesActive))
+ _ = copy(copyOfEntityIDs, mm.GroupMetricMap[n].pmMEData.InstancesActive)
+ mm.OnuMetricsManagerLock.RUnlock()
+
+ switch n {
+ case EthernetBridgeHistoryName:
+ logger.Debugw(ctx, "state collect data - collecting data for EthernetFramePerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.deviceID})
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, true, entityID); metricInfo != nil { // upstream
+ metricInfoSlice = append(metricInfoSlice, metricInfo)
+ }
+ if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, false, entityID); metricInfo != nil { // downstream
+ metricInfoSlice = append(metricInfoSlice, metricInfo)
+ }
+ }
+ case EthernetUniHistoryName:
+ logger.Debugw(ctx, "state collect data - collecting data for EthernetPerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.deviceID})
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
+ metricInfoSlice = append(metricInfoSlice, metricInfo)
+ }
+ }
+
+ case FecHistoryName:
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
+ metricInfoSlice = append(metricInfoSlice, metricInfo)
+ }
+ }
+ case GemPortHistoryName:
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
+ metricInfoSlice = append(metricInfoSlice, metricInfo)
+ }
+ }
+
+ default:
+ logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.deviceID, "name": n})
+ }
+ mm.handleMetricsPublish(ctx, n, metricInfoSlice)
}
- mm.handleMetricsPublish(ctx, n, metricInfoSlice)
}
// Does not matter we send success or failure here.
// Those PMs that we failed to collect data will be attempted to collect again in the next PM collection cycle (assuming
@@ -2136,9 +2158,12 @@
}
// Insert "IntervalEndTime" as part of the requested attributes as we need this to compare the get responses when get request is multipart
requestedAttributes[me.EthernetFramePerformanceMonitoringHistoryDataUpstream_IntervalEndTime] = 0
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, classID, entityID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetME failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
pmFsm := mm.PAdaptFsm
if pmFsm != nil {
@@ -2159,6 +2184,9 @@
log.Fields{"device-id": mm.deviceID, "upstream": upstream, "entityID": entityID})
// The metrics will be empty in this case
return fmt.Errorf("timeout-during-l2-pm-collection-for-ethernet-bridge-history-%v", mm.deviceID)
+ case <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ logger.Warnw(ctx, "Deleting the device, stopping Ethernet Performance metrics collection for the device ", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("deleting the device, stopping Ethernet Performance metrics collection for the device %v", mm.deviceID)
}
// verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
@@ -2245,15 +2273,21 @@
if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
requestedAttributes["IntervalEndTime"] = 0
}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, classID, entityID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetMe failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return fmt.Errorf("GetME-failed-%s-%s", mm.deviceID, err)
}
if meInstance != nil {
select {
+ case <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ logger.Warnw(ctx, "Deleting the device, stopping EthernetUniHistory metrics collection for the device ", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("deleting the device, stopping EthernetUniHistory metrics collection for the device %v", mm.deviceID)
case meAttributes = <-mm.l2PmChan:
logger.Debugw(ctx, "received ethernet uni history data metrics",
log.Fields{"device-id": mm.deviceID, "entityID": entityID})
@@ -2348,15 +2382,23 @@
if _, ok := requestedAttributes[me.FecPerformanceMonitoringHistoryData_IntervalEndTime]; !ok {
requestedAttributes[me.FecPerformanceMonitoringHistoryData_IntervalEndTime] = 0
}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, classID, entityID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetMe failed, failure PM FSM!", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return fmt.Errorf("GetME-failed-%s-%s", mm.deviceID, err)
}
if meInstance != nil {
select {
+ case _, ok := <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ if !ok {
+ logger.Warnw(ctx, "Deleting the device, stopping FEC history metrics collection for the device ", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("deleting the device, stopping FEC history metrics collection for the device %v", mm.deviceID)
+ }
case meAttributes = <-mm.l2PmChan:
logger.Debugw(ctx, "received fec history data metrics",
log.Fields{"device-id": mm.deviceID, "entityID": entityID})
@@ -2408,6 +2450,35 @@
return nil
}
+func (mm *OnuMetricsManager) GetMeInstance(ctx context.Context, classID me.ClassID, entityID uint16, requestedAttributes me.AttributeValueMap,
+ timeout int, highPrio bool, rxChan chan cmn.Message, isExtendedOmci bool) (*me.ManagedEntity, error) {
+
+ select {
+ case <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ errMsg := "deleting the device, stopping GetMeInstance for the device " + mm.deviceID
+ logger.Warn(ctx, errMsg)
+ return nil, status.Error(codes.NotFound, errMsg)
+ default:
+ if mm.pOnuDeviceEntry.GetDevOmciCC() != nil {
+ meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ return meInstance, err
+ }
+ }
+ return nil, nil
+}
+
+// CheckMeInstanceStatusCode checked status code if not found returns true
+func CheckMeInstanceStatusCode(err error) bool {
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.NotFound {
+ return true
+ }
+ }
+ return false
+}
+
// nolint: gocyclo
func (mm *OnuMetricsManager) populateGemPortMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, gemPortHistData map[string]float32, intervalEndTime *int) error {
@@ -2415,9 +2486,12 @@
if _, ok := requestedAttributes[me.GemPortNetworkCtpPerformanceMonitoringHistoryData_IntervalEndTime]; !ok {
requestedAttributes[me.GemPortNetworkCtpPerformanceMonitoringHistoryData_IntervalEndTime] = 0
}
- meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetMe(ctx, classID, entityID, requestedAttributes,
+ meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
if err != nil {
+ if CheckMeInstanceStatusCode(err) {
+ return err // Device is being deleted, so we stop processing
+ }
logger.Errorw(ctx, "GetMe failed", log.Fields{"device-id": mm.deviceID})
_ = mm.PAdaptFsm.PFsm.Event(L2PmEventFailure)
return fmt.Errorf("GetME-failed-%s-%s", mm.deviceID, err)
@@ -2432,6 +2506,9 @@
log.Fields{"device-id": mm.deviceID, "entityID": entityID})
// The metrics will be empty in this case
return fmt.Errorf("timeout-during-l2-pm-collection-for-gemport-history-%v", mm.deviceID)
+ case <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ logger.Warnw(ctx, "Deleting the device, stopping GEM port history metrics collection for the device ", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("deleting the device, stopping GEM port history metrics collection for the device %v", mm.deviceID)
}
// verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {