[VOL-3737] : Fix synchronization issues with mcast flow and group delete

Change-Id: Ib535d7cd9c83595098e8fa957ca01fee9553e701
diff --git a/VERSION b/VERSION
index 0f9d6b1..94ff29c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.0-dev
+3.1.1
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index c8137d0..e288838 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -21,6 +21,7 @@
 	"context"
 	"encoding/binary"
 	"encoding/hex"
+	"errors"
 	"fmt"
 	"io"
 	"net"
@@ -37,7 +38,7 @@
 	"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v4/pkg/config"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
-	"github.com/opencord/voltha-lib-go/v4/pkg/flows"
+	flow_utils "github.com/opencord/voltha-lib-go/v4/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
 
@@ -55,7 +56,12 @@
 
 // Constants for number of retries and for timeout
 const (
-	InvalidPort = 0xffffffff
+	InvalidPort                  = 0xffffffff
+	MaxNumOfGroupHandlerChannels = 256
+
+	McastFlowOrGroupAdd    = "McastFlowOrGroupAdd"
+	McastFlowOrGroupModify = "McastFlowOrGroupModify"
+	McastFlowOrGroupRemove = "McastFlowOrGroupRemove"
 )
 
 //DeviceHandler will interact with the OLT device.
@@ -89,6 +95,10 @@
 	totalPonPorts     uint32
 	perOnuChannel     map[string]onuIndicationChannels
 	perOnuChannelLock sync.Mutex
+
+	// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
+	// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
+	incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
 }
 
 //OnuDevice represents ONU related info
@@ -114,6 +124,18 @@
 	stopChannel       chan struct{}
 }
 
+//McastFlowOrGroupControlBlock is created per mcast flow/group add/modify/remove and pushed on the incomingMcastFlowOrGroup channel slice
+//The McastFlowOrGroupControlBlock is then picked by the mcastFlowOrGroupChannelHandlerRoutine for further processing.
+//There are MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine routines which monitor for any incoming mcast flow/group messages
+//and process them serially. The mcast flow/group are assigned these routines based on formula (group-id modulo MaxNumOfGroupHandlerChannels)
+type McastFlowOrGroupControlBlock struct {
+	ctx               context.Context       // Flow/group handler context
+	flowOrGroupAction string                // one of McastFlowOrGroupAdd, McastFlowOrGroupModify or McastFlowOrGroupDelete
+	flow              *voltha.OfpFlowStats  // Flow message (can be nil or valid flow)
+	group             *voltha.OfpGroupEntry // Group message (can be nil or valid group)
+	errChan           *chan error           // channel to report the mcast Flow/group handling error
+}
+
 var pmNames = []string{
 	"rx_bytes",
 	"rx_packets",
@@ -156,6 +178,16 @@
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
 	dh.perOnuChannel = make(map[string]onuIndicationChannels)
+	// Create a slice of buffered channels for handling concurrent mcast flow/group.
+	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+	for i := range dh.incomingMcastFlowOrGroup {
+		dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+		// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
+		// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
+		// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
+		// for incoming mcast flow/group to be processed serially.
+		go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+	}
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -1435,6 +1467,7 @@
 func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
 	logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
 
+	var err error
 	var errorsList []error
 
 	if flows != nil {
@@ -1445,7 +1478,11 @@
 				log.Fields{"device-id": device.Id,
 					"ponIf":        ponIf,
 					"flowToRemove": flow})
-			err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+			if flow_utils.HasGroup(flow) {
+				err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupRemove)
+			} else {
+				err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+			}
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -1457,7 +1494,11 @@
 				log.Fields{"device-id": device.Id,
 					"ponIf":     ponIf,
 					"flowToAdd": flow})
-			err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+			if flow_utils.HasGroup(flow) {
+				err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
+			} else {
+				err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+			}
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -1467,19 +1508,22 @@
 	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
-			err := dh.groupMgr.AddGroup(ctx, group)
+			// err = dh.groupMgr.AddGroup(ctx, group)
+			err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, nil, group, McastFlowOrGroupAdd)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 		for _, group := range groups.ToUpdate.Items {
-			err := dh.groupMgr.ModifyGroup(ctx, group)
+			// err = dh.groupMgr.ModifyGroup(ctx, group)
+			err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, nil, group, McastFlowOrGroupModify)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
 		}
 		for _, group := range groups.ToRemove.Items {
-			err := dh.groupMgr.DeleteGroup(ctx, group)
+			// err = dh.groupMgr.DeleteGroup(ctx, group)
+			err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, nil, group, McastFlowOrGroupRemove)
 			if err != nil {
 				errorsList = append(errorsList, err)
 			}
