VOL-4200 suport reset of extended pm counters VOL-4270 fetching counters per uni

Change-Id: Icd1c507368e4097b03a34231d47f62218a790a16
diff --git a/internal/pkg/pmmgr/onu_metrics_manager.go b/internal/pkg/pmmgr/onu_metrics_manager.go
index 95fcd60..f1abbe4 100755
--- a/internal/pkg/pmmgr/onu_metrics_manager.go
+++ b/internal/pkg/pmmgr/onu_metrics_manager.go
@@ -301,13 +301,13 @@
 	pOnuDeviceEntry cmn.IonuDeviceEntry
 	PAdaptFsm       *cmn.AdapterFsm
 
-	opticalMetricsChan                   chan me.AttributeValueMap
-	uniStatusMetricsChan                 chan me.AttributeValueMap
-	l2PmChan                             chan me.AttributeValueMap
-	extendedPmMeChan                     chan me.AttributeValueMap
-	syncTimeResponseChan                 chan bool       // true is success, false is fail
-	l2PmCreateOrDeleteResponseChan       chan bool       // true is success, false is fail
-	extendedPMCreateOrDeleteResponseChan chan me.Results // true is sucesss, false is fail
+	opticalMetricsChan             chan me.AttributeValueMap
+	uniStatusMetricsChan           chan me.AttributeValueMap
+	l2PmChan                       chan me.AttributeValueMap
+	extendedPmMeChan               chan me.AttributeValueMap
+	syncTimeResponseChan           chan bool       // true is success, false is fail
+	l2PmCreateOrDeleteResponseChan chan bool       // true is success, false is fail
+	extendedPMMeResponseChan       chan me.Results // true is sucesss, false is fail
 
 	activeL2Pms  []string // list of active l2 pm MEs created on the ONU.
 	l2PmToDelete []string // list of L2 PMs to delete
@@ -334,6 +334,7 @@
 	extPmKvStore                                  *db.Backend
 	onuEthernetFrameExtendedPmLock                sync.RWMutex
 	isDeviceReadyToCollectExtendedPmStats         bool
+	isEthernetFrameExtendedPmOperationOngoing     bool
 }
 
 // NewOnuMetricsManager returns a new instance of the NewOnuMetricsManager
@@ -359,7 +360,7 @@
 
 	metricsManager.syncTimeResponseChan = make(chan bool)
 	metricsManager.l2PmCreateOrDeleteResponseChan = make(chan bool)
-	metricsManager.extendedPMCreateOrDeleteResponseChan = make(chan me.Results)
+	metricsManager.extendedPMMeResponseChan = make(chan me.Results)
 
 	metricsManager.StopProcessingOmciResponses = make(chan bool)
 	metricsManager.StopTicks = make(chan bool)
@@ -1078,6 +1079,8 @@
 		_ = mm.handleOmciDeleteResponseMessage(ctx, msg)
 	case omci.GetCurrentDataResponseType:
 		_ = mm.handleOmciGetCurrentDataResponseMessage(ctx, msg)
+	case omci.SetResponseType:
+		_ = mm.handleOmciSetResponseMessage(ctx, msg)
 	default:
 		logger.Warnw(ctx, "Unknown Message Type", log.Fields{"msgType": msg.OmciMsg.MessageType})
 
@@ -1211,6 +1214,30 @@
 	return fmt.Errorf("unhandled-omci-synchronize-time-response-message--error-code-%v", msgObj.Result)
 }
 
