VOL-2339: DHCP issue on second ONU, when two ONUs exist on same PON

- Process to_remove before to_add items in UpdateFlowsIncrementally
  method
- Sequentially add flows for a given pon, onu, uni inorder to avoid
  PON resource collisions. Used lock per key so that different
  subscribers do not have to wait for each other to add flows.
- Wait for any flow removes for the subscriber to complete before
  adding flows.

Change-Id: I095291b9a53fd0f19dc79f2b44923ec786a26d6e
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7e09fc8..9c65cee 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -1081,14 +1081,15 @@
 func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
 	if flows != nil {
-		for _, flow := range flows.ToAdd.Items {
-			log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
-			dh.flowMgr.AddFlow(flow, flowMetadata)
-		}
 		for _, flow := range flows.ToRemove.Items {
 			log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
 			dh.flowMgr.RemoveFlow(flow)
 		}
+
+		for _, flow := range flows.ToAdd.Items {
+			log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
+			dh.flowMgr.AddFlow(flow, flowMetadata)
+		}
 	}
 	if groups != nil && flows != nil {
 		for _, flow := range flows.ToRemove.Items {
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 0186712..8c7cf8d 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -259,5 +259,9 @@
 	onuID = OnuIDFromUniPortNum(uniPortNo)
 	uniID = UniIDFromPortNum(uniPortNo)
 
+	log.Debugw("flow extract info result",
+		log.Fields{"uniPortNo": uniPortNo, "ponIntf": ponIntf,
+			"onuID": onuID, "uniID": uniID, "inPort": inPort, "ethType": ethType})
+
 	return uniPortNo, ponIntf, onuID, uniID, inPort, ethType, nil
 }
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 565c8c8..71a0850 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -26,6 +26,7 @@
 	"fmt"
 	"math/big"
 	"sync"
+	"time"
 
 	"github.com/opencord/voltha-lib-go/v2/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -39,6 +40,7 @@
 	"github.com/opencord/voltha-protos/v2/go/voltha"
 
 	//deepcopy "github.com/getlantern/deepcopy"
+	"github.com/EagleChen/mapmutex"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -152,6 +154,18 @@
 	gemPort uint32
 }
 
+type pendingFlowDeleteKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
+type tpLockKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -174,6 +188,9 @@
 	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	onuGemInfo         map[uint32][]rsrcMgr.OnuGemInfo    //onu, gem and uni info local cache
 	lockCache          sync.RWMutex
+	pendingFlowDelete  sync.Map
+	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
+	perUserFlowHandleLock *mapmutex.Mutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -204,6 +221,8 @@
 		flowMgr.loadFlowIDlistForGem(idx)
 	}
 	flowMgr.lockCache = sync.RWMutex{}
+	flowMgr.pendingFlowDelete = sync.Map{}
+	flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
 	log.Info("Initialization of  flow manager success!!")
 	return &flowMgr
 }
@@ -254,24 +273,32 @@
 
 	uni := getUniPortPath(intfID, int32(onuID), int32(uniID))
 	log.Debugw("Uni port name", log.Fields{"uni": uni})
-	allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
-	if allocID == 0 || gemPorts == nil || TpInst == nil {
-		log.Error("alloc-id-gem-ports-tp-unavailable")
+
+	tpLockMapKey := tpLockKey{intfID, onuID, uniID}
+	if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
+		allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+		if allocID == 0 || gemPorts == nil || TpInst == nil {
+			log.Error("alloc-id-gem-ports-tp-unavailable")
+			f.perUserFlowHandleLock.Unlock(tpLockMapKey)
+			return
+		}
+		args := make(map[string]uint32)
+		args[IntfID] = intfID
+		args[OnuID] = onuID
+		args[UniID] = uniID
+		args[PortNo] = portNo
+		args[AllocID] = allocID
+
+		/* Flows can be added specific to gemport if p-bits are received.
+		 * If no pbit mentioned then adding flows for all gemports
+		 */
+		f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+		f.perUserFlowHandleLock.Unlock(tpLockMapKey)
+	} else {
+		log.Errorw("failed to acquire per user flow handle lock",
+			log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID})
 		return
 	}
-
-	/* Flows can be added specific to gemport if p-bits are received.
-	 * If no pbit mentioned then adding flows for all gemports
-	 */
-
-	args := make(map[string]uint32)
-	args[IntfID] = intfID
-	args[OnuID] = onuID
-	args[UniID] = uniID
-	args[PortNo] = portNo
-	args[AllocID] = allocID
-
-	f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
 }
 
 // CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
