[VOL-2786] : Handle flow deletes completely before proceeding onto
             flow adds for a subscriber.
             Also serialize flow deletes on a per subscriber basis.
             Flow Adds are already serialized on a per subscriber basis.

Change-Id: If0a85780f76c3fdc6496074356131c5cdfbfd577
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 2b0b52a..111a87c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -21,6 +21,7 @@
 	"context"
 	"encoding/hex"
 	"fmt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"io"
 	"net"
 	"strconv"
@@ -51,8 +52,24 @@
 const (
 	MaxRetry       = 10
 	MaxTimeOutInMs = 500
+	InvalidPort    = 0xffffffff
 )
 
+// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
+type pendingFlowRemoveDataKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
+// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
+// This holds the number of pending flow removes and also a signal channel to
+// to indicate the receiver when all flow removes are handled
+type pendingFlowRemoveData struct {
+	pendingFlowRemoveCount uint32
+	allFlowsRemoved        chan struct{}
+}
+
 //DeviceHandler will interact with the OLT device.
 type DeviceHandler struct {
 	deviceID      string
@@ -80,6 +97,13 @@
 	stopHeartbeatCheck chan bool
 	activePorts        sync.Map
 	stopIndications    chan bool
+
+	// pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
+	// subscriber basis for the number of pending flow removes. This data is used
+	// to process all the flow removes for a subscriber before handling flow adds.
+	// Interleaving flow delete and flow add processing has known to cause PON resource
+	// management contentions on a per subscriber bases, so we need ensure ordering.
+	pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
 }
 
 //OnuDevice represents ONU related info
@@ -138,6 +162,8 @@
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
+	dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
+
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -1258,12 +1284,21 @@
 
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
+			dh.incrementActiveFlowRemoveCount(flow)
+
 			log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
-			dh.flowMgr.RemoveFlow(ctx, flow)
+			err := dh.flowMgr.RemoveFlow(ctx, flow)
+			if err != nil {
+				errorsList = append(errorsList, err)
+			}
+
+			dh.decrementActiveFlowRemoveCount(flow)
 		}
 
 		for _, flow := range flows.ToAdd.Items {
 			log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
+			// If there are active Flow Remove in progress for a given subscriber, wait until it completes
+			dh.waitForFlowRemoveToFinish(flow)
 			err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
 			if err != nil {
 				errorsList = append(errorsList, err)
@@ -1277,6 +1312,7 @@
 		}
 	}
 
+	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
 			err := dh.flowMgr.AddGroup(ctx, group)
@@ -1910,3 +1946,137 @@
 	dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
 	return nil
 }
+
+func getInPortFromFlow(flow *of.OfpFlowStats) uint32 {
+	for _, field := range flows.GetOfbFields(flow) {
+		if field.Type == flows.IN_PORT {
+			return field.GetPort()
+		}
+	}
+	return InvalidPort
+}
+
+func getOutPortFromFlow(flow *of.OfpFlowStats) uint32 {
+	for _, action := range flows.GetActions(flow) {
+		if action.Type == flows.OUTPUT {
+			if out := action.GetOutput(); out != nil {
+				return out.GetPort()
+			}
+		}
+	}
+	return InvalidPort
+}
+
+func (dh *DeviceHandler) incrementActiveFlowRemoveCount(flow *of.OfpFlowStats) {
+	inPort, outPort := getPorts(flow)
+	log.Debugw("increment flow remove count for inPort outPort", log.Fields{"inPort": inPort, "outPort": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		log.Debugw("increment flow remove count for subscriber", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+
+		dh.lockDevice.Lock()
+		defer dh.lockDevice.Unlock()
+		flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
+		if !ok {
+			flowRemoveData = pendingFlowRemoveData{
+				pendingFlowRemoveCount: 0,
+				allFlowsRemoved:        make(chan struct{}),
+			}
+		}
+		flowRemoveData.pendingFlowRemoveCount++
+		dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
+
+		log.Debugw("current flow remove count after increment",
+			log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID,
+				"currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+	}
+}
+
+func (dh *DeviceHandler) decrementActiveFlowRemoveCount(flow *of.OfpFlowStats) {
+	inPort, outPort := getPorts(flow)
+	log.Debugw("decrement flow remove count for inPort outPort", log.Fields{"inPort": inPort, "outPort": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		log.Debugw("decrement flow remove count for subscriber", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+
+		dh.lockDevice.Lock()
+		defer dh.lockDevice.Unlock()
+		if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+			log.Fatalf("flow remove key not found", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+		} else {
+			if val.pendingFlowRemoveCount > 0 {
+				val.pendingFlowRemoveCount--
+			}
+			log.Debugw("current flow remove count after decrement",
+				log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID,
+					"currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+			// If all flow removes have finished, then close the channel to signal the receiver
+			// to go ahead with flow adds.
+			if val.pendingFlowRemoveCount == 0 {
+				close(val.allFlowsRemoved)
+				delete(dh.pendingFlowRemoveDataPerSubscriber, key)
+				return
+			}
+			dh.pendingFlowRemoveDataPerSubscriber[key] = val
+		}
+	}
+}
+
+func (dh *DeviceHandler) waitForFlowRemoveToFinish(flow *of.OfpFlowStats) {
+	var flowRemoveData pendingFlowRemoveData
+	var ok bool
+	inPort, outPort := getPorts(flow)
+	log.Debugw("wait for flow remove to finish for inPort outPort", log.Fields{"inPort": inPort, "outPort": outPort})
+	if inPort != InvalidPort && outPort != InvalidPort {
+		_, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+		key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+		log.Debugw("wait for flow remove to finish for subscriber", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+
+		dh.lockDevice.RLock()
+		if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+			log.Debugw("no pending flow to remove", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+			dh.lockDevice.RUnlock()
+			return
+		}
+		dh.lockDevice.RUnlock()
+
+		// Wait for all flow removes to finish first
+		<-flowRemoveData.allFlowsRemoved
+
+		log.Debugw("all flows cleared, handling flow add now", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+	}
+}
+
+func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
+	inPort := getInPortFromFlow(flow)
+	outPort := getOutPortFromFlow(flow)
+
+	if inPort == InvalidPort || outPort == InvalidPort {
+		return inPort, outPort
+	}
+
+	if isControllerFlow := IsControllerBoundFlow(outPort); isControllerFlow {
+		/* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows  */
+		if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
+			if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+				return uniPort, outPort
+			}
+		}
+	} else {
+		// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
+		if portType := IntfIDToPortTypeName(outPort); portType == voltha.Port_PON_OLT {
+			if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+				return inPort, uniPort
+			}
+			// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
+		} else if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
+			if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+				return uniPort, outPort
+			}
+		}
+	}
+
+	return InvalidPort, InvalidPort
+}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b229f82..0ca3a7f 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,6 +22,7 @@
 	"crypto/md5"
 	"encoding/hex"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"math/big"
 	"strings"
@@ -1727,7 +1728,7 @@
 }
 
 //RemoveFlow removes the flow from the device
-func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 	log.Debugw("Removing Flow", log.Fields{"flow": flow})
 	var direction string
 	actionInfo := make(map[string]interface{})
@@ -1739,21 +1740,39 @@
 				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
 			} else {
 				log.Error("Invalid output port in action")
-				return
+				return olterrors.NewErrInvalidValue(log.Fields{"invalid-out-port-action": 0}, nil)
 			}
 		}
 	}
 
 	if flows.HasGroup(flow) {
 		direction = Multicast
+		f.clearFlowFromResourceManager(ctx, flow, direction)
+		return nil
 	} else if IsUpstream(actionInfo[Output].(uint32)) {
 		direction = Upstream
 	} else {
 		direction = Downstream
 	}
-	f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
 
-	return
+	_, intfID, onuID, uniID, _, _, err := FlowExtractInfo(flow, direction)
+	if err != nil {
+		return err
+	}
+
+	userKey := tpLockKey{intfID, onuID, uniID}
+
+	// Serialize flow removes on a per subscriber basis
+	if f.perUserFlowHandleLock.TryLock(userKey) {
+		f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
+		f.perUserFlowHandleLock.Unlock(userKey)
+	} else {
+		// Ideally this should never happen
+		log.Errorw("failed to acquire lock to remove flow, flow remove aborted", log.Fields{"flow": flow})
+		return errors.New("failed-to-acquire-per-user-lock")
+	}
+
+	return nil
 }
 
 func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,