+func (mm *OnuMetricsManager) handleOmciSetResponseMessage(ctx context.Context, msg cmn.OmciMessage) error {
+	msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeSetResponse)
+	if msgLayer == nil {
+		logger.Errorw(ctx, "omci Msg layer could not be detected for SetResponse - handling stopped", log.Fields{"device-id": mm.deviceID})
+		return fmt.Errorf("omci Msg layer could not be detected for SetResponse - handling stopped: %s", mm.deviceID)
+	}
+	msgObj, msgOk := msgLayer.(*omci.SetResponse)
+	if !msgOk {
+		logger.Errorw(ctx, "omci Msg layer could not be assigned for SetResponse - handling stopped", log.Fields{"device-id": mm.deviceID})
+		return fmt.Errorf("omci Msg layer could not be assigned for SetResponse - handling stopped: %s", mm.deviceID)
+	}
+	logger.Debugw(ctx, "OMCI SetResponse Data", log.Fields{"device-id": mm.deviceID, "data-fields": msgObj, "result": msgObj.Result})
+	switch msgObj.EntityClass {
+	case me.EthernetFrameExtendedPmClassID,
+		me.EthernetFrameExtendedPm64BitClassID:
+		mm.extendedPMMeResponseChan <- msgObj.Result
+		return nil
+	default:
+		logger.Errorw(ctx, "unhandled omci set response message",
+			log.Fields{"device-id": mm.deviceID, "class-id": msgObj.EntityClass})
+	}
+	return fmt.Errorf("unhandled-omci-set-response-message-%v", mm.deviceID)
+}
+
 // flushMetricCollectionChannels flushes all metric collection channels for any stale OMCI responses
 func (mm *OnuMetricsManager) flushMetricCollectionChannels(ctx context.Context) {
 	// flush commMetricsChan
@@ -2333,7 +2360,7 @@
 		return nil
 	case me.EthernetFrameExtendedPmClassID,
 		me.EthernetFrameExtendedPm64BitClassID:
-		mm.extendedPMCreateOrDeleteResponseChan <- msgObj.Result
+		mm.extendedPMMeResponseChan <- msgObj.Result
 		return nil
 	default:
 		logger.Errorw(ctx, "unhandled omci create response message",
@@ -3011,7 +3038,7 @@
 func (mm *OnuMetricsManager) waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassID me.ClassID, upstream bool) (bool, error) {
 	logger.Debugw(ctx, "wait-for-ethernet-frame-create-or-delete-response-or-timeout", log.Fields{"create": create, "instID": instID, "meClassID": meClassID})
 	select {
-	case resp := <-mm.extendedPMCreateOrDeleteResponseChan:
+	case resp := <-mm.extendedPMMeResponseChan:
 		logger.Debugw(ctx, "received-extended-pm-me-response",
 			log.Fields{"device-id": mm.deviceID, "resp": resp, "create": create, "meClassID": meClassID, "instID": instID, "upstream": upstream})
 		// If the result is me.InstanceExists it means the entity was already created. It is ok handled that as success
@@ -3042,9 +3069,9 @@
 			} else {
 				entityID = uniPort.EntityID
 			}
-
+			logger.Debugw(ctx, "try-creating-extended-pm-me", log.Fields{"device-id": mm.deviceID, "entityID": entityID})
 			// parent entity id will be same for both direction
-			controlBlock := mm.getControlBlockForExtendedPMDirection(ctx, direction, uniPort.EntityID)
+			controlBlock := mm.getControlBlockForExtendedPMDirection(ctx, direction, uniPort.EntityID, false)
 
 		inner1:
 			// retry ExtendedPmCreateAttempts times to create the instance of PM
@@ -3057,7 +3084,8 @@
 						log.Fields{"device-id": mm.deviceID})
 					return false, err
 				}
-				if supported, err := mm.waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx, true, entityID, meType, direction); err == nil && supported {
+				if supported, err := mm.waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx, true, entityID,
+					meType, direction); err == nil && supported {
 					if direction {
 						mm.ethernetFrameExtendedPmUpStreamMEByEntityID[entityID] = meEnt
 					} else {
@@ -3104,7 +3132,7 @@
 	mm.onuEthernetFrameExtendedPmLock.Unlock()
 }
 
-// CreateEthernetFrameExtendedPMME - TODO: add comment
+// CreateEthernetFrameExtendedPMME - This method tries to create the possible me type for extended pms
 func (mm *OnuMetricsManager) CreateEthernetFrameExtendedPMME(ctx context.Context) {
 	//get the type of extended frame pm me supported by onu first
 	exist, err := mm.getEthernetFrameExtendedMETypeFromKvStore(ctx)
@@ -3145,9 +3173,105 @@
 	}
 }
 
-// CollectEthernetFrameExtendedPMCounters - TODO: add comment
-func (mm *OnuMetricsManager) CollectEthernetFrameExtendedPMCounters(ctx context.Context) *extension.SingleGetValueResponse {
-	errFunc := func(reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
+func (mm *OnuMetricsManager) setControlBlockResetFlagForEthernetExtendedPMME(ctx context.Context, upstream bool,
+	entityID uint16, meName string, reset bool) (extension.GetValueResponse_ErrorReason, error) {
+	uniPortEntityID := entityID
+	if upstream {
+		uniPortEntityID = entityID - 0x100
+	}
+	controlBlock := mm.getControlBlockForExtendedPMDirection(ctx, upstream, uniPortEntityID, reset)
+	_, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendSetEthernetFrameExtendedPMME(ctx,
+		mm.pDeviceHandler.GetOmciTimeout(), true,
+		mm.PAdaptFsm.CommChan, entityID, mm.supportedEthernetFrameExtendedPMClass, controlBlock)
+	if err != nil {
+		logger.Errorw(ctx, "EthernetFrameExtendedPMME-set-reset-bit-failed",
+			log.Fields{"device-id": mm.deviceID})
+		return extension.GetValueResponse_INTERNAL_ERROR, err
+	}
+
+	if resp := mm.waitForResetResponseOrTimeout(ctx, entityID, meName); resp {
+		return extension.GetValueResponse_REASON_UNDEFINED, nil
+	}
+	return extension.GetValueResponse_INTERNAL_ERROR, fmt.Errorf("unable-to-reset-pm-counters")
+}
+
+func (mm *OnuMetricsManager) waitForResetResponseOrTimeout(ctx context.Context, instID uint16, meClassName string) bool {
+	logger.Debugw(ctx, "wait-for-ethernet-frame-reset-counters-response-or-timeout", log.Fields{"instID": instID, "meClassName": meClassName})
+	select {
+	case resp := <-mm.extendedPMMeResponseChan:
+		logger.Debugw(ctx, "received-extended-pm-me-reset-response",
+			log.Fields{"device-id": mm.deviceID, "resp": resp, "meClassName": meClassName, "instID": instID})
+		if resp == me.Success {
+			return true
+		}
+		return false
+	case <-time.After(mm.pOnuDeviceEntry.GetDevOmciCC().GetMaxOmciTimeoutWithRetries() * time.Second):
+		logger.Errorw(ctx, "timeout-waiting-for-ext-pm-me-reset-response",
+			log.Fields{"device-id": mm.deviceID, "resp": false, "meClassName": meClassName, "instID": instID})
+	}
+	return false
+}
+
+func (mm *OnuMetricsManager) resetEthernetFrameExtendedPMCounters(ctx context.Context,
+	upstreamEntityMap map[uint16]*me.ManagedEntity, downstreamEntityMap map[uint16]*me.ManagedEntity) (extension.GetValueResponse_ErrorReason, error) {
+	className := "EthernetFrameExtendedPm64Bit"
+	if mm.supportedEthernetFrameExtendedPMClass == me.EthernetFrameExtendedPmClassID {
+		className = "EthernetFrameExtendedPm"
+	}
+	// Reset the counters if option is specified
+	for entityID := range upstreamEntityMap {
+		errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, true, entityID, className,
+			true)
+		if err != nil {
+			return errReason, err
+		}
+	}
+
+	for entityID := range downstreamEntityMap {
+		errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, false, entityID, className,
+			true)
+		if err != nil {
+			return errReason, err
+		}
+	}
+	// This is currently done as a workaround for sercomm glasfaser onu as the reset bit is not getting cleared by the
+	// device itself.
+	// Unset the reset bit if option is specified
+	for entityID := range upstreamEntityMap {
+		errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, true, entityID, className,
+			false)
+		if err != nil {
+			return errReason, err
+		}
+	}
+
+	for entityID := range downstreamEntityMap {
+		errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, false, entityID, className,
+			false)
+		if err != nil {
+			return errReason, err
+		}
+	}
+	return extension.GetValueResponse_REASON_UNDEFINED, nil
+}
+
+func (mm *OnuMetricsManager) setEthernetFrameExtendedPmCounterOperationFlag(val bool) {
+	mm.onuEthernetFrameExtendedPmLock.Lock()
+	defer mm.onuEthernetFrameExtendedPmLock.Unlock()
+	mm.isEthernetFrameExtendedPmOperationOngoing = val
+}
+
+func (mm *OnuMetricsManager) getEthernetFrameExtendedPmCounterOperationFlag() bool {
+	mm.onuEthernetFrameExtendedPmLock.Lock()
+	defer mm.onuEthernetFrameExtendedPmLock.Unlock()
+	return mm.isEthernetFrameExtendedPmOperationOngoing
+}
+
+// CollectEthernetFrameExtendedPMCounters - This method collects the ethernet frame extended pm counters from the device
+func (mm *OnuMetricsManager) CollectEthernetFrameExtendedPMCounters(ctx context.Context,
+	onuInfo *extension.GetOmciEthernetFrameExtendedPmRequest) *extension.SingleGetValueResponse {
+	errFunc := func(reason extension.GetValueResponse_ErrorReason, err string) *extension.SingleGetValueResponse {
+		logger.Error(ctx, err)
 		return &extension.SingleGetValueResponse{
 			Response: &extension.GetValueResponse{
 				Status:    extension.GetValueResponse_ERROR,
@@ -3158,9 +3282,54 @@
 	mm.onuEthernetFrameExtendedPmLock.RLock()
 	if !mm.isDeviceReadyToCollectExtendedPmStats {
 		mm.onuEthernetFrameExtendedPmLock.RUnlock()
-		return errFunc(extension.GetValueResponse_INTERNAL_ERROR)
+		return errFunc(extension.GetValueResponse_INTERNAL_ERROR, fmt.Sprintf("onu-%v-not-ready-to-collect-stats", mm.deviceID))
 	}
 	mm.onuEthernetFrameExtendedPmLock.RUnlock()
+
+	if mm.getEthernetFrameExtendedPmCounterOperationFlag() {
+		return errFunc(extension.GetValueResponse_INTERNAL_ERROR,
+			fmt.Sprintf("extended-pm-reset-or-get-operation-is-still-going-on-for-onu-%v", mm.deviceID))
+	}
+	mm.setEthernetFrameExtendedPmCounterOperationFlag(true)
+	defer mm.setEthernetFrameExtendedPmCounterOperationFlag(false)
+
+	upstreamEntityMap := make(map[uint16]*me.ManagedEntity)
+	downstreamEntityMap := make(map[uint16]*me.ManagedEntity)
+	if onuInfo.IsUniIndex != nil {
+		for _, uniPort := range *mm.pDeviceHandler.GetUniEntityMap() {
+			if uniPort.UniID == uint8(onuInfo.GetUniIndex()) {
+				logger.Debugw(ctx, "mapped-uni-index-to-uni-port", log.Fields{"device-id": mm.deviceID, "uni-index": onuInfo.GetUniIndex()})
+				upstreamEntityMap[uniPort.EntityID+0x100] = mm.ethernetFrameExtendedPmUpStreamMEByEntityID[uniPort.EntityID+0x100]
+				downstreamEntityMap[uniPort.EntityID] = mm.ethernetFrameExtendedPmDownStreamMEByEntityID[uniPort.EntityID]
+				break
+			}
+		}
+		if len(downstreamEntityMap) == 0 {
+			logger.Errorw(ctx, "invalid-uni-index-provided-while-fetching-the-extended-pm",
+				log.Fields{"device-id": mm.deviceID, "uni-index": onuInfo.GetUniIndex()})
+			return errFunc(extension.GetValueResponse_INVALID_REQ_TYPE,
+				fmt.Sprintf("onu-%s-invalid-uni-%v", mm.deviceID, onuInfo.GetUniIndex()))
+		}
+	} else {
+		// make a copy of all downstream and upstream maps in the local ones
+		for entityID, meEnt := range mm.ethernetFrameExtendedPmUpStreamMEByEntityID {
+			upstreamEntityMap[entityID] = meEnt
+		}
+		for entityID, meEnt := range mm.ethernetFrameExtendedPmDownStreamMEByEntityID {
+			downstreamEntityMap[entityID] = meEnt
+		}
+	}
+	logger.Debugw(ctx, "extended-pm-collection-me-count", log.Fields{"device-id": mm.deviceID,
+		"count": len(upstreamEntityMap) + len(downstreamEntityMap)})
+	// Reset the metrics first for all required me's
+	if onuInfo.Reset_ {
+		errReason, err := mm.resetEthernetFrameExtendedPMCounters(ctx, upstreamEntityMap, downstreamEntityMap)
+		if err != nil {
+			logger.Errorw(ctx, "unable-to-reset-ethernet-frame-extended-pm-counters",
+				log.Fields{"device-id": mm.deviceID})
+			return errFunc(errReason, fmt.Sprintf("%v", err.Error()))
+		}
+	}
 	// Collect metrics for upstream for all the PM Mes per uni port and aggregate
 	var pmUpstream extension.OmciEthernetFrameExtendedPm
 	var pmDownstream extension.OmciEthernetFrameExtendedPm
@@ -3168,41 +3337,45 @@
 	if mm.supportedEthernetFrameExtendedPMClass == me.EthernetFrameExtendedPmClassID {
 		counterFormat = extension.GetOmciEthernetFrameExtendedPmResponse_THIRTY_TWO_BIT
 	}
-	for entityID, meEnt := range mm.ethernetFrameExtendedPmUpStreamMEByEntityID {
-		var receivedMask uint16
-		if metricInfo, errResp := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, true, &receivedMask); metricInfo != nil { // upstream
-			if receivedMask == 0 {
-				pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, false)
-				logger.Error(ctx, "all-the-attributes-of-ethernet-frame-extended-pm-counters-are-unsupported")
-				pmDownstream = pmUpstream
-				singleValResp := extension.SingleGetValueResponse{
-					Response: &extension.GetValueResponse{
-						Status: extension.GetValueResponse_OK,
-						Response: &extension.GetValueResponse_OnuCounters{
-							OnuCounters: &extension.GetOmciEthernetFrameExtendedPmResponse{
-								Upstream:                          &pmUpstream,
-								Downstream:                        &pmDownstream,
-								OmciEthernetFrameExtendedPmFormat: counterFormat,
+	if !onuInfo.Reset_ {
+		for entityID, meEnt := range upstreamEntityMap {
+			logger.Debugw(ctx, "collect-upstream-pm-counters-for-entity-id", log.Fields{"device-id": mm.deviceID, "entityID": entityID})
+			var receivedMask uint16
+			if metricInfo, errResp, err := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, true, &receivedMask); metricInfo != nil { // upstream
+				if receivedMask == 0 {
+					pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, false)
+					logger.Error(ctx, "all-the-attributes-of-ethernet-frame-extended-pm-counters-are-unsupported")
+					pmDownstream = pmUpstream
+					singleValResp := extension.SingleGetValueResponse{
+						Response: &extension.GetValueResponse{
+							Status: extension.GetValueResponse_OK,
+							Response: &extension.GetValueResponse_OnuCounters{
+								OnuCounters: &extension.GetOmciEthernetFrameExtendedPmResponse{
+									Upstream:                          &pmUpstream,
+									Downstream:                        &pmDownstream,
+									OmciEthernetFrameExtendedPmFormat: counterFormat,
+								},
 							},
 						},
-					},
+					}
+					return &singleValResp
 				}
-				return &singleValResp
+				// Aggregate the result for upstream
+				pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, true)
+			} else {
+				return errFunc(errResp, fmt.Sprintf("%v", err.Error()))
 			}
-			// Aggregate the result for upstream
-			pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, true)
-		} else {
-			return errFunc(errResp)
 		}
-	}
 
-	for entityID, meEnt := range mm.ethernetFrameExtendedPmDownStreamMEByEntityID {
-		var receivedMask uint16
-		if metricInfo, errResp := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, false, &receivedMask); metricInfo != nil { // downstream
-			// Aggregate the result for downstream
-			pmDownstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmDownstream, true)
-		} else {
-			return errFunc(errResp)
+		for entityID, meEnt := range downstreamEntityMap {
+			logger.Debugw(ctx, "collect-downstream-pm-counters-for-entity-id", log.Fields{"device-id": mm.deviceID, "entityID": entityID})
+			var receivedMask uint16
+			if metricInfo, errResp, err := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, false, &receivedMask); metricInfo != nil { // downstream
+				// Aggregate the result for downstream
+				pmDownstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmDownstream, true)
+			} else {
+				return errFunc(errResp, fmt.Sprintf("%v", err.Error()))
+			}
 		}
 	}
 	singleValResp := extension.SingleGetValueResponse{
@@ -3220,7 +3393,7 @@
 	return &singleValResp
 }
 
-func (mm *OnuMetricsManager) collectEthernetFrameExtendedPMData(ctx context.Context, meEnt *me.ManagedEntity, entityID uint16, upstream bool, receivedMask *uint16) (map[string]uint64, extension.GetValueResponse_ErrorReason) {
+func (mm *OnuMetricsManager) collectEthernetFrameExtendedPMData(ctx context.Context, meEnt *me.ManagedEntity, entityID uint16, upstream bool, receivedMask *uint16) (map[string]uint64, extension.GetValueResponse_ErrorReason, error) {
 	var classID me.ClassID
 	logger.Debugw(ctx, "collecting-data-for-ethernet-frame-extended-pm", log.Fields{"device-id": mm.deviceID, "entityID": entityID, "upstream": upstream})
 
@@ -3235,7 +3408,7 @@
 		if errResp, err := mm.populateEthernetFrameExtendedPMMetrics(ctx, classID, entityID, mask, ethPMData, upstream, &sumReceivedMask); err != nil {
 			logger.Errorw(ctx, "error-during-metric-collection",
 				log.Fields{"device-id": mm.deviceID, "entityID": entityID, "err": err})
-			return nil, errResp
+			return nil, errResp, err
 		}
 		if (mask == 0x3F00 || mask == 0x3800) && sumReceivedMask == 0 {
 			//It means the first attributes fetch was a failure, hence instead of sending multiple failure get requests
@@ -3245,7 +3418,7 @@
 		}
 	}
 	*receivedMask = sumReceivedMask
-	return ethPMData, extension.GetValueResponse_REASON_UNDEFINED
+	return ethPMData, extension.GetValueResponse_REASON_UNDEFINED, nil
 }
 
 // nolint: gocyclo
@@ -3576,9 +3749,8 @@
 	return receivedMask
 }
 