@@ -2164,8 +2208,8 @@
 }
 
 func getInPortFromFlow(flow *of.OfpFlowStats) uint32 {
-	for _, field := range flows.GetOfbFields(flow) {
-		if field.Type == flows.IN_PORT {
+	for _, field := range flow_utils.GetOfbFields(flow) {
+		if field.Type == flow_utils.IN_PORT {
 			return field.GetPort()
 		}
 	}
@@ -2173,8 +2217,8 @@
 }
 
 func getOutPortFromFlow(flow *of.OfpFlowStats) uint32 {
-	for _, action := range flows.GetActions(flow) {
-		if action.Type == flows.OUTPUT {
+	for _, action := range flow_utils.GetActions(flow) {
+		if action.Type == flow_utils.OUTPUT {
 			if out := action.GetOutput(); out != nil {
 				return out.GetPort()
 			}
@@ -2194,19 +2238,19 @@
 	if isControllerFlow := IsControllerBoundFlow(outPort); isControllerFlow {
 		/* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows  */
 		if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
-			if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+			if uniPort := flow_utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
 				return uniPort, outPort
 			}
 		}
 	} else {
 		// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
 		if portType := IntfIDToPortTypeName(outPort); portType == voltha.Port_PON_OLT {
-			if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+			if uniPort := flow_utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
 				return inPort, uniPort
 			}
 			// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
 		} else if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
-			if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+			if uniPort := flow_utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
 				return uniPort, outPort
 			}
 		}
@@ -2345,3 +2389,87 @@
 		}
 	}
 }
+
+// RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
+// instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
+func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *voltha.OfpFlowStats, group *voltha.OfpGroupEntry, action string) error {
+	// Step1 : Fill McastFlowOrGroupControlBlock
+	// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
+	// Step3 : Wait on response channel for response
+	// Step4 : Return error value
+	logger.Debugw(ctx, "process-flow-or-group", log.Fields{"flow": flow, "group": group, "action": action})
+	errChan := make(chan error)
+	var groupID uint32
+	mcastFlowOrGroupCb := McastFlowOrGroupControlBlock{
+		ctx:               ctx,
+		flowOrGroupAction: action,
+		flow:              flow,
+		group:             group,
+		errChan:           &errChan,
+	}
+	if flow != nil {
+		groupID = flow_utils.GetGroup(flow)
+	} else if group != nil {
+		groupID = group.Desc.GroupId
+	} else {
+		return errors.New("flow-and-group-both-nil")
+	}
+	// Derive the appropriate go routine to handle the request by a simple module operation.
+	// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+	dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+	// Wait for handler to return error value
+	err := <-errChan
+	logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"flow": flow, "group": group, "action": action, "err": err})
+	return err
+}
+
+// mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+	for {
+		// block on the channel to receive an incoming mcast flow/group
+		// process the flow completely before proceeding to handle the next flow
+		mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
+		if mcastFlowOrGroupCb.flow != nil {
+			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+					log.Fields{"device-id": dh.device.Id,
+						"flowToAdd": mcastFlowOrGroupCb.flow})
+				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+				err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+				// Pass the return value over the return channel
+				*mcastFlowOrGroupCb.errChan <- err
+			} else { // flow remove
+				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+					log.Fields{"device-id": dh.device.Id,
+						"flowToRemove": mcastFlowOrGroupCb.flow})
+				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+				err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+				// Pass the return value over the return channel
+				*mcastFlowOrGroupCb.errChan <- err
+			}
+		} else { // mcast group
+			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+					log.Fields{"device-id": dh.device.Id,
+						"groupToAdd": mcastFlowOrGroupCb.group})
+				err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+				// Pass the return value over the return channel
+				*mcastFlowOrGroupCb.errChan <- err
+			} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+				logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+					log.Fields{"device-id": dh.device.Id,
+						"groupToModify": mcastFlowOrGroupCb.group})
+				err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+				// Pass the return value over the return channel
+				*mcastFlowOrGroupCb.errChan <- err
+			} else { // group remove
+				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+					log.Fields{"device-id": dh.device.Id,
+						"groupToRemove": mcastFlowOrGroupCb.group})
+				err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+				// Pass the return value over the return channel
+				*mcastFlowOrGroupCb.errChan <- err
+			}
+		}
+	}
+}