[VOL-3193] Moving OpenoltFlowManager lock on a per pon base
Change-Id: I8f28cc750d658bbcc51a97d80b5c82dc6b42a85e
diff --git a/VERSION b/VERSION
index f041bc6..1a45714 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.8
+2.4.9-dev
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index bd97db8..53641fa 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -924,9 +924,9 @@
//{"test1", args{egressPortNo: 0, packet: &ofp.OfpPacketOut{}}, true},
{"PacketOut-1", dh1, args{egressPortNo: 0, packet: pktout}, false},
{"PacketOut-2", dh2, args{egressPortNo: 1, packet: pktout}, false},
- {"PacketOut-2", dh2, args{egressPortNo: 115000, packet: pktout}, false},
- {"PacketOut-3", dh1, args{egressPortNo: 65536, packet: pktout}, false},
- {"PacketOut-4", dh2, args{egressPortNo: 65535, packet: pktout}, false},
+ {"PacketOut-3", dh2, args{egressPortNo: 4112, packet: pktout}, false},
+ {"PacketOut-4", dh1, args{egressPortNo: 1048577, packet: pktout}, false},
+ {"PacketOut-5", dh2, args{egressPortNo: 1048576, packet: pktout}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 7fce2f3..b489c0a 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -217,9 +217,10 @@
onuIdsLock sync.RWMutex
flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
- onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
- lockCache sync.RWMutex
- pendingFlowDelete sync.Map
+ // TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
+ onuGemInfo [][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
+ onuGemInfoLock []sync.RWMutex // lock by Pon Port
+ pendingFlowDelete sync.Map
// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
perUserFlowHandleLock *mapmutex.Mutex
interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
@@ -242,8 +243,8 @@
flowMgr.onuIdsLock = sync.RWMutex{}
flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
- flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo)
ponPorts := rMgr.DevInfo.GetPonPorts()
+ flowMgr.onuGemInfo = make([][]rsrcMgr.OnuGemInfo, ponPorts)
//Load the onugem info cache from kv store on flowmanager start
for idx = 0; idx < ponPorts; idx++ {
if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -252,7 +253,7 @@
//Load flowID list per gem map per interface from the kvstore.
flowMgr.loadFlowIDlistForGem(ctx, idx)
}
- flowMgr.lockCache = sync.RWMutex{}
+ flowMgr.onuGemInfoLock = make([]sync.RWMutex, ponPorts)
flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
@@ -1967,8 +1968,10 @@
// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
// is conveyed to ONOS during packet-in OF message.
func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(intfID uint32, onuID uint32, gemPortID uint32) {
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
+
logger.Infow("deleting-gem-from-local-cache",
log.Fields{
"gem": gemPortID,
@@ -2871,8 +2874,9 @@
//UpdateOnuInfo function adds onu info to cache and kvstore
func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
+
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
@@ -2890,8 +2894,10 @@
//addGemPortToOnuInfoMap function adds GEMport to ONU map
func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
+
logger.Infow("adding-gem-to-onu-info-map",
log.Fields{
"gem": gemPort,
@@ -2939,18 +2945,17 @@
// This function Lookup maps by serialNumber or (intfId, gemPort)
//getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
-func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(serialNumber string, intfID uint32, gemPortID uint32) (uint32, error) {
+func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(intfID uint32, gemPortID uint32) (uint32, error) {
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
logger.Infow("getting-onu-id-from-gem-port-and-pon-port",
log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "onu-geminfo": f.onuGemInfo[intfID],
- "serial-number": serialNumber,
- "intf-id": intfID,
- "gemport-id": gemPortID})
+ "device-id": f.deviceHandler.device.Id,
+ "onu-geminfo": f.onuGemInfo[intfID],
+ "intf-id": intfID,
+ "gemport-id": gemPortID})
// get onuid from the onugem info cache
onugem := f.onuGemInfo[intfID]
@@ -2963,9 +2968,8 @@
}
}
return uint32(0), olterrors.NewErrNotFound("onu-id", log.Fields{
- "serial-number": serialNumber,
- "interface-id": intfID,
- "gem-port-id": gemPortID},
+ "interface-id": intfID,
+ "gem-port-id": gemPortID},
nil)
}
@@ -2977,7 +2981,7 @@
if packetIn.IntfType == "pon" {
// packet indication does not have serial number , so sending as nil
- if onuID, err = f.getOnuIDfromGemPortMap("", packetIn.IntfId, packetIn.GemportId); err != nil {
+ if onuID, err = f.getOnuIDfromGemPortMap(packetIn.IntfId, packetIn.GemportId); err != nil {
// Called method is returning error with all data populated; just return the same
return logicalPortNum, err
}
@@ -3006,8 +3010,9 @@
var gemPortID uint32
var err error
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
+
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
gemPortID, ok := f.packetInGemPort[pktInkey]
@@ -3685,8 +3690,9 @@
func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
+
lookupGemPort, ok := f.packetInGemPort[pktInkey]
if ok {
if lookupGemPort == gemPort {
@@ -3709,8 +3715,10 @@
// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
- f.lockCache.Lock()
- defer f.lockCache.Unlock()
+
+ f.onuGemInfoLock[intfID].Lock()
+ defer f.onuGemInfoLock[intfID].Unlock()
+
onugem := f.onuGemInfo[intfID]
for idx, onu := range onugem {
if onu.OnuID == onuID {