VOL-3888: Provide resiliency for L2 PM Data

Change-Id: I38b6e5a25612b829e16feeaf6543a91cd63545d8
diff --git a/internal/pkg/onuadaptercore/onu_metrics_manager.go b/internal/pkg/onuadaptercore/onu_metrics_manager.go
index 747ba8a..4a1de70 100644
--- a/internal/pkg/onuadaptercore/onu_metrics_manager.go
+++ b/internal/pkg/onuadaptercore/onu_metrics_manager.go
@@ -19,10 +19,13 @@
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"github.com/looplab/fsm"
 	"github.com/opencord/omci-lib-go"
 	me "github.com/opencord/omci-lib-go/generated"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"sync"
@@ -77,6 +80,7 @@
 // UniStatusGroupMetrics are supported UNI status names
 var UniStatusGroupMetrics = map[string]voltha.PmConfig_PmType{
 	"uni_port_no":     voltha.PmConfig_CONTEXT,
+	"entity_id":       voltha.PmConfig_CONTEXT,
 	"ethernet_type":   voltha.PmConfig_GAUGE,
 	"oper_status":     voltha.PmConfig_GAUGE,
 	"uni_admin_state": voltha.PmConfig_GAUGE,
@@ -208,11 +212,26 @@
 	GemPortHistoryFrequency = L2PmCollectionInterval
 )
 
+// KV Store related constants
+const (
+	cPmKvStorePrefix = "%s/openonu/pm-data/%s" // <some-base-path>/openonu/pm-data/<onu-device-id>
+	cPmAdd           = "add"
+	cPmAdded         = "added"
+	cPmRemove        = "remove"
+	cPmRemoved       = "removed"
+)
+
 // Defines the type for generic metric population function
 type groupMetricPopulateFunc func(context.Context, me.ClassID, uint16, me.AttributeValueMap, me.AttributeValueMap, map[string]float32, *int) error
 
 // *** Classical L2 PM Counters end   ***
 
+type pmMEData struct {
+	InstancesActive   []uint16 `json:"instances_active"`    // list of active ME instance IDs for the group
+	InstancesToDelete []uint16 `json:"instances_to_delete"` // list of ME instance IDs marked for deletion for the group
+	InstancesToAdd    []uint16 `json:"instances_to_add"`    // list of ME instance IDs marked for addition for the group
+}
+
 type groupMetric struct {
 	groupName              string
 	enabled                bool
@@ -221,8 +240,7 @@
 	nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
 	isL2PMCounter          bool      // true for only L2 PM counters
 	collectAttempts        uint32    // number of attempts to collect L2 PM data
-	createRetryAttempts    uint32    // number of attempts to try creating the L2 PM ME
-	deleteRetryAttempts    uint32    // number of attempts to try deleting the L2 PM ME
+	pmMEData               *pmMEData
 }
 
 type standaloneMetric struct {
@@ -246,10 +264,6 @@
 	l2PmToDelete []string // list of L2 PMs to delete
 	l2PmToAdd    []string // list of L2 PM to add
 
-	gemPortNCTPPerfHistInstToAdd    []uint16
-	gemPortNCTPPerfHistInstToDelete []uint16
-	gemPortNCTPPerfHistInstActive   []uint16
-
 	groupMetricMap      map[string]*groupMetric
 	standaloneMetricMap map[string]*standaloneMetric
 
@@ -260,13 +274,17 @@
 	nextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
 
 	onuMetricsManagerLock sync.RWMutex
+
+	pmKvStore *db.Backend
 }
 
 // newonuMetricsManager returns a new instance of the newonuMetricsManager
-// Note that none of the context stored internally in onuMetricsManager is backed up on KV store for resiliency.
-// Metric collection is not a critical operation that needs support for resiliency. On adapter restart, some context
-// could be lost (except for Device.PmConfigs which is backed up the rw-core on KV store). An example of information
-// that is lost on adapter restart is nextCollectionInterval time.
+// The metrics manager module is responsible for configuration and management of individual and group metrics.
+// Currently all the metrics are managed as a group which fall into two categories - L2 PM and "all others"
+// The L2 PM counters have a fixed 15min interval for PM collection while all other group counters have
+// the collection interval configurable.
+// The global PM config is part of the voltha.Device struct and is backed up on KV store (by rw-core).
+// This module also implements resiliency for L2 PM ME instances that are active/pending-delete/pending-add.
 func newonuMetricsManager(ctx context.Context, dh *deviceHandler) *onuMetricsManager {
 
 	var metricsManager onuMetricsManager
@@ -299,6 +317,15 @@
 
 	// initialize the next metric collection intervals.
 	metricsManager.initializeMetricCollectionTime(ctx)
+
+	baseKvStorePath := fmt.Sprintf(cPmKvStorePrefix, dh.pOpenOnuAc.cm.Backend.PathPrefix, dh.deviceID)
+	metricsManager.pmKvStore = dh.setBackend(ctx, baseKvStorePath)
+	if metricsManager.pmKvStore == nil {
+		logger.Errorw(ctx, "Can't initialize pmKvStore - no backend connection to PM module",
+			log.Fields{"device-id": dh.deviceID, "service": baseKvStorePath})
+		return nil
+	}
+
 	logger.Info(ctx, "init-onuMetricsManager completed", log.Fields{"device-id": dh.deviceID})
 	return &metricsManager
 }
