[VOL-4478] Error Handling Changes
* clearResources method has been updated:
- After the gem port is removed from the OLT, free gemPort-id, update gem-related cache data, send gemPortDelete request to the ONU adapter.
- Remove US/DS scheduler/queues if gemports of the related instance are not used by other flows, and the associated alloc-id is not used by other UNI ports.
- Remove US/DS scheduler/queues if the related US/DS meter exists. So, meter removal is now in the CreateSchedulerQueues method. It still covers ATT use case.
- Free alloc-id after the US scheduler is removed from the OLT.
* DeleteFlowIDsForGem method has been updated:
- Firstly, remove the data from the DB. Then, update the cache.
Change-Id: I1ba4a73e0ae55b59caf7216d873bbac7fdedd295
Change-Id: I8b60d31ce71e90221afce98ac24e0007c193c0e8
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index f47ccf1..e9809f2 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -378,15 +378,19 @@
func (rsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocIDs []uint32) error {
intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+
+ // Note: in case the write to DB fails there could be inconsistent data between cache and db.
+ // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
+ if err := rsrcMgr.PonRsrMgr.UpdateAllocIdsForOnu(ctx, intfOnuIDuniID, allocIDs); err != nil {
+ logger.Errorw(ctx, "Failed to update alloc ids for onu", log.Fields{"err": err})
+ return err
+ }
+
// update cache
rsrcMgr.allocIDsForOnuLock.Lock()
rsrcMgr.allocIDsForOnu[intfOnuIDuniID] = allocIDs
rsrcMgr.allocIDsForOnuLock.Unlock()
-
- // Note: in case the write to DB fails there could be inconsistent data between cache and db.
- // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
- return rsrcMgr.PonRsrMgr.UpdateAllocIdsForOnu(ctx, intfOnuIDuniID,
- allocIDs)
+ return nil
}
// GetCurrentGEMPortIDsForOnu returns gem ports for given pon interface , onu id and uni id
@@ -470,16 +474,17 @@
func (rsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
uniID uint32, gemIDs []uint32) error {
intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+
+ if err := rsrcMgr.PonRsrMgr.UpdateGEMPortIDsForOnu(ctx, intfOnuIDuniID, gemIDs); err != nil {
+ logger.Errorw(ctx, "Failed to update gem port ids for onu", log.Fields{"err": err})
+ return err
+ }
+
// update cache
rsrcMgr.gemPortIDsForOnuLock.Lock()
rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID] = gemIDs
rsrcMgr.gemPortIDsForOnuLock.Unlock()
-
- // Note: in case the write to DB fails there could be inconsistent data between cache and db.
- // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
- return rsrcMgr.PonRsrMgr.UpdateGEMPortIDsForOnu(ctx, intfOnuIDuniID,
- gemIDs)
-
+ return nil
}
// FreeonuID releases(make free) onu id for a particular pon-port
@@ -544,10 +549,6 @@
AllocIDs := rsrcMgr.PonRsrMgr.GetCurrentAllocIDForOnu(ctx, intfOnuIDuniID)
- rsrcMgr.allocIDsForOnuLock.Lock()
- delete(rsrcMgr.allocIDsForOnu, intfOnuIDuniID)
- rsrcMgr.allocIDsForOnuLock.Unlock()
-
if err := rsrcMgr.TechprofileRef.FreeResourceID(ctx, intfID,
ponrmgr.ALLOC_ID,
AllocIDs); err != nil {
@@ -558,11 +559,12 @@
})
}
- GEMPortIDs := rsrcMgr.PonRsrMgr.GetCurrentGEMPortIDsForOnu(ctx, intfOnuIDuniID)
+ //update cache
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ delete(rsrcMgr.allocIDsForOnu, intfOnuIDuniID)
+ rsrcMgr.allocIDsForOnuLock.Unlock()
- rsrcMgr.gemPortIDsForOnuLock.Lock()
- delete(rsrcMgr.gemPortIDsForOnu, intfOnuIDuniID)
- rsrcMgr.gemPortIDsForOnuLock.Unlock()
+ GEMPortIDs := rsrcMgr.PonRsrMgr.GetCurrentGEMPortIDsForOnu(ctx, intfOnuIDuniID)
if err := rsrcMgr.TechprofileRef.FreeResourceID(ctx, intfID,
ponrmgr.GEMPORT_ID,
@@ -574,6 +576,11 @@
})
}
+ // update cache
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ delete(rsrcMgr.gemPortIDsForOnu, intfOnuIDuniID)
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
rsrcMgr.PonRsrMgr.RemoveResourceMap(ctx, intfOnuIDuniID)
}
@@ -641,15 +648,16 @@
// This path is formed as the following: {intfID, onuID, uniID}/tp_id
func (rsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) error {
intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
- // update cache
- rsrcMgr.techProfileIDsForOnuLock.Lock()
- delete(rsrcMgr.techProfileIDsForOnu, intfOnuUniID)
- rsrcMgr.techProfileIDsForOnuLock.Unlock()
if err := rsrcMgr.KVStore.Delete(ctx, intfOnuUniID); err != nil {
logger.Errorw(ctx, "Failed to delete techprofile id resource in KV store", log.Fields{"path": intfOnuUniID})
return err
}
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ delete(rsrcMgr.techProfileIDsForOnu, intfOnuUniID)
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
return nil
}
@@ -663,10 +671,6 @@
}
}
intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
- // update cache
- rsrcMgr.techProfileIDsForOnuLock.Lock()
- rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
- rsrcMgr.techProfileIDsForOnuLock.Unlock()
Value, err := json.Marshal(tpIDList)
if err != nil {
@@ -677,6 +681,11 @@
logger.Errorf(ctx, "Failed to update resource %s", intfOnuUniID)
return err
}
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
return err
}
@@ -699,11 +708,6 @@
logger.Debugf(ctx, "updating tp id %d on path %s", tpID, intfOnuUniID)
tpIDList = append(tpIDList, tpID)
- // update cache
- rsrcMgr.techProfileIDsForOnuLock.Lock()
- rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
- rsrcMgr.techProfileIDsForOnuLock.Unlock()
-
Value, err = json.Marshal(tpIDList)
if err != nil {
logger.Error(ctx, "failed to Marshal")
@@ -713,6 +717,11 @@
logger.Errorf(ctx, "Failed to update resource %s", intfOnuUniID)
return err
}
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
return err
}
@@ -724,11 +733,6 @@
var err error
intfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
- // update cache
- rsrcMgr.meterInfoForOnuLock.Lock()
- rsrcMgr.meterInfoForOnu[intfOnuUniID] = meterInfo
- rsrcMgr.meterInfoForOnuLock.Unlock()
-
Value, err = json.Marshal(*meterInfo)
if err != nil {
logger.Error(ctx, "failed to Marshal meter config")
@@ -738,6 +742,11 @@
logger.Errorf(ctx, "Failed to store meter into KV store %s", intfOnuUniID)
return err
}
+
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ rsrcMgr.meterInfoForOnu[intfOnuUniID] = meterInfo
+ rsrcMgr.meterInfoForOnuLock.Unlock()
logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": intfOnuUniID, "meter-info": meterInfo})
return err
}
@@ -809,13 +818,6 @@
meterInfo.RefCnt++
} else {
meterInfo.RefCnt--
- // If RefCnt become 0 clear the meter information from the DB.
- if meterInfo.RefCnt == 0 {
- if err := rsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID); err != nil {
- return err
- }
- return nil
- }
}
if err := rsrcMgr.StoreMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID, meterInfo); err != nil {
return err
@@ -829,15 +831,15 @@
uniID uint32, tpID uint32) error {
Path := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
- // update cache
- rsrcMgr.meterInfoForOnuLock.Lock()
- delete(rsrcMgr.meterInfoForOnu, Path)
- rsrcMgr.meterInfoForOnuLock.Unlock()
-
if err := rsrcMgr.KVStore.Delete(ctx, Path); err != nil {
logger.Errorf(ctx, "Failed to delete meter id %s from kvstore ", Path)
return err
}
+
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ delete(rsrcMgr.meterInfoForOnu, Path)
+ rsrcMgr.meterInfoForOnuLock.Unlock()
return nil
}
@@ -949,10 +951,6 @@
var err error
Path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
- rsrcMgr.onuGemInfoLock.Lock()
- rsrcMgr.onuGemInfo[Path] = &onuGem
- rsrcMgr.onuGemInfoLock.Unlock()
-
Value, err = json.Marshal(onuGem)
if err != nil {
logger.Error(ctx, "failed to Marshal")
@@ -964,21 +962,28 @@
return err
}
logger.Debugw(ctx, "added onu gem info to store", log.Fields{"onuGemInfo": onuGem})
+
+ //update cache
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo[Path] = &onuGem
+ rsrcMgr.onuGemInfoLock.Unlock()
return err
}
// DelOnuGemInfo deletes the onugem info from kvstore per ONU
func (rsrcMgr *OpenOltResourceMgr) DelOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) error {
path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
- rsrcMgr.onuGemInfoLock.Lock()
- logger.Debugw(ctx, "removing onu gem info", log.Fields{"onuGemInfo": rsrcMgr.onuGemInfo[path]})
- delete(rsrcMgr.onuGemInfo, path)
- rsrcMgr.onuGemInfoLock.Unlock()
if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
logger.Errorf(ctx, "failed to remove resource %s", path)
return err
}
+
+ //update cache
+ rsrcMgr.onuGemInfoLock.Lock()
+ logger.Debugw(ctx, "removing onu gem info", log.Fields{"onuGemInfo": rsrcMgr.onuGemInfo[path]})
+ delete(rsrcMgr.onuGemInfo, path)
+ rsrcMgr.onuGemInfoLock.Unlock()
return nil
}
@@ -1032,10 +1037,6 @@
func (rsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
path := fmt.Sprintf(OnuPacketInPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
- // update cache
- rsrcMgr.gemPortForPacketInInfoLock.Lock()
- rsrcMgr.gemPortForPacketInInfo[path] = gemPort
- rsrcMgr.gemPortForPacketInInfoLock.Unlock()
Value, err := json.Marshal(gemPort)
if err != nil {
@@ -1046,6 +1047,11 @@
logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": gemPort})
return
}
+
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ rsrcMgr.gemPortForPacketInInfo[path] = gemPort
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
logger.Debugw(ctx, "added gem packet in successfully", log.Fields{"path": path, "gem": gemPort})
}
@@ -1100,6 +1106,12 @@
return errors.New("failed-to-read-value-from-path-" + path)
}
+ logger.Debugw(ctx, "delete-packetin-gem-port", log.Fields{"realPath": path})
+ if err := rsrcMgr.KVStore.DeleteWithPrefix(ctx, path); err != nil {
+ logger.Errorf(ctx, "failed-to-remove-resource-%s", path)
+ return err
+ }
+
//remove them one by one
for key := range value {
// Remove the PathPrefix from the path on KV key.
@@ -1115,12 +1127,6 @@
logger.Debugw(ctx, "removed-key-from-packetin-gem-port-cache", log.Fields{"key": key, "cache-len": len(rsrcMgr.gemPortForPacketInInfo)})
}
- logger.Debugw(ctx, "delete-packetin-gem-port", log.Fields{"realPath": path})
- if err := rsrcMgr.KVStore.DeleteWithPrefix(ctx, path); err != nil {
- logger.Errorf(ctx, "failed-to-remove-resource-%s", path)
- return err
- }
-
return nil
}
@@ -1168,11 +1174,6 @@
var val []byte
path := fmt.Sprintf(FlowIDsForGem, intf, gem)
- // update cache
- rsrcMgr.flowIDsForGemLock.Lock()
- rsrcMgr.flowIDsForGem[gem] = flowIDs
- rsrcMgr.flowIDsForGemLock.Unlock()
-
if flowIDs == nil {
if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
@@ -1190,19 +1191,26 @@
return err
}
logger.Debugw(ctx, "added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowIDs})
+
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ rsrcMgr.flowIDsForGem[gem] = flowIDs
+ rsrcMgr.flowIDsForGemLock.Unlock()
return nil
}
//DeleteFlowIDsForGem deletes the flowID list entry per gem from kvstore.
-func (rsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
+func (rsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) error {
path := fmt.Sprintf(FlowIDsForGem, intf, gem)
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
+ return err
+ }
// update cache
rsrcMgr.flowIDsForGemLock.Lock()
delete(rsrcMgr.flowIDsForGem, gem)
rsrcMgr.flowIDsForGemLock.Unlock()
- if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
- }
+ return nil
}
//DeleteAllFlowIDsForGemForIntf deletes all the flow ids associated for all the gems on the given pon interface
@@ -1317,10 +1325,6 @@
OutPorts: outPorts,
}
- rsrcMgr.groupInfoLock.Lock()
- rsrcMgr.groupInfo[path] = &groupInfo
- rsrcMgr.groupInfoLock.Unlock()
-
Value, err = json.Marshal(groupInfo)
if err != nil {
@@ -1332,6 +1336,11 @@
logger.Errorf(ctx, "Failed to update resource %s", path)
return err
}
+
+ // update cache
+ rsrcMgr.groupInfoLock.Lock()
+ rsrcMgr.groupInfo[path] = &groupInfo
+ rsrcMgr.groupInfoLock.Unlock()
return nil
}
@@ -1343,14 +1352,16 @@
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- rsrcMgr.groupInfoLock.Lock()
- delete(rsrcMgr.groupInfo, path)
- rsrcMgr.groupInfoLock.Unlock()
if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
logger.Errorf(ctx, "Failed to remove resource %s due to %s", path, err)
return err
}
+
+ // update cache
+ rsrcMgr.groupInfoLock.Lock()
+ delete(rsrcMgr.groupInfo, path)
+ rsrcMgr.groupInfoLock.Unlock()
return nil
}