[VOL-3957]: Maintain meter reference counter and free up the meter from
KV store as soon as last flow referencing it is removed.
Change-Id: I1b32690e1d65a35e0d03a65aa7b2e38a38997521
diff --git a/VERSION b/VERSION
index 944880f..8ee49b2 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.2.0
+3.2.1-dev
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 8b3947e..53b33f5 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1677,11 +1677,11 @@
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
for _, tpID := range tpIDList {
- if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 31ca07d..4e10522 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -440,7 +440,7 @@
*/
var SchedCfg *tp_pb.SchedulerConfig
- KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
return olterrors.NewErrNotFound("meter",
log.Fields{"intf-id": sq.intfID,
@@ -449,14 +449,17 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- if KvStoreMeter != nil {
- if KvStoreMeter.MeterId == sq.meterID {
- logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id})
+ if meterInfo != nil {
+ logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
+ if meterInfo.MeterConfig.MeterId == sq.meterID {
+ if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
+ return err
+ }
return nil
}
return olterrors.NewErrInvalidValue(log.Fields{
"unsupported": "meter-id",
- "kv-store-meter-id": KvStoreMeter.MeterId,
+ "kv-store-meter-id": meterInfo.MeterConfig.MeterId,
"meter-id-in-flow": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
@@ -482,42 +485,48 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- var meterConfig *ofp.OfpMeterConfig
+ found := false
+ meterInfo = &rsrcMgr.MeterInfo{}
if sq.flowMetadata != nil {
for _, meter := range sq.flowMetadata.Meters {
if sq.meterID == meter.MeterId {
- meterConfig = meter
+ meterInfo.MeterConfig = ofp.OfpMeterConfig{}
+ meterInfo.MeterConfig.MeterId = meter.MeterId
+ meterInfo.MeterConfig.Flags = meter.Flags
+ meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
+ meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
- log.Fields{"meterConfig": meterConfig,
+ log.Fields{"meterConfig": meterInfo.MeterConfig,
"device-id": f.deviceHandler.device.Id})
+ found = true
break
}
}
} else {
logger.Errorw(ctx, "flow-metadata-not-present-in-flow", log.Fields{"device-id": f.deviceHandler.device.Id})
}
- if meterConfig == nil {
+ if !found {
return olterrors.NewErrNotFound("meterbands", log.Fields{
"reason": "Could-not-get-meterbands-from-flowMetadata",
"flow-metadata": sq.flowMetadata,
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
- } else if len(meterConfig.Bands) < MaxMeterBand {
+ } else if len(meterInfo.MeterConfig.Bands) < MaxMeterBand {
logger.Errorw(ctx, "invalid-number-of-bands-in-meter",
- log.Fields{"Bands": meterConfig.Bands,
+ log.Fields{"Bands": meterInfo.MeterConfig.Bands,
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id})
return olterrors.NewErrInvalidValue(log.Fields{
"reason": "Invalid-number-of-bands-in-meter",
- "meterband-count": len(meterConfig.Bands),
- "metabands": meterConfig.Bands,
+ "meterband-count": len(meterInfo.MeterConfig.Bands),
+ "metabands": meterInfo.MeterConfig.Bands,
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
- cir := meterConfig.Bands[0].Rate
- cbs := meterConfig.Bands[0].BurstSize
- eir := meterConfig.Bands[1].Rate
- ebs := meterConfig.Bands[1].BurstSize
+ cir := meterInfo.MeterConfig.Bands[0].Rate
+ cbs := meterInfo.MeterConfig.Bands[0].BurstSize
+ eir := meterInfo.MeterConfig.Bands[1].Rate
+ ebs := meterInfo.MeterConfig.Bands[1].BurstSize
pir := cir + eir
pbs := cbs + ebs
TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
@@ -535,7 +544,7 @@
/* After we successfully applied the scheduler configuration on the OLT device,
* store the meter id on the KV store, for further reference.
*/
- if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+ if err := f.resourceMgr.StoreMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterInfo); err != nil {
return olterrors.NewErrAdapter("failed-updating-meter-id",
log.Fields{"onu-id": sq.onuID,
"meter-id": sq.meterID,
@@ -543,8 +552,8 @@
}
logger.Infow(ctx, "updated-meter-info-into-kv-store-successfully",
log.Fields{"direction": Direction,
- "Meter": meterConfig,
- "device-id": f.deviceHandler.device.Id})
+ "meter-info": meterInfo,
+ "device-id": f.deviceHandler.device.Id})
return nil
}
@@ -649,31 +658,7 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
- if err != nil {
- return olterrors.NewErrNotFound("meter",
- log.Fields{
- "onu-id": sq.onuID,
- "device-id": f.deviceHandler.device.Id}, err)
- }
- if KVStoreMeter == nil {
- logger.Warnw(ctx, "no-meter-installed-yet",
- log.Fields{
- "direction": Direction,
- "intf-id": sq.intfID,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "device-id": f.deviceHandler.device.Id})
- return nil
- }
- cir := KVStoreMeter.Bands[0].Rate
- cbs := KVStoreMeter.Bands[0].BurstSize
- eir := KVStoreMeter.Bands[1].Rate
- ebs := KVStoreMeter.Bands[1].BurstSize
- pir := cir + eir
- pbs := cbs + ebs
-
- TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+ TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
@@ -722,12 +707,11 @@
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
*/
- err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ err = f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
return olterrors.NewErrAdapter("unable-to-remove-meter",
log.Fields{
"onu": sq.onuID,
- "meter": KVStoreMeter.MeterId,
"device-id": f.deviceHandler.device.Id,
"intf-id": sq.intfID,
"onu-id": sq.onuID,
@@ -736,7 +720,6 @@
}
logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
log.Fields{
- "meter-id": KVStoreMeter.MeterId,
"dir": Direction,
"device-id": f.deviceHandler.device.Id,
"intf-id": sq.intfID,
@@ -1892,27 +1875,27 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
- gemPortID int32, flowID uint64, portNum uint32) error {
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+ gemPortID int32, flowID uint64, portNum uint32, direction string) error {
tpID, err := getTpIDFromFlow(ctx, flow)
if err != nil {
return olterrors.NewErrNotFound("tp-id",
log.Fields{
"flow": flow,
- "intf": Intf,
+ "intf-id": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id}, err)
}
- uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
- tpPath := f.getTPpath(ctx, Intf, uni, tpID)
+ uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
+ tpPath := f.getTPpath(ctx, intfID, uni, tpID)
logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
log.Fields{
"tpPath": tpPath,
"device-id": f.deviceHandler.device.Id})
- techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
+ techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
return olterrors.NewErrNotFound("tech-profile-in-kv-store",
log.Fields{
@@ -1933,7 +1916,7 @@
// everytime flowsUsedByGemPort cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
- if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+ if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
return err
}
break
@@ -1944,19 +1927,23 @@
"gemport-id": gemPortID,
"usedByFlows": flowIDs,
"device-id": f.deviceHandler.device.Id})
+
+ if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, intfID, uint32(onuID), uint32(uniID), tpID, false); err != nil {
+ return err
+ }
return nil
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
// But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
+ f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
// also clear gem to uni cache
f.removeFromGemToUniMap(gemPortKey{
- intfID: Intf,
+ intfID: intfID,
gemPort: uint32(gemPortID),
})
- f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
+ f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
f.onuIdsLock.Lock() // TODO: What is this lock?
@@ -1965,17 +1952,17 @@
f.flowsUsedByGemPortKey.Lock()
delete(f.flowsUsedByGemPort, uint32(gemPortID))
f.flowsUsedByGemPortKey.Unlock()
- f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
// Delete the gem port on the ONU.
- if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
+ if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
log.Fields{
"err": err,
- "intf": Intf,
+ "intfID": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
@@ -1983,26 +1970,26 @@
}
switch techprofileInst := techprofileInst.(type) {
case *tp.TechProfile:
- ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+ ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
if !ok {
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
logger.Warn(ctx, err)
}
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
- "intf": Intf,
+ "intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
@@ -2010,18 +1997,18 @@
}
}
case *tp.EponProfile:
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
- "intf": Intf,
+ "intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
@@ -2100,7 +2087,7 @@
return err
}
if !flowInfo.Flow.ReplicateFlow {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum); err != nil {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"stored-flow": flowInfo.Flow,
@@ -2118,7 +2105,7 @@
}
logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
for _, gem := range gems {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum); err != nil {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"stored-flow": flowInfo.Flow,
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 7a3f8ec..4e7aa79 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -109,6 +109,12 @@
OutPorts []uint32
}
+// MeterInfo store meter information at path <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
+type MeterInfo struct {
+ RefCnt uint8 // number of flow references for this meter. When RefCnt is 0, the MeterInfo should be deleted.
+ MeterConfig ofp.OfpMeterConfig
+}
+
// OpenOltResourceMgr holds resource related information as provided below for each field
type OpenOltResourceMgr struct {
DeviceID string // OLT device id
@@ -1046,15 +1052,14 @@
return err
}
-// UpdateMeterIDForOnu updates the meter id in the KV-Store for the given onu based on the path
+// StoreMeterInfoForOnu updates the meter id in the KV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) UpdateMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32, MeterConfig *ofp.OfpMeterConfig) error {
+func (RsrcMgr *OpenOltResourceMgr) StoreMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
+ UniID uint32, TpID uint32, meterInfo *MeterInfo) error {
var Value []byte
var err error
-
IntfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
- Value, err = json.Marshal(*MeterConfig)
+ Value, err = json.Marshal(*meterInfo)
if err != nil {
logger.Error(ctx, "failed to Marshal meter config")
return err
@@ -1063,26 +1068,27 @@
logger.Errorf(ctx, "Failed to store meter into KV store %s", IntfOnuUniID)
return err
}
+ logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": IntfOnuUniID, "meter-info": meterInfo})
return err
}
-// GetMeterIDForOnu fetches the meter id from the kv store for the given onu based on the path
+// GetMeterInfoForOnu fetches the meter id from the kv store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) GetMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) (*ofp.OfpMeterConfig, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
+ UniID uint32, TpID uint32) (*MeterInfo, error) {
Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
- var meterConfig ofp.OfpMeterConfig
+ var meterInfo MeterInfo
Value, err := RsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
- logger.Debug(ctx, "Found meter in KV store", log.Fields{"Direction": Direction})
+ logger.Debug(ctx, "Found meter info in KV store", log.Fields{"Direction": Direction})
Val, er := kvstore.ToByte(Value.Value)
if er != nil {
logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": er})
return nil, er
}
- if er = json.Unmarshal(Val, &meterConfig); er != nil {
- logger.Error(ctx, "Failed to unmarshal meterconfig", log.Fields{"error": er})
+ if er = json.Unmarshal(Val, &meterInfo); er != nil {
+ logger.Error(ctx, "Failed to unmarshal meter info", log.Fields{"error": er})
return nil, er
}
} else {
@@ -1093,12 +1099,37 @@
logger.Errorf(ctx, "Failed to get Meter config from kvstore for path %s", Path)
}
- return &meterConfig, err
+ return &meterInfo, err
}
-// RemoveMeterIDForOnu deletes the meter id from the kV-Store for the given onu based on the path
+// HandleMeterInfoRefCntUpdate increments or decrements the reference counter for a given meter.
+// When reference count becomes 0, it clears the meter information from the kv store
+func (RsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
+ IntfID uint32, OnuID uint32, UniID uint32, TpID uint32, increment bool) error {
+ meterInfo, err := RsrcMgr.GetMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID)
+ if meterInfo == nil || err != nil {
+ return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+ }
+ if increment {
+ meterInfo.RefCnt++
+ } else {
+ meterInfo.RefCnt--
+ if meterInfo.RefCnt == 0 {
+ if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID); err != nil {
+ return err
+ }
+ return nil
+ }
+ }
+ if err := RsrcMgr.StoreMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID, meterInfo); err != nil {
+ return err
+ }
+ return nil
+}
+
+// RemoveMeterInfoForOnu deletes the meter id from the kV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) RemoveMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) RemoveMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
UniID uint32, TpID uint32) error {
Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
if err := RsrcMgr.KVStore.Delete(ctx, Path); err != nil {
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index e71d07e..53f8898 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -518,7 +518,7 @@
}
}
-func TestOpenOltResourceMgr_GetMeterIDForOnu(t *testing.T) {
+func TestOpenOltResourceMgr_GetMeterInfoForOnu(t *testing.T) {
type args struct {
Direction string
IntfID uint32
@@ -530,22 +530,22 @@
name string
fields *fields
args args
- want *ofp.OfpMeterConfig
+ want *MeterInfo
wantErr error
}{
- {"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
- &ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
- {"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
- &ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
+ {"GetMeterInfoForOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
+ &MeterInfo{}, errors.New("failed to get Meter config from kvstore for path")},
+ {"GetMeterInfoForOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
+ &MeterInfo{}, errors.New("failed to get Meter config from kvstore for path")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- got, err := RsrcMgr.GetMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.tpID)
+ got, err := RsrcMgr.GetMeterInfoForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.tpID)
if reflect.TypeOf(got) != reflect.TypeOf(tt.want) && err != nil {
- t.Errorf("GetMeterIDForOnu() got = %v, want %v", got, tt.want)
+ t.Errorf("GetMeterInfoForOnu() got = %v, want %v", got, tt.want)
}
})
}
@@ -628,7 +628,7 @@
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if err := RsrcMgr.RemoveMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+ if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
tt.args.tpID); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("RemoveMeterIDForOnu() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -781,12 +781,12 @@
func TestOpenOltResourceMgr_UpdateMeterIDForOnu(t *testing.T) {
type args struct {
- Direction string
- IntfID uint32
- OnuID uint32
- UniID uint32
- tpID uint32
- MeterConfig *ofp.OfpMeterConfig
+ Direction string
+ IntfID uint32
+ OnuID uint32
+ UniID uint32
+ tpID uint32
+ MeterInfo *MeterInfo
}
tests := []struct {
name string
@@ -795,15 +795,15 @@
wantErr error
}{
{"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 1, 2,
- 2, 64, &ofp.OfpMeterConfig{}}, errors.New("failed to get Meter config from kvstore for path")},
+ 2, 64, &MeterInfo{}}, errors.New("failed to get Meter config from kvstore for path")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if err := RsrcMgr.UpdateMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
- tt.args.tpID, tt.args.MeterConfig); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
+ if err := RsrcMgr.StoreMeterInfoForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+ tt.args.tpID, tt.args.MeterInfo); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("UpdateMeterIDForOnu() got = %v, want %v", err, tt.wantErr)
}
})