@@ -728,14 +755,17 @@
 					// do nothing
 				}
 			}
-			var entityID uint32
 			if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
-				entityID = uint32(val.(uint16))
+				entityID := val.(uint16)
+				unigMetrics["entity_id"] = float32(entityID)
+				// TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
+				for _, uni := range mm.pDeviceHandler.uniEntityMap {
+					if uni.entityID == entityID {
+						unigMetrics["uni_port_no"] = float32(uni.portNo)
+					}
+				}
 			}
-			// TODO: Rlock needed for reading uniEntityMap?
-			if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
-				unigMetrics["uni_port_no"] = float32(uniPort.portNo)
-			}
+
 			// create slice of metrics given that there could be more than one UNI-G instance
 			metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: unigMetrics}
 			metricInfoSlice = append(metricInfoSlice, &metricInfo)
@@ -783,13 +813,15 @@
 				}
 			}
 		}
-		var entityID uint32
 		if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
-			entityID = uint32(val.(uint16))
-		}
-		// TODO: Rlock needed for reading uniEntityMap?
-		if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
-			pptpMetrics["uni_port_no"] = float32(uniPort.portNo)
+			entityID := val.(uint16)
+			pptpMetrics["entity_id"] = float32(entityID)
+			// TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
+			for _, uni := range mm.pDeviceHandler.uniEntityMap {
+				if uni.entityID == entityID {
+					pptpMetrics["uni_port_no"] = float32(uni.portNo)
+				}
+			}
 		}
 
 		// create slice of metrics given that there could be more than one PPTP instance and
@@ -835,13 +867,15 @@
 			}
 		}
 
-		var entityID uint32
 		if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
-			entityID = uint32(meAttributes["ManagedEntityId"].(uint16))
-		}
-		// TODO: Rlock needed for reading uniEntityMap?
-		if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
-			veipMetrics["uni_port_no"] = float32(uniPort.portNo)
+			entityID := val.(uint16)
+			veipMetrics["entity_id"] = float32(entityID)
+			// TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
+			for _, uni := range mm.pDeviceHandler.uniEntityMap {
+				if uni.entityID == entityID {
+					veipMetrics["uni_port_no"] = float32(uni.portNo)
+				}
+			}
 		}
 
 		// create slice of metrics given that there could be more than one VEIP instance
@@ -1035,6 +1069,14 @@
 // ** L2 PM FSM Handlers start **
 
 func (mm *onuMetricsManager) l2PMFsmStarting(ctx context.Context, e *fsm.Event) {
+	// restore data from KV store
+	if err := mm.restorePmData(ctx); err != nil {
+		logger.Errorw(ctx, "error restoring pm data", log.Fields{"err": err})
+		// we continue given that it does not effect the actual services for the ONU,
+		// but there may be some negative effect on PM collection (there may be some mismatch in
+		// the actual PM config and what is present on the device).
+	}
+
 	// Loop through all the group metrics
 	// If it is a L2 PM Interval metric and it is enabled, then if it is not in the
 	// list of active L2 PM list then mark it for creation
@@ -1109,6 +1151,12 @@
 	mm.l2PmToAdd = nil
 	mm.l2PmToDelete = nil
 	mm.onuMetricsManagerLock.Unlock()
+	// If the FSM was stopped, then clear PM data from KV store
+	// The FSM is stopped when ONU goes down. It is time to clear its data from store
+	if e.Event == l2PmEventStop {
+		_ = mm.clearPmGroupData(ctx) // ignore error
+	}
+
 }
 func (mm *onuMetricsManager) l2PMFsmIdle(ctx context.Context, e *fsm.Event) {
 	logger.Debugw(ctx, "Enter state idle", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
@@ -1145,12 +1193,17 @@
 
 	for _, n := range copyOfActiveL2Pms {
 		var metricInfoSlice []*voltha.MetricInformation
+
+		// mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
+		mm.onuMetricsManagerLock.RLock()
+		copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
+		_ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
+		mm.onuMetricsManagerLock.RUnlock()
+
 		switch n {
 		case EthernetBridgeHistoryName:
 			logger.Debugw(ctx, "state collect data - collecting data for EthernetFramePerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
-			for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
-				// Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
-				entityID := macBridgePortAniEID + uniPort.entityID
+			for _, entityID := range copyOfEntityIDs {
 				if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, true, entityID); metricInfo != nil { // upstream
 					metricInfoSlice = append(metricInfoSlice, metricInfo)
 				}
@@ -1160,30 +1213,21 @@
 			}
 		case EthernetUniHistoryName:
 			logger.Debugw(ctx, "state collect data - collecting data for EthernetPerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
-			for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
-				if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
-					// Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
-					entityID := uniPort.entityID
-					if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
-						metricInfoSlice = append(metricInfoSlice, metricInfo)
-					}
+			for _, entityID := range copyOfEntityIDs {
+				if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
+					metricInfoSlice = append(metricInfoSlice, metricInfo)
 				}
 			}
+
 		case FecHistoryName:
-			// get the ANI-G instance IDs as FecHistory is tied to ANI-G instance id
-			anigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID)
-			for _, anigInstID := range anigInstKeys {
-				if metricInfo := mm.collectFecHistoryData(ctx, anigInstID); metricInfo != nil { // upstream
+			for _, entityID := range copyOfEntityIDs {
+				if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
 					metricInfoSlice = append(metricInfoSlice, metricInfo)
 				}
 			}
 		case GemPortHistoryName:
-			mm.onuMetricsManagerLock.RLock()
-			copyOfActiveGemPortInstIDs := make([]uint16, len(mm.gemPortNCTPPerfHistInstActive))
-			_ = copy(copyOfActiveGemPortInstIDs, mm.gemPortNCTPPerfHistInstActive)
-			mm.onuMetricsManagerLock.RUnlock()
-			for _, v := range copyOfActiveGemPortInstIDs {
-				if metricInfo := mm.collectGemHistoryData(ctx, v); metricInfo != nil { // upstream
+			for _, entityID := range copyOfEntityIDs {
+				if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
 					metricInfoSlice = append(metricInfoSlice, metricInfo)
 				}
 			}
@@ -1203,6 +1247,7 @@
 	}()
 }
 
+// nolint: gocyclo
 func (mm *onuMetricsManager) l2PmFsmCreatePM(ctx context.Context, e *fsm.Event) {
 	// Copy the l2PmToAdd for which we want to collect the metrics since l2PmToAdd can change dynamically
 	mm.onuMetricsManagerLock.RLock()
@@ -1213,92 +1258,131 @@
 	logger.Debugw(ctx, "state create pm - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": copyOfL2PmToAdd})
 	for _, n := range copyOfL2PmToAdd {
 		resp := false
+		atLeastOneSuccess := false // flag indicates if at least one ME instance of the PM was successfully created.
+		cnt := 0
 		switch n {
 		case EthernetBridgeHistoryName:
 			boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
 			boolForDirection = append(boolForDirection, true, false)
 			// Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
 			for _, direction := range boolForDirection {
-			inner1:
 				for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
 					// Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
 					entityID := macBridgePortAniEID + uniPort.entityID
-					mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
-						ctx, ConstDefaultOmciTimeout, true, direction, true, mm.pAdaptFsm.commChan, entityID)
-					if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
-						break inner1
+					_ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
+				inner1:
+					// retry L2PmCreateAttempts times to create the instance of PM
+					for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+						mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
+							ctx, ConstDefaultOmciTimeout, true, direction, true, mm.pAdaptFsm.commChan, entityID)
+						if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetFramePerformanceMonitoringHistoryData"); resp {
+							atLeastOneSuccess = true
+							_ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
+							break inner1
+						}
+					}
+					if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
 					}
 				}
 			}
 		case EthernetUniHistoryName:
-
-		inner2:
 			for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
 				if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
-					// Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
+					// Attach the EthernetPerformanceMonitoringHistoryData ME to PPTP port instance
 					entityID := uniPort.entityID
-					mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
-						ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, entityID)
-					if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
-						break inner2
+					_ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
+				inner2:
+					// retry L2PmCreateAttempts times to create the instance of PM
+					for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+						mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
+							ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, entityID)
+						if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetPerformanceMonitoringHistoryData"); resp {
+							atLeastOneSuccess = true
+							_ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
+							break inner2
+						}
+					}
+					if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
 					}
 				}
 			}
 		case FecHistoryName:
-
-		inner3:
 			for _, anigInstID := range mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID) {
-				// Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
+				// Attach the FecPerformanceMonitoringHistoryData ME to the ANI-G ME instance
 				mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
 					ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, anigInstID)
-				if resp = mm.waitForResponseOrTimeout(ctx, true, anigInstID, "FecPerformanceMonitoringHistoryData"); !resp {
-					break inner3
+				_ = mm.updatePmData(ctx, n, anigInstID, cPmAdd) // TODO: ignore error for now
+			inner3:
+				// retry L2PmCreateAttempts times to create the instance of PM
+				for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+					if resp = mm.waitForResponseOrTimeout(ctx, true, anigInstID, "FecPerformanceMonitoringHistoryData"); resp {
+						atLeastOneSuccess = true
+						_ = mm.updatePmData(ctx, n, anigInstID, cPmAdded) // TODO: ignore error for now
+						break inner3
+					}
+				}
+				if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+					_ = mm.updatePmData(ctx, n, anigInstID, cPmRemoved) // TODO: ignore error for now
 				}
 			}
 		case GemPortHistoryName:
 
 			mm.onuMetricsManagerLock.RLock()
-			copyOfGemPortInstIDsToAdd := make([]uint16, len(mm.gemPortNCTPPerfHistInstToAdd))
-			_ = copy(copyOfGemPortInstIDsToAdd, mm.gemPortNCTPPerfHistInstToAdd)
+			copyOfGemPortInstIDsToAdd := make([]uint16, len(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd))
+			_ = copy(copyOfGemPortInstIDsToAdd, mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd)
 			mm.onuMetricsManagerLock.RUnlock()
-		inner4:
+
+			if len(copyOfGemPortInstIDsToAdd) == 0 {
+				// If there are no gemport history MEs to be created, just skip further processing
+				// Otherwise down below (after 'switch' case handling) we assume the ME creation failed because resp and atLeastOneSuccess flag are false.
+				// Normally there are no GemPortHistory MEs to create at start up. They come in only after provisioning service on the ONU.
+				mm.onuMetricsManagerLock.Lock()
+				mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
+				mm.onuMetricsManagerLock.Unlock()
+				continue
+			}
+
 			for _, v := range copyOfGemPortInstIDsToAdd {
 				mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
 					ctx, ConstDefaultOmciTimeout, true, true, mm.pAdaptFsm.commChan, v)
-				if resp = mm.waitForResponseOrTimeout(ctx, true, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
-					break inner4
+				_ = mm.updatePmData(ctx, n, v, cPmAdd) // TODO: ignore error for now
+			inner4:
+				// retry L2PmCreateAttempts times to create the instance of PM
+				for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
+					if resp = mm.waitForResponseOrTimeout(ctx, true, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); resp {
+						atLeastOneSuccess = true
+						_ = mm.updatePmData(ctx, n, v, cPmAdded) // TODO: ignore error for now
+						break inner4
+					}
 				}
-				mm.onuMetricsManagerLock.Lock()
-				mm.gemPortNCTPPerfHistInstActive = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstActive, v)
-				mm.onuMetricsManagerLock.Unlock()
+				if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+					_ = mm.updatePmData(ctx, n, v, cPmRemoved) // TODO: ignore error for now
+				}
 			}
 
 		default:
 			logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
 		}
-		// On success Update the local list maintained for active PMs and PMs to add
-		if resp {
+		// On success of at least one instance of the PM for a given ME, update the local list maintained for active PMs and PMs to add
+		if atLeastOneSuccess {
 			mm.onuMetricsManagerLock.Lock()
 			mm.activeL2Pms = mm.appendIfMissingString(mm.activeL2Pms, n)
 			mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
 			logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
 			mm.onuMetricsManagerLock.Unlock()
 		} else {
-			// If createRetryAttempts exceeds L2PmCreateAttempts then locally disable the PM
+			// If we are here then no instance of the PM of the given ME were created successfully, so locally disable the PM
 			// and also remove it from l2PmToAdd slice so that we do not try to create the PM ME anymore
 			mm.onuMetricsManagerLock.Lock()
-			mm.groupMetricMap[n].createRetryAttempts++
-			if mm.groupMetricMap[n].createRetryAttempts > L2PmCreateAttempts {
-				logger.Debugw(ctx, "exceeded-max-add-retry-attempts--disabling-group", log.Fields{"groupName": n})
-				mm.groupMetricMap[n].enabled = false
-				mm.groupMetricMap[n].createRetryAttempts = 0 // reset counter
-				mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
+			logger.Debugw(ctx, "exceeded-max-add-retry-attempts--disabling-group", log.Fields{"groupName": n})
+			mm.groupMetricMap[n].enabled = false
+			mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
 
-			}
 			logger.Warnw(ctx, "state create pm - failed to create pm",
 				log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
-					"createRetryAttempts": mm.groupMetricMap[n].createRetryAttempts,
-					"active-l2-pms":       mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
+					"active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
 			mm.onuMetricsManagerLock.Unlock()
 		}
 	}
@@ -1313,6 +1397,7 @@
 	}()
 }
 
