[VOL-4478] Error Handling Changes
* clearResources method has been updated:
- After the gem port is removed from the OLT, free gemPort-id, update gem-related cache data, send gemPortDelete request to the ONU adapter.
- Remove US/DS scheduler/queues if gemports of the related instance are not used by other flows, and the associated alloc-id is not used by other UNI ports.
- Remove US/DS scheduler/queues if the related US/DS meter exists. So, meter removal is now in the CreateSchedulerQueues method. It still covers ATT use case.
- Free alloc-id after the US scheduler is removed from the OLT.
* DeleteFlowIDsForGem method has been updated:
- Firstly, remove the data from the DB. Then, update the cache.
Change-Id: I1ba4a73e0ae55b59caf7216d873bbac7fdedd295
Change-Id: I8b60d31ce71e90221afce98ac24e0007c193c0e8
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 3f09ba4..78a7480 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -422,18 +422,22 @@
}
if meterInfo != nil {
- logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
- if meterInfo.MeterID == sq.meterID {
- if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
+ // If RefCnt become 0 clear the meter information from the DB.
+ if meterInfo.MeterID != sq.meterID && meterInfo.RefCnt == 0 {
+ if err := f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID); err != nil {
return err
}
- return nil
+ } else {
+ logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
+ if meterInfo.MeterID == sq.meterID {
+ return f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true)
+ }
+ return olterrors.NewErrInvalidValue(log.Fields{
+ "unsupported": "meter-id",
+ "kv-store-meter-id": meterInfo.MeterID,
+ "meter-id-in-flow": sq.meterID,
+ "device-id": f.deviceHandler.device.Id}, nil)
}
- return olterrors.NewErrInvalidValue(log.Fields{
- "unsupported": "meter-id",
- "kv-store-meter-id": meterInfo.MeterID,
- "meter-id-in-flow": sq.meterID,
- "device-id": f.deviceHandler.device.Id}, nil)
}
logger.Debugw(ctx, "meter-does-not-exist-creating-new",
@@ -654,7 +658,6 @@
if sq.direction == tp_pb.Direction_UPSTREAM {
allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
- f.resourceMgr.FreeAllocID(ctx, sq.intfID, sq.onuID, sq.uniID, allocID)
// Delete the TCONT on the ONU.
uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
tpPath := f.getTPpath(ctx, sq.intfID, uni, sq.tpID)
@@ -1901,7 +1904,9 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
- gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
+ flowID uint64, portNum uint32, tpID uint32) error {
+
+ logger.Debugw(ctx, "clearing-resources", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "tpID": tpID})
uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
tpPath := f.getTPpath(ctx, intfID, uni, tpID)
@@ -1910,52 +1915,6 @@
"tpPath": tpPath,
"device-id": f.deviceHandler.device.Id})
- used := f.isGemPortUsedByAnotherFlow(uint32(gemPortID))
-
- if used {
- f.gemToFlowIDsKey.RLock()
- flowIDs := f.gemToFlowIDs[uint32(gemPortID)]
- f.gemToFlowIDsKey.RUnlock()
-
- for i, flowIDinMap := range flowIDs {
- if flowIDinMap == flowID {
- flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- f.gemToFlowIDsKey.Lock()
- f.gemToFlowIDs[uint32(gemPortID)] = flowIDs
- f.gemToFlowIDsKey.Unlock()
- // everytime gemToFlowIDs cache is updated the same should be updated
- // in kv store by calling UpdateFlowIDsForGem
- if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
- return err
- }
- break
- }
- }
- logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
- log.Fields{
- "gemport-id": gemPortID,
- "usedByFlows": flowIDs,
- "device-id": f.deviceHandler.device.Id})
-
- return nil
- }
- logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
- _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), uint32(gemPortID)) // ignore error and proceed.
- //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
- // by calling DeleteFlowIDsForGem
- f.gemToFlowIDsKey.Lock()
- delete(f.gemToFlowIDs, uint32(gemPortID))
- f.gemToFlowIDsKey.Unlock()
-
- f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
-
- f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
-
- //First remove TCONT from child if needed. Then remove the GEM.
- //It is expected from child to clean ani side conf if all GEMs of TP are deleted.
- //Before this, ensure that the related TCONT deletions are informed to child.
- //Refer to VOL-4215.
techprofileInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
if err != nil || techprofileInst == nil {
// The child device is possibly deleted which in turn had cleaned up all the resources (including tp instances), check..
@@ -1971,21 +1930,108 @@
"tp-id": tpID,
"path": tpPath}, err)
}
+
+ var allGemPortsFree = true
switch techprofileInst := techprofileInst.(type) {
case *tp_pb.TechProfileInstance:
- ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
- if !ok {
+ for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
+ gemPortID := gemPort.GemportId
+ used := f.isGemPortUsedByAnotherFlow(gemPortID, flowID)
+ if used {
+ f.gemToFlowIDsKey.RLock()
+ flowIDs := f.gemToFlowIDs[gemPortID]
+ f.gemToFlowIDsKey.RUnlock()
+
+ for i, flowIDinMap := range flowIDs {
+ if flowIDinMap == flowID {
+ flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+ f.gemToFlowIDsKey.Lock()
+ f.gemToFlowIDs[gemPortID] = flowIDs
+ f.gemToFlowIDsKey.Unlock()
+ // everytime gemToFlowIDs cache is updated the same should be updated
+ // in kv store by calling UpdateFlowIDsForGem
+ if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, gemPortID, flowIDs); err != nil {
+ return err
+ }
+ break
+ }
+ }
+ logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
+ log.Fields{
+ "gemport-id": gemPortID,
+ "usedByFlows": flowIDs,
+ "currentFlow": flowID,
+ "device-id": f.deviceHandler.device.Id})
+ allGemPortsFree = false
+ }
+ }
+ if !allGemPortsFree {
+ return nil
+ }
+ }
+
+ logger.Debugw(ctx, "all-gem-ports-are-free-to-be-deleted", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "tpID": tpID})
+ switch techprofileInst := techprofileInst.(type) {
+ case *tp_pb.TechProfileInstance:
+ schedQueue := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}
+ allocExists := f.isAllocUsedByAnotherUNI(ctx, schedQueue)
+ if !allocExists {
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
- logger.Warn(ctx, err)
+
+ if KvStoreMeter, _ := f.resourceMgr.GetMeterInfoForOnu(ctx, "upstream", intfID, uint32(onuID), uint32(uniID), tpID); KvStoreMeter != nil {
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue); err != nil {
+ logger.Warn(ctx, err)
+ }
}
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
- logger.Warn(ctx, err)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocId)
+
+ schedQueue.direction = tp_pb.Direction_DOWNSTREAM
+ if KvStoreMeter, _ := f.resourceMgr.GetMeterInfoForOnu(ctx, "downstream", intfID, uint32(onuID), uint32(uniID), tpID); KvStoreMeter != nil {
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue); err != nil {
+ logger.Warn(ctx, err)
+ }
+ }
+
+ for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
+ gemPortID := gemPort.GemportId
+ f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), gemPortID)
+ _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), gemPortID) // ignore error and proceed.
+
+ if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err == nil {
+ //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
+ // by calling DeleteFlowIDsForGem
+ f.gemToFlowIDsKey.Lock()
+ delete(f.gemToFlowIDs, gemPortID)
+ f.gemToFlowIDsKey.Unlock()
+ } else {
+ logger.Errorw(ctx, "error-removing-flow-ids-of-gem-port",
+ log.Fields{
+ "err": err,
+ "intf": intfID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "gemport-id": gemPortID})
+ }
+
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), gemPortID)
+
+ // Delete the gem port on the ONU.
+ if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), gemPortID, tpPath); err != nil {
+ logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
+ log.Fields{
+ "err": err,
+ "intfID": intfID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "gemport-id": gemPortID})
+ }
}
}
case *tp_pb.EponTechProfileInstance:
@@ -2012,18 +2058,6 @@
log.Fields{
"techprofileInst": techprofileInst})
}
-
- // Delete the gem port on the ONU. Send Gem Removal After TCONT removal.
- if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
- logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
- log.Fields{
- "err": err,
- "intfID": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id,
- "gemport-id": gemPortID})
- }
return nil
}
@@ -2093,32 +2127,20 @@
return err
}
- f.flowIDToGemsLock.Lock()
- gems, ok := f.flowIDToGems[flow.Id]
- if !ok {
- logger.Errorw(ctx, "flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id})
- f.flowIDToGemsLock.Unlock()
- return olterrors.NewErrNotFound("flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id}, nil)
- }
- copyOfGems := make([]uint32, len(gems))
- _ = copy(copyOfGems, gems)
// Delete the flow-id to gemport list entry from the map now the flow is deleted.
+ f.flowIDToGemsLock.Lock()
delete(f.flowIDToGems, flow.Id)
f.flowIDToGemsLock.Unlock()
- logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": copyOfGems})
- for _, gem := range copyOfGems {
- if err = f.clearResources(ctx, Intf, onuID, uniID, int32(gem), flow.Id, portNum, tpID); err != nil {
- logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "device-id": f.deviceHandler.device.Id,
- "onu-id": onuID,
- "intf": Intf,
- "gem": gem,
- "err": err,
- })
- return err
- }
+ if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID); err != nil {
+ logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-id": onuID,
+ "intf": Intf,
+ "err": err,
+ })
+ return err
}
// Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
@@ -2989,26 +3011,18 @@
return nil
}
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32, flowID uint64) bool {
f.gemToFlowIDsKey.RLock()
flowIDList := f.gemToFlowIDs[gemPortID]
f.gemToFlowIDsKey.RUnlock()
- return len(flowIDList) > 1
-
-}
-
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp_pb.TechProfileInstance, gemPortID uint32) (bool, uint32) {
- currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
- tpGemPorts := tpInst.UpstreamGemPortAttributeList
- for _, currentGemPort := range currentGemPorts {
- for _, tpGemPort := range tpGemPorts {
- if (currentGemPort == tpGemPort.GemportId) && (currentGemPort != gemPortID) {
- return true, currentGemPort
+ if len(flowIDList) > 0 {
+ for _, id := range flowIDList {
+ if flowID != id {
+ return true
}
}
}
- logger.Debug(ctx, "tech-profile-is-not-in-use-by-any-gem")
- return false, 0
+ return false
}
func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {