Adding lock on flowsUsedByGemPort map
Adding lock on OnuDevice.uniPorts map

Change-Id: Ia6d2832479b4b5449415a42cf6cd90d3c3fc07d0
diff --git a/VERSION b/VERSION
index b40e924..8fd9baf 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.13
+2.4.14-dev
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 4e124e5..f049cab 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -113,7 +113,6 @@
 	onuID         uint32
 	intfID        uint32
 	proxyDeviceID string
-	uniPorts      map[uint32]struct{}
 	losRaised     bool
 	rdiRaised     bool
 }
@@ -138,7 +137,6 @@
 	device.onuID = onuID
 	device.intfID = intfID
 	device.proxyDeviceID = proxyDevID
-	device.uniPorts = make(map[uint32]struct{})
 	device.losRaised = losRaised
 	return &device
 }
@@ -1367,19 +1365,6 @@
 	return nil
 }
 
-// AddUniPortToOnu adds the uni port to the onu device
-func (dh *DeviceHandler) AddUniPortToOnu(intfID, onuID, uniPort uint32) {
-	onuKey := dh.formOnuKey(intfID, onuID)
-
-	if onuDevice, ok := dh.onus.Load(onuKey); ok {
-		// add it to the uniPort map for the onu device
-		if _, ok = onuDevice.(*OnuDevice).uniPorts[uniPort]; !ok {
-			onuDevice.(*OnuDevice).uniPorts[uniPort] = struct{}{}
-			logger.Debugw("adding-uni-port", log.Fields{"port": uniPort, "intf-id": intfID, "onuId": onuID})
-		}
-	}
-}
-
 // UpdatePmConfig updates the pm metrics.
 func (dh *DeviceHandler) UpdatePmConfig(pmConfigs *voltha.PmConfigs) {
 
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 02e382b..9839ea3 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -215,6 +215,7 @@
 	deviceHandler      *DeviceHandler
 	resourceMgr        *rsrcMgr.OpenOltResourceMgr
 	onuIdsLock         sync.RWMutex
+	perGemPortLock     *mapmutex.Mutex                    // lock to be used to access the flowsUsedByGemPort map
 	flowsUsedByGemPort map[gemPortKey][]uint32            //gem port id to flow ids
 	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
@@ -257,6 +258,7 @@
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
 	flowMgr.pendingFlowDelete = sync.Map{}
 	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
+	flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
 	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
 	//load interface to multicast queue map from kv store
 	flowMgr.loadInterfaceToMulticastQueueMap(ctx)
@@ -280,19 +282,30 @@
 }
 
 func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
-	logger.Debugw("registering-flow-for-device ",
-		log.Fields{
-			"flow":      flowFromCore,
-			"device-id": f.deviceHandler.device.Id})
+
 	gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
-	flowIDList, ok := f.flowsUsedByGemPort[gemPK]
-	if !ok {
-		flowIDList = []uint32{deviceFlow.FlowId}
+	if f.perGemPortLock.TryLock(gemPK) {
+		logger.Debugw("registering-flow-for-device",
+			log.Fields{
+				"flow":      flowFromCore,
+				"device-id": f.deviceHandler.device.Id})
+		flowIDList, ok := f.flowsUsedByGemPort[gemPK]
+		if !ok {
+			flowIDList = []uint32{deviceFlow.FlowId}
+		}
+		flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
+		f.flowsUsedByGemPort[gemPK] = flowIDList
+		// update the flowids for a gem to the KVstore
+		f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
+		f.perGemPortLock.Unlock(gemPK)
+	} else {
+		logger.Error("failed-to-acquire-per-gem-port-lock",
+			log.Fields{
+				"flow":      flowFromCore,
+				"device-id": f.deviceHandler.device.Id,
+				"key":       gemPK,
+			})
 	}
-	flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
-	f.flowsUsedByGemPort[gemPK] = flowIDList
-	// update the flowids for a gem to the KVstore
-	f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
 }
 
 func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -2077,24 +2090,42 @@
 			}
 
 			gemPK := gemPortKey{Intf, uint32(gemPortID)}
