VOL-3957: Re-org the meter reference count update code to account
for scenarios involing flow replication across multiple p-bits.
Change-Id: I5f97fba1d75e2dc604007a151ed807a05c1165cd
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 4e10522..15c7e9c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -1876,18 +1876,7 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
- gemPortID int32, flowID uint64, portNum uint32, direction string) error {
-
- tpID, err := getTpIDFromFlow(ctx, flow)
- if err != nil {
- return olterrors.NewErrNotFound("tp-id",
- log.Fields{
- "flow": flow,
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id}, err)
- }
+ gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
tpPath := f.getTPpath(ctx, intfID, uni, tpID)
@@ -1928,9 +1917,6 @@
"usedByFlows": flowIDs,
"device-id": f.deviceHandler.device.Id})
- if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, intfID, uint32(onuID), uint32(uniID), tpID, false); err != nil {
- return err
- }
return nil
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
@@ -2086,8 +2072,19 @@
logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
return err
}
+ tpID, err := getTpIDFromFlow(ctx, flow)
+ if err != nil {
+ return olterrors.NewErrNotFound("tp-id",
+ log.Fields{
+ "flow": flow,
+ "intf-id": Intf,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
+
if !flowInfo.Flow.ReplicateFlow {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, tpID); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"stored-flow": flowInfo.Flow,
@@ -2095,6 +2092,7 @@
"stored-flow-id": flowInfo.Flow.FlowId,
"onu-id": onuID,
"intf": Intf,
+ "err": err,
})
return err
}
@@ -2105,7 +2103,7 @@
}
logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
for _, gem := range gems {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, tpID); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"stored-flow": flowInfo.Flow,
@@ -2114,6 +2112,7 @@
"onu-id": onuID,
"intf": Intf,
"gem": gem,
+ "err": err,
})
return err
}
@@ -2136,6 +2135,10 @@
f.subscriberDataPathFlowIDMapLock.Unlock()
}
}
+ // Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
+ if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, flowDirection, Intf, uint32(onuID), uint32(uniID), tpID, false); err != nil {
+ return err
+ }
return nil
}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 4e7aa79..a501310 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -1107,13 +1107,25 @@
func (RsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
IntfID uint32, OnuID uint32, UniID uint32, TpID uint32, increment bool) error {
meterInfo, err := RsrcMgr.GetMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID)
- if meterInfo == nil || err != nil {
- return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+ if err != nil {
+ return err
+ } else if meterInfo == nil {
+ // If we are increasing the reference count, we expect the meter information to be present on KV store.
+ // But if decrementing the reference count, the meter is possibly already cleared from KV store. Just log warn but do not return error.
+ if increment {
+ logger.Errorf(ctx, "error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+ return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+ }
+ logger.Warnw(ctx, "meter is already cleared",
+ log.Fields{"intfID": IntfID, "onuID": OnuID, "uniID": UniID, "direction": Direction, "increment": increment})
+ return nil
}
+
if increment {
meterInfo.RefCnt++
} else {
meterInfo.RefCnt--
+ // If RefCnt become 0 clear the meter information from the DB.
if meterInfo.RefCnt == 0 {
if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID); err != nil {
return err