Adding lock on flowsUsedByGemPort map
Adding lock on OnuDevice.uniPorts map
Change-Id: Ia6d2832479b4b5449415a42cf6cd90d3c3fc07d0
diff --git a/VERSION b/VERSION
index b40e924..8fd9baf 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.13
+2.4.14-dev
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 4e124e5..f049cab 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -113,7 +113,6 @@
onuID uint32
intfID uint32
proxyDeviceID string
- uniPorts map[uint32]struct{}
losRaised bool
rdiRaised bool
}
@@ -138,7 +137,6 @@
device.onuID = onuID
device.intfID = intfID
device.proxyDeviceID = proxyDevID
- device.uniPorts = make(map[uint32]struct{})
device.losRaised = losRaised
return &device
}
@@ -1367,19 +1365,6 @@
return nil
}
-// AddUniPortToOnu adds the uni port to the onu device
-func (dh *DeviceHandler) AddUniPortToOnu(intfID, onuID, uniPort uint32) {
- onuKey := dh.formOnuKey(intfID, onuID)
-
- if onuDevice, ok := dh.onus.Load(onuKey); ok {
- // add it to the uniPort map for the onu device
- if _, ok = onuDevice.(*OnuDevice).uniPorts[uniPort]; !ok {
- onuDevice.(*OnuDevice).uniPorts[uniPort] = struct{}{}
- logger.Debugw("adding-uni-port", log.Fields{"port": uniPort, "intf-id": intfID, "onuId": onuID})
- }
- }
-}
-
// UpdatePmConfig updates the pm metrics.
func (dh *DeviceHandler) UpdatePmConfig(pmConfigs *voltha.PmConfigs) {
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 02e382b..9839ea3 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -215,6 +215,7 @@
deviceHandler *DeviceHandler
resourceMgr *rsrcMgr.OpenOltResourceMgr
onuIdsLock sync.RWMutex
+ perGemPortLock *mapmutex.Mutex // lock to be used to access the flowsUsedByGemPort map
flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
@@ -257,6 +258,7 @@
flowMgr.onuGemInfoLock = sync.RWMutex{}
flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
+ flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
//load interface to multicast queue map from kv store
flowMgr.loadInterfaceToMulticastQueueMap(ctx)
@@ -280,19 +282,30 @@
}
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
- logger.Debugw("registering-flow-for-device ",
- log.Fields{
- "flow": flowFromCore,
- "device-id": f.deviceHandler.device.Id})
+
gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
- flowIDList, ok := f.flowsUsedByGemPort[gemPK]
- if !ok {
- flowIDList = []uint32{deviceFlow.FlowId}
+ if f.perGemPortLock.TryLock(gemPK) {
+ logger.Debugw("registering-flow-for-device",
+ log.Fields{
+ "flow": flowFromCore,
+ "device-id": f.deviceHandler.device.Id})
+ flowIDList, ok := f.flowsUsedByGemPort[gemPK]
+ if !ok {
+ flowIDList = []uint32{deviceFlow.FlowId}
+ }
+ flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
+ f.flowsUsedByGemPort[gemPK] = flowIDList
+ // update the flowids for a gem to the KVstore
+ f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
+ f.perGemPortLock.Unlock(gemPK)
+ } else {
+ logger.Error("failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "flow": flowFromCore,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
}
- flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
- f.flowsUsedByGemPort[gemPK] = flowIDList
- // update the flowids for a gem to the KVstore
- f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
}
func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -2077,24 +2090,42 @@
}
gemPK := gemPortKey{Intf, uint32(gemPortID)}
- if f.isGemPortUsedByAnotherFlow(gemPK) {
- flowIDs := f.flowsUsedByGemPort[gemPK]
- for i, flowIDinMap := range flowIDs {
- if flowIDinMap == flowID {
- flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- // everytime flowsUsedByGemPort cache is updated the same should be updated
- // in kv store by calling UpdateFlowIDsForGem
- f.flowsUsedByGemPort[gemPK] = flowIDs
- f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs)
- break
+ used, err := f.isGemPortUsedByAnotherFlow(gemPK)
+ if err != nil {
+ return err
+ }
+ if used {
+ if f.perGemPortLock.TryLock(gemPK) {
+ flowIDs := f.flowsUsedByGemPort[gemPK]
+ for i, flowIDinMap := range flowIDs {
+ if flowIDinMap == flowID {
+ flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+ // everytime flowsUsedByGemPort cache is updated the same should be updated
+ // in kv store by calling UpdateFlowIDsForGem
+ f.flowsUsedByGemPort[gemPK] = flowIDs
+ f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs)
+ break
+ }
}
+ logger.Debugw("gem-port-id-is-still-used-by-other-flows",
+ log.Fields{
+ "gemport-id": gemPortID,
+ "usedByFlows": flowIDs,
+ "device-id": f.deviceHandler.device.Id})
+ f.perGemPortLock.Unlock(gemPK)
+ return nil
}
- logger.Debugw("gem-port-id-is-still-used-by-other-flows",
+ logger.Error("failed-to-acquire-per-gem-port-lock",
log.Fields{
- "gemport-id": gemPortID,
- "usedByFlows": flowIDs,
- "device-id": f.deviceHandler.device.Id})
- return nil
+ "gemport-id": gemPortID,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+ "gemport-id": gemPortID,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ }, nil)
}
logger.Debugf("gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
@@ -2105,7 +2136,16 @@
f.onuIdsLock.Lock()
//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
- delete(f.flowsUsedByGemPort, gemPK)
+ if f.perGemPortLock.TryLock(gemPK) {
+ delete(f.flowsUsedByGemPort, gemPK)
+ f.perGemPortLock.Unlock(gemPK)
+ } else {
+ logger.Error("failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ }
f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
@@ -2481,7 +2521,6 @@
return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
- f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(flow)
@@ -3525,12 +3564,24 @@
go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, tpID)
}
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) bool {
- flowIDList := f.flowsUsedByGemPort[gemPK]
- if len(flowIDList) > 1 {
- return true
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) (bool, error) {
+ if f.perGemPortLock.TryLock(gemPK) {
+ flowIDList := f.flowsUsedByGemPort[gemPK]
+ f.perGemPortLock.Unlock(gemPK)
+ if len(flowIDList) > 1 {
+ return true, nil
+ }
+ return false, nil
}
- return false
+ logger.Error("failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ return false, olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ }, nil)
}
func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
@@ -3889,7 +3940,17 @@
}
for gem, FlowIDs := range flowIDsList {
gemPK := gemPortKey{intf, uint32(gem)}
- f.flowsUsedByGemPort[gemPK] = FlowIDs
+ if f.perGemPortLock.TryLock(gemPK) {
+ f.flowsUsedByGemPort[gemPK] = FlowIDs
+ f.perGemPortLock.Unlock(gemPK)
+ } else {
+ logger.Error("failed-to-acquire-per-gem-port-lock",
+ log.Fields{
+ "intf-id": intf,
+ "device-id": f.deviceHandler.device.Id,
+ "key": gemPK,
+ })
+ }
}
return
}