Adding lock on flowsUsedByGemPort map
Adding lock on OnuDevice.uniPorts map

Change-Id: Ia6d2832479b4b5449415a42cf6cd90d3c3fc07d0
(cherry picked from commit ac032b184d2ad9e512a1d5322646ef0df0845acc)
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 504526f..7c7dc06 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -115,7 +115,6 @@
 	onuID         uint32
 	intfID        uint32
 	proxyDeviceID string
-	uniPorts      map[uint32]struct{}
 	losRaised     bool
 	rdiRaised     bool
 }
@@ -140,7 +139,6 @@
 	device.onuID = onuID
 	device.intfID = intfID
 	device.proxyDeviceID = proxyDevID
-	device.uniPorts = make(map[uint32]struct{})
 	device.losRaised = losRaised
 	return &device
 }
@@ -1409,19 +1407,6 @@
 	return nil
 }
 
-// AddUniPortToOnu adds the uni port to the onu device
-func (dh *DeviceHandler) AddUniPortToOnu(ctx context.Context, intfID, onuID, uniPort uint32) {
-	onuKey := dh.formOnuKey(intfID, onuID)
-
-	if onuDevice, ok := dh.onus.Load(onuKey); ok {
-		// add it to the uniPort map for the onu device
-		if _, ok = onuDevice.(*OnuDevice).uniPorts[uniPort]; !ok {
-			onuDevice.(*OnuDevice).uniPorts[uniPort] = struct{}{}
-			logger.Debugw(ctx, "adding-uni-port", log.Fields{"port": uniPort, "intf-id": intfID, "onuId": onuID})
-		}
-	}
-}
-
 // UpdatePmConfig updates the pm metrics.
 func (dh *DeviceHandler) UpdatePmConfig(ctx context.Context, pmConfigs *voltha.PmConfigs) {
 	logger.Infow(ctx, "update-pm-configs", log.Fields{"device-id": dh.device.Id, "pm-configs": pmConfigs})
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 3d928c4..709735a 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -173,6 +173,13 @@
 	BinaryStringPrefix = "0b"
 	// BinaryBit1 is binary bit 1 expressed as a character
 	BinaryBit1 = '1'
+
+	// MapMutex
+	maxRetry  = 300
+	maxDelay  = 100000000
+	baseDelay = 10000000
+	factor    = 1.1
+	jitter    = 0.2
 )
 
 type gemPortKey struct {
@@ -215,6 +222,7 @@
 	deviceHandler      *DeviceHandler
 	resourceMgr        *rsrcMgr.OpenOltResourceMgr
 	onuIdsLock         sync.RWMutex
+	perGemPortLock     *mapmutex.Mutex                    // lock to be used to access the flowsUsedByGemPort map
 	flowsUsedByGemPort map[gemPortKey][]uint32            //gem port id to flow ids
 	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
@@ -256,7 +264,8 @@
 	}
 	flowMgr.onuGemInfoLock = sync.RWMutex{}
 	flowMgr.pendingFlowDelete = sync.Map{}
-	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(300, 100000000, 10000000, 1.1, 0.2)
+	flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
+	flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
 	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
 	//load interface to multicast queue map from kv store
 	flowMgr.loadInterfaceToMulticastQueueMap(ctx)
@@ -265,19 +274,35 @@
 }
 
 func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
-	logger.Debugw(ctx, "registering-flow-for-device ",
-		log.Fields{
-			"flow":      flowFromCore,
-			"device-id": f.deviceHandler.device.Id})
 	gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
-	flowIDList, ok := f.flowsUsedByGemPort[gemPK]
-	if !ok {
-		flowIDList = []uint32{deviceFlow.FlowId}
+	if f.perGemPortLock.TryLock(gemPK) {
+		logger.Debugw(ctx, "registering-flow-for-device ",
+			log.Fields{
+				"flow":      flowFromCore,
+				"device-id": f.deviceHandler.device.Id})
+		flowIDList, ok := f.flowsUsedByGemPort[gemPK]
+		if !ok {
+			flowIDList = []uint32{deviceFlow.FlowId}
+		}
+		flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
+		f.flowsUsedByGemPort[gemPK] = flowIDList
+
+		f.perGemPortLock.Unlock(gemPK)
+
+		// update the flowids for a gem to the KVstore
+		return f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
 	}
-	flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
-	f.flowsUsedByGemPort[gemPK] = flowIDList
-	// update the flowids for a gem to the KVstore
-	return f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
+	logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+		log.Fields{
+			"flow-from-core": flowFromCore,
+			"device-id":      f.deviceHandler.device.Id,
+			"key":            gemPK,
+		})
+	return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+		"flow-from-core": flowFromCore,
+		"device-id":      f.deviceHandler.device.Id,
+		"key":            gemPK,
+	}, nil)
 }
 
 func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -2049,26 +2074,45 @@
 			}
 
 			gemPK := gemPortKey{Intf, uint32(gemPortID)}
-			if f.isGemPortUsedByAnotherFlow(gemPK) {
-				flowIDs := f.flowsUsedByGemPort[gemPK]
-				for i, flowIDinMap := range flowIDs {
-					if flowIDinMap == flowID {
-						flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
-						// everytime flowsUsedByGemPort cache is updated the same should be updated
-						// in kv store by calling UpdateFlowIDsForGem
-						f.flowsUsedByGemPort[gemPK] = flowIDs
-						if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
-							return err
+			used, err := f.isGemPortUsedByAnotherFlow(ctx, gemPK)
+			if err != nil {
+				return err
+			}
+			if used {
+				if f.perGemPortLock.TryLock(gemPK) {
+					flowIDs := f.flowsUsedByGemPort[gemPK]
+					for i, flowIDinMap := range flowIDs {
+						if flowIDinMap == flowID {
+							flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+							// everytime flowsUsedByGemPort cache is updated the same should be updated
+							// in kv store by calling UpdateFlowIDsForGem
+							f.flowsUsedByGemPort[gemPK] = flowIDs
+							if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+								return err
+							}
+							break
 						}
-						break
 					}
+					logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
+						log.Fields{
+							"gemport-id":  gemPortID,
+							"usedByFlows": flowIDs,
+							"device-id":   f.deviceHandler.device.Id})
+					f.perGemPortLock.Unlock(gemPK)
+					return nil
 				}
-				logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
+
+				logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
 					log.Fields{
-						"gemport-id":  gemPortID,
-						"usedByFlows": flowIDs,
-						"device-id":   f.deviceHandler.device.Id})
-				return nil
+						"gemport-id": gemPortID,
+						"device-id":  f.deviceHandler.device.Id,
+						"key":        gemPK,
+					})
+				return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+					"gemport-id": gemPortID,
+					"device-id":  f.deviceHandler.device.Id,
+					"key":        gemPK,
+				}, nil)
 			}
 			logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
 			f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
@@ -2079,7 +2123,16 @@
 			f.onuIdsLock.Lock()
 			//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
 			// by calling DeleteFlowIDsForGem
-			delete(f.flowsUsedByGemPort, gemPK)
+			if f.perGemPortLock.TryLock(gemPK) {
+				delete(f.flowsUsedByGemPort, gemPK)
+				f.perGemPortLock.Unlock(gemPK)
+			} else {
+				logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+					log.Fields{
+						"device-id": f.deviceHandler.device.Id,
+						"key":       gemPK,
+					})
+			}
 			f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
 			f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			f.onuIdsLock.Unlock()
@@ -2459,7 +2512,6 @@
 		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
-	f.deviceHandler.AddUniPortToOnu(ctx, intfID, onuID, portNo)
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
 
 	TpID, err := getTpIDFromFlow(ctx, flow)
@@ -3566,9 +3618,21 @@
 	}()
 }
 
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) bool {
-	flowIDList := f.flowsUsedByGemPort[gemPK]
-	return len(flowIDList) > 1
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(ctx context.Context, gemPK gemPortKey) (bool, error) {
+	if f.perGemPortLock.TryLock(gemPK) {
+		flowIDList := f.flowsUsedByGemPort[gemPK]
+		f.perGemPortLock.Unlock(gemPK)
+		return len(flowIDList) > 1, nil
+	}
+	logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+		log.Fields{
+			"device-id": f.deviceHandler.device.Id,
+			"key":       gemPK,
+		})
+	return false, olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
+		"device-id": f.deviceHandler.device.Id,
+		"key":       gemPK,
+	}, nil)
 }
 
 func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
@@ -3929,7 +3993,17 @@
 	}
 	for gem, FlowIDs := range flowIDsList {
 		gemPK := gemPortKey{intf, uint32(gem)}
-		f.flowsUsedByGemPort[gemPK] = FlowIDs
+		if f.perGemPortLock.TryLock(gemPK) {
+			f.flowsUsedByGemPort[gemPK] = FlowIDs
+			f.perGemPortLock.Unlock(gemPK)
+		} else {
+			logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
+				log.Fields{
+					"intf-id":   intf,
+					"device-id": f.deviceHandler.device.Id,
+					"key":       gemPK,
+				})
+		}
 	}
 }