+// nolint: gocyclo
 func (mm *onuMetricsManager) l2PmFsmDeletePM(ctx context.Context, e *fsm.Event) {
 	// Copy the l2PmToDelete for which we want to collect the metrics since l2PmToDelete can change dynamically
 	mm.onuMetricsManagerLock.RLock()
@@ -1323,90 +1408,124 @@
 	logger.Debugw(ctx, "state delete pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-delete": mm.l2PmToDelete})
 	for _, n := range copyOfL2PmToDelete {
 		resp := false
+		cnt := 0
+		atLeastOneDeleteFailure := false
+
+		// mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
+		mm.onuMetricsManagerLock.RLock()
+		copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
+		_ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
+		mm.onuMetricsManagerLock.RUnlock()
+
+		if len(copyOfEntityIDs) == 0 {
+			// if there are no enityIDs to remove for the PM ME just clear the PM name entry from cache and continue
+			mm.onuMetricsManagerLock.Lock()
+			mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
+			mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
+			logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
+			mm.onuMetricsManagerLock.Unlock()
+			continue
+		}
+		logger.Debugw(ctx, "entities to delete", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n, "entityIDs": copyOfEntityIDs})
 		switch n {
 		case EthernetBridgeHistoryName:
 			boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
 			boolForDirection = append(boolForDirection, true, false)
 			// Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
 			for _, direction := range boolForDirection {
-			inner1:
-				for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
-					// Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
-					entityID := macBridgePortAniEID + uniPort.entityID
-					mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
-						ctx, ConstDefaultOmciTimeout, true, direction, false, mm.pAdaptFsm.commChan, entityID)
-					if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
-						break inner1
+				for _, entityID := range copyOfEntityIDs {
+				inner1:
+					// retry L2PmDeleteAttempts times to delete the instance of PM
+					for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
+						mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
+							ctx, ConstDefaultOmciTimeout, true, direction, false, mm.pAdaptFsm.commChan, entityID)
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemove) // TODO: ignore error for now
+						if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
+							atLeastOneDeleteFailure = true
+						} else {
+							_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+							break inner1
+						}
+					}
+					if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
 					}
 				}
 			}
 		case EthernetUniHistoryName:
-
-		inner2:
-			for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
-				if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
-					// Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
-					entityID := uniPort.entityID
+			for _, entityID := range copyOfEntityIDs {
+			inner2:
+				// retry L2PmDeleteAttempts times to delete the instance of PM
+				for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
 					mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
 						ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
 					if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
+						atLeastOneDeleteFailure = true
+					} else {
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
 						break inner2
 					}
 				}
+				if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+					_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+				}
 			}
 		case FecHistoryName:
-
-		inner3:
-			for _, anigInstID := range mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID) {
-				// Attach the EthernetPerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
-				mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
-					ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, anigInstID)
-				if resp := mm.waitForResponseOrTimeout(ctx, false, anigInstID, "FecPerformanceMonitoringHistoryData"); !resp {
-					break inner3
+			for _, entityID := range copyOfEntityIDs {
+			inner3:
+				// retry L2PmDeleteAttempts times to delete the instance of PM
+				for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
+					mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
+						ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
+					if resp := mm.waitForResponseOrTimeout(ctx, false, entityID, "FecPerformanceMonitoringHistoryData"); !resp {
+						atLeastOneDeleteFailure = true
+					} else {
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+						break inner3
+					}
+				}
+				if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+					_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
 				}
 			}
 		case GemPortHistoryName:
-			mm.onuMetricsManagerLock.RLock()
-			copyOfGemPortInstIDsToDelete := make([]uint16, len(mm.gemPortNCTPPerfHistInstToDelete))
-			_ = copy(copyOfGemPortInstIDsToDelete, mm.gemPortNCTPPerfHistInstToDelete)
-			mm.onuMetricsManagerLock.RUnlock()
-		inner4:
-			for _, v := range copyOfGemPortInstIDsToDelete {
-				mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
-					ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, v)
-				if resp = mm.waitForResponseOrTimeout(ctx, false, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
-					break inner4
+			for _, entityID := range copyOfEntityIDs {
+			inner4:
+				// retry L2PmDeleteAttempts times to delete the instance of PM
+				for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
+					mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
+						ctx, ConstDefaultOmciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
+					if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
+						atLeastOneDeleteFailure = true
+					} else {
+						_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+						break inner4
+					}
 				}
