VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
  separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
  had a different meaning and the name seems to have been blindly ported to 2.x adapter
  and does not make sense anymore

Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0dfb781..f342bcf 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -57,21 +57,6 @@
 	InvalidPort    = 0xffffffff
 )
 
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
-	intfID uint32
-	onuID  uint32
-	uniID  uint32
-}
-
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
-	pendingFlowRemoveCount uint32
-	allFlowsRemoved        chan struct{}
-}
-
 //DeviceHandler will interact with the OLT device.
 type DeviceHandler struct {
 	device        *voltha.Device
@@ -84,7 +69,8 @@
 	Client        oop.OpenoltClient
 	transitionMap *TransitionMap
 	clientCon     *grpc.ClientConn
-	flowMgr       *OpenOltFlowMgr
+	flowMgr       []*OpenOltFlowMgr
+	groupMgr      *OpenOltGroupMgr
 	eventMgr      *OpenOltEventMgr
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 
@@ -98,12 +84,7 @@
 	stopIndications               chan bool
 	isReadIndicationRoutineActive bool
 
-	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
-	// subscriber basis for the number of pending flow removes. This data is used
-	// to process all the flow removes for a subscriber before handling flow adds.
-	// Interleaving flow delete and flow add processing has known to cause PON resource
-	// management contentions on a per subscriber bases, so we need ensure ordering.
-	pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+	totalPonPorts uint32
 }
 
 //OnuDevice represents ONU related info
@@ -158,7 +139,6 @@
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
-	dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
 
 	//TODO initialize the support classes.
 	return &dh
@@ -590,7 +570,7 @@
 		defer span.Finish()
 
 		portStats := indication.GetPortStats()
-		go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.resourceMgr.DevInfo.GetPonPorts())
+		go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.totalPonPorts)
 	case *oop.Indication_FlowStats:
 		span, ctx := log.CreateChildSpan(ctx, "flow-stats-indication", log.Fields{"device-id": dh.device.Id})
 		defer span.Finish()
@@ -777,16 +757,23 @@
 	if err != nil {
 		return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
+	dh.totalPonPorts = deviceInfo.GetPonPorts()
+
 	// Instantiate resource manager
 	if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo); dh.resourceMgr == nil {
 		return olterrors.ErrResourceManagerInstantiating
 	}
 
-	// Instantiate flow manager
-	if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
-		return olterrors.ErrResourceManagerInstantiating
+	dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
 
+	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+	for i := range dh.flowMgr {
+		// Instantiate flow manager
+		if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+			return olterrors.ErrResourceManagerInstantiating
+		}
 	}
+
 	/* TODO: Instantiate Alarm , stats , BW managers */
 	/* Instantiating Event Manager to handle Alarms and KPIs */
 	dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -1060,7 +1047,7 @@
 
 func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
 	logger.Debugw(ctx, "activate-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "serialNum": serialNum, "serialNumber": serialNumber, "device-id": dh.device.Id, "OmccEncryption": dh.openOLT.config.OmccEncryption})
-	if err := dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
+	if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
 		return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
 	}
 	// TODO: need resource manager
@@ -1153,9 +1140,7 @@
 		logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
 		// we need to create a new ChildDevice
 		ponintfid := onuDiscInd.GetIntfId()
-		dh.lockDevice.Lock()
 		onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
-		dh.lockDevice.Unlock()
 
 		logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
 
@@ -1437,26 +1422,26 @@
 
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
-			dh.incrementActiveFlowRemoveCount(ctx, flow)
+			ponIf := dh.getPonIfFromFlow(ctx, flow)
 
 			logger.Debugw(ctx, "removing-flow",
 				log.Fields{"device-id": device.Id,
+					"ponIf":        ponIf,
 					"flowToRemove": flow})
