[VOL-2895] : Pings fail intermittently after OLT reboot and ONU disable
- There was a possible data corruption due to lock not being applied
over the entire routine where a new FlowID was being allocated.
There could be similar corruptions for other PON resource allocations
as well, so the locks are applied over entire routine where PON resources
are being managed. This comes at a slightly increased cost of end-to-end
flow handling transaction time when there are many susbcriber, but
guarantees sanity of data.
Change-Id: I0644aab4ffd6a636ea9eadccea13e2ed1ccb5d7b
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index b874591..540a457 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -439,15 +439,16 @@
// GetONUID returns the available OnuID for the given pon-port
func (RsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, ponIntfID uint32) (uint32, error) {
// Check if Pon Interface ID is present in Resource-manager-map
+ RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
+ defer RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
+
if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
return 0, err
}
- RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
// Get ONU id for a provided pon interface ID.
ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
ponrmgr.ONU_ID, 1)
- RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
if err != nil {
logger.Errorf("Failed to get resource for interface %d for type %s",
ponIntfID, ponrmgr.ONU_ID)
@@ -508,6 +509,10 @@
var err error
FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, ONUID, uniID)
+
+ RsrcMgr.FlowIDMgmtLock.Lock()
+ defer RsrcMgr.FlowIDMgmtLock.Unlock()
+
FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
if FlowIDs != nil {
logger.Debugw("Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
@@ -520,10 +525,8 @@
}
}
logger.Debug("No matching flows with flow cookie or flow category, allocating new flowid")
- RsrcMgr.FlowIDMgmtLock.Lock()
FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
ponrmgr.FLOW_ID, 1)
- RsrcMgr.FlowIDMgmtLock.Unlock()
if err != nil {
logger.Errorf("Failed to get resource for interface %d for type %s",
ponIntfID, ponrmgr.FLOW_ID)
@@ -544,6 +547,10 @@
var err error
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
+
+ RsrcMgr.AllocIDMgmtLock[intfID].Lock()
+ defer RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
+
AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
if AllocID != nil {
// Since we support only one alloc_id for the ONU at the moment,
@@ -552,10 +559,8 @@
logger.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
return AllocID[0]
}
- RsrcMgr.AllocIDMgmtLock[intfID].Lock()
AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
ponrmgr.ALLOC_ID, 1)
- RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
if AllocID == nil || err != nil {
logger.Error("Failed to allocate alloc id")
@@ -679,15 +684,16 @@
var err error
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+ RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
+ defer RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
+
GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
if GEMPortList != nil {
return GEMPortList, nil
}
- RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
ponrmgr.GEMPORT_ID, NumOfPorts)
- RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
if err != nil && GEMPortList == nil {
logger.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
return nil, err
@@ -719,8 +725,9 @@
func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
RsrcMgr.OnuIDMgmtLock[intfID].Lock()
+ defer RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
+
RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID)
- RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
/* Free onu id for a particular interface.*/
var IntfonuID string
@@ -735,8 +742,11 @@
uniID int32, FlowID uint32) {
var IntfONUID string
var err error
- FlowIds := make([]uint32, 0)
+ RsrcMgr.FlowIDMgmtLock.Lock()
+ defer RsrcMgr.FlowIDMgmtLock.Unlock()
+
+ FlowIds := make([]uint32, 0)
FlowIds = append(FlowIds, FlowID)
IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfONUID, FlowID, false)
@@ -744,8 +754,7 @@
logger.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfONUID})
}
RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
- RsrcMgr.FlowIDMgmtLock.Lock()
- defer RsrcMgr.FlowIDMgmtLock.Unlock()
+
RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
}
@@ -753,8 +762,9 @@
func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
uniID uint32, FlowID []uint32) {
RsrcMgr.FlowIDMgmtLock.Lock()
+ defer RsrcMgr.FlowIDMgmtLock.Unlock()
+
RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
- RsrcMgr.FlowIDMgmtLock.Unlock()
var IntfOnuIDUniID string
var err error
@@ -772,11 +782,12 @@
// for the given OLT device.
func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
uniID uint32, allocID uint32) {
+ RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
+ defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
+
RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, allocID)
- RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
- defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs)
}
@@ -784,11 +795,12 @@
// for the given OLT device.
func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, IntfID uint32, onuID uint32,
uniID uint32, gemPortID uint32) {
+ RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
+ defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
+
RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
gemPortIDs := make([]uint32, 0)
gemPortIDs = append(gemPortIDs, gemPortID)
- RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
- defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
}
@@ -798,9 +810,8 @@
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
-
RsrcMgr.AllocIDMgmtLock[onuID].Lock()
+ AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
ponrmgr.ALLOC_ID,
AllocIDs)
@@ -1272,6 +1283,7 @@
logger.Error("Failed to marshal data", log.Fields{"error": err})
return err
}
+
RsrcMgr.flowIDToGemInfoLock.Lock()
defer RsrcMgr.flowIDToGemInfoLock.Unlock()
if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
@@ -1307,7 +1319,6 @@
RsrcMgr.flowIDToGemInfoLock.Lock()
defer RsrcMgr.flowIDToGemInfoLock.Unlock()
-
if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
logger.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
return