-				mm.onuMetricsManagerLock.Lock()
-				mm.gemPortNCTPPerfHistInstActive = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstActive, v)
-				mm.onuMetricsManagerLock.Unlock()
+				if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
+					_ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
+				}
 			}
-
 		default:
 			logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
 		}
-		// On success Update the local list maintained for active PMs and PMs to delete
-		if resp {
+		// If we could not completely clean up the PM ME then just give up.
+		if atLeastOneDeleteFailure {
+			logger.Warnw(ctx, "state delete pm - failed to delete at least one instance of the PM ME",
+				log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
+					"active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
+			mm.onuMetricsManagerLock.Lock()
+			logger.Debugw(ctx, "exceeded-max-delete-retry-attempts--disabling-group", log.Fields{"groupName": n})
+			mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
+			mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
+			mm.groupMetricMap[n].enabled = false
+			mm.onuMetricsManagerLock.Unlock()
+		} else { // success case
 			mm.onuMetricsManagerLock.Lock()
 			mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
 			mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
 			logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
 			mm.onuMetricsManagerLock.Unlock()
-		} else {
-			logger.Warnw(ctx, "state delete pm - failed to delete pm",
-				log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
-					"deleteRetryAttempts": mm.groupMetricMap[n].deleteRetryAttempts,
-					"active-l2-pms":       mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
-			// If deleteRetryAttempts exceeds L2PmDeleteAttempts then just give up
-			mm.onuMetricsManagerLock.Lock()
-			mm.groupMetricMap[n].deleteRetryAttempts++
-			if mm.groupMetricMap[n].deleteRetryAttempts > L2PmDeleteAttempts {
-				logger.Debugw(ctx, "exceeded-max-delete-retry-attempts--disabling-group", log.Fields{"groupName": n})
-				mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
-				mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
-				mm.groupMetricMap[n].deleteRetryAttempts = 0 // reset counter
-			}
-			mm.onuMetricsManagerLock.Unlock()
 		}
 	}
 	logger.Debugw(ctx, "state delete pm - done", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
@@ -1608,6 +1727,10 @@
 		// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
 		if _, ok := ethPMHistData[k]; !ok {
 			switch k {
+			case "entity_id":
+				if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+					ethPMHistData[k] = float32(val.(uint16))
+				}
 			case "drop_events":
 				if val, ok := meAttributes["DropEvents"]; ok && val != nil {
 					ethPMHistData[k] = float32(val.(uint32))
@@ -1700,6 +1823,10 @@
 		// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
 		if _, ok := ethPMUniHistData[k]; !ok {
 			switch k {
+			case "entity_id":
+				if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+					ethPMUniHistData[k] = float32(val.(uint16))
+				}
 			case "fcs_errors":
 				if val, ok := meAttributes["FcsErrors"]; ok && val != nil {
 					ethPMUniHistData[k] = float32(val.(uint32))
@@ -1792,6 +1919,10 @@
 		// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
 		if _, ok := fecHistData[k]; !ok {
 			switch k {
+			case "entity_id":
+				if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+					fecHistData[k] = float32(val.(uint16))
+				}
 			case "corrected_bytes":
 				if val, ok := meAttributes["CorrectedBytes"]; ok && val != nil {
 					fecHistData[k] = float32(val.(uint32))
@@ -1848,6 +1979,10 @@
 		// the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
 		if _, ok := gemPortHistData[k]; !ok {
 			switch k {
+			case "entity_id":
+				if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
+					gemPortHistData[k] = float32(val.(uint16))
+				}
 			case "transmitted_gem_frames":
 				if val, ok := meAttributes["TransmittedGemFrames"]; ok && val != nil {
 					gemPortHistData[k] = float32(val.(uint32))
@@ -2082,7 +2217,12 @@
 func (mm *onuMetricsManager) initializeGroupMetric(grpMtrcs map[string]voltha.PmConfig_PmType, grpName string, grpEnabled bool, grpFreq uint32) {
 	var pmConfigSlice []*voltha.PmConfig
 	for k, v := range grpMtrcs {
-		pmConfigSlice = append(pmConfigSlice, &voltha.PmConfig{Name: k, Type: v})
+		pmConfigSlice = append(pmConfigSlice,
+			&voltha.PmConfig{
+				Name:       k,
+				Type:       v,
+				Enabled:    grpEnabled && mm.pDeviceHandler.pOpenOnuAc.metricsEnabled,
+				SampleFreq: grpFreq})
 	}
 	groupMetric := voltha.PmGroupConfig{
 		GroupName: grpName,
@@ -2214,9 +2354,9 @@
 	mm.onuMetricsManagerLock.Lock()
 	defer mm.onuMetricsManagerLock.Unlock()
 	// mark the instance for addition
-	mm.gemPortNCTPPerfHistInstToAdd = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToAdd, gemPortNTPInstID)
+	mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
 	// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
-	mm.gemPortNCTPPerfHistInstToDelete = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToDelete, gemPortNTPInstID)
+	mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
 
 	mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, GemPortHistoryName)
 	// We do not need to remove from l2PmToDelete slice as we could have Add and Delete of
@@ -2228,9 +2368,9 @@
 func (mm *onuMetricsManager) RemoveGemPortForPerfMonitoring(gemPortNTPInstID uint16) {
 	mm.onuMetricsManagerLock.Lock()
 	defer mm.onuMetricsManagerLock.Unlock()
-	mm.gemPortNCTPPerfHistInstToDelete = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToDelete, gemPortNTPInstID)
+	mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
 	// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
-	mm.gemPortNCTPPerfHistInstToAdd = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToAdd, gemPortNTPInstID)
+	mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
 
 	mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, GemPortHistoryName)
 	// We do not need to remove from l2PmToAdd slice as we could have Add and Delete of
@@ -2245,12 +2385,12 @@
 		// NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
 		for _, v := range gemPortInstIDs {
 			// mark the instance for addition
-			mm.gemPortNCTPPerfHistInstToAdd = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToAdd, v)
+			mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
 			// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
-			mm.gemPortNCTPPerfHistInstToDelete = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToDelete, v)
+			mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
 		}
 		logger.Debugw(ctx, "updateGemPortNTPInstanceToAddForPerfMonitoring",
-			log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.gemPortNCTPPerfHistInstToAdd, "gemToDel": mm.gemPortNCTPPerfHistInstToDelete})
+			log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
 	}
 }
 
@@ -2259,13 +2399,185 @@
 		gemPortInstIDs := mm.pDeviceHandler.pOnuTP.GetAllBidirectionalGemPortIDsForOnu()
 		// NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
 		for _, v := range gemPortInstIDs {
-			mm.gemPortNCTPPerfHistInstToDelete = mm.appendIfMissingUnt16(mm.gemPortNCTPPerfHistInstToDelete, v)
+			mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
 			// If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
-			mm.gemPortNCTPPerfHistInstToAdd = mm.removeIfFoundUint16(mm.gemPortNCTPPerfHistInstToAdd, v)
+			mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
 		}
 	}
 	logger.Debugw(ctx, "updateGemPortNTPInstanceToDeleteForPerfMonitoring",
-		log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.gemPortNCTPPerfHistInstToAdd, "gemToDel": mm.gemPortNCTPPerfHistInstToDelete})
+		log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
+}
+
+// restorePmData restores any PM data available on the KV store to local cache
+func (mm *onuMetricsManager) restorePmData(ctx context.Context) error {
+	logger.Debugw(ctx, "restorePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+	if mm.pmKvStore == nil {
+		logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+		return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+	}
+	var errorsList []error
+	for groupName, group := range mm.groupMetricMap {
+		group.pmMEData = &pmMEData{}
+		Value, err := mm.pmKvStore.Get(ctx, groupName)
+		if err == nil {
+			if Value != nil {
+				logger.Debugw(ctx, "PM data read",
+					log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
+				tmpBytes, _ := kvstore.ToByte(Value.Value)
+
+				if err = json.Unmarshal(tmpBytes, &group.pmMEData); err != nil {
+					logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
+					errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-unmarshal-PM-data-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
+					continue
+				}
+				logger.Debugw(ctx, "restorePmData - success", log.Fields{"pmData": group.pmMEData, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+			} else {
+				logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+				continue
+			}
+		} else {
+			logger.Errorw(ctx, "restorePmData - fail", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "err": err})
+			errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-read-from-KVstore-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
+			continue
+		}
+	}
+	if len(errorsList) > 0 {
+		return fmt.Errorf("errors-restoring-pm-data-for-one-or-more-groups--errors:%v", errorsList)
+	}
+	logger.Debugw(ctx, "restorePmData - complete success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+	return nil
+}
+
+// getPmData gets pmMEData from cache. Since we have write through cache implementation for pmMEData,
+// the data must be available in cache.
+// Note, it is expected that caller of this function manages the required synchronization (like using locks etc.).
+func (mm *onuMetricsManager) getPmData(ctx context.Context, groupName string) (*pmMEData, error) {
+	if grp, ok := mm.groupMetricMap[groupName]; ok {
+		return grp.pmMEData, nil
+	}
+	// Data not in cache, try to fetch from kv store.
+	data := &pmMEData{}
+	if mm.pmKvStore == nil {
+		logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+		return data, fmt.Errorf("pmKvStore not set. device-id - %s", mm.pDeviceHandler.deviceID)
+	}
+	Value, err := mm.pmKvStore.Get(ctx, groupName)
+	if err == nil {
+		if Value != nil {
+			logger.Debugw(ctx, "PM data read",
+				log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
+			tmpBytes, _ := kvstore.ToByte(Value.Value)
+
+			if err = json.Unmarshal(tmpBytes, data); err != nil {
+				logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
+				return data, err
+			}
+			logger.Debugw(ctx, "PM data", log.Fields{"pmData": data, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+		} else {
+			logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
+			return data, err
+		}
+	} else {
+		logger.Errorw(ctx, "unable to read from KVstore", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+		return data, err
+	}
+
+	return data, nil
+}
+
+// updatePmData update pmMEData to store. It is write through cache, i.e., write to cache first and then update store
+func (mm *onuMetricsManager) updatePmData(ctx context.Context, groupName string, meInstanceID uint16, pmAction string) error {
+	logger.Debugw(ctx, "updatePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "entityID": meInstanceID, "pmAction": pmAction})
+	mm.onuMetricsManagerLock.Lock()
+	defer mm.onuMetricsManagerLock.Unlock()
+
+	if mm.pmKvStore == nil {
+		logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+		return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+	}
+
+	pmMEData, err := mm.getPmData(ctx, groupName)
+	if err != nil || pmMEData == nil {
+		// error already logged in called function.
+		return err
+	}
+	switch pmAction {
+	case cPmAdd:
+		pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(pmMEData.InstancesToAdd, meInstanceID)
+		pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
+		pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
+	case cPmAdded:
+		pmMEData.InstancesActive = mm.appendIfMissingUnt16(pmMEData.InstancesActive, meInstanceID)
+		pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
+		pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
+	case cPmRemove:
+		pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(pmMEData.InstancesToDelete, meInstanceID)
+		pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
+		pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
+	case cPmRemoved:
+		pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
+		pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
+		pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
+	default:
+		logger.Errorw(ctx, "unknown pm action", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pmAction": pmAction, "groupName": groupName})
+		return fmt.Errorf(fmt.Sprintf("unknown-pm-action-deviceid-%s-groupName-%s-pmaction-%s", mm.pDeviceHandler.deviceID, groupName, pmAction))
+	}
+	// write through cache
+	mm.groupMetricMap[groupName].pmMEData = pmMEData
+
+	Value, err := json.Marshal(*pmMEData)
+	if err != nil {
+		logger.Errorw(ctx, "unable to marshal PM data", log.Fields{"groupName": groupName, "pmAction": pmAction, "pmData": *pmMEData, "err": err})
+		return err
+	}
+	// Update back to kv store
+	if err = mm.pmKvStore.Put(ctx, groupName, Value); err != nil {
+		logger.Errorw(ctx, "unable to put PM data to kv store", log.Fields{"groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction, "err": err})
+		return err
+	}
+	logger.Debugw(ctx, "updatePmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction})
+
+	return nil
+}
+
+// clearPmGroupData cleans PM Group data from store
+func (mm *onuMetricsManager) clearPmGroupData(ctx context.Context) error {
+	mm.onuMetricsManagerLock.Lock()
+	defer mm.onuMetricsManagerLock.Unlock()
+	logger.Debugw(ctx, "clearPmGroupData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+	if mm.pmKvStore == nil {
+		logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+		return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+	}
+
+	for n := range mm.groupMetricMap {
+		if err := mm.pmKvStore.Delete(ctx, n); err != nil {
+			logger.Errorw(ctx, "clearPmGroupData - fail", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "groupName": n, "err": err})
+			// do not abort this procedure. continue to delete next group.
+		} else {
+			logger.Debugw(ctx, "clearPmGroupData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": n})
+		}
+	}
+
+	return nil
+}
+
+// clearAllPmData clears all PM data associated with the device from KV store
+func (mm *onuMetricsManager) clearAllPmData(ctx context.Context) error {
+	mm.onuMetricsManagerLock.Lock()
+	defer mm.onuMetricsManagerLock.Unlock()
+	logger.Debugw(ctx, "clearAllPmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+	if mm.pmKvStore == nil {
+		logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+		return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
+	}
+
+	if err := mm.pmKvStore.Delete(ctx, ""); err != nil {
+		logger.Errorw(ctx, "unable to delete PM data from kv store", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "err": err})
+		return err
+	}
+	logger.Debugw(ctx, "clearAllPmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
+	return nil
 }
 
 func (mm *onuMetricsManager) appendIfMissingString(slice []string, n string) []string {