[VOL-3193] Moving OpenoltFlowManager lock on a per pon base

Change-Id: I8f28cc750d658bbcc51a97d80b5c82dc6b42a85e
diff --git a/VERSION b/VERSION
index f041bc6..1a45714 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.8
+2.4.9-dev
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index bd97db8..53641fa 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -924,9 +924,9 @@
 		//{"test1", args{egressPortNo: 0, packet: &ofp.OfpPacketOut{}}, true},
 		{"PacketOut-1", dh1, args{egressPortNo: 0, packet: pktout}, false},
 		{"PacketOut-2", dh2, args{egressPortNo: 1, packet: pktout}, false},
-		{"PacketOut-2", dh2, args{egressPortNo: 115000, packet: pktout}, false},
-		{"PacketOut-3", dh1, args{egressPortNo: 65536, packet: pktout}, false},
-		{"PacketOut-4", dh2, args{egressPortNo: 65535, packet: pktout}, false},
+		{"PacketOut-3", dh2, args{egressPortNo: 4112, packet: pktout}, false},
+		{"PacketOut-4", dh1, args{egressPortNo: 1048577, packet: pktout}, false},
+		{"PacketOut-5", dh2, args{egressPortNo: 1048576, packet: pktout}, false},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 7fce2f3..b489c0a 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -217,9 +217,10 @@
 	onuIdsLock         sync.RWMutex
 	flowsUsedByGemPort map[gemPortKey][]uint32            //gem port id to flow ids
 	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
-	onuGemInfo         map[uint32][]rsrcMgr.OnuGemInfo    //onu, gem and uni info local cache
-	lockCache          sync.RWMutex
-	pendingFlowDelete  sync.Map
+	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
+	onuGemInfo        [][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
+	onuGemInfoLock    []sync.RWMutex         // lock by Pon Port
+	pendingFlowDelete sync.Map
 	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
 	perUserFlowHandleLock    *mapmutex.Mutex
 	interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
@@ -242,8 +243,8 @@
 	flowMgr.onuIdsLock = sync.RWMutex{}
 	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
-	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo)
 	ponPorts := rMgr.DevInfo.GetPonPorts()
+	flowMgr.onuGemInfo = make([][]rsrcMgr.OnuGemInfo, ponPorts)
 	//Load the onugem info cache from kv store on flowmanager start
 	for idx = 0; idx < ponPorts; idx++ {
 		if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -252,7 +253,7 @@
 		//Load flowID list per gem map per interface from the kvstore.
 		flowMgr.loadFlowIDlistForGem(ctx, idx)
 	}
-	flowMgr.lockCache = sync.RWMutex{}
+	flowMgr.onuGemInfoLock = make([]sync.RWMutex, ponPorts)
 	flowMgr.pendingFlowDelete = sync.Map{}
 	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
 	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
@@ -1967,8 +1968,10 @@
 // Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
 // is conveyed to ONOS during packet-in OF message.
 func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(intfID uint32, onuID uint32, gemPortID uint32) {
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
+
 	logger.Infow("deleting-gem-from-local-cache",
 		log.Fields{
 			"gem":       gemPortID,
@@ -2871,8 +2874,9 @@
 //UpdateOnuInfo function adds onu info to cache and kvstore
 func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
 
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
+
 	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
 	f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
 	if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
@@ -2890,8 +2894,10 @@
 
 //addGemPortToOnuInfoMap function adds GEMport to ONU map
 func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
+
 	logger.Infow("adding-gem-to-onu-info-map",
 		log.Fields{
 			"gem":       gemPort,
@@ -2939,18 +2945,17 @@
 // This function Lookup maps  by serialNumber or (intfId, gemPort)
 
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
-func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(serialNumber string, intfID uint32, gemPortID uint32) (uint32, error) {
+func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(intfID uint32, gemPortID uint32) (uint32, error) {
 
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
 
 	logger.Infow("getting-onu-id-from-gem-port-and-pon-port",
 		log.Fields{
-			"device-id":     f.deviceHandler.device.Id,
-			"onu-geminfo":   f.onuGemInfo[intfID],
-			"serial-number": serialNumber,
-			"intf-id":       intfID,
-			"gemport-id":    gemPortID})
+			"device-id":   f.deviceHandler.device.Id,
+			"onu-geminfo": f.onuGemInfo[intfID],
+			"intf-id":     intfID,
+			"gemport-id":  gemPortID})
 
 	// get onuid from the onugem info cache
 	onugem := f.onuGemInfo[intfID]
@@ -2963,9 +2968,8 @@
 		}
 	}
 	return uint32(0), olterrors.NewErrNotFound("onu-id", log.Fields{
-		"serial-number": serialNumber,
-		"interface-id":  intfID,
-		"gem-port-id":   gemPortID},
+		"interface-id": intfID,
+		"gem-port-id":  gemPortID},
 		nil)
 }
 
@@ -2977,7 +2981,7 @@
 
 	if packetIn.IntfType == "pon" {
 		// packet indication does not have serial number , so sending as nil
-		if onuID, err = f.getOnuIDfromGemPortMap("", packetIn.IntfId, packetIn.GemportId); err != nil {
+		if onuID, err = f.getOnuIDfromGemPortMap(packetIn.IntfId, packetIn.GemportId); err != nil {
 			// Called method is returning error with all data populated; just return the same
 			return logicalPortNum, err
 		}
@@ -3006,8 +3010,9 @@
 	var gemPortID uint32
 	var err error
 
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
+
 	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
 
 	gemPortID, ok := f.packetInGemPort[pktInkey]
@@ -3685,8 +3690,9 @@
 func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
 	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
 
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
+
 	lookupGemPort, ok := f.packetInGemPort[pktInkey]
 	if ok {
 		if lookupGemPort == gemPort {
@@ -3709,8 +3715,10 @@
 
 // AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
 func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
-	f.lockCache.Lock()
-	defer f.lockCache.Unlock()
+
+	f.onuGemInfoLock[intfID].Lock()
+	defer f.onuGemInfoLock[intfID].Unlock()
+
 	onugem := f.onuGemInfo[intfID]
 	for idx, onu := range onugem {
 		if onu.OnuID == onuID {