[VOL-3957]: Maintain meter reference counter and free up the meter from
KV store as soon as last flow referencing it is removed.
Change-Id: I1b32690e1d65a35e0d03a65aa7b2e38a38997521
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 31ca07d..4e10522 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -440,7 +440,7 @@
*/
var SchedCfg *tp_pb.SchedulerConfig
- KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
return olterrors.NewErrNotFound("meter",
log.Fields{"intf-id": sq.intfID,
@@ -449,14 +449,17 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- if KvStoreMeter != nil {
- if KvStoreMeter.MeterId == sq.meterID {
- logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id})
+ if meterInfo != nil {
+ logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
+ if meterInfo.MeterConfig.MeterId == sq.meterID {
+ if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
+ return err
+ }
return nil
}
return olterrors.NewErrInvalidValue(log.Fields{
"unsupported": "meter-id",
- "kv-store-meter-id": KvStoreMeter.MeterId,
+ "kv-store-meter-id": meterInfo.MeterConfig.MeterId,
"meter-id-in-flow": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
@@ -482,42 +485,48 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- var meterConfig *ofp.OfpMeterConfig
+ found := false
+ meterInfo = &rsrcMgr.MeterInfo{}
if sq.flowMetadata != nil {
for _, meter := range sq.flowMetadata.Meters {
if sq.meterID == meter.MeterId {
- meterConfig = meter
+ meterInfo.MeterConfig = ofp.OfpMeterConfig{}
+ meterInfo.MeterConfig.MeterId = meter.MeterId
+ meterInfo.MeterConfig.Flags = meter.Flags
+ meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
+ meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
- log.Fields{"meterConfig": meterConfig,
+ log.Fields{"meterConfig": meterInfo.MeterConfig,
"device-id": f.deviceHandler.device.Id})
+ found = true
break
}
}
} else {
logger.Errorw(ctx, "flow-metadata-not-present-in-flow", log.Fields{"device-id": f.deviceHandler.device.Id})
}
- if meterConfig == nil {
+ if !found {
return olterrors.NewErrNotFound("meterbands", log.Fields{
"reason": "Could-not-get-meterbands-from-flowMetadata",
"flow-metadata": sq.flowMetadata,
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
- } else if len(meterConfig.Bands) < MaxMeterBand {
+ } else if len(meterInfo.MeterConfig.Bands) < MaxMeterBand {
logger.Errorw(ctx, "invalid-number-of-bands-in-meter",
- log.Fields{"Bands": meterConfig.Bands,
+ log.Fields{"Bands": meterInfo.MeterConfig.Bands,
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id})
return olterrors.NewErrInvalidValue(log.Fields{
"reason": "Invalid-number-of-bands-in-meter",
- "meterband-count": len(meterConfig.Bands),
- "metabands": meterConfig.Bands,
+ "meterband-count": len(meterInfo.MeterConfig.Bands),
+ "metabands": meterInfo.MeterConfig.Bands,
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
- cir := meterConfig.Bands[0].Rate
- cbs := meterConfig.Bands[0].BurstSize
- eir := meterConfig.Bands[1].Rate
- ebs := meterConfig.Bands[1].BurstSize
+ cir := meterInfo.MeterConfig.Bands[0].Rate
+ cbs := meterInfo.MeterConfig.Bands[0].BurstSize
+ eir := meterInfo.MeterConfig.Bands[1].Rate
+ ebs := meterInfo.MeterConfig.Bands[1].BurstSize
pir := cir + eir
pbs := cbs + ebs
TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
@@ -535,7 +544,7 @@
/* After we successfully applied the scheduler configuration on the OLT device,
* store the meter id on the KV store, for further reference.
*/
- if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+ if err := f.resourceMgr.StoreMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterInfo); err != nil {
return olterrors.NewErrAdapter("failed-updating-meter-id",
log.Fields{"onu-id": sq.onuID,
"meter-id": sq.meterID,
@@ -543,8 +552,8 @@
}
logger.Infow(ctx, "updated-meter-info-into-kv-store-successfully",
log.Fields{"direction": Direction,
- "Meter": meterConfig,
- "device-id": f.deviceHandler.device.Id})
+ "meter-info": meterInfo,
+ "device-id": f.deviceHandler.device.Id})
return nil
}
@@ -649,31 +658,7 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
- if err != nil {
- return olterrors.NewErrNotFound("meter",
- log.Fields{
- "onu-id": sq.onuID,
- "device-id": f.deviceHandler.device.Id}, err)
- }
- if KVStoreMeter == nil {
- logger.Warnw(ctx, "no-meter-installed-yet",
- log.Fields{
- "direction": Direction,
- "intf-id": sq.intfID,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "device-id": f.deviceHandler.device.Id})
- return nil
- }
- cir := KVStoreMeter.Bands[0].Rate
- cbs := KVStoreMeter.Bands[0].BurstSize
- eir := KVStoreMeter.Bands[1].Rate
- ebs := KVStoreMeter.Bands[1].BurstSize
- pir := cir + eir
- pbs := cbs + ebs
-
- TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+ TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
@@ -722,12 +707,11 @@
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
*/
- err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ err = f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
return olterrors.NewErrAdapter("unable-to-remove-meter",
log.Fields{
"onu": sq.onuID,
- "meter": KVStoreMeter.MeterId,
"device-id": f.deviceHandler.device.Id,
"intf-id": sq.intfID,
"onu-id": sq.onuID,
@@ -736,7 +720,6 @@
}
logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
log.Fields{
- "meter-id": KVStoreMeter.MeterId,
"dir": Direction,
"device-id": f.deviceHandler.device.Id,
"intf-id": sq.intfID,
@@ -1892,27 +1875,27 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
- gemPortID int32, flowID uint64, portNum uint32) error {
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+ gemPortID int32, flowID uint64, portNum uint32, direction string) error {
tpID, err := getTpIDFromFlow(ctx, flow)
if err != nil {
return olterrors.NewErrNotFound("tp-id",
log.Fields{
"flow": flow,
- "intf": Intf,
+ "intf-id": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id}, err)
}
- uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
- tpPath := f.getTPpath(ctx, Intf, uni, tpID)
+ uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
+ tpPath := f.getTPpath(ctx, intfID, uni, tpID)
logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
log.Fields{
"tpPath": tpPath,
"device-id": f.deviceHandler.device.Id})
- techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
+ techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
return olterrors.NewErrNotFound("tech-profile-in-kv-store",
log.Fields{
@@ -1933,7 +1916,7 @@
// everytime flowsUsedByGemPort cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
- if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+ if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
return err
}
break
@@ -1944,19 +1927,23 @@
"gemport-id": gemPortID,
"usedByFlows": flowIDs,
"device-id": f.deviceHandler.device.Id})
+
+ if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, intfID, uint32(onuID), uint32(uniID), tpID, false); err != nil {
+ return err
+ }
return nil
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
// But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
+ f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
// also clear gem to uni cache
f.removeFromGemToUniMap(gemPortKey{
- intfID: Intf,
+ intfID: intfID,
gemPort: uint32(gemPortID),
})
- f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
+ f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
f.onuIdsLock.Lock() // TODO: What is this lock?
@@ -1965,17 +1952,17 @@
f.flowsUsedByGemPortKey.Lock()
delete(f.flowsUsedByGemPort, uint32(gemPortID))
f.flowsUsedByGemPortKey.Unlock()
- f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
// Delete the gem port on the ONU.
- if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
+ if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
log.Fields{
"err": err,
- "intf": Intf,
+ "intfID": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
@@ -1983,26 +1970,26 @@
}
switch techprofileInst := techprofileInst.(type) {
case *tp.TechProfile:
- ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+ ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
if !ok {
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
logger.Warn(ctx, err)
}
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
- "intf": Intf,
+ "intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
@@ -2010,18 +1997,18 @@
}
}
case *tp.EponProfile:
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
- "intf": Intf,
+ "intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
@@ -2100,7 +2087,7 @@
return err
}
if !flowInfo.Flow.ReplicateFlow {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum); err != nil {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"stored-flow": flowInfo.Flow,
@@ -2118,7 +2105,7 @@
}
logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
for _, gem := range gems {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum); err != nil {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"stored-flow": flowInfo.Flow,