Adding lock on flowsUsedByGemPort map
Adding lock on OnuDevice.uniPorts map
Change-Id: Ia6d2832479b4b5449415a42cf6cd90d3c3fc07d0
(cherry picked from commit ac032b184d2ad9e512a1d5322646ef0df0845acc)
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 3d928c4..709735a 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -173,6 +173,13 @@
BinaryStringPrefix = "0b"
// BinaryBit1 is binary bit 1 expressed as a character
BinaryBit1 = '1'
+
+ // MapMutex
+ maxRetry = 300
+ maxDelay = 100000000
+ baseDelay = 10000000
+ factor = 1.1
+ jitter = 0.2
)
type gemPortKey struct {
@@ -215,6 +222,7 @@
deviceHandler *DeviceHandler
resourceMgr *rsrcMgr.OpenOltResourceMgr
onuIdsLock sync.RWMutex
+ perGemPortLock *mapmutex.Mutex // lock to be used to access the flowsUsedByGemPort map
flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
@@ -256,7 +264,8 @@
}
flowMgr.onuGemInfoLock = sync.RWMutex{}
flowMgr.pendingFlowDelete = sync.Map{}
- flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
+ flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
+ flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
//load interface to multicast queue map from kv store
flowMgr.loadInterfaceToMulticastQueueMap(ctx)
@@ -265,19 +274,35 @@
}
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
- logger.Debugw(ctx, "registering-flow-for-device ",
- log.Fields{
- "flow": flowFromCore,
- "device-id": f.deviceHandler.device.Id})
gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
- flowIDList, ok := f.flowsUsedByGemPort[gemPK]
- if !ok {
- flowIDList = []uint32{deviceFlow.FlowId}
+ if f.perGemPortLock.TryLock(gemPK) {
+ logger.Debugw(ctx, "registering-flow-for-device ",
+ log.Fields{
+ "flow": flowFromCore,
+ "device-id": f.deviceHandler.device.Id})
+ flowIDList, ok := f.flowsUsedByGemPort[gemPK]
+ if !ok {
+ flowIDList = []uint32{deviceFlow.FlowId}
+ }
+ flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
+ f.flowsUsedByGemPort[gemPK] = flowIDList
+
+ f.perGemPortLock.Unlock(gemPK)
+
+ // update the flowids for a gem to the KVstore
+ return f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
}
- flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
- f.flowsUsedByGemPort[gemPK] = flowIDList
- // update the flowids for a gem to the KVstore
- return f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
+ logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "flow-from-core": flowFromCore,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+ "flow-from-core": flowFromCore,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ }, nil)
}
func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -2049,26 +2074,45 @@
}
gemPK := gemPortKey{Intf, uint32(gemPortID)}
- if f.isGemPortUsedByAnotherFlow(gemPK) {
- flowIDs := f.flowsUsedByGemPort[gemPK]
- for i, flowIDinMap := range flowIDs {
- if flowIDinMap == flowID {
- flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- // everytime flowsUsedByGemPort cache is updated the same should be updated
- // in kv store by calling UpdateFlowIDsForGem
- f.flowsUsedByGemPort[gemPK] = flowIDs
- if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
- return err
+ used, err := f.isGemPortUsedByAnotherFlow(ctx, gemPK)
+ if err != nil {
+ return err
+ }
+ if used {
+ if f.perGemPortLock.TryLock(gemPK) {
+ flowIDs := f.flowsUsedByGemPort[gemPK]
+ for i, flowIDinMap := range flowIDs {
+ if flowIDinMap == flowID {
+ flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+ // everytime flowsUsedByGemPort cache is updated the same should be updated
+ // in kv store by calling UpdateFlowIDsForGem
+ f.flowsUsedByGemPort[gemPK] = flowIDs
+ if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+ return err
+ }
+ break
}
- break
}
+ logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
+ log.Fields{
+ "gemport-id": gemPortID,
+ "usedByFlows": flowIDs,
+ "device-id": f.deviceHandler.device.Id})
+ f.perGemPortLock.Unlock(gemPK)
+ return nil
}
- logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
+
+ logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
log.Fields{
- "gemport-id": gemPortID,
- "usedByFlows": flowIDs,
- "device-id": f.deviceHandler.device.Id})
- return nil
+ "gemport-id": gemPortID,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+ "gemport-id": gemPortID,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ }, nil)
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
@@ -2079,7 +2123,16 @@
f.onuIdsLock.Lock()
//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
- delete(f.flowsUsedByGemPort, gemPK)
+ if f.perGemPortLock.TryLock(gemPK) {
+ delete(f.flowsUsedByGemPort, gemPK)
+ f.perGemPortLock.Unlock(gemPK)
+ } else {
+ logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ }
f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
@@ -2459,7 +2512,6 @@
return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
- f.deviceHandler.AddUniPortToOnu(ctx, intfID, onuID, portNo)
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(ctx, flow)
@@ -3566,9 +3618,21 @@
}()
}
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) bool {
- flowIDList := f.flowsUsedByGemPort[gemPK]
- return len(flowIDList) > 1
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(ctx context.Context, gemPK gemPortKey) (bool, error) {
+ if f.perGemPortLock.TryLock(gemPK) {
+ flowIDList := f.flowsUsedByGemPort[gemPK]
+ f.perGemPortLock.Unlock(gemPK)
+ return len(flowIDList) > 1, nil
+ }
+ logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ return false, olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ }, nil)
}
func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
@@ -3929,7 +3993,17 @@
}
for gem, FlowIDs := range flowIDsList {
gemPK := gemPortKey{intf, uint32(gem)}
- f.flowsUsedByGemPort[gemPK] = FlowIDs
+ if f.perGemPortLock.TryLock(gemPK) {
+ f.flowsUsedByGemPort[gemPK] = FlowIDs
+ f.perGemPortLock.Unlock(gemPK)
+ } else {
+ logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "intf-id": intf,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ }
}
}