-func (mm *OnuMetricsManager) aggregateEthernetFrameExtendedPM(pmDataIn map[string]uint64, pmData extension.OmciEthernetFrameExtendedPm, aggregate bool) extension.OmciEthernetFrameExtendedPm {
-	mm.onuEthernetFrameExtendedPmLock.Lock()
-	defer mm.onuEthernetFrameExtendedPmLock.Unlock()
+func (mm *OnuMetricsManager) aggregateEthernetFrameExtendedPM(pmDataIn map[string]uint64,
+	pmData extension.OmciEthernetFrameExtendedPm, aggregate bool) extension.OmciEthernetFrameExtendedPm {
 	errorCounterValue := UnsupportedCounterValue64bit
 	if mm.supportedEthernetFrameExtendedPMClass == me.EthernetFrameExtendedPmClassID {
 		errorCounterValue = UnsupportedCounterValue32bit
@@ -3674,7 +3846,7 @@
 	return pmDataOut
 }
 
-func (mm *OnuMetricsManager) getControlBlockForExtendedPMDirection(ctx context.Context, upstream bool, entityID uint16) []uint16 {
+func (mm *OnuMetricsManager) getControlBlockForExtendedPMDirection(ctx context.Context, upstream bool, entityID uint16, reset bool) []uint16 {
 	controlBlock := make([]uint16, 8)
 	// Control Block First two bytes are for threshold data 1/2 id - does not matter here
 	controlBlock[0] = 0
@@ -3683,7 +3855,11 @@
 	// Next two bytes are for the parent me instance id
 	controlBlock[2] = entityID
 	// Next two bytes are for accumulation disable
-	controlBlock[3] = 0
+	if reset {
+		controlBlock[3] = 1 << 15 //Set the 16th bit of AD to reset the counters.
+	} else {
+		controlBlock[3] = 0
+	}
 	// Next two bytes are for tca disable
 	controlBlock[4] = 0x4000 //tca global disable
 	// Next two bytes are for control fields - bit 1(lsb) as 1 for continuous accumulation and bit 2(0 for upstream)