[VOL-3269] Using a global lock on onuGemInfoLock
Change-Id: Id6fb3ab27b3c5a6fca74d18d306d07e706a355bf
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d14194e..ef2248c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -218,8 +218,9 @@
flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
- onuGemInfo [][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
- onuGemInfoLock []sync.RWMutex // lock by Pon Port
+ onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
+ // We need to have a global lock on the onuGemInfo map
+ onuGemInfoLock sync.RWMutex
pendingFlowDelete sync.Map
// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
perUserFlowHandleLock *mapmutex.Mutex
@@ -244,7 +245,7 @@
flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
ponPorts := rMgr.DevInfo.GetPonPorts()
- flowMgr.onuGemInfo = make([][]rsrcMgr.OnuGemInfo, ponPorts)
+ flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
//Load the onugem info cache from kv store on flowmanager start
for idx = 0; idx < ponPorts; idx++ {
if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -253,7 +254,7 @@
//Load flowID list per gem map per interface from the kvstore.
flowMgr.loadFlowIDlistForGem(ctx, idx)
}
- flowMgr.onuGemInfoLock = make([]sync.RWMutex, ponPorts)
+ flowMgr.onuGemInfoLock = sync.RWMutex{}
flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
@@ -1558,29 +1559,6 @@
return &flows
}
-//func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string) *[]rsrcMgr.FlowInfo {
-// var flows []rsrcMgr.FlowInfo = []rsrcMgr.FlowInfo{rsrcMgr.FlowInfo{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
-// var intfId uint32
-// /* For flows which trap out of the NNI, the AccessIntfId is invalid
-// (set to -1). In such cases, we need to refer to the NetworkIntfId .
-// */
-// if flow.AccessIntfId != -1 {
-// intfId = uint32(flow.AccessIntfId)
-// } else {
-// intfId = uint32(flow.NetworkIntfId)
-// }
-// // Get existing flows matching flowid for given subscriber from KV store
-// existingFlows := f.resourceMgr.GetFlowIDInfo(intfId, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
-// if existingFlows != nil {
-// logger.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
-// for _, f := range *existingFlows {
-// flows = append(flows, f)
-// }
-// }
-// logger.Debugw("Updated flows for given flowID and onuid", log.Fields{"updatedflow": flows, "flowid": flow.FlowId, "onu": flow.OnuId})
-// return &flows
-//}
-
func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(ctx context.Context, intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
logger.Debugw("storing-flow(s)-into-kv-store", log.Fields{
"flow-id": flowID,
@@ -1680,26 +1658,6 @@
return nil
}
-/*func register_flow(deviceFlow *openolt_pb2.Flow, logicalFlow *ofp.OfpFlowStats){
- //update core flows_proxy : flows_proxy.update('/', flows)
-}
-
-func generateStoredId(flowId uint32, direction string)uint32{
-
- if direction == Upstream{
- logger.Debug("Upstream flow shifting flowid")
- return ((0x1 << 15) | flowId)
- }else if direction == Downstream{
- logger.Debug("Downstream flow not shifting flowid")
- return flowId
- }else{
- logger.Errorw("Unrecognized direction",log.Fields{"direction": direction})
- return flowId
- }
-}
-
-*/
-
func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) error {
classifierInfo := make(map[string]interface{})
@@ -2004,17 +1962,18 @@
// is conveyed to ONOS during packet-in OF message.
func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(intfID uint32, onuID uint32, gemPortID uint32) {
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
+ f.onuGemInfoLock.Lock()
+ defer f.onuGemInfoLock.Unlock()
logger.Infow("deleting-gem-from-local-cache",
log.Fields{
- "gem": gemPortID,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo[intfID]})
+ "gem-port-id": gemPortID,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-gem": f.onuGemInfo[intfID]})
onugem := f.onuGemInfo[intfID]
+deleteLoop:
for i, onu := range onugem {
if onu.OnuID == onuID {
for j, gem := range onu.GemPorts {
@@ -2029,10 +1988,10 @@
"deletedgemport-id": gemPortID,
"gemports": onu.GemPorts,
"device-id": f.deviceHandler.device.Id})
- break
+ break deleteLoop
}
}
- break
+ break deleteLoop
}
}
}
@@ -2939,8 +2898,8 @@
//UpdateOnuInfo function adds onu info to cache and kvstore
func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
+ f.onuGemInfoLock.Lock()
+ defer f.onuGemInfoLock.Unlock()
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
@@ -2960,16 +2919,16 @@
//addGemPortToOnuInfoMap function adds GEMport to ONU map
func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
+ f.onuGemInfoLock.Lock()
+ defer f.onuGemInfoLock.Unlock()
logger.Infow("adding-gem-to-onu-info-map",
log.Fields{
- "gem": gemPort,
- "intf": intfID,
- "onu": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo[intfID]})
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-gem": f.onuGemInfo[intfID]})
onugem := f.onuGemInfo[intfID]
// update the gem to the local cache as well as to kv strore
for idx, onu := range onugem {
@@ -3000,11 +2959,11 @@
}
logger.Infow("gem-added-to-onu-info-map",
log.Fields{
- "gem": gemPort,
- "intf": intfID,
- "onu": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo[intfID]})
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-gem": f.onuGemInfo[intfID]})
}
// This function Lookup maps by serialNumber or (intfId, gemPort)
@@ -3012,8 +2971,8 @@
//getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(intfID uint32, gemPortID uint32) (uint32, error) {
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
+ f.onuGemInfoLock.RLock()
+ defer f.onuGemInfoLock.RUnlock()
logger.Infow("getting-onu-id-from-gem-port-and-pon-port",
log.Fields{
@@ -3032,6 +2991,11 @@
}
}
}
+ logger.Errorw("onu-id-from-gem-port-not-found", log.Fields{
+ "gem-port-id": gemPortID,
+ "interface-id": intfID,
+ "all-gems-on-port": onu,
+ })
return uint32(0), olterrors.NewErrNotFound("onu-id", log.Fields{
"interface-id": intfID,
"gem-port-id": gemPortID},
@@ -3075,18 +3039,18 @@
var gemPortID uint32
var err error
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
-
+ f.onuGemInfoLock.RLock()
+ defer f.onuGemInfoLock.RUnlock()
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
-
- gemPortID, ok := f.packetInGemPort[pktInkey]
+ var ok bool
+ gemPortID, ok = f.packetInGemPort[pktInkey]
if ok {
logger.Debugw("found-gemport-for-pktin-key",
log.Fields{
"pktinkey": pktInkey,
"gem": gemPortID})
- return gemPortID, err
+
+ return gemPortID, nil
}
//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, intfID, onuID, portNum)
@@ -3104,6 +3068,7 @@
log.Fields{
"pktinkey": pktInkey,
"gem": gemPortID}, err)
+
}
// nolint: gocyclo
@@ -3830,11 +3795,11 @@
// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+
+ f.onuGemInfoLock.Lock()
+ defer f.onuGemInfoLock.Unlock()
+
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
-
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
-
lookupGemPort, ok := f.packetInGemPort[pktInkey]
if ok {
if lookupGemPort == gemPort {
@@ -3853,13 +3818,14 @@
"pktinkey": pktInkey,
"gem": gemPort})
return
+
}
// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
- f.onuGemInfoLock[intfID].Lock()
- defer f.onuGemInfoLock[intfID].Unlock()
+ f.onuGemInfoLock.Lock()
+ defer f.onuGemInfoLock.Unlock()
onugem := f.onuGemInfo[intfID]
for idx, onu := range onugem {
@@ -3875,6 +3841,7 @@
}
}
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
+
}
func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {