[VOL-3269] Using a global lock on onuGemInfoLock

Change-Id: Id6fb3ab27b3c5a6fca74d18d306d07e706a355bf
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b31c312..28d2065 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -218,8 +218,9 @@
 	flowsUsedByGemPort map[gemPortKey][]uint32            //gem port id to flow ids
 	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
-	onuGemInfo        [][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
-	onuGemInfoLock    []sync.RWMutex         // lock by Pon Port
+	onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
+	// We need to have a global lock on the onuGemInfo map
+	onuGemInfoLock    sync.RWMutex
 	pendingFlowDelete sync.Map
 	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
 	perUserFlowHandleLock    *mapmutex.Mutex
@@ -244,7 +245,7 @@
 	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 	ponPorts := rMgr.DevInfo.GetPonPorts()
-	flowMgr.onuGemInfo = make([][]rsrcMgr.OnuGemInfo, ponPorts)
+	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
 	//Load the onugem info cache from kv store on flowmanager start
 	for idx = 0; idx < ponPorts; idx++ {
 		if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -253,7 +254,7 @@
 		//Load flowID list per gem map per interface from the kvstore.
 		flowMgr.loadFlowIDlistForGem(ctx, idx)
 	}
-	flowMgr.onuGemInfoLock = make([]sync.RWMutex, ponPorts)
+	flowMgr.onuGemInfoLock = sync.RWMutex{}
 	flowMgr.pendingFlowDelete = sync.Map{}
 	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
 	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
@@ -1556,29 +1557,6 @@
 	return &flows
 }
 
-//func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string) *[]rsrcMgr.FlowInfo {
-//	var flows []rsrcMgr.FlowInfo = []rsrcMgr.FlowInfo{rsrcMgr.FlowInfo{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
-//	var intfId uint32
-//	/* For flows which trap out of the NNI, the AccessIntfId is invalid
-//	   (set to -1). In such cases, we need to refer to the NetworkIntfId .
-//	*/
-//	if flow.AccessIntfId != -1 {
-//		intfId = uint32(flow.AccessIntfId)
-//	} else {
-//		intfId = uint32(flow.NetworkIntfId)
-//	}
-//	// Get existing flows matching flowid for given subscriber from KV store
-//	existingFlows := f.resourceMgr.GetFlowIDInfo(intfId, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
-//	if existingFlows != nil {
-//		logger.Debugw(ctx, "Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
-//		for _, f := range *existingFlows {
-//			flows = append(flows, f)
-//		}
-//	}
-//	logger.Debugw(ctx, "Updated flows for given flowID and onuid", log.Fields{"updatedflow": flows, "flowid": flow.FlowId, "onu": flow.OnuId})
-//	return &flows
-//}
-
 func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(ctx context.Context, intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
 	logger.Debugw(ctx, "storing-flow(s)-into-kv-store", log.Fields{
 		"flow-id":   flowID,
@@ -1678,26 +1656,6 @@
 	return nil
 }
 
-/*func register_flow(deviceFlow *openolt_pb2.Flow, logicalFlow *ofp.OfpFlowStats){
- //update core flows_proxy : flows_proxy.update('/', flows)
-}
-
-func generateStoredId(flowId uint32, direction string)uint32{
-
-	if direction == Upstream{
-		logger.Debug(ctx, "Upstream flow shifting flowid")
-		return ((0x1 << 15) | flowId)
-	}else if direction == Downstream{
-		logger.Debug(ctx, "Downstream flow not shifting flowid")
-		return flowId
-	}else{
-		logger.Errorw(ctx, "Unrecognized direction",log.Fields{"direction": direction})
-		return flowId
-	}
-}
-
-*/
-
 func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) error {
 
 	classifierInfo := make(map[string]interface{})
@@ -2002,17 +1960,18 @@
 // is conveyed to ONOS during packet-in OF message.
 func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
 
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
+	f.onuGemInfoLock.Lock()
+	defer f.onuGemInfoLock.Unlock()
 
 	logger.Infow(ctx, "deleting-gem-from-local-cache",
 		log.Fields{
-			"gem":       gemPortID,
-			"intf-id":   intfID,
-			"onu-id":    onuID,
-			"device-id": f.deviceHandler.device.Id,
-			"onu-gem":   f.onuGemInfo[intfID]})
+			"gem-port-id": gemPortID,
+			"intf-id":     intfID,
+			"onu-id":      onuID,
+			"device-id":   f.deviceHandler.device.Id,
+			"onu-gem":     f.onuGemInfo[intfID]})
 	onugem := f.onuGemInfo[intfID]
+deleteLoop:
 	for i, onu := range onugem {
 		if onu.OnuID == onuID {
 			for j, gem := range onu.GemPorts {
@@ -2027,10 +1986,10 @@
 							"deletedgemport-id": gemPortID,
 							"gemports":          onu.GemPorts,
 							"device-id":         f.deviceHandler.device.Id})
-					break
+					break deleteLoop
 				}
 			}
-			break
+			break deleteLoop
 		}
 	}
 }
@@ -2937,8 +2896,8 @@
 //UpdateOnuInfo function adds onu info to cache and kvstore
 func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
 
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
+	f.onuGemInfoLock.Lock()
+	defer f.onuGemInfoLock.Unlock()
 
 	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
 	f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
@@ -2958,16 +2917,16 @@
 //addGemPortToOnuInfoMap function adds GEMport to ONU map
 func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
 
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
+	f.onuGemInfoLock.Lock()
+	defer f.onuGemInfoLock.Unlock()
 
 	logger.Infow(ctx, "adding-gem-to-onu-info-map",
 		log.Fields{
-			"gem":       gemPort,
-			"intf":      intfID,
-			"onu":       onuID,
-			"device-id": f.deviceHandler.device.Id,
-			"onu-gem":   f.onuGemInfo[intfID]})
+			"gem-port-id": gemPort,
+			"intf-id":     intfID,
+			"onu-id":      onuID,
+			"device-id":   f.deviceHandler.device.Id,
+			"onu-gem":     f.onuGemInfo[intfID]})
 	onugem := f.onuGemInfo[intfID]
 	// update the gem to the local cache as well as to kv strore
 	for idx, onu := range onugem {
@@ -2998,11 +2957,11 @@
 	}
 	logger.Infow(ctx, "gem-added-to-onu-info-map",
 		log.Fields{
-			"gem":       gemPort,
-			"intf":      intfID,
-			"onu":       onuID,
-			"device-id": f.deviceHandler.device.Id,
-			"onu-gem":   f.onuGemInfo[intfID]})
+			"gem-port-id": gemPort,
+			"intf-id":     intfID,
+			"onu-id":      onuID,
+			"device-id":   f.deviceHandler.device.Id,
+			"onu-gem":     f.onuGemInfo[intfID]})
 }
 
 // This function Lookup maps  by serialNumber or (intfId, gemPort)
@@ -3010,8 +2969,8 @@
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
 func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
 
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
+	f.onuGemInfoLock.RLock()
+	defer f.onuGemInfoLock.RUnlock()
 
 	logger.Infow(ctx, "getting-onu-id-from-gem-port-and-pon-port",
 		log.Fields{
@@ -3030,6 +2989,11 @@
 			}
 		}
 	}
+	logger.Errorw(ctx, "onu-id-from-gem-port-not-found", log.Fields{
+		"gem-port-id":      gemPortID,
+		"interface-id":     intfID,
+		"all-gems-on-port": onu,
+	})
 	return uint32(0), olterrors.NewErrNotFound("onu-id", log.Fields{
 		"interface-id": intfID,
 		"gem-port-id":  gemPortID},
@@ -3073,18 +3037,18 @@
 	var gemPortID uint32
 	var err error
 
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
-
+	f.onuGemInfoLock.RLock()
+	defer f.onuGemInfoLock.RUnlock()
 	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
-
-	gemPortID, ok := f.packetInGemPort[pktInkey]
+	var ok bool
+	gemPortID, ok = f.packetInGemPort[pktInkey]
 	if ok {
 		logger.Debugw(ctx, "found-gemport-for-pktin-key",
 			log.Fields{
 				"pktinkey": pktInkey,
 				"gem":      gemPortID})
-		return gemPortID, err
+
+		return gemPortID, nil
 	}
 	//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
 	gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, intfID, onuID, portNum)
@@ -3102,6 +3066,7 @@
 		log.Fields{
 			"pktinkey": pktInkey,
 			"gem":      gemPortID}, err)
+
 }
 
 // nolint: gocyclo
@@ -3828,11 +3793,11 @@
 
 // UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
 func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+
+	f.onuGemInfoLock.Lock()
+	defer f.onuGemInfoLock.Unlock()
+
 	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
-
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
-
 	lookupGemPort, ok := f.packetInGemPort[pktInkey]
 	if ok {
 		if lookupGemPort == gemPort {
@@ -3851,13 +3816,14 @@
 			"pktinkey": pktInkey,
 			"gem":      gemPort})
 	return
+
 }
 
 // AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
 func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
 
-	f.onuGemInfoLock[intfID].Lock()
-	defer f.onuGemInfoLock[intfID].Unlock()
+	f.onuGemInfoLock.Lock()
+	defer f.onuGemInfoLock.Unlock()
 
 	onugem := f.onuGemInfo[intfID]
 	for idx, onu := range onugem {
@@ -3873,6 +3839,7 @@
 		}
 	}
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
+
 }
 
 func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {