VOL-3888: Provide resiliency for L2 PM Data
Change-Id: I38b6e5a25612b829e16feeaf6543a91cd63545d8
diff --git a/internal/pkg/onuadaptercore/onu_metrics_manager.go b/internal/pkg/onuadaptercore/onu_metrics_manager.go
index 747ba8a..4a1de70 100644
--- a/internal/pkg/onuadaptercore/onu_metrics_manager.go
+++ b/internal/pkg/onuadaptercore/onu_metrics_manager.go
@@ -19,10 +19,13 @@
import (
"context"
+ "encoding/json"
"fmt"
"github.com/looplab/fsm"
"github.com/opencord/omci-lib-go"
me "github.com/opencord/omci-lib-go/generated"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"sync"
@@ -77,6 +80,7 @@
// UniStatusGroupMetrics are supported UNI status names
var UniStatusGroupMetrics = map[string]voltha.PmConfig_PmType{
"uni_port_no": voltha.PmConfig_CONTEXT,
+ "entity_id": voltha.PmConfig_CONTEXT,
"ethernet_type": voltha.PmConfig_GAUGE,
"oper_status": voltha.PmConfig_GAUGE,
"uni_admin_state": voltha.PmConfig_GAUGE,
@@ -208,11 +212,26 @@
GemPortHistoryFrequency = L2PmCollectionInterval
)
+// KV Store related constants
+const (
+ cPmKvStorePrefix = "%s/openonu/pm-data/%s" // <some-base-path>/openonu/pm-data/<onu-device-id>
+ cPmAdd = "add"
+ cPmAdded = "added"
+ cPmRemove = "remove"
+ cPmRemoved = "removed"
+)
+
// Defines the type for generic metric population function
type groupMetricPopulateFunc func(context.Context, me.ClassID, uint16, me.AttributeValueMap, me.AttributeValueMap, map[string]float32, *int) error
// *** Classical L2 PM Counters end ***
+type pmMEData struct {
+ InstancesActive []uint16 `json:"instances_active"` // list of active ME instance IDs for the group
+ InstancesToDelete []uint16 `json:"instances_to_delete"` // list of ME instance IDs marked for deletion for the group
+ InstancesToAdd []uint16 `json:"instances_to_add"` // list of ME instance IDs marked for addition for the group
+}
+
type groupMetric struct {
groupName string
enabled bool
@@ -221,8 +240,7 @@
nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
isL2PMCounter bool // true for only L2 PM counters
collectAttempts uint32 // number of attempts to collect L2 PM data
- createRetryAttempts uint32 // number of attempts to try creating the L2 PM ME
- deleteRetryAttempts uint32 // number of attempts to try deleting the L2 PM ME
+ pmMEData *pmMEData
}
type standaloneMetric struct {
@@ -246,10 +264,6 @@
l2PmToDelete []string // list of L2 PMs to delete
l2PmToAdd []string // list of L2 PM to add
- gemPortNCTPPerfHistInstToAdd []uint16
- gemPortNCTPPerfHistInstToDelete []uint16
- gemPortNCTPPerfHistInstActive []uint16
-
groupMetricMap map[string]*groupMetric
standaloneMetricMap map[string]*standaloneMetric
@@ -260,13 +274,17 @@
nextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
onuMetricsManagerLock sync.RWMutex
+
+ pmKvStore *db.Backend
}
// newonuMetricsManager returns a new instance of the newonuMetricsManager
-// Note that none of the context stored internally in onuMetricsManager is backed up on KV store for resiliency.
-// Metric collection is not a critical operation that needs support for resiliency. On adapter restart, some context
-// could be lost (except for Device.PmConfigs which is backed up the rw-core on KV store). An example of information
-// that is lost on adapter restart is nextCollectionInterval time.
+// The metrics manager module is responsible for configuration and management of individual and group metrics.
+// Currently all the metrics are managed as a group which fall into two categories - L2 PM and "all others"
+// The L2 PM counters have a fixed 15min interval for PM collection while all other group counters have
+// the collection interval configurable.
+// The global PM config is part of the voltha.Device struct and is backed up on KV store (by rw-core).
+// This module also implements resiliency for L2 PM ME instances that are active/pending-delete/pending-add.
func newonuMetricsManager(ctx context.Context, dh *deviceHandler) *onuMetricsManager {
var metricsManager onuMetricsManager
@@ -299,6 +317,15 @@
// initialize the next metric collection intervals.
metricsManager.initializeMetricCollectionTime(ctx)
+
+ baseKvStorePath := fmt.Sprintf(cPmKvStorePrefix, dh.pOpenOnuAc.cm.Backend.PathPrefix, dh.deviceID)
+ metricsManager.pmKvStore = dh.setBackend(ctx, baseKvStorePath)
+ if metricsManager.pmKvStore == nil {
+ logger.Errorw(ctx, "Can't initialize pmKvStore - no backend connection to PM module",
+ log.Fields{"device-id": dh.deviceID, "service": baseKvStorePath})
+ return nil
+ }
+
logger.Info(ctx, "init-onuMetricsManager completed", log.Fields{"device-id": dh.deviceID})
return &metricsManager
}
@@ -728,14 +755,17 @@
// do nothing
}
}
- var entityID uint32
if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
- entityID = uint32(val.(uint16))
+ entityID := val.(uint16)
+ unigMetrics["entity_id"] = float32(entityID)
+ // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
+ for _, uni := range mm.pDeviceHandler.uniEntityMap {
+ if uni.entityID == entityID {
+ unigMetrics["uni_port_no"] = float32(uni.portNo)
+ }
+ }
}
- // TODO: Rlock needed for reading uniEntityMap?
- if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
- unigMetrics["uni_port_no"] = float32(uniPort.portNo)
- }
+
// create slice of metrics given that there could be more than one UNI-G instance
metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: unigMetrics}
metricInfoSlice = append(metricInfoSlice, &metricInfo)
@@ -783,13 +813,15 @@
}
}
}
- var entityID uint32
if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
- entityID = uint32(val.(uint16))
- }
- // TODO: Rlock needed for reading uniEntityMap?
- if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
- pptpMetrics["uni_port_no"] = float32(uniPort.portNo)
+ entityID := val.(uint16)
+ pptpMetrics["entity_id"] = float32(entityID)
+ // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
+ for _, uni := range mm.pDeviceHandler.uniEntityMap {
+ if uni.entityID == entityID {
+ pptpMetrics["uni_port_no"] = float32(uni.portNo)
+ }
+ }
}
// create slice of metrics given that there could be more than one PPTP instance and
@@ -835,13 +867,15 @@
}
}
- var entityID uint32
if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
- entityID = uint32(meAttributes["ManagedEntityId"].(uint16))
- }
- // TODO: Rlock needed for reading uniEntityMap?
- if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
- veipMetrics["uni_port_no"] = float32(uniPort.portNo)
+ entityID := val.(uint16)
+ veipMetrics["entity_id"] = float32(entityID)
+ // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
+ for _, uni := range mm.pDeviceHandler.uniEntityMap {
+ if uni.entityID == entityID {
+ veipMetrics["uni_port_no"] = float32(uni.portNo)
+ }
+ }
}
// create slice of metrics given that there could be more than one VEIP instance
@@ -1035,6 +1069,14 @@
// ** L2 PM FSM Handlers start **
func (mm *onuMetricsManager) l2PMFsmStarting(ctx context.Context, e *fsm.Event) {
+ // restore data from KV store
+ if err := mm.restorePmData(ctx); err != nil {
+ logger.Errorw(ctx, "error restoring pm data", log.Fields{"err": err})
+ // we continue given that it does not effect the actual services for the ONU,
+ // but there may be some negative effect on PM collection (there may be some mismatch in
+ // the actual PM config and what is present on the device).
+ }
+
// Loop through all the group metrics
// If it is a L2 PM Interval metric and it is enabled, then if it is not in the
// list of active L2 PM list then mark it for creation
@@ -1109,6 +1151,12 @@
mm.l2PmToAdd = nil
mm.l2PmToDelete = nil
mm.onuMetricsManagerLock.Unlock()
+ // If the FSM was stopped, then clear PM data from KV store
+ // The FSM is stopped when ONU goes down. It is time to clear its data from store
+ if e.Event == l2PmEventStop {
+ _ = mm.clearPmGroupData(ctx) // ignore error
+ }
+
}
func (mm *onuMetricsManager) l2PMFsmIdle(ctx context.Context, e *fsm.Event) {
logger.Debugw(ctx, "Enter state idle", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
@@ -1145,12 +1193,17 @@
for _, n := range copyOfActiveL2Pms {
var metricInfoSlice []*voltha.MetricInformation
+
+ // mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
+ mm.onuMetricsManagerLock.RLock()
+ copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
+ _ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
+ mm.onuMetricsManagerLock.RUnlock()
+
switch n {
case EthernetBridgeHistoryName:
logger.Debugw(ctx, "state collect data - collecting data for EthernetFramePerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
- for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
- // Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
- entityID := macBridgePortAniEID + uniPort.entityID
+ for _, entityID := range copyOfEntityIDs {
if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, true, entityID); metricInfo != nil { // upstream
metricInfoSlice = append(metricInfoSlice, metricInfo)
}
@@ -1160,30 +1213,21 @@
}
case EthernetUniHistoryName:
logger.Debugw(ctx, "state collect data - collecting data for EthernetPerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
- for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
- if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
- // Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
- entityID := uniPort.entityID
- if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
- metricInfoSlice = append(metricInfoSlice, metricInfo)
- }
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
+ metricInfoSlice = append(metricInfoSlice, metricInfo)
}
}
+
case FecHistoryName:
- // get the ANI-G instance IDs as FecHistory is tied to ANI-G instance id
- anigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID)
- for _, anigInstID := range anigInstKeys {
- if metricInfo := mm.collectFecHistoryData(ctx, anigInstID); metricInfo != nil { // upstream
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
metricInfoSlice = append(metricInfoSlice, metricInfo)
}
}
case GemPortHistoryName:
- mm.onuMetricsManagerLock.RLock()
- copyOfActiveGemPortInstIDs := make([]uint16, len(mm.gemPortNCTPPerfHistInstActive))
- _ = copy(copyOfActiveGemPortInstIDs, mm.gemPortNCTPPerfHistInstActive)
- mm.onuMetricsManagerLock.RUnlock()
- for _, v := range copyOfActiveGemPortInstIDs {
- if metricInfo := mm.collectGemHistoryData(ctx, v); metricInfo != nil { // upstream
+ for _, entityID := range copyOfEntityIDs {
+ if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
metricInfoSlice = append(metricInfoSlice, metricInfo)
}
}
@@ -1203,6 +1247,7 @@
}()
}
+// nolint: gocyclo
func (mm *onuMetricsManager) l2PmFsmCreatePM(ctx context.Context, e *fsm.Event) {
// Copy the l2PmToAdd for which we want to collect the metrics since l2PmToAdd can change dynamically
mm.onuMetricsManagerLock.RLock()
@@ -1213,92 +1258,131 @@
logger.Debugw(ctx, "state create pm - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": copyOfL2PmToAdd})
for _, n := range copyOfL2PmToAdd {
resp := false
+ atLeastOneSuccess := false // flag indicates if at least one ME instance of the PM was successfully created.
+ cnt := 0
switch n {
case EthernetBridgeHistoryName:
boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
boolForDirection = append(boolForDirection, true, false)
// Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
for _, direction := range boolForDirection {
- inner1:
for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
// Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
entityID := macBridgePortAniEID + uniPort.entityID
- mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
- ctx, ConstDefaultOmciTimeout, true, direction, true, mm.pAdaptFsm.commChan, entityID)
- if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
- break inner1
+ _ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
+ inner1:
+ // retry L2PmCreateAttempts times to create the instance of PM
+ for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+ mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
+ ctx, ConstDefaultOmciTimeout, true, direction, true, mm.pAdaptFsm.commChan, entityID)
+ if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetFramePerformanceMonitoringHistoryData"); resp {
+ atLeastOneSuccess = true
+ _ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
+ break inner1
+ }
+ }
+ if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
}
}
}
case EthernetUniHistoryName:
-
- inner2:
for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
- // Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
+ // Attach the EthernetPerformanceMonitoringHistoryData ME to PPTP port instance
entityID := uniPort.entityID
- mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
- ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, entityID)
- if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
- break inner2
+ _ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
+ inner2:
+ // retry L2PmCreateAttempts times to create the instance of PM
+ for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+ mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
+ ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, entityID)
+ if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetPerformanceMonitoringHistoryData"); resp {
+ atLeastOneSuccess = true
+ _ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
+ break inner2
+ }
+ }
+ if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
}
}
}
case FecHistoryName:
-
- inner3:
for _, anigInstID := range mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID) {
- // Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
+ // Attach the FecPerformanceMonitoringHistoryData ME to the ANI-G ME instance
mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, anigInstID)
- if resp = mm.waitForResponseOrTimeout(ctx, true, anigInstID, "FecPerformanceMonitoringHistoryData"); !resp {
- break inner3
+ _ = mm.updatePmData(ctx, n, anigInstID, cPmAdd) // TODO: ignore error for now
+ inner3:
+ // retry L2PmCreateAttempts times to create the instance of PM
+ for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+ if resp = mm.waitForResponseOrTimeout(ctx, true, anigInstID, "FecPerformanceMonitoringHistoryData"); resp {
+ atLeastOneSuccess = true
+ _ = mm.updatePmData(ctx, n, anigInstID, cPmAdded) // TODO: ignore error for now
+ break inner3
+ }
+ }
+ if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, anigInstID, cPmRemoved) // TODO: ignore error for now
}
}
case GemPortHistoryName:
mm.onuMetricsManagerLock.RLock()
- copyOfGemPortInstIDsToAdd := make([]uint16, len(mm.gemPortNCTPPerfHistInstToAdd))
- _ = copy(copyOfGemPortInstIDsToAdd, mm.gemPortNCTPPerfHistInstToAdd)
+ copyOfGemPortInstIDsToAdd := make([]uint16, len(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd))
+ _ = copy(copyOfGemPortInstIDsToAdd, mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd)
mm.onuMetricsManagerLock.RUnlock()
- inner4:
+
+ if len(copyOfGemPortInstIDsToAdd) == 0 {
+ // If there are no gemport history MEs to be created, just skip further processing
+ // Otherwise down below (after 'switch' case handling) we assume the ME creation failed because resp and atLeastOneSuccess flag are false.
+ // Normally there are no GemPortHistory MEs to create at start up. They come in only after provisioning service on the ONU.
+ mm.onuMetricsManagerLock.Lock()
+ mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
+ mm.onuMetricsManagerLock.Unlock()
+ continue
+ }
+
for _, v := range copyOfGemPortInstIDsToAdd {
mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, v)
- if resp = mm.waitForResponseOrTimeout(ctx, true, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
- break inner4
+ _ = mm.updatePmData(ctx, n, v, cPmAdd) // TODO: ignore error for now
+ inner4:
+ // retry L2PmCreateAttempts times to create the instance of PM
+ for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+ if resp = mm.waitForResponseOrTimeout(ctx, true, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); resp {
+ atLeastOneSuccess = true
+ _ = mm.updatePmData(ctx, n, v, cPmAdded) // TODO: ignore error for now
+ break inner4
+ }
}
- mm.onuMetricsManagerLock.Lock()
- mm.gemPortNCTPPerfHistInstActive = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstActive, v)
- mm.onuMetricsManagerLock.Unlock()
+ if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, v, cPmRemoved) // TODO: ignore error for now
+ }
}
default:
logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
}
- // On success Update the local list maintained for active PMs and PMs to add
- if resp {
+ // On success of at least one instance of the PM for a given ME, update the local list maintained for active PMs and PMs to add
+ if atLeastOneSuccess {
mm.onuMetricsManagerLock.Lock()
mm.activeL2Pms = mm.appendIfMissingString(mm.activeL2Pms, n)
mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
mm.onuMetricsManagerLock.Unlock()
} else {
- // If createRetryAttempts exceeds L2PmCreateAttempts then locally disable the PM
+ // If we are here then no instance of the PM of the given ME were created successfully, so locally disable the PM
// and also remove it from l2PmToAdd slice so that we do not try to create the PM ME anymore
mm.onuMetricsManagerLock.Lock()
- mm.groupMetricMap[n].createRetryAttempts++
- if mm.groupMetricMap[n].createRetryAttempts > L2PmCreateAttempts {
- logger.Debugw(ctx, "exceeded-max-add-retry-attempts--disabling-group", log.Fields{"groupName": n})
- mm.groupMetricMap[n].enabled = false
- mm.groupMetricMap[n].createRetryAttempts = 0 // reset counter
- mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
+ logger.Debugw(ctx, "exceeded-max-add-retry-attempts--disabling-group", log.Fields{"groupName": n})
+ mm.groupMetricMap[n].enabled = false
+ mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
- }
logger.Warnw(ctx, "state create pm - failed to create pm",
log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
- "createRetryAttempts": mm.groupMetricMap[n].createRetryAttempts,
- "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
+ "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
mm.onuMetricsManagerLock.Unlock()
}
}
@@ -1313,6 +1397,7 @@
}()
}
+// nolint: gocyclo
func (mm *onuMetricsManager) l2PmFsmDeletePM(ctx context.Context, e *fsm.Event) {
// Copy the l2PmToDelete for which we want to collect the metrics since l2PmToDelete can change dynamically
mm.onuMetricsManagerLock.RLock()
@@ -1323,90 +1408,124 @@
logger.Debugw(ctx, "state delete pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-delete": mm.l2PmToDelete})
for _, n := range copyOfL2PmToDelete {
resp := false
+ cnt := 0
+ atLeastOneDeleteFailure := false
+
+ // mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
+ mm.onuMetricsManagerLock.RLock()
+ copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
+ _ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
+ mm.onuMetricsManagerLock.RUnlock()
+
+ if len(copyOfEntityIDs) == 0 {
+ // if there are no enityIDs to remove for the PM ME just clear the PM name entry from cache and continue
+ mm.onuMetricsManagerLock.Lock()
+ mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
+ mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
+ logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
+ mm.onuMetricsManagerLock.Unlock()
+ continue
+ }
+ logger.Debugw(ctx, "entities to delete", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n, "entityIDs": copyOfEntityIDs})
switch n {
case EthernetBridgeHistoryName:
boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
boolForDirection = append(boolForDirection, true, false)
// Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
for _, direction := range boolForDirection {
- inner1:
- for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
- // Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
- entityID := macBridgePortAniEID + uniPort.entityID
- mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
- ctx, ConstDefaultOmciTimeout, true, direction, false, mm.pAdaptFsm.commChan, entityID)
- if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
- break inner1
+ for _, entityID := range copyOfEntityIDs {
+ inner1:
+ // retry L2PmDeleteAttempts times to delete the instance of PM
+ for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
+ mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
+ ctx, ConstDefaultOmciTimeout, true, direction, false, mm.pAdaptFsm.commChan, entityID)
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemove) // TODO: ignore error for now
+ if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
+ atLeastOneDeleteFailure = true
+ } else {
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+ break inner1
+ }
+ }
+ if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
}
}
}
case EthernetUniHistoryName:
-
- inner2:
- for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
- if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
- // Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
- entityID := uniPort.entityID
+ for _, entityID := range copyOfEntityIDs {
+ inner2:
+ // retry L2PmDeleteAttempts times to delete the instance of PM
+ for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
+ atLeastOneDeleteFailure = true
+ } else {
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
break inner2
}
}
+ if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+ }
}
case FecHistoryName:
-
- inner3:
- for _, anigInstID := range mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID) {
- // Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
- mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
- ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, anigInstID)
- if resp := mm.waitForResponseOrTimeout(ctx, false, anigInstID, "FecPerformanceMonitoringHistoryData"); !resp {
- break inner3
+ for _, entityID := range copyOfEntityIDs {
+ inner3:
+ // retry L2PmDeleteAttempts times to delete the instance of PM
+ for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
+ mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
+ ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
+ if resp := mm.waitForResponseOrTimeout(ctx, false, entityID, "FecPerformanceMonitoringHistoryData"); !resp {
+ atLeastOneDeleteFailure = true
+ } else {
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+ break inner3
+ }
+ }
+ if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
}
}
case GemPortHistoryName:
- mm.onuMetricsManagerLock.RLock()
- copyOfGemPortInstIDsToDelete := make([]uint16, len(mm.gemPortNCTPPerfHistInstToDelete))
- _ = copy(copyOfGemPortInstIDsToDelete, mm.gemPortNCTPPerfHistInstToDelete)
- mm.onuMetricsManagerLock.RUnlock()
- inner4:
- for _, v := range copyOfGemPortInstIDsToDelete {
- mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
- ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, v)
- if resp = mm.waitForResponseOrTimeout(ctx, false, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
- break inner4
+ for _, entityID := range copyOfEntityIDs {
+ inner4:
+ // retry L2PmDeleteAttempts times to delete the instance of PM
+ for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
+ mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
+ ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
+ if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
+ atLeastOneDeleteFailure = true
+ } else {
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+ break inner4
+ }
}
- mm.onuMetricsManagerLock.Lock()
- mm.gemPortNCTPPerfHistInstActive = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstActive, v)
- mm.onuMetricsManagerLock.Unlock()
+ if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+ _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+ }
}
-
default:
logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
}
- // On success Update the local list maintained for active PMs and PMs to delete
- if resp {
+ // If we could not completely clean up the PM ME then just give up.
+ if atLeastOneDeleteFailure {
+ logger.Warnw(ctx, "state delete pm - failed to delete at least one instance of the PM ME",
+ log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
+ "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
+ mm.onuMetricsManagerLock.Lock()
+ logger.Debugw(ctx, "exceeded-max-delete-retry-attempts--disabling-group", log.Fields{"groupName": n})
+ mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
+ mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
+ mm.groupMetricMap[n].enabled = false
+ mm.onuMetricsManagerLock.Unlock()
+ } else { // success case
mm.onuMetricsManagerLock.Lock()
mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
mm.onuMetricsManagerLock.Unlock()
- } else {
- logger.Warnw(ctx, "state delete pm - failed to delete pm",
- log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
- "deleteRetryAttempts": mm.groupMetricMap[n].deleteRetryAttempts,
- "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
- // If deleteRetryAttempts exceeds L2PmDeleteAttempts then just give up
- mm.onuMetricsManagerLock.Lock()
- mm.groupMetricMap[n].deleteRetryAttempts++
- if mm.groupMetricMap[n].deleteRetryAttempts > L2PmDeleteAttempts {
- logger.Debugw(ctx, "exceeded-max-delete-retry-attempts--disabling-group", log.Fields{"groupName": n})
- mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
- mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
- mm.groupMetricMap[n].deleteRetryAttempts = 0 // reset counter
- }
- mm.onuMetricsManagerLock.Unlock()
}
}
logger.Debugw(ctx, "state delete pm - done", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
@@ -1608,6 +1727,10 @@
// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
if _, ok := ethPMHistData[k]; !ok {
switch k {
+ case "entity_id":
+ if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+ ethPMHistData[k] = float32(val.(uint16))
+ }
case "drop_events":
if val, ok := meAttributes["DropEvents"]; ok && val != nil {
ethPMHistData[k] = float32(val.(uint32))
@@ -1700,6 +1823,10 @@
// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
if _, ok := ethPMUniHistData[k]; !ok {
switch k {
+ case "entity_id":
+ if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+ ethPMUniHistData[k] = float32(val.(uint16))
+ }
case "fcs_errors":
if val, ok := meAttributes["FcsErrors"]; ok && val != nil {
ethPMUniHistData[k] = float32(val.(uint32))
@@ -1792,6 +1919,10 @@
// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
if _, ok := fecHistData[k]; !ok {
switch k {
+ case "entity_id":
+ if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+ fecHistData[k] = float32(val.(uint16))
+ }
case "corrected_bytes":
if val, ok := meAttributes["CorrectedBytes"]; ok && val != nil {
fecHistData[k] = float32(val.(uint32))
@@ -1848,6 +1979,10 @@
// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
if _, ok := gemPortHistData[k]; !ok {
switch k {
+ case "entity_id":
+ if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+ gemPortHistData[k] = float32(val.(uint16))
+ }
case "transmitted_gem_frames":
if val, ok := meAttributes["TransmittedGemFrames"]; ok && val != nil {
gemPortHistData[k] = float32(val.(uint32))
@@ -2082,7 +2217,12 @@
func (mm *onuMetricsManager) initializeGroupMetric(grpMtrcs map[string]voltha.PmConfig_PmType, grpName string, grpEnabled bool, grpFreq uint32) {
var pmConfigSlice []*voltha.PmConfig
for k, v := range grpMtrcs {
- pmConfigSlice = append(pmConfigSlice, &voltha.PmConfig{Name: k, Type: v})
+ pmConfigSlice = append(pmConfigSlice,
+ &voltha.PmConfig{
+ Name: k,
+ Type: v,
+ Enabled: grpEnabled && mm.pDeviceHandler.pOpenOnuAc.metricsEnabled,
+ SampleFreq: grpFreq})
}
groupMetric := voltha.PmGroupConfig{
GroupName: grpName,
@@ -2214,9 +2354,9 @@
mm.onuMetricsManagerLock.Lock()
defer mm.onuMetricsManagerLock.Unlock()
// mark the instance for addition
- mm.gemPortNCTPPerfHistInstToAdd = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToAdd, gemPortNTPInstID)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
- mm.gemPortNCTPPerfHistInstToDelete = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToDelete, gemPortNTPInstID)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, GemPortHistoryName)
// We do not need to remove from l2PmToDelete slice as we could have Add and Delete of
@@ -2228,9 +2368,9 @@
func (mm *onuMetricsManager) RemoveGemPortForPerfMonitoring(gemPortNTPInstID uint16) {
mm.onuMetricsManagerLock.Lock()
defer mm.onuMetricsManagerLock.Unlock()
- mm.gemPortNCTPPerfHistInstToDelete = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToDelete, gemPortNTPInstID)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
- mm.gemPortNCTPPerfHistInstToAdd = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToAdd, gemPortNTPInstID)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, GemPortHistoryName)
// We do not need to remove from l2PmToAdd slice as we could have Add and Delete of
@@ -2245,12 +2385,12 @@
// NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
for _, v := range gemPortInstIDs {
// mark the instance for addition
- mm.gemPortNCTPPerfHistInstToAdd = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToAdd, v)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
- mm.gemPortNCTPPerfHistInstToDelete = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToDelete, v)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
}
logger.Debugw(ctx, "updateGemPortNTPInstanceToAddForPerfMonitoring",
- log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.gemPortNCTPPerfHistInstToAdd, "gemToDel": mm.gemPortNCTPPerfHistInstToDelete})
+ log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
}
}
@@ -2259,13 +2399,185 @@
gemPortInstIDs := mm.pDeviceHandler.pOnuTP.GetAllBidirectionalGemPortIDsForOnu()
// NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
for _, v := range gemPortInstIDs {
- mm.gemPortNCTPPerfHistInstToDelete = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToDelete, v)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
- mm.gemPortNCTPPerfHistInstToAdd = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToAdd, v)
+ mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
}
}
logger.Debugw(ctx, "updateGemPortNTPInstanceToDeleteForPerfMonitoring",
- log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.gemPortNCTPPerfHistInstToAdd, "gemToDel": mm.gemPortNCTPPerfHistInstToDelete})
+ log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
+}
+
+// restorePmData restores any PM data available on the KV store to local cache
+func (mm *onuMetricsManager) restorePmData(ctx context.Context) error {
+ logger.Debugw(ctx, "restorePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ if mm.pmKvStore == nil {
+ logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+ }
+ var errorsList []error
+ for groupName, group := range mm.groupMetricMap {
+ group.pmMEData = &pmMEData{}
+ Value, err := mm.pmKvStore.Get(ctx, groupName)
+ if err == nil {
+ if Value != nil {
+ logger.Debugw(ctx, "PM data read",
+ log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
+ tmpBytes, _ := kvstore.ToByte(Value.Value)
+
+ if err = json.Unmarshal(tmpBytes, &group.pmMEData); err != nil {
+ logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
+ errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-unmarshal-PM-data-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
+ continue
+ }
+ logger.Debugw(ctx, "restorePmData - success", log.Fields{"pmData": group.pmMEData, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+ } else {
+ logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+ continue
+ }
+ } else {
+ logger.Errorw(ctx, "restorePmData - fail", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "err": err})
+ errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-read-from-KVstore-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
+ continue
+ }
+ }
+ if len(errorsList) > 0 {
+ return fmt.Errorf("errors-restoring-pm-data-for-one-or-more-groups--errors:%v", errorsList)
+ }
+ logger.Debugw(ctx, "restorePmData - complete success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return nil
+}
+
+// getPmData gets pmMEData from cache. Since we have write through cache implementation for pmMEData,
+// the data must be available in cache.
+// Note, it is expected that caller of this function manages the required synchronization (like using locks etc.).
+func (mm *onuMetricsManager) getPmData(ctx context.Context, groupName string) (*pmMEData, error) {
+ if grp, ok := mm.groupMetricMap[groupName]; ok {
+ return grp.pmMEData, nil
+ }
+ // Data not in cache, try to fetch from kv store.
+ data := &pmMEData{}
+ if mm.pmKvStore == nil {
+ logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return data, fmt.Errorf("pmKvStore not set. device-id - %s", mm.pDeviceHandler.deviceID)
+ }
+ Value, err := mm.pmKvStore.Get(ctx, groupName)
+ if err == nil {
+ if Value != nil {
+ logger.Debugw(ctx, "PM data read",
+ log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
+ tmpBytes, _ := kvstore.ToByte(Value.Value)
+
+ if err = json.Unmarshal(tmpBytes, data); err != nil {
+ logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
+ return data, err
+ }
+ logger.Debugw(ctx, "PM data", log.Fields{"pmData": data, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+ } else {
+ logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+ return data, err
+ }
+ } else {
+ logger.Errorw(ctx, "unable to read from KVstore", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return data, err
+ }
+
+ return data, nil
+}
+
+// updatePmData update pmMEData to store. It is write through cache, i.e., write to cache first and then update store
+func (mm *onuMetricsManager) updatePmData(ctx context.Context, groupName string, meInstanceID uint16, pmAction string) error {
+ logger.Debugw(ctx, "updatePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "entityID": meInstanceID, "pmAction": pmAction})
+ mm.onuMetricsManagerLock.Lock()
+ defer mm.onuMetricsManagerLock.Unlock()
+
+ if mm.pmKvStore == nil {
+ logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+ }
+
+ pmMEData, err := mm.getPmData(ctx, groupName)
+ if err != nil || pmMEData == nil {
+ // error already logged in called function.
+ return err
+ }
+ switch pmAction {
+ case cPmAdd:
+ pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(pmMEData.InstancesToAdd, meInstanceID)
+ pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
+ pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
+ case cPmAdded:
+ pmMEData.InstancesActive = mm.appendIfMissingUnt16(pmMEData.InstancesActive, meInstanceID)
+ pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
+ pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
+ case cPmRemove:
+ pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(pmMEData.InstancesToDelete, meInstanceID)
+ pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
+ pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
+ case cPmRemoved:
+ pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
+ pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
+ pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
+ default:
+ logger.Errorw(ctx, "unknown pm action", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pmAction": pmAction, "groupName": groupName})
+ return fmt.Errorf(fmt.Sprintf("unknown-pm-action-deviceid-%s-groupName-%s-pmaction-%s", mm.pDeviceHandler.deviceID, groupName, pmAction))
+ }
+ // write through cache
+ mm.groupMetricMap[groupName].pmMEData = pmMEData
+
+ Value, err := json.Marshal(*pmMEData)
+ if err != nil {
+ logger.Errorw(ctx, "unable to marshal PM data", log.Fields{"groupName": groupName, "pmAction": pmAction, "pmData": *pmMEData, "err": err})
+ return err
+ }
+ // Update back to kv store
+ if err = mm.pmKvStore.Put(ctx, groupName, Value); err != nil {
+ logger.Errorw(ctx, "unable to put PM data to kv store", log.Fields{"groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction, "err": err})
+ return err
+ }
+ logger.Debugw(ctx, "updatePmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction})
+
+ return nil
+}
+
+// clearPmGroupData cleans PM Group data from store
+func (mm *onuMetricsManager) clearPmGroupData(ctx context.Context) error {
+ mm.onuMetricsManagerLock.Lock()
+ defer mm.onuMetricsManagerLock.Unlock()
+ logger.Debugw(ctx, "clearPmGroupData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ if mm.pmKvStore == nil {
+ logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+ }
+
+ for n := range mm.groupMetricMap {
+ if err := mm.pmKvStore.Delete(ctx, n); err != nil {
+ logger.Errorw(ctx, "clearPmGroupData - fail", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "groupName": n, "err": err})
+ // do not abort this procedure. continue to delete next group.
+ } else {
+ logger.Debugw(ctx, "clearPmGroupData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": n})
+ }
+ }
+
+ return nil
+}
+
+// clearAllPmData clears all PM data associated with the device from KV store
+func (mm *onuMetricsManager) clearAllPmData(ctx context.Context) error {
+ mm.onuMetricsManagerLock.Lock()
+ defer mm.onuMetricsManagerLock.Unlock()
+ logger.Debugw(ctx, "clearAllPmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ if mm.pmKvStore == nil {
+ logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+ }
+
+ if err := mm.pmKvStore.Delete(ctx, ""); err != nil {
+ logger.Errorw(ctx, "unable to delete PM data from kv store", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "err": err})
+ return err
+ }
+ logger.Debugw(ctx, "clearAllPmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+ return nil
}
func (mm *onuMetricsManager) appendIfMissingString(slice []string, n string) []string {
diff --git a/internal/pkg/onuadaptercore/openonu.go b/internal/pkg/onuadaptercore/openonu.go
index 357219b..eb1aaba 100644
--- a/internal/pkg/onuadaptercore/openonu.go
+++ b/internal/pkg/onuadaptercore/openonu.go
@@ -378,11 +378,23 @@
func (oo *OpenONUAC) Delete_device(ctx context.Context, device *voltha.Device) error {
logger.Infow(ctx, "delete-device", log.Fields{"device-id": device.Id, "SerialNumber": device.SerialNumber})
if handler := oo.getDeviceHandler(ctx, device.Id, false); handler != nil {
- err := handler.deleteDevicePersistencyData(ctx)
+ var errorsList []error
+ if err := handler.deleteDevicePersistencyData(ctx); err != nil {
+ errorsList = append(errorsList, err)
+ }
handler.stopCollector <- true // stop the metric collector routine
+ if handler.pOnuMetricsMgr != nil {
+ if err := handler.pOnuMetricsMgr.clearAllPmData(ctx); err != nil {
+ errorsList = append(errorsList, err)
+ }
+ }
//don't leave any garbage - even in error case
oo.deleteDeviceHandlerToMap(handler)
- return err
+ if len(errorsList) > 0 {
+ logger.Errorw(ctx, "one-or-more-error-during-device-delete", log.Fields{"device-id": device.Id})
+ return fmt.Errorf("one-or-more-error-during-device-delete, errors:%v", errorsList)
+ }
+ return nil
}
logger.Warnw(ctx, "no handler found for device-deletion", log.Fields{"device-id": device.Id})
return fmt.Errorf(fmt.Sprintf("handler-not-found-%s", device.Id))