-			if f.isGemPortUsedByAnotherFlow(gemPK) {
-				flowIDs := f.flowsUsedByGemPort[gemPK]
-				for i, flowIDinMap := range flowIDs {
-					if flowIDinMap == flowID {
-						flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
-						// everytime flowsUsedByGemPort cache is updated the same should be updated
-						// in kv store by calling UpdateFlowIDsForGem
-						f.flowsUsedByGemPort[gemPK] = flowIDs
-						f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs)
-						break
+			used, err := f.isGemPortUsedByAnotherFlow(gemPK)
+			if err != nil {
+				return err
+			}
+			if used {
+				if f.perGemPortLock.TryLock(gemPK) {
+					flowIDs := f.flowsUsedByGemPort[gemPK]
+					for i, flowIDinMap := range flowIDs {
+						if flowIDinMap == flowID {
+							flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+							// everytime flowsUsedByGemPort cache is updated the same should be updated
+							// in kv store by calling UpdateFlowIDsForGem
+							f.flowsUsedByGemPort[gemPK] = flowIDs
+							f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs)
+							break
+						}
 					}
+					logger.Debugw("gem-port-id-is-still-used-by-other-flows",
+						log.Fields{
+							"gemport-id":  gemPortID,
+							"usedByFlows": flowIDs,
+							"device-id":   f.deviceHandler.device.Id})
+					f.perGemPortLock.Unlock(gemPK)
+					return nil
 				}
-				logger.Debugw("gem-port-id-is-still-used-by-other-flows",
+				logger.Error("failed-to-acquire-per-gem-port-lock",
 					log.Fields{
-						"gemport-id":  gemPortID,
-						"usedByFlows": flowIDs,
-						"device-id":   f.deviceHandler.device.Id})
-				return nil
+						"gemport-id": gemPortID,
+						"device-id":  f.deviceHandler.device.Id,
+						"key":        gemPK,
+					})
+				return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+					"gemport-id": gemPortID,
+					"device-id":  f.deviceHandler.device.Id,
+					"key":        gemPK,
+				}, nil)
 			}
 			logger.Debugf("gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
 			f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
@@ -2105,7 +2136,16 @@
 			f.onuIdsLock.Lock()
 			//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
 			// by calling DeleteFlowIDsForGem
-			delete(f.flowsUsedByGemPort, gemPK)
+			if f.perGemPortLock.TryLock(gemPK) {
+				delete(f.flowsUsedByGemPort, gemPK)
+				f.perGemPortLock.Unlock(gemPK)
+			} else {
+				logger.Error("failed-to-acquire-per-gem-port-lock",
+					log.Fields{
+						"device-id": f.deviceHandler.device.Id,
+						"key":       gemPK,
+					})
+			}
 			f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
 			f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			f.onuIdsLock.Unlock()
@@ -2481,7 +2521,6 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
-	f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(flow)
@@ -3525,12 +3564,24 @@
 	go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, tpID)
 }
 
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) bool {
-	flowIDList := f.flowsUsedByGemPort[gemPK]
-	if len(flowIDList) > 1 {
-		return true
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) (bool, error) {
+	if f.perGemPortLock.TryLock(gemPK) {
+		flowIDList := f.flowsUsedByGemPort[gemPK]
+		f.perGemPortLock.Unlock(gemPK)
+		if len(flowIDList) > 1 {
+			return true, nil
+		}
+		return false, nil
 	}
-	return false
+	logger.Error("failed-to-acquire-per-gem-port-lock",
+		log.Fields{
+			"device-id": f.deviceHandler.device.Id,
+			"key":       gemPK,
+		})
+	return false, olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+		"device-id": f.deviceHandler.device.Id,
+		"key":       gemPK,
+	}, nil)
 }
 
 func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
@@ -3889,7 +3940,17 @@
 	}
 	for gem, FlowIDs := range flowIDsList {
 		gemPK := gemPortKey{intf, uint32(gem)}
-		f.flowsUsedByGemPort[gemPK] = FlowIDs
+		if f.perGemPortLock.TryLock(gemPK) {
+			f.flowsUsedByGemPort[gemPK] = FlowIDs
+			f.perGemPortLock.Unlock(gemPK)
+		} else {
+			logger.Error("failed-to-acquire-per-gem-port-lock",
+				log.Fields{
+					"intf-id":   intf,
+					"device-id": f.deviceHandler.device.Id,
+					"key":       gemPK,
+				})
+		}
 	}
 	return
 }