[VOL-2786] : Handle flow deletes completely before proceeding onto
             flow adds for a subscriber.
             Also serialize flow deletes on a per subscriber basis.
             Flow Adds are already serialized on a per subscriber basis.

Change-Id: If0a85780f76c3fdc6496074356131c5cdfbfd577
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b229f82..0ca3a7f 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,6 +22,7 @@
 	"crypto/md5"
 	"encoding/hex"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"math/big"
 	"strings"
@@ -1727,7 +1728,7 @@
 }
 
 //RemoveFlow removes the flow from the device
-func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
 	log.Debugw("Removing Flow", log.Fields{"flow": flow})
 	var direction string
 	actionInfo := make(map[string]interface{})
@@ -1739,21 +1740,39 @@
 				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
 			} else {
 				log.Error("Invalid output port in action")
-				return
+				return olterrors.NewErrInvalidValue(log.Fields{"invalid-out-port-action": 0}, nil)
 			}
 		}
 	}
 
 	if flows.HasGroup(flow) {
 		direction = Multicast
+		f.clearFlowFromResourceManager(ctx, flow, direction)
+		return nil
 	} else if IsUpstream(actionInfo[Output].(uint32)) {
 		direction = Upstream
 	} else {
 		direction = Downstream
 	}
-	f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
 
-	return
+	_, intfID, onuID, uniID, _, _, err := FlowExtractInfo(flow, direction)
+	if err != nil {
+		return err
+	}
+
+	userKey := tpLockKey{intfID, onuID, uniID}
+
+	// Serialize flow removes on a per subscriber basis
+	if f.perUserFlowHandleLock.TryLock(userKey) {
+		f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
+		f.perUserFlowHandleLock.Unlock(userKey)
+	} else {
+		// Ideally this should never happen
+		log.Errorw("failed to acquire lock to remove flow, flow remove aborted", log.Fields{"flow": flow})
+		return errors.New("failed-to-acquire-per-user-lock")
+	}
+
+	return nil
 }
 
 func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,