VOL-4200 suport reset of extended pm counters VOL-4270 fetching counters per uni
Change-Id: Icd1c507368e4097b03a34231d47f62218a790a16
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index d643a9b..9224d61 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -4609,3 +4609,53 @@
func (oo *OmciCC) SetChMonitoredRequest(omciTransID uint16, chVal bool) {
oo.monitoredRequests[omciTransID].chSuccess <- chVal
}
+
+// SendSetEthernetFrameExtendedPMME sends the set request for ethernet frame extended type me
+func (oo *OmciCC) SendSetEthernetFrameExtendedPMME(ctx context.Context, timeout int, highPrio bool,
+ rxChan chan Message, entityID uint16, classID me.ClassID, controlBlock []uint16) (*me.ManagedEntity, error) {
+ tid := oo.GetNextTid(highPrio)
+ logger.Debugw(ctx, "send-set-ethernet-frame-extended-pm-me-control-block:", log.Fields{"device-id": oo.deviceID,
+ "SequNo": strconv.FormatInt(int64(tid), 16), "InstId": strconv.FormatInt(int64(entityID), 16)})
+
+ meParams := me.ParamData{EntityID: entityID,
+ Attributes: me.AttributeValueMap{"ControlBlock": controlBlock},
+ }
+ var meInstance *me.ManagedEntity
+ var omciErr me.OmciErrors
+ if classID == me.EthernetFrameExtendedPmClassID {
+ meInstance, omciErr = me.NewEthernetFrameExtendedPm(meParams)
+ } else {
+ meInstance, omciErr = me.NewEthernetFrameExtendedPm64Bit(meParams)
+ }
+
+ if omciErr.GetError() == nil {
+ omciLayer, msgLayer, err := omci.EncodeFrame(meInstance, omci.SetRequestType, omci.TransactionID(tid))
+ if err != nil {
+ logger.Errorw(ctx, "cannot-encode-ethernet-frame-extended-pm-me",
+ log.Fields{"err": err, "device-id": oo.deviceID, "inst-id": strconv.FormatInt(int64(entityID), 16)})
+ return nil, err
+ }
+ pkt, err := serializeOmciLayer(ctx, omciLayer, msgLayer)
+ if err != nil {
+ logger.Errorw(ctx, "cannot-serialize-ethernet-frame-extended-pm-me-set-msg",
+ log.Fields{"err": err, "device-id": oo.deviceID, "inst-id": strconv.FormatInt(int64(entityID), 16)})
+ return nil, err
+ }
+ omciRxCallbackPair := CallbackPair{
+ CbKey: tid,
+ CbEntry: CallbackPairEntry{rxChan, oo.receiveOmciResponse, true},
+ }
+ err = oo.Send(ctx, pkt, timeout, CDefaultRetries, highPrio, omciRxCallbackPair)
+ if err != nil {
+ logger.Errorw(ctx, "Cannot send ethernet-frame-extended-pm-me",
+ log.Fields{"Err": err, "device-id": oo.deviceID, "inst-id": strconv.FormatInt(int64(entityID), 16)})
+ return nil, err
+ }
+ logger.Debugw(ctx, "send-ethernet-frame-extended-pm-me-set-msg-done",
+ log.Fields{"device-id": oo.deviceID, "inst-id": strconv.FormatInt(int64(entityID), 16)})
+ return meInstance, nil
+ }
+ logger.Errorw(ctx, "cannot-generate-ethernet-frame-extended-pm-me-instance",
+ log.Fields{"Err": omciErr.GetError(), "device-id": oo.deviceID, "inst-id": strconv.FormatInt(int64(entityID), 16)})
+ return nil, omciErr.GetError()
+}
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index a6bba99..41a8771 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -3600,7 +3600,7 @@
},
}
}
- resp := dh.pOnuMetricsMgr.CollectEthernetFrameExtendedPMCounters(ctx)
+ resp := dh.pOnuMetricsMgr.CollectEthernetFrameExtendedPMCounters(ctx, onuInfo)
return resp
}
diff --git a/internal/pkg/pmmgr/onu_metrics_manager.go b/internal/pkg/pmmgr/onu_metrics_manager.go
index 95fcd60..f1abbe4 100755
--- a/internal/pkg/pmmgr/onu_metrics_manager.go
+++ b/internal/pkg/pmmgr/onu_metrics_manager.go
@@ -301,13 +301,13 @@
pOnuDeviceEntry cmn.IonuDeviceEntry
PAdaptFsm *cmn.AdapterFsm
- opticalMetricsChan chan me.AttributeValueMap
- uniStatusMetricsChan chan me.AttributeValueMap
- l2PmChan chan me.AttributeValueMap
- extendedPmMeChan chan me.AttributeValueMap
- syncTimeResponseChan chan bool // true is success, false is fail
- l2PmCreateOrDeleteResponseChan chan bool // true is success, false is fail
- extendedPMCreateOrDeleteResponseChan chan me.Results // true is sucesss, false is fail
+ opticalMetricsChan chan me.AttributeValueMap
+ uniStatusMetricsChan chan me.AttributeValueMap
+ l2PmChan chan me.AttributeValueMap
+ extendedPmMeChan chan me.AttributeValueMap
+ syncTimeResponseChan chan bool // true is success, false is fail
+ l2PmCreateOrDeleteResponseChan chan bool // true is success, false is fail
+ extendedPMMeResponseChan chan me.Results // true is sucesss, false is fail
activeL2Pms []string // list of active l2 pm MEs created on the ONU.
l2PmToDelete []string // list of L2 PMs to delete
@@ -334,6 +334,7 @@
extPmKvStore *db.Backend
onuEthernetFrameExtendedPmLock sync.RWMutex
isDeviceReadyToCollectExtendedPmStats bool
+ isEthernetFrameExtendedPmOperationOngoing bool
}
// NewOnuMetricsManager returns a new instance of the NewOnuMetricsManager
@@ -359,7 +360,7 @@
metricsManager.syncTimeResponseChan = make(chan bool)
metricsManager.l2PmCreateOrDeleteResponseChan = make(chan bool)
- metricsManager.extendedPMCreateOrDeleteResponseChan = make(chan me.Results)
+ metricsManager.extendedPMMeResponseChan = make(chan me.Results)
metricsManager.StopProcessingOmciResponses = make(chan bool)
metricsManager.StopTicks = make(chan bool)
@@ -1078,6 +1079,8 @@
_ = mm.handleOmciDeleteResponseMessage(ctx, msg)
case omci.GetCurrentDataResponseType:
_ = mm.handleOmciGetCurrentDataResponseMessage(ctx, msg)
+ case omci.SetResponseType:
+ _ = mm.handleOmciSetResponseMessage(ctx, msg)
default:
logger.Warnw(ctx, "Unknown Message Type", log.Fields{"msgType": msg.OmciMsg.MessageType})
@@ -1211,6 +1214,30 @@
return fmt.Errorf("unhandled-omci-synchronize-time-response-message--error-code-%v", msgObj.Result)
}
+func (mm *OnuMetricsManager) handleOmciSetResponseMessage(ctx context.Context, msg cmn.OmciMessage) error {
+ msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeSetResponse)
+ if msgLayer == nil {
+ logger.Errorw(ctx, "omci Msg layer could not be detected for SetResponse - handling stopped", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("omci Msg layer could not be detected for SetResponse - handling stopped: %s", mm.deviceID)
+ }
+ msgObj, msgOk := msgLayer.(*omci.SetResponse)
+ if !msgOk {
+ logger.Errorw(ctx, "omci Msg layer could not be assigned for SetResponse - handling stopped", log.Fields{"device-id": mm.deviceID})
+ return fmt.Errorf("omci Msg layer could not be assigned for SetResponse - handling stopped: %s", mm.deviceID)
+ }
+ logger.Debugw(ctx, "OMCI SetResponse Data", log.Fields{"device-id": mm.deviceID, "data-fields": msgObj, "result": msgObj.Result})
+ switch msgObj.EntityClass {
+ case me.EthernetFrameExtendedPmClassID,
+ me.EthernetFrameExtendedPm64BitClassID:
+ mm.extendedPMMeResponseChan <- msgObj.Result
+ return nil
+ default:
+ logger.Errorw(ctx, "unhandled omci set response message",
+ log.Fields{"device-id": mm.deviceID, "class-id": msgObj.EntityClass})
+ }
+ return fmt.Errorf("unhandled-omci-set-response-message-%v", mm.deviceID)
+}
+
// flushMetricCollectionChannels flushes all metric collection channels for any stale OMCI responses
func (mm *OnuMetricsManager) flushMetricCollectionChannels(ctx context.Context) {
// flush commMetricsChan
@@ -2333,7 +2360,7 @@
return nil
case me.EthernetFrameExtendedPmClassID,
me.EthernetFrameExtendedPm64BitClassID:
- mm.extendedPMCreateOrDeleteResponseChan <- msgObj.Result
+ mm.extendedPMMeResponseChan <- msgObj.Result
return nil
default:
logger.Errorw(ctx, "unhandled omci create response message",
@@ -3011,7 +3038,7 @@
func (mm *OnuMetricsManager) waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassID me.ClassID, upstream bool) (bool, error) {
logger.Debugw(ctx, "wait-for-ethernet-frame-create-or-delete-response-or-timeout", log.Fields{"create": create, "instID": instID, "meClassID": meClassID})
select {
- case resp := <-mm.extendedPMCreateOrDeleteResponseChan:
+ case resp := <-mm.extendedPMMeResponseChan:
logger.Debugw(ctx, "received-extended-pm-me-response",
log.Fields{"device-id": mm.deviceID, "resp": resp, "create": create, "meClassID": meClassID, "instID": instID, "upstream": upstream})
// If the result is me.InstanceExists it means the entity was already created. It is ok handled that as success
@@ -3042,9 +3069,9 @@
} else {
entityID = uniPort.EntityID
}
-
+ logger.Debugw(ctx, "try-creating-extended-pm-me", log.Fields{"device-id": mm.deviceID, "entityID": entityID})
// parent entity id will be same for both direction
- controlBlock := mm.getControlBlockForExtendedPMDirection(ctx, direction, uniPort.EntityID)
+ controlBlock := mm.getControlBlockForExtendedPMDirection(ctx, direction, uniPort.EntityID, false)
inner1:
// retry ExtendedPmCreateAttempts times to create the instance of PM
@@ -3057,7 +3084,8 @@
log.Fields{"device-id": mm.deviceID})
return false, err
}
- if supported, err := mm.waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx, true, entityID, meType, direction); err == nil && supported {
+ if supported, err := mm.waitForEthernetFrameCreateOrDeleteResponseOrTimeout(ctx, true, entityID,
+ meType, direction); err == nil && supported {
if direction {
mm.ethernetFrameExtendedPmUpStreamMEByEntityID[entityID] = meEnt
} else {
@@ -3104,7 +3132,7 @@
mm.onuEthernetFrameExtendedPmLock.Unlock()
}
-// CreateEthernetFrameExtendedPMME - TODO: add comment
+// CreateEthernetFrameExtendedPMME - This method tries to create the possible me type for extended pms
func (mm *OnuMetricsManager) CreateEthernetFrameExtendedPMME(ctx context.Context) {
//get the type of extended frame pm me supported by onu first
exist, err := mm.getEthernetFrameExtendedMETypeFromKvStore(ctx)
@@ -3145,9 +3173,105 @@
}
}
-// CollectEthernetFrameExtendedPMCounters - TODO: add comment
-func (mm *OnuMetricsManager) CollectEthernetFrameExtendedPMCounters(ctx context.Context) *extension.SingleGetValueResponse {
- errFunc := func(reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
+func (mm *OnuMetricsManager) setControlBlockResetFlagForEthernetExtendedPMME(ctx context.Context, upstream bool,
+ entityID uint16, meName string, reset bool) (extension.GetValueResponse_ErrorReason, error) {
+ uniPortEntityID := entityID
+ if upstream {
+ uniPortEntityID = entityID - 0x100
+ }
+ controlBlock := mm.getControlBlockForExtendedPMDirection(ctx, upstream, uniPortEntityID, reset)
+ _, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendSetEthernetFrameExtendedPMME(ctx,
+ mm.pDeviceHandler.GetOmciTimeout(), true,
+ mm.PAdaptFsm.CommChan, entityID, mm.supportedEthernetFrameExtendedPMClass, controlBlock)
+ if err != nil {
+ logger.Errorw(ctx, "EthernetFrameExtendedPMME-set-reset-bit-failed",
+ log.Fields{"device-id": mm.deviceID})
+ return extension.GetValueResponse_INTERNAL_ERROR, err
+ }
+
+ if resp := mm.waitForResetResponseOrTimeout(ctx, entityID, meName); resp {
+ return extension.GetValueResponse_REASON_UNDEFINED, nil
+ }
+ return extension.GetValueResponse_INTERNAL_ERROR, fmt.Errorf("unable-to-reset-pm-counters")
+}
+
+func (mm *OnuMetricsManager) waitForResetResponseOrTimeout(ctx context.Context, instID uint16, meClassName string) bool {
+ logger.Debugw(ctx, "wait-for-ethernet-frame-reset-counters-response-or-timeout", log.Fields{"instID": instID, "meClassName": meClassName})
+ select {
+ case resp := <-mm.extendedPMMeResponseChan:
+ logger.Debugw(ctx, "received-extended-pm-me-reset-response",
+ log.Fields{"device-id": mm.deviceID, "resp": resp, "meClassName": meClassName, "instID": instID})
+ if resp == me.Success {
+ return true
+ }
+ return false
+ case <-time.After(mm.pOnuDeviceEntry.GetDevOmciCC().GetMaxOmciTimeoutWithRetries() * time.Second):
+ logger.Errorw(ctx, "timeout-waiting-for-ext-pm-me-reset-response",
+ log.Fields{"device-id": mm.deviceID, "resp": false, "meClassName": meClassName, "instID": instID})
+ }
+ return false
+}
+
+func (mm *OnuMetricsManager) resetEthernetFrameExtendedPMCounters(ctx context.Context,
+ upstreamEntityMap map[uint16]*me.ManagedEntity, downstreamEntityMap map[uint16]*me.ManagedEntity) (extension.GetValueResponse_ErrorReason, error) {
+ className := "EthernetFrameExtendedPm64Bit"
+ if mm.supportedEthernetFrameExtendedPMClass == me.EthernetFrameExtendedPmClassID {
+ className = "EthernetFrameExtendedPm"
+ }
+ // Reset the counters if option is specified
+ for entityID := range upstreamEntityMap {
+ errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, true, entityID, className,
+ true)
+ if err != nil {
+ return errReason, err
+ }
+ }
+
+ for entityID := range downstreamEntityMap {
+ errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, false, entityID, className,
+ true)
+ if err != nil {
+ return errReason, err
+ }
+ }
+ // This is currently done as a workaround for sercomm glasfaser onu as the reset bit is not getting cleared by the
+ // device itself.
+ // Unset the reset bit if option is specified
+ for entityID := range upstreamEntityMap {
+ errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, true, entityID, className,
+ false)
+ if err != nil {
+ return errReason, err
+ }
+ }
+
+ for entityID := range downstreamEntityMap {
+ errReason, err := mm.setControlBlockResetFlagForEthernetExtendedPMME(ctx, false, entityID, className,
+ false)
+ if err != nil {
+ return errReason, err
+ }
+ }
+ return extension.GetValueResponse_REASON_UNDEFINED, nil
+}
+
+func (mm *OnuMetricsManager) setEthernetFrameExtendedPmCounterOperationFlag(val bool) {
+ mm.onuEthernetFrameExtendedPmLock.Lock()
+ defer mm.onuEthernetFrameExtendedPmLock.Unlock()
+ mm.isEthernetFrameExtendedPmOperationOngoing = val
+}
+
+func (mm *OnuMetricsManager) getEthernetFrameExtendedPmCounterOperationFlag() bool {
+ mm.onuEthernetFrameExtendedPmLock.Lock()
+ defer mm.onuEthernetFrameExtendedPmLock.Unlock()
+ return mm.isEthernetFrameExtendedPmOperationOngoing
+}
+
+// CollectEthernetFrameExtendedPMCounters - This method collects the ethernet frame extended pm counters from the device
+func (mm *OnuMetricsManager) CollectEthernetFrameExtendedPMCounters(ctx context.Context,
+ onuInfo *extension.GetOmciEthernetFrameExtendedPmRequest) *extension.SingleGetValueResponse {
+ errFunc := func(reason extension.GetValueResponse_ErrorReason, err string) *extension.SingleGetValueResponse {
+ logger.Error(ctx, err)
return &extension.SingleGetValueResponse{
Response: &extension.GetValueResponse{
Status: extension.GetValueResponse_ERROR,
@@ -3158,9 +3282,54 @@
mm.onuEthernetFrameExtendedPmLock.RLock()
if !mm.isDeviceReadyToCollectExtendedPmStats {
mm.onuEthernetFrameExtendedPmLock.RUnlock()
- return errFunc(extension.GetValueResponse_INTERNAL_ERROR)
+ return errFunc(extension.GetValueResponse_INTERNAL_ERROR, fmt.Sprintf("onu-%v-not-ready-to-collect-stats", mm.deviceID))
}
mm.onuEthernetFrameExtendedPmLock.RUnlock()
+
+ if mm.getEthernetFrameExtendedPmCounterOperationFlag() {
+ return errFunc(extension.GetValueResponse_INTERNAL_ERROR,
+ fmt.Sprintf("extended-pm-reset-or-get-operation-is-still-going-on-for-onu-%v", mm.deviceID))
+ }
+ mm.setEthernetFrameExtendedPmCounterOperationFlag(true)
+ defer mm.setEthernetFrameExtendedPmCounterOperationFlag(false)
+
+ upstreamEntityMap := make(map[uint16]*me.ManagedEntity)
+ downstreamEntityMap := make(map[uint16]*me.ManagedEntity)
+ if onuInfo.IsUniIndex != nil {
+ for _, uniPort := range *mm.pDeviceHandler.GetUniEntityMap() {
+ if uniPort.UniID == uint8(onuInfo.GetUniIndex()) {
+ logger.Debugw(ctx, "mapped-uni-index-to-uni-port", log.Fields{"device-id": mm.deviceID, "uni-index": onuInfo.GetUniIndex()})
+ upstreamEntityMap[uniPort.EntityID+0x100] = mm.ethernetFrameExtendedPmUpStreamMEByEntityID[uniPort.EntityID+0x100]
+ downstreamEntityMap[uniPort.EntityID] = mm.ethernetFrameExtendedPmDownStreamMEByEntityID[uniPort.EntityID]
+ break
+ }
+ }
+ if len(downstreamEntityMap) == 0 {
+ logger.Errorw(ctx, "invalid-uni-index-provided-while-fetching-the-extended-pm",
+ log.Fields{"device-id": mm.deviceID, "uni-index": onuInfo.GetUniIndex()})
+ return errFunc(extension.GetValueResponse_INVALID_REQ_TYPE,
+ fmt.Sprintf("onu-%s-invalid-uni-%v", mm.deviceID, onuInfo.GetUniIndex()))
+ }
+ } else {
+ // make a copy of all downstream and upstream maps in the local ones
+ for entityID, meEnt := range mm.ethernetFrameExtendedPmUpStreamMEByEntityID {
+ upstreamEntityMap[entityID] = meEnt
+ }
+ for entityID, meEnt := range mm.ethernetFrameExtendedPmDownStreamMEByEntityID {
+ downstreamEntityMap[entityID] = meEnt
+ }
+ }
+ logger.Debugw(ctx, "extended-pm-collection-me-count", log.Fields{"device-id": mm.deviceID,
+ "count": len(upstreamEntityMap) + len(downstreamEntityMap)})
+ // Reset the metrics first for all required me's
+ if onuInfo.Reset_ {
+ errReason, err := mm.resetEthernetFrameExtendedPMCounters(ctx, upstreamEntityMap, downstreamEntityMap)
+ if err != nil {
+ logger.Errorw(ctx, "unable-to-reset-ethernet-frame-extended-pm-counters",
+ log.Fields{"device-id": mm.deviceID})
+ return errFunc(errReason, fmt.Sprintf("%v", err.Error()))
+ }
+ }
// Collect metrics for upstream for all the PM Mes per uni port and aggregate
var pmUpstream extension.OmciEthernetFrameExtendedPm
var pmDownstream extension.OmciEthernetFrameExtendedPm
@@ -3168,41 +3337,45 @@
if mm.supportedEthernetFrameExtendedPMClass == me.EthernetFrameExtendedPmClassID {
counterFormat = extension.GetOmciEthernetFrameExtendedPmResponse_THIRTY_TWO_BIT
}
- for entityID, meEnt := range mm.ethernetFrameExtendedPmUpStreamMEByEntityID {
- var receivedMask uint16
- if metricInfo, errResp := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, true, &receivedMask); metricInfo != nil { // upstream
- if receivedMask == 0 {
- pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, false)
- logger.Error(ctx, "all-the-attributes-of-ethernet-frame-extended-pm-counters-are-unsupported")
- pmDownstream = pmUpstream
- singleValResp := extension.SingleGetValueResponse{
- Response: &extension.GetValueResponse{
- Status: extension.GetValueResponse_OK,
- Response: &extension.GetValueResponse_OnuCounters{
- OnuCounters: &extension.GetOmciEthernetFrameExtendedPmResponse{
- Upstream: &pmUpstream,
- Downstream: &pmDownstream,
- OmciEthernetFrameExtendedPmFormat: counterFormat,
+ if !onuInfo.Reset_ {
+ for entityID, meEnt := range upstreamEntityMap {
+ logger.Debugw(ctx, "collect-upstream-pm-counters-for-entity-id", log.Fields{"device-id": mm.deviceID, "entityID": entityID})
+ var receivedMask uint16
+ if metricInfo, errResp, err := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, true, &receivedMask); metricInfo != nil { // upstream
+ if receivedMask == 0 {
+ pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, false)
+ logger.Error(ctx, "all-the-attributes-of-ethernet-frame-extended-pm-counters-are-unsupported")
+ pmDownstream = pmUpstream
+ singleValResp := extension.SingleGetValueResponse{
+ Response: &extension.GetValueResponse{
+ Status: extension.GetValueResponse_OK,
+ Response: &extension.GetValueResponse_OnuCounters{
+ OnuCounters: &extension.GetOmciEthernetFrameExtendedPmResponse{
+ Upstream: &pmUpstream,
+ Downstream: &pmDownstream,
+ OmciEthernetFrameExtendedPmFormat: counterFormat,
+ },
},
},
- },
+ }
+ return &singleValResp
}
- return &singleValResp
+ // Aggregate the result for upstream
+ pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, true)
+ } else {
+ return errFunc(errResp, fmt.Sprintf("%v", err.Error()))
}
- // Aggregate the result for upstream
- pmUpstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmUpstream, true)
- } else {
- return errFunc(errResp)
}
- }
- for entityID, meEnt := range mm.ethernetFrameExtendedPmDownStreamMEByEntityID {
- var receivedMask uint16
- if metricInfo, errResp := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, false, &receivedMask); metricInfo != nil { // downstream
- // Aggregate the result for downstream
- pmDownstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmDownstream, true)
- } else {
- return errFunc(errResp)
+ for entityID, meEnt := range downstreamEntityMap {
+ logger.Debugw(ctx, "collect-downstream-pm-counters-for-entity-id", log.Fields{"device-id": mm.deviceID, "entityID": entityID})
+ var receivedMask uint16
+ if metricInfo, errResp, err := mm.collectEthernetFrameExtendedPMData(ctx, meEnt, entityID, false, &receivedMask); metricInfo != nil { // downstream
+ // Aggregate the result for downstream
+ pmDownstream = mm.aggregateEthernetFrameExtendedPM(metricInfo, pmDownstream, true)
+ } else {
+ return errFunc(errResp, fmt.Sprintf("%v", err.Error()))
+ }
}
}
singleValResp := extension.SingleGetValueResponse{
@@ -3220,7 +3393,7 @@
return &singleValResp
}
-func (mm *OnuMetricsManager) collectEthernetFrameExtendedPMData(ctx context.Context, meEnt *me.ManagedEntity, entityID uint16, upstream bool, receivedMask *uint16) (map[string]uint64, extension.GetValueResponse_ErrorReason) {
+func (mm *OnuMetricsManager) collectEthernetFrameExtendedPMData(ctx context.Context, meEnt *me.ManagedEntity, entityID uint16, upstream bool, receivedMask *uint16) (map[string]uint64, extension.GetValueResponse_ErrorReason, error) {
var classID me.ClassID
logger.Debugw(ctx, "collecting-data-for-ethernet-frame-extended-pm", log.Fields{"device-id": mm.deviceID, "entityID": entityID, "upstream": upstream})
@@ -3235,7 +3408,7 @@
if errResp, err := mm.populateEthernetFrameExtendedPMMetrics(ctx, classID, entityID, mask, ethPMData, upstream, &sumReceivedMask); err != nil {
logger.Errorw(ctx, "error-during-metric-collection",
log.Fields{"device-id": mm.deviceID, "entityID": entityID, "err": err})
- return nil, errResp
+ return nil, errResp, err
}
if (mask == 0x3F00 || mask == 0x3800) && sumReceivedMask == 0 {
//It means the first attributes fetch was a failure, hence instead of sending multiple failure get requests
@@ -3245,7 +3418,7 @@
}
}
*receivedMask = sumReceivedMask
- return ethPMData, extension.GetValueResponse_REASON_UNDEFINED
+ return ethPMData, extension.GetValueResponse_REASON_UNDEFINED, nil
}
// nolint: gocyclo
@@ -3576,9 +3749,8 @@
return receivedMask
}
-func (mm *OnuMetricsManager) aggregateEthernetFrameExtendedPM(pmDataIn map[string]uint64, pmData extension.OmciEthernetFrameExtendedPm, aggregate bool) extension.OmciEthernetFrameExtendedPm {
- mm.onuEthernetFrameExtendedPmLock.Lock()
- defer mm.onuEthernetFrameExtendedPmLock.Unlock()
+func (mm *OnuMetricsManager) aggregateEthernetFrameExtendedPM(pmDataIn map[string]uint64,
+ pmData extension.OmciEthernetFrameExtendedPm, aggregate bool) extension.OmciEthernetFrameExtendedPm {
errorCounterValue := UnsupportedCounterValue64bit
if mm.supportedEthernetFrameExtendedPMClass == me.EthernetFrameExtendedPmClassID {
errorCounterValue = UnsupportedCounterValue32bit
@@ -3674,7 +3846,7 @@
return pmDataOut
}
-func (mm *OnuMetricsManager) getControlBlockForExtendedPMDirection(ctx context.Context, upstream bool, entityID uint16) []uint16 {
+func (mm *OnuMetricsManager) getControlBlockForExtendedPMDirection(ctx context.Context, upstream bool, entityID uint16, reset bool) []uint16 {
controlBlock := make([]uint16, 8)
// Control Block First two bytes are for threshold data 1/2 id - does not matter here
controlBlock[0] = 0
@@ -3683,7 +3855,11 @@
// Next two bytes are for the parent me instance id
controlBlock[2] = entityID
// Next two bytes are for accumulation disable
- controlBlock[3] = 0
+ if reset {
+ controlBlock[3] = 1 << 15 //Set the 16th bit of AD to reset the counters.
+ } else {
+ controlBlock[3] = 0
+ }
// Next two bytes are for tca disable
controlBlock[4] = 0x4000 //tca global disable
// Next two bytes are for control fields - bit 1(lsb) as 1 for continuous accumulation and bit 2(0 for upstream)