@@ -445,6 +472,7 @@
 	var allocIDs []uint32
 	var allgemPortIDs []uint32
 	var gemPortIDs []uint32
+	tpInstanceExists := false
 
 	allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
 	allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
@@ -466,6 +494,7 @@
 		f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
 	} else {
 		log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
+		tpInstanceExists = true
 	}
 	if UsMeterID != 0 {
 		sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
@@ -485,13 +514,19 @@
 	}
 
 	allocID := techProfileInstance.UsScheduler.AllocID
-	allocIDs = appendUnique(allocIDs, allocID)
-
 	for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
-		allgemPortIDs = appendUnique(allgemPortIDs, gem.GemportID)
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
+	if tpInstanceExists {
+		return allocID, gemPortIDs, techProfileInstance
+	}
+
+	allocIDs = appendUnique(allocIDs, allocID)
+	for _, gemPortID := range gemPortIDs {
+		allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
+	}
+
 	log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocIDs": allocIDs, "gemports": allgemPortIDs})
 	// Send Tconts and GEM ports to KV store
 	f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocIDs, allgemPortIDs)
@@ -603,6 +638,10 @@
 		log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
 	}
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
 	flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
@@ -674,11 +713,15 @@
 	delete(classifier, VlanVid)
 
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
 
-	flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
+	flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
-		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
+		log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 		return
 	}
 
@@ -729,10 +772,8 @@
 
 	uplinkClassifier := make(map[string]interface{})
 	uplinkAction := make(map[string]interface{})
-	downlinkClassifier := make(map[string]interface{})
-	downlinkAction := make(map[string]interface{})
+
 	var upstreamFlow openoltpb2.Flow
-	var downstreamFlow openoltpb2.Flow
 	var networkIntfID uint32
 
 	// Fill Classfier
@@ -742,6 +783,10 @@
 	// Fill action
 	uplinkAction[TrapToHost] = true
 	flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
 	//Add Uplink EAPOL Flow
 	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 	if err != nil {
@@ -795,78 +840,7 @@
 			return
 		}
 	}
-	// Dummy Downstream flow due to BAL 2.6 limitation
-	{
-		/* Add Downstream EAPOL Flow, Only for first EAP flow (BAL
-		# requirement)
-		# On one of the platforms (Broadcom BAL), when same DL classifier
-		# vlan was used across multiple ONUs, eapol flow re-adds after
-		# flow delete (cases of onu reboot/disable) fails.
-		# In order to generate unique vlan, a combination of intf_id
-		# onu_id and uniId is used.
-		# uniId defaults to 0, so add 1 to it.
-		*/
-		log.Debugw("Creating DL EAPOL flow with default vlan", log.Fields{"vlan": vlanID})
-		specialVlanDlFlow := 4090 - intfID*onuID*(uniID+1)
-		// Assert that we do not generate invalid vlans under no condition
-		if specialVlanDlFlow <= 2 {
-			log.Fatalw("invalid-vlan-generated", log.Fields{"vlan": specialVlanDlFlow})
-			return
-		}
-		log.Debugw("specialVlanEAPOLDlFlow:", log.Fields{"dl_vlan": specialVlanDlFlow})
-		// Fill Classfier
-		downlinkClassifier[PacketTagType] = SingleTag
-		downlinkClassifier[EthType] = uint32(EapEthType)
-		downlinkClassifier[VlanVid] = uint32(specialVlanDlFlow)
-		// Fill action
-		downlinkAction[PushVlan] = true
-		downlinkAction[VlanVid] = vlanID
-		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
-		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
-		if err != nil {
-			log.Errorw("flowId unavailable for DL EAPOL",
-				log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
-			return
-		}
-		log.Debugw("Creating DL EAPOL flow",
-			log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowID": downlinkFlowID})
-		if classifierProto = makeOpenOltClassifierField(downlinkClassifier); classifierProto == nil {
-			log.Error("Error in making classifier protobuf for downlink flow")
-			return
-		}
-		if actionProto = makeOpenOltActionField(downlinkAction); actionProto == nil {
-			log.Error("Error in making action protobuf for dl flow")
-			return
-		}
-		// Downstream flow in grpc protobuf
-		downstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
-			OnuId:         int32(onuID),
-			UniId:         int32(uniID),
-			FlowId:        downlinkFlowID,
-			FlowType:      Downstream,
-			AllocId:       int32(allocID),
-			NetworkIntfId: int32(networkIntfID),
-			GemportId:     int32(gemPortID),
-			Classifier:    classifierProto,
-			Action:        actionProto,
-			Priority:      int32(logicalFlow.Priority),
-			Cookie:        logicalFlow.Cookie,
-			PortNo:        portNo}
-		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
-			log.Debug("EAPOL DL flow added to device successfully")
-			flowCategory := ""
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID, logicalFlow.Id)
-			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
-				downstreamFlow.OnuId,
-				downstreamFlow.UniId,
-				downstreamFlow.FlowId,
-				/* flowCategory, */
-				flowsToKVStore); err != nil {
-				log.Errorw("Error uploading EAPOL DL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
-				return
-			}
-		}
-	}
+
 	log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
 }
 
@@ -967,6 +941,7 @@
 		log.Error("Invalid classfier object")
 		return 0
 	}
+	log.Debugw("generating flow store cookie", log.Fields{"classifier": classifier, "gemPortID": gemPortID})
 	var jsonData []byte
 	var flowString string
 	var err error
@@ -983,7 +958,9 @@
 	_, _ = h.Write([]byte(flowString))
 	hash := big.NewInt(0)
 	hash.SetBytes(h.Sum(nil))
-	return hash.Uint64()
+	generatedHash := hash.Uint64()
+	log.Debugw("hash generated", log.Fields{"hash": generatedHash})
+	return generatedHash
 }
 
 func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
@@ -1062,7 +1039,7 @@
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
 		log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
-		return false
+		return true
 	}
 
 	if err != nil {
@@ -1270,6 +1247,29 @@
 	return nil
 }
 
+func (f *OpenOltFlowMgr) deletePendingFlows(Intf uint32, onuID int32, uniID int32) {
+	pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
+	if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
+		if val.(int) > 0 {
+			pnFlDels := val.(int) - 1
+			if pnFlDels > 0 {
+				log.Debugw("flow delete succeeded, more pending",
+					log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID, "currPendingFlowCnt": pnFlDels})
+				f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
+			} else {
+				log.Debugw("all pending flow deletes handled, removing entry from map",
+					log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+				f.pendingFlowDelete.Delete(pnFlDelKey)
+			}
+		}
+	} else {
+		log.Debugw("no pending delete flows found",
+			log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+
+	}
+
+}
+
 //clearResources clears pon resources in kv store and the device
 func (f *OpenOltFlowMgr) clearResources(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
 	gemPortID int32, flowID uint32, flowDirection string,
@@ -1288,6 +1288,23 @@
 		// between DS and US.
 		f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
 		if len(updatedFlows) == 0 {
+			// Do this for subscriber flows only (not trap from NNI flows)
+			if onuID != -1 && uniID != -1 {
+				pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
+				if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
+					log.Debugw("creating entry for pending flow delete",
+						log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+					f.pendingFlowDelete.Store(pnFlDelKey, 1)
+				} else {
+					pnFlDels := val.(int) + 1
+					log.Debugw("updating flow delete entry",
+						log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID, "currPendingFlowCnt": pnFlDels})
+					f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
+				}
+
+				defer f.deletePendingFlows(Intf, onuID, uniID)
+			}
+
 			log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
 			f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
 
@@ -1369,6 +1386,7 @@
 		log.Error(err)
 		return
 	}
+
 	onuID = int32(onu)
 	uniID = int32(uni)
 
@@ -1451,6 +1469,24 @@
 	return
 }
 
+func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
+	uniID uint32, ch chan bool) {
+	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
+	for {
+		select {
+		case <-time.After(20 * time.Millisecond):
+			if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
+				log.Debug("pending flow deletes completed")
+				ch <- true
+				return
+			}
+		case <-ctx.Done():
+			log.Error("flow delete wait handler routine canceled")
+			return
+		}
+	}
+}
+
 // AddFlow add flow to device
 func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
 	classifierInfo := make(map[string]interface{})
@@ -1514,7 +1550,26 @@
 		log.Debugw("Downstream-flow-meter-id", log.Fields{"DsMeterID": DsMeterID})
 
 	}
-	f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+
+	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
+	if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
+		log.Debugw("no pending flows found, going ahead with flow install", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+		f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+	} else {
+		ctx := context.Background()
+		ctx, cancel := context.WithCancel(ctx)
+		defer cancel()
+		pendingFlowDelComplete := make(chan bool)
+		go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
+		select {
+		case <-pendingFlowDelComplete:
+			log.Debugw("all pending flow deletes completed", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+			f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+
+		case <-time.After(10 * time.Second):
+			log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+		}
+	}
 }
 
 //sendTPDownloadMsgToChild send payload