VOL-4559: Delete the TP Instance and TP ID first once all the GEM
references are cleared before checking for other TP instances with
the same alloc-id in case of instance-control is single-instance per ONU.
Also has more fixes related to handling of instance-control onu single-
instance during setup and removal of schedulers/queues.
Also fix issue related to stale data after device delete for mcast
queues.
Change-Id: Iaca358128e91c3f1ba23dd4bea3d05dccc67bb02
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 9ae21dd..a8c33f0 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -2046,6 +2046,7 @@
}
_ = dh.resourceMgr[ponPort].DeleteAllFlowIDsForGemForIntf(ctx, ponPort)
_ = dh.resourceMgr[ponPort].DeleteAllOnuGemInfoForIntf(ctx, ponPort)
+ dh.resourceMgr[ponPort].DeleteMcastQueueForIntf(ctx)
if err := dh.resourceMgr[ponPort].Delete(ctx, ponPort); err != nil {
logger.Debug(ctx, err)
}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 50c1707..818b167 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -388,9 +388,10 @@
}
// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
+// nolint: gocyclo
func (f *OpenOltFlowMgr) CreateSchedulerQueues(ctx context.Context, sq schedQueue) error {
- logger.Debugw(ctx, "CreateSchedulerQueues",
+ logger.Infow(ctx, "CreateSchedulerQueues",
log.Fields{"dir": sq.direction,
"intf-id": sq.intfID,
"onu-id": sq.onuID,
@@ -406,12 +407,34 @@
return err
}
+ var TrafficShaping *tp_pb.TrafficShapingInfo
+ if sq.flowMetadata == nil || len(sq.flowMetadata.Meters) != 1 {
+ return olterrors.NewErrInvalidValue(log.Fields{
+ "reason": "invalid-meter-config",
+ "meter-id": sq.meterID,
+ "device-id": f.deviceHandler.device.Id}, nil)
+ }
+
+ if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, sq.flowMetadata.Meters[0]); err != nil {
+ return olterrors.NewErrInvalidValue(log.Fields{
+ "reason": "invalid-meter-config",
+ "meter-id": sq.meterID,
+ "device-id": f.deviceHandler.device.Id}, nil)
+ }
+
+ var SchedCfg *tp_pb.SchedulerConfig
+ if sq.direction == tp_pb.Direction_UPSTREAM {
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
+ } else if sq.direction == tp_pb.Direction_DOWNSTREAM {
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
+ }
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
+ TrafficSched[0].TechProfileId = sq.tpID
+
/* Lets make a simple assumption that if the meter-id is present on the KV store,
* then the scheduler and queues configuration is applied on the OLT device
* in the given direction.
*/
-
- var SchedCfg *tp_pb.SchedulerConfig
meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
return olterrors.NewErrNotFound("meter",
@@ -421,11 +444,42 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- // update referernce count and return if the meter was already installed before
+ // update reference count and return if the meter was already installed before
if meterInfo != nil && meterInfo.MeterID == sq.meterID {
- logger.Debugw(ctx, "scheduler-already-created-for-direction",
+ logger.Infow(ctx, "scheduler-already-created-for-direction",
log.Fields{"device-id": f.deviceHandler.device.Id, "direction": direction, "meter-id": sq.meterID})
- return f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true)
+ if err = f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
+ return err
+ }
+
+ if allocExists := f.isAllocUsedByAnotherUNI(ctx, sq); allocExists {
+ // Alloc object was already created as part of flow setup on another uni of the onu for the same service.
+ // Just create gem ports and traffic queues on the current uni for the given service
+ logger.Infow(ctx, "alloc in use on another uni, schedulers already created, creating queues only",
+ log.Fields{"intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "tp-id": sq.tpID,
+ "device-id": f.deviceHandler.device.Id})
+ // The upstream scheduler is already created. We only need to create the queues
+ // If there are multiple upstream flows on a given uni, then it is possible that
+ // we call pushTrafficQueues multiple times, but that is OK as BAL returns OK.
+ // TODO: Find better mechanism to not duplicate request.
+ if err = f.pushTrafficQueues(ctx, sq, TrafficSched); err != nil {
+ return olterrors.NewErrAdapter("failure-pushing-traffic-queues-to-device",
+ log.Fields{"intf-id": sq.intfID,
+ "direction": sq.direction,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
+ } else {
+ logger.Infow(ctx, "alloc not in use on another uni, only meter ref cnt updated",
+ log.Fields{"intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "tp-id": sq.tpID,
+ "device-id": f.deviceHandler.device.Id})
+ }
+ return err
}
logger.Debugw(ctx, "meter-does-not-exist-creating-new",
@@ -434,12 +488,6 @@
"direction": direction,
"device-id": f.deviceHandler.device.Id})
- if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
- } else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
- }
-
found := false
meterInfo = &rsrcMgr.MeterInfo{}
if sq.flowMetadata != nil {
@@ -465,17 +513,6 @@
"device-id": f.deviceHandler.device.Id}, nil)
}
- var TrafficShaping *tp_pb.TrafficShapingInfo
- if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, sq.flowMetadata.Meters[0]); err != nil {
- return olterrors.NewErrInvalidValue(log.Fields{
- "reason": "invalid-meter-config",
- "meter-id": sq.meterID,
- "device-id": f.deviceHandler.device.Id}, nil)
- }
-
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
- TrafficSched[0].TechProfileId = sq.tpID
-
if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficSched); err != nil {
return olterrors.NewErrAdapter("failure-pushing-traffic-scheduler-and-queues-to-device",
log.Fields{"intf-id": sq.intfID,
@@ -499,6 +536,32 @@
return nil
}
+func (f *OpenOltFlowMgr) pushTrafficQueues(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
+ trafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
+ if err != nil {
+ return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
+ log.Fields{"intf-id": sq.intfID,
+ "direction": sq.direction,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
+ logger.Debugw(ctx, "sending-traffic-queues-create-to-device",
+ log.Fields{"direction": sq.direction,
+ "traffic-queues": trafficQueues,
+ "device-id": f.deviceHandler.device.Id})
+ queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
+ TrafficQueues: trafficQueues,
+ TechProfileId: TrafficSched[0].TechProfileId}
+ if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+ if len(queues.TrafficQueues) > 1 {
+ logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
+ _, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+ }
+ return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
+ }
+ return err
+}
+
func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
trafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
@@ -579,13 +642,49 @@
return nil
}
-// RemoveSchedulerQueues removes the traffic schedulers from the device based on the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) RemoveSchedulerQueues(ctx context.Context, sq schedQueue) error {
+// RemoveQueues removes the traffic queues from the device based on the given schedQueue info
+func (f *OpenOltFlowMgr) RemoveQueues(ctx context.Context, sq schedQueue) error {
+ var err error
+ logger.Infow(ctx, "removing-queue-in-olt",
+ log.Fields{
+ "direction": sq.direction,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort,
+ "device-id": f.deviceHandler.device.Id})
+ TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
+ if err != nil {
+ return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
+ log.Fields{
+ "intf-id": sq.intfID,
+ "direction": sq.direction,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
+
+ if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+ &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
+ TrafficQueues: TrafficQueues,
+ TechProfileId: sq.tpID}); err != nil {
+ return olterrors.NewErrAdapter("unable-to-remove-traffic-queues-from-device",
+ log.Fields{
+ "intf-id": sq.intfID,
+ "traffic-queues": TrafficQueues,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
+ logger.Infow(ctx, "removed-traffic-queues-successfully", log.Fields{"device-id": f.deviceHandler.device.Id, "trafficQueues": TrafficQueues})
+
+ return err
+}
+
+// RemoveScheduler removes the traffic scheduler from the device based on the given schedQueue info
+func (f *OpenOltFlowMgr) RemoveScheduler(ctx context.Context, sq schedQueue) error {
var Direction string
var SchedCfg *tp_pb.SchedulerConfig
var err error
- logger.Infow(ctx, "removing-schedulers-and-queues-in-olt",
+ logger.Infow(ctx, "removing-scheduler-in-olt",
log.Fields{
"direction": sq.direction,
"intf-id": sq.intfID,
@@ -606,88 +705,46 @@
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
- TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
- if err != nil {
- return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
+ if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+ IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
+ TrafficScheds: TrafficSched}); err != nil {
+ return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
log.Fields{
- "intf-id": sq.intfID,
- "direction": sq.direction,
- "device-id": f.deviceHandler.device.Id}, err)
+ "intf-id": sq.intfID,
+ "traffic-schedulers": TrafficSched,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort}, err)
}
- if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
- &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
- UniId: sq.uniID, PortNo: sq.uniPort,
- TrafficQueues: TrafficQueues,
- TechProfileId: TrafficSched[0].TechProfileId}); err != nil {
- return olterrors.NewErrAdapter("unable-to-remove-traffic-queues-from-device",
- log.Fields{
- "intf-id": sq.intfID,
- "traffic-queues": TrafficQueues,
- "device-id": f.deviceHandler.device.Id}, err)
- }
- logger.Infow(ctx, "removed-traffic-queues-successfully", log.Fields{"device-id": f.deviceHandler.device.Id, "trafficQueues": TrafficQueues})
+ logger.Infow(ctx, "removed-traffic-schedulers-successfully",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort})
- if allocExists := f.isAllocUsedByAnotherUNI(ctx, sq); !allocExists {
- if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
- IntfId: sq.intfID, OnuId: sq.onuID,
- UniId: sq.uniID, PortNo: sq.uniPort,
- TrafficScheds: TrafficSched}); err != nil {
- return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
+ if sq.direction == tp_pb.Direction_UPSTREAM {
+ allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
+ // Delete the TCONT on the ONU.
+ uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
+ tpPath := f.getTPpath(ctx, sq.intfID, uni, sq.tpID)
+ if err := f.sendDeleteTcontToChild(ctx, sq.intfID, sq.onuID, sq.uniID, allocID, tpPath); err != nil {
+ logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
- "intf-id": sq.intfID,
- "traffic-schedulers": TrafficSched,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "uni-port": sq.uniPort}, err)
- }
-
- logger.Infow(ctx, "removed-traffic-schedulers-successfully",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "intf-id": sq.intfID,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "uni-port": sq.uniPort})
-
- if sq.direction == tp_pb.Direction_UPSTREAM {
- allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
- // Delete the TCONT on the ONU.
- uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
- tpPath := f.getTPpath(ctx, sq.intfID, uni, sq.tpID)
- if err := f.sendDeleteTcontToChild(ctx, sq.intfID, sq.onuID, sq.uniID, allocID, tpPath); err != nil {
- logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
- log.Fields{
- "intf": sq.intfID,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "device-id": f.deviceHandler.device.Id,
- "alloc-id": allocID})
- }
+ "intf": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "alloc-id": allocID})
}
}
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
*/
- err = f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
- if err != nil {
- return olterrors.NewErrAdapter("unable-to-remove-meter",
- log.Fields{
- "onu": sq.onuID,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": sq.intfID,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "uni-port": sq.uniPort}, err)
- }
- logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
- log.Fields{
- "dir": Direction,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": sq.intfID,
- "onu-id": sq.onuID,
- "uni-id": sq.uniID,
- "uni-port": sq.uniPort})
+ err = f.removeMeterReference(ctx, Direction, sq)
return err
}
@@ -1966,69 +2023,81 @@
}
logger.Debugw(ctx, "all-gem-ports-are-free-to-be-deleted", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "tpID": tpID})
+
+ // Free TPInstance, TPID, GemPorts and Traffic Queues. AllocID and Schedulers will be cleared later only if they are not shared across all the UNIs
switch techprofileInst := techprofileInst.(type) {
case *tp_pb.TechProfileInstance:
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
+ logger.Warn(ctx, err)
+ }
+ if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ logger.Warn(ctx, err)
+ }
+
+ for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
+ gemPortID := gemPort.GemportId
+ f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), gemPortID)
+ _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), gemPortID) // ignore error and proceed.
+
+ if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err == nil {
+ //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
+ // by calling DeleteFlowIDsForGem
+ f.gemToFlowIDsKey.Lock()
+ delete(f.gemToFlowIDs, gemPortID)
+ f.gemToFlowIDsKey.Unlock()
+ } else {
+ logger.Errorw(ctx, "error-removing-flow-ids-of-gem-port",
+ log.Fields{
+ "err": err,
+ "intf": intfID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "gemport-id": gemPortID})
+ }
+
+ }
+ // Remove queues at OLT in upstream and downstream direction
+ schedQueue := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}
+ if err := f.RemoveQueues(ctx, schedQueue); err != nil {
+ logger.Warn(ctx, err)
+ }
+ schedQueue.direction = tp_pb.Direction_DOWNSTREAM
+ if err := f.RemoveQueues(ctx, schedQueue); err != nil {
+ logger.Warn(ctx, err)
+ }
+ }
+
+ switch techprofileInst := techprofileInst.(type) {
+ case *tp_pb.TechProfileInstance:
+ // Proceed to free allocid and cleanup schedulers (US/DS) if no other references are found for this TP across all the UNIs on the ONU
schedQueue := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}
allocExists := f.isAllocUsedByAnotherUNI(ctx, schedQueue)
if !allocExists {
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
- logger.Warn(ctx, err)
- }
- if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
- logger.Warn(ctx, err)
- }
-
+ // all alloc object references removed, remove upstream scheduler
if KvStoreMeter, _ := f.resourceMgr.GetMeterInfoForOnu(ctx, "upstream", intfID, uint32(onuID), uint32(uniID), tpID); KvStoreMeter != nil {
- if err := f.RemoveSchedulerQueues(ctx, schedQueue); err != nil {
+ if err := f.RemoveScheduler(ctx, schedQueue); err != nil {
logger.Warn(ctx, err)
}
}
- f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocId)
-
- schedQueue.direction = tp_pb.Direction_DOWNSTREAM
- if KvStoreMeter, _ := f.resourceMgr.GetMeterInfoForOnu(ctx, "downstream", intfID, uint32(onuID), uint32(uniID), tpID); KvStoreMeter != nil {
- if err := f.RemoveSchedulerQueues(ctx, schedQueue); err != nil {
- logger.Warn(ctx, err)
- }
+ // remove alloc id from resource pool by setting the 'freeFromResourcePool' to true
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocId, true)
+ } else {
+ // just remove meter reference for the upstream direction for the current pon/onu/uni
+ // The upstream scheduler, alloc id and meter-reference for the last remaining pon/onu/uni will be removed when no other alloc references that TP
+ if err := f.removeMeterReference(ctx, "upstream", schedQueue); err != nil {
+ return err
}
+ // setting 'freeFromResourcePool' to false in resourceMgr.FreeAllocID will only remove alloc-id data for the given pon/onu/uni
+ // but still preserve it on the resource pool.
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocId, false)
+ }
- for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
- gemPortID := gemPort.GemportId
- f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), gemPortID)
- _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), gemPortID) // ignore error and proceed.
-
- if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err == nil {
- //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
- // by calling DeleteFlowIDsForGem
- f.gemToFlowIDsKey.Lock()
- delete(f.gemToFlowIDs, gemPortID)
- f.gemToFlowIDsKey.Unlock()
- } else {
- logger.Errorw(ctx, "error-removing-flow-ids-of-gem-port",
- log.Fields{
- "err": err,
- "intf": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id,
- "gemport-id": gemPortID})
- }
-
- f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), gemPortID)
-
- // Delete the gem port on the ONU.
- if sendDeleteGemRequest {
- if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), gemPortID, tpPath); err != nil {
- logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
- log.Fields{
- "err": err,
- "intfID": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id,
- "gemport-id": gemPortID})
- }
- }
+ // Downstream scheduler removal is simple, just invoke RemoveScheduler without all the complex handling we do for the alloc object.
+ schedQueue.direction = tp_pb.Direction_DOWNSTREAM
+ if KvStoreMeter, _ := f.resourceMgr.GetMeterInfoForOnu(ctx, "downstream", intfID, uint32(onuID), uint32(uniID), tpID); KvStoreMeter != nil {
+ if err := f.RemoveScheduler(ctx, schedQueue); err != nil {
+ logger.Warn(ctx, err)
}
}
case *tp_pb.EponTechProfileInstance:
@@ -2038,7 +2107,6 @@
if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId)
// Delete the TCONT on the ONU.
if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
@@ -2050,11 +2118,34 @@
"alloc-id": techprofileInst.AllocId,
"error": err})
}
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId, true)
default:
logger.Errorw(ctx, "error-unknown-tech",
log.Fields{
"techprofileInst": techprofileInst})
}
+
+ // Free TPInstance, TPID, GemPorts and Traffic Queues. AllocID and Schedulers will be cleared later only if they are not shared across all the UNIs
+ switch techprofileInst := techprofileInst.(type) {
+ case *tp_pb.TechProfileInstance:
+ for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
+ // Delete the gem port on the ONU.
+ if sendDeleteGemRequest {
+ if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), gemPort.GemportId, tpPath); err != nil {
+ logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
+ log.Fields{
+ "err": err,
+ "intfID": intfID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "gemport-id": gemPort.GemportId})
+ }
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), gemPort.GemportId)
+ }
+ }
+ }
+
return nil
}
@@ -3055,12 +3146,11 @@
tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
tpInstances := f.techprofile.FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp_pb.TechProfileInstance)
- logger.Debugw(ctx, "got-single-instance-tp-instances", log.Fields{"tp-instances": tpInstances})
for i := 0; i < len(tpInstances); i++ {
tpI := tpInstances[i]
if tpI.SubscriberIdentifier != tpInst.SubscriberIdentifier &&
tpI.UsScheduler.AllocId == tpInst.UsScheduler.AllocId {
- logger.Debugw(ctx, "alloc-is-in-use",
+ logger.Debugw(ctx, "alloc-is-in-use-on-another-uni",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intfID": sq.intfID,
@@ -3500,7 +3590,7 @@
for _, gem := range techprofileInst.UpstreamGemPortAttributeList {
f.resourceMgr.FreeGemPortID(ctx, intfID, onuID, uniID, gem.GemportId)
}
- f.resourceMgr.FreeAllocID(ctx, intfID, onuID, uniID, techprofileInst.UsScheduler.AllocId)
+ f.resourceMgr.FreeAllocID(ctx, intfID, onuID, uniID, techprofileInst.UsScheduler.AllocId, true)
}
}
@@ -3546,3 +3636,29 @@
}
return nil
}
+
+func (f *OpenOltFlowMgr) removeMeterReference(ctx context.Context, direction string, sq schedQueue) error {
+ /* After we successfully remove the scheduler configuration on the OLT device,
+ * delete the meter id on the KV store.
+ */
+ err := f.resourceMgr.RemoveMeterInfoForOnu(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ if err != nil {
+ return olterrors.NewErrAdapter("unable-to-remove-meter",
+ log.Fields{
+ "onu": sq.onuID,
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort}, err)
+ }
+ logger.Debugw(ctx, "removed-meter-from-KV-store-successfully",
+ log.Fields{
+ "dir": direction,
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort})
+ return err
+}
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index af7b147..5f64a71 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -98,17 +98,16 @@
{"CreateSchedulerQueues-19", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 5, Upstream)}, false},
{"CreateSchedulerQueues-20", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 5, Downstream)}, false},
- {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 0, Upstream)}, false},
- {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 0, Downstream)}, false},
+ //Negative testcases
+ {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 0, Upstream)}, true},
+ {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 0, Downstream)}, true},
{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, createFlowMetadata(tprofile, 2, Upstream)}, true},
{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, createFlowMetadata(tprofile2, 2, Downstream)}, true},
{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, createFlowMetadata(tprofile, 3, Upstream)}, true},
{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, createFlowMetadata(tprofile2, 3, Downstream)}, true},
-
- //Negative testcases
- {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &ofp.FlowMetadata{}}, false},
+ {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &ofp.FlowMetadata{}}, true},
{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &ofp.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &ofp.FlowMetadata{}}, false},
+ {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &ofp.FlowMetadata{}}, true},
{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &ofp.FlowMetadata{}}, true},
{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &ofp.FlowMetadata{}}, true},
{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
@@ -168,7 +167,7 @@
Meters: []*ofp.OfpMeterConfig{ofpMeterConfig}}
}
-func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
+func TestOpenOltFlowMgr_RemoveScheduler(t *testing.T) {
tprofile := &tp_pb.TechProfileInstance{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
InstanceControl: &tp_pb.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -189,19 +188,82 @@
schedQueue schedQueue
wantErr bool
}{
- // TODO: Add test cases.
- {"RemoveSchedulerQueues", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, nil}, false},
- {"RemoveSchedulerQueues", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
- // negative test cases
- {"RemoveSchedulerQueues", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
- {"RemoveSchedulerQueues", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
+ {"RemoveScheduler-1", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, nil}, false},
+ {"RemoveScheduler-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
+ {"RemoveScheduler-3", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
+ {"RemoveScheduler-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr[tt.schedQueue.intfID].RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
- t.Errorf("OpenOltFlowMgr.RemoveSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
+ if err := flowMgr[tt.schedQueue.intfID].RemoveScheduler(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+ t.Errorf("OpenOltFlowMgr.RemoveScheduler() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+
+}
+
+func TestOpenOltFlowMgr_RemoveQueues(t *testing.T) {
+ tprofile := &tp_pb.TechProfileInstance{Name: "tp1", SubscriberIdentifier: "subscriber1",
+ ProfileType: "pt1", NumGemPorts: 1, Version: 1,
+ InstanceControl: &tp_pb.InstanceControl{Onu: "single-instance", Uni: "single-instance", MaxGemPayloadSize: "1"},
+ }
+ tprofile.UsScheduler = &tp_pb.SchedulerAttributes{}
+ tprofile.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ tprofile.UsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ tprofile.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
+ tprofile.DsScheduler = &tp_pb.SchedulerAttributes{}
+ tprofile.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ tprofile.DsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_BestEffort
+ tprofile.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_StrictPriority
+ tprofile.UpstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ tprofile.UpstreamGemPortAttributeList = append(tprofile.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b00000011"})
+ tprofile.UpstreamGemPortAttributeList = append(tprofile.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 2, PbitMap: "0b00001100"})
+ tprofile.UpstreamGemPortAttributeList = append(tprofile.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 3, PbitMap: "0b00110000"})
+ tprofile.UpstreamGemPortAttributeList = append(tprofile.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 4, PbitMap: "0b11000000"})
+
+ tprofile.DownstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ tprofile.DownstreamGemPortAttributeList = append(tprofile.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b00000011"})
+ tprofile.DownstreamGemPortAttributeList = append(tprofile.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 2, PbitMap: "0b00001100"})
+ tprofile.DownstreamGemPortAttributeList = append(tprofile.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 3, PbitMap: "0b00110000"})
+ tprofile.DownstreamGemPortAttributeList = append(tprofile.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 4, PbitMap: "0b11000000"})
+
+ tprofile2 := &tp_pb.TechProfileInstance{Name: "tp2", SubscriberIdentifier: "subscriber2",
+ ProfileType: "pt1", NumGemPorts: 1, Version: 1,
+ InstanceControl: &tp_pb.InstanceControl{Onu: "multi-instance", Uni: "single-instance", MaxGemPayloadSize: "1"},
+ }
+ tprofile2.UsScheduler = &tp_pb.SchedulerAttributes{}
+ tprofile2.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ tprofile2.UsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ tprofile2.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
+ tprofile2.DsScheduler = &tp_pb.SchedulerAttributes{}
+ tprofile2.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ tprofile2.DsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_BestEffort
+ tprofile2.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_StrictPriority
+ tprofile2.UpstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ tprofile2.UpstreamGemPortAttributeList = append(tprofile.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b11111111"})
+ tprofile2.DownstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ tprofile2.DownstreamGemPortAttributeList = append(tprofile.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b11111111"})
+
+ //defTprofile := &tp.DefaultTechProfile{}
+ tests := []struct {
+ name string
+ schedQueue schedQueue
+ wantErr bool
+ }{
+ {"RemoveQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, nil}, false},
+ {"RemoveQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
+ {"RemoveQueues-3", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
+ {"RemoveQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := flowMgr[tt.schedQueue.intfID].RemoveQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+ t.Errorf("OpenOltFlowMgr.RemoveQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index e9809f2..476bbf5 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -508,18 +508,22 @@
// for the given OLT device.
// The caller should ensure that this is a blocking call and this operation is serialized for
// the ONU so as not cause resource corruption since there are no mutexes used here.
+// Setting freeFromResourcePool to false will not clear it from the resource pool but only
+// clear it for the given pon/onu/uni
func (rsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, intfID uint32, onuID uint32,
- uniID uint32, allocID uint32) {
+ uniID uint32, allocID uint32, freeFromResourcePool bool) {
rsrcMgr.RemoveAllocIDForOnu(ctx, intfID, onuID, uniID, allocID)
- allocIDs := make([]uint32, 0)
- allocIDs = append(allocIDs, allocID)
- if err := rsrcMgr.TechprofileRef.FreeResourceID(ctx, intfID, ponrmgr.ALLOC_ID, allocIDs); err != nil {
- logger.Errorw(ctx, "error-while-freeing-alloc-id", log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "err": err.Error(),
- })
+ if freeFromResourcePool {
+ allocIDs := make([]uint32, 0)
+ allocIDs = append(allocIDs, allocID)
+ if err := rsrcMgr.TechprofileRef.FreeResourceID(ctx, intfID, ponrmgr.ALLOC_ID, allocIDs); err != nil {
+ logger.Errorw(ctx, "error-while-freeing-alloc-id", log.Fields{
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "err": err.Error(),
+ })
+ }
}
}
@@ -1301,6 +1305,17 @@
return nil
}
+//DeleteMcastQueueForIntf deletes multicast queue info for the current pon interface from kvstore
+func (rsrcMgr *OpenOltResourceMgr) DeleteMcastQueueForIntf(ctx context.Context) {
+ path := McastQueuesForIntf
+
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorw(ctx, "Failed to delete multicast queue info from kvstore", log.Fields{"err": err, "interfaceId": rsrcMgr.PonIntfID})
+ return
+ }
+ logger.Debugw(ctx, "deleted multicast queue info from KV store successfully", log.Fields{"interfaceId": rsrcMgr.PonIntfID})
+}
+
//AddFlowGroupToKVStore adds flow group into KV store
func (rsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
var Value []byte
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index c4a6751..5392eb7 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -864,6 +864,23 @@
}
}
+func TestOpenOltResourceMgr_DeleteMcastQueueForIntf(t *testing.T) {
+ tests := []struct {
+ name string
+ fields *fields
+ }{
+ {"DeleteMcastQueueForIntf-1", getResMgr()},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ RsrcMgr.DeleteMcastQueueForIntf(ctx)
+ })
+ }
+}
+
func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
groupDesc := ofp.OfpGroupDesc{
Type: ofp.OfpGroupType_OFPGT_ALL,