[VOL-3737] : Fix synchronization issues with mcast flow and group delete
Change-Id: Ib535d7cd9c83595098e8fa957ca01fee9553e701
diff --git a/VERSION b/VERSION
index 0f9d6b1..94ff29c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.0-dev
+3.1.1
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index c8137d0..e288838 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -21,6 +21,7 @@
"context"
"encoding/binary"
"encoding/hex"
+ "errors"
"fmt"
"io"
"net"
@@ -37,7 +38,7 @@
"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v4/pkg/config"
"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
+ flow_utils "github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
@@ -55,7 +56,12 @@
// Constants for number of retries and for timeout
const (
- InvalidPort = 0xffffffff
+ InvalidPort = 0xffffffff
+ MaxNumOfGroupHandlerChannels = 256
+
+ McastFlowOrGroupAdd = "McastFlowOrGroupAdd"
+ McastFlowOrGroupModify = "McastFlowOrGroupModify"
+ McastFlowOrGroupRemove = "McastFlowOrGroupRemove"
)
//DeviceHandler will interact with the OLT device.
@@ -89,6 +95,10 @@
totalPonPorts uint32
perOnuChannel map[string]onuIndicationChannels
perOnuChannelLock sync.Mutex
+
+ // Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
+ // A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
+ incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
}
//OnuDevice represents ONU related info
@@ -114,6 +124,18 @@
stopChannel chan struct{}
}
+//McastFlowOrGroupControlBlock is created per mcast flow/group add/modify/remove and pushed on the incomingMcastFlowOrGroup channel slice
+//The McastFlowOrGroupControlBlock is then picked by the mcastFlowOrGroupChannelHandlerRoutine for further processing.
+//There are MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine routines which monitor for any incoming mcast flow/group messages
+//and process them serially. The mcast flow/group are assigned these routines based on formula (group-id modulo MaxNumOfGroupHandlerChannels)
+type McastFlowOrGroupControlBlock struct {
+ ctx context.Context // Flow/group handler context
+ flowOrGroupAction string // one of McastFlowOrGroupAdd, McastFlowOrGroupModify or McastFlowOrGroupDelete
+ flow *voltha.OfpFlowStats // Flow message (can be nil or valid flow)
+ group *voltha.OfpGroupEntry // Group message (can be nil or valid group)
+ errChan *chan error // channel to report the mcast Flow/group handling error
+}
+
var pmNames = []string{
"rx_bytes",
"rx_packets",
@@ -156,6 +178,16 @@
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
dh.perOnuChannel = make(map[string]onuIndicationChannels)
+ // Create a slice of buffered channels for handling concurrent mcast flow/group.
+ dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+ for i := range dh.incomingMcastFlowOrGroup {
+ dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+ // Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
+ // There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
+ // These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
+ // for incoming mcast flow/group to be processed serially.
+ go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+ }
//TODO initialize the support classes.
return &dh
}
@@ -1435,6 +1467,7 @@
func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+ var err error
var errorsList []error
if flows != nil {
@@ -1445,7 +1478,11 @@
log.Fields{"device-id": device.Id,
"ponIf": ponIf,
"flowToRemove": flow})
- err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+ if flow_utils.HasGroup(flow) {
+ err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupRemove)
+ } else {
+ err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+ }
if err != nil {
errorsList = append(errorsList, err)
}
@@ -1457,7 +1494,11 @@
log.Fields{"device-id": device.Id,
"ponIf": ponIf,
"flowToAdd": flow})
- err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ if flow_utils.HasGroup(flow) {
+ err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
+ } else {
+ err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ }
if err != nil {
errorsList = append(errorsList, err)
}
@@ -1467,19 +1508,22 @@
// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
- err := dh.groupMgr.AddGroup(ctx, group)
+ // err = dh.groupMgr.AddGroup(ctx, group)
+ err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, nil, group, McastFlowOrGroupAdd)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, group := range groups.ToUpdate.Items {
- err := dh.groupMgr.ModifyGroup(ctx, group)
+ // err = dh.groupMgr.ModifyGroup(ctx, group)
+ err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, nil, group, McastFlowOrGroupModify)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, group := range groups.ToRemove.Items {
- err := dh.groupMgr.DeleteGroup(ctx, group)
+ // err = dh.groupMgr.DeleteGroup(ctx, group)
+ err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, nil, group, McastFlowOrGroupRemove)
if err != nil {
errorsList = append(errorsList, err)
}
@@ -2164,8 +2208,8 @@
}
func getInPortFromFlow(flow *of.OfpFlowStats) uint32 {
- for _, field := range flows.GetOfbFields(flow) {
- if field.Type == flows.IN_PORT {
+ for _, field := range flow_utils.GetOfbFields(flow) {
+ if field.Type == flow_utils.IN_PORT {
return field.GetPort()
}
}
@@ -2173,8 +2217,8 @@
}
func getOutPortFromFlow(flow *of.OfpFlowStats) uint32 {
- for _, action := range flows.GetActions(flow) {
- if action.Type == flows.OUTPUT {
+ for _, action := range flow_utils.GetActions(flow) {
+ if action.Type == flow_utils.OUTPUT {
if out := action.GetOutput(); out != nil {
return out.GetPort()
}
@@ -2194,19 +2238,19 @@
if isControllerFlow := IsControllerBoundFlow(outPort); isControllerFlow {
/* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows */
if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
- if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+ if uniPort := flow_utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
return uniPort, outPort
}
}
} else {
// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
if portType := IntfIDToPortTypeName(outPort); portType == voltha.Port_PON_OLT {
- if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+ if uniPort := flow_utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
return inPort, uniPort
}
// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
} else if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
- if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+ if uniPort := flow_utils.GetChildPortFromTunnelId(flow); uniPort != 0 {
return uniPort, outPort
}
}
@@ -2345,3 +2389,87 @@
}
}
}
+
+// RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
+// instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
+func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *voltha.OfpFlowStats, group *voltha.OfpGroupEntry, action string) error {
+ // Step1 : Fill McastFlowOrGroupControlBlock
+ // Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
+ // Step3 : Wait on response channel for response
+ // Step4 : Return error value
+ logger.Debugw(ctx, "process-flow-or-group", log.Fields{"flow": flow, "group": group, "action": action})
+ errChan := make(chan error)
+ var groupID uint32
+ mcastFlowOrGroupCb := McastFlowOrGroupControlBlock{
+ ctx: ctx,
+ flowOrGroupAction: action,
+ flow: flow,
+ group: group,
+ errChan: &errChan,
+ }
+ if flow != nil {
+ groupID = flow_utils.GetGroup(flow)
+ } else if group != nil {
+ groupID = group.Desc.GroupId
+ } else {
+ return errors.New("flow-and-group-both-nil")
+ }
+ // Derive the appropriate go routine to handle the request by a simple module operation.
+ // There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+ dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+ // Wait for handler to return error value
+ err := <-errChan
+ logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"flow": flow, "group": group, "action": action, "err": err})
+ return err
+}
+
+// mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+ for {
+ // block on the channel to receive an incoming mcast flow/group
+ // process the flow completely before proceeding to handle the next flow
+ mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
+ if mcastFlowOrGroupCb.flow != nil {
+ if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+ log.Fields{"device-id": dh.device.Id,
+ "flowToAdd": mcastFlowOrGroupCb.flow})
+ // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+ err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else { // flow remove
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+ log.Fields{"device-id": dh.device.Id,
+ "flowToRemove": mcastFlowOrGroupCb.flow})
+ // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+ err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ }
+ } else { // mcast group
+ if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToAdd": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToModify": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else { // group remove
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToRemove": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ }
+ }
+ }
+}