-			err := dh.flowMgr.RemoveFlow(ctx, flow)
+			err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
-
-			dh.decrementActiveFlowRemoveCount(ctx, flow)
 		}
 
 		for _, flow := range flows.ToAdd.Items {
+			ponIf := dh.getPonIfFromFlow(ctx, flow)
 			logger.Debugw(ctx, "adding-flow",
 				log.Fields{"device-id": device.Id,
+					"ponIf":     ponIf,
 					"flowToAdd": flow})
 			// If there are active Flow Remove in progress for a given subscriber, wait until it completes
-			dh.waitForFlowRemoveToFinish(ctx, flow)
-			err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+			err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -1466,19 +1451,19 @@
 	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
-			err := dh.flowMgr.AddGroup(ctx, group)
+			err := dh.groupMgr.AddGroup(ctx, group)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 		for _, group := range groups.ToUpdate.Items {
-			err := dh.flowMgr.ModifyGroup(ctx, group)
+			err := dh.groupMgr.ModifyGroup(ctx, group)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 		for _, group := range groups.ToRemove.Items {
-			err := dh.flowMgr.DeleteGroup(ctx, group)
+			err := dh.groupMgr.DeleteGroup(ctx, group)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -1596,7 +1581,7 @@
 		uniID = UniIDFromPortNum(uint32(port))
 		logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
 		/* Delete tech-profile instance from the KV store */
-		if err = dh.flowMgr.DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+		if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
 			logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
 		}
 		logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
@@ -1687,9 +1672,8 @@
 func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
 
 	if dh.resourceMgr != nil {
-		noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
 		var ponPort uint32
-		for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
+		for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
 			var onuGemData []rsrcMgr.OnuGemInfo
 			err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
 			if err != nil {
@@ -1762,7 +1746,7 @@
 			"packet":            hex.EncodeToString(packetIn.Pkt),
 		})
 	}
-	logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
+	logicalPortNum, err := dh.flowMgr[packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, packetIn)
 	if err != nil {
 		return olterrors.NewErrNotFound("logical-port", log.Fields{"packet": hex.EncodeToString(packetIn.Pkt)}, err)
 	}
@@ -1834,7 +1818,7 @@
 		onuID := OnuIDFromPortNum(uint32(egressPortNo))
 		uniID := UniIDFromPortNum(uint32(egressPortNo))
 
-		gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
+		gemPortID, err := dh.flowMgr[intfID].GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
 		if err != nil {
 			// In this case the openolt agent will receive the gemPortID as 0.
 			// The agent tries to retrieve the gemPortID in this case.
@@ -2098,21 +2082,12 @@
 	}
 
 	for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
-		var flowRemoveData pendingFlowRemoveData
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uint32(uniID)}
-		dh.lockDevice.RLock()
-		if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			dh.lockDevice.RUnlock()
-			continue
-		}
-		dh.lockDevice.RUnlock()
-
 		logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
 			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
+		dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
 		logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
 			log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
+		// TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
 	}
 
 	onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
@@ -2189,88 +2164,6 @@
 	return InvalidPort
 }
 
-func (dh *DeviceHandler) incrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		dh.lockDevice.Lock()
-		defer dh.lockDevice.Unlock()
-		flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
-		if !ok {
-			flowRemoveData = pendingFlowRemoveData{
-				pendingFlowRemoveCount: 0,
-				allFlowsRemoved:        make(chan struct{}),
-			}
-		}
-		flowRemoveData.pendingFlowRemoveCount++
-		dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
-		logger.Debugw(ctx, "current-flow-remove-count–increment",
-			log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-				"currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-	}
-}
-
-func (dh *DeviceHandler) decrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		dh.lockDevice.Lock()
-		defer dh.lockDevice.Unlock()
-		if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-		} else {
-			if val.pendingFlowRemoveCount > 0 {
-				val.pendingFlowRemoveCount--
-			}
-			logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
-				log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
-					"currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
-			// If all flow removes have finished, then close the channel to signal the receiver
-			// to go ahead with flow adds.
-			if val.pendingFlowRemoveCount == 0 {
-				close(val.allFlowsRemoved)
-				delete(dh.pendingFlowRemoveDataPerSubscriber, key)
-				return
-			}
-			dh.pendingFlowRemoveDataPerSubscriber[key] = val
-		}
-	}
-}
-
-func (dh *DeviceHandler) waitForFlowRemoveToFinish(ctx context.Context, flow *of.OfpFlowStats) {
-	var flowRemoveData pendingFlowRemoveData
-	var ok bool
-	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
-	if inPort != InvalidPort && outPort != InvalidPort {
-		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
-		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
-		logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
-		dh.lockDevice.RLock()
-		if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
-			logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-			dh.lockDevice.RUnlock()
-			return
-		}
-		dh.lockDevice.RUnlock()
-
-		// Wait for all flow removes to finish first
-		<-flowRemoveData.allFlowsRemoved
-
-		logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-	}
-}
-
 func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
 	inPort := getInPortFromFlow(flow)
 	outPort := getOutPortFromFlow(flow)
@@ -2350,3 +2243,14 @@
 	logger.Infow(ctx, "get-ext-value", log.Fields{"resp": resp, "device-id": dh.device, "onu-id": device.Id, "pon-intf": device.ParentPortNo})
 	return resp, nil
 }
+
+func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+	// Default to PON0
+	var intfID uint32
+	inPort, outPort := getPorts(flow)
+	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, _, _ = ExtractAccessFromFlow(inPort, outPort)
+	}
+	return intfID
+}