[VOL-2786] : Handle flow deletes completely before proceeding onto
flow adds for a subscriber.
Also serialize flow deletes on a per subscriber basis.
Flow Adds are already serialized on a per subscriber basis.
Change-Id: If0a85780f76c3fdc6496074356131c5cdfbfd577
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 2b0b52a..111a87c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -21,6 +21,7 @@
"context"
"encoding/hex"
"fmt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"io"
"net"
"strconv"
@@ -51,8 +52,24 @@
const (
MaxRetry = 10
MaxTimeOutInMs = 500
+ InvalidPort = 0xffffffff
)
+// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
+type pendingFlowRemoveDataKey struct {
+ intfID uint32
+ onuID uint32
+ uniID uint32
+}
+
+// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
+// This holds the number of pending flow removes and also a signal channel to
+// to indicate the receiver when all flow removes are handled
+type pendingFlowRemoveData struct {
+ pendingFlowRemoveCount uint32
+ allFlowsRemoved chan struct{}
+}
+
//DeviceHandler will interact with the OLT device.
type DeviceHandler struct {
deviceID string
@@ -80,6 +97,13 @@
stopHeartbeatCheck chan bool
activePorts sync.Map
stopIndications chan bool
+
+ // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
+ // subscriber basis for the number of pending flow removes. This data is used
+ // to process all the flow removes for a subscriber before handling flow adds.
+ // Interleaving flow delete and flow add processing has known to cause PON resource
+ // management contentions on a per subscriber bases, so we need ensure ordering.
+ pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
}
//OnuDevice represents ONU related info
@@ -138,6 +162,8 @@
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
+ dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
+
//TODO initialize the support classes.
return &dh
}
@@ -1258,12 +1284,21 @@
if flows != nil {
for _, flow := range flows.ToRemove.Items {
+ dh.incrementActiveFlowRemoveCount(flow)
+
log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
- dh.flowMgr.RemoveFlow(ctx, flow)
+ err := dh.flowMgr.RemoveFlow(ctx, flow)
+ if err != nil {
+ errorsList = append(errorsList, err)
+ }
+
+ dh.decrementActiveFlowRemoveCount(flow)
}
for _, flow := range flows.ToAdd.Items {
log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
+ // If there are active Flow Remove in progress for a given subscriber, wait until it completes
+ dh.waitForFlowRemoveToFinish(flow)
err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
if err != nil {
errorsList = append(errorsList, err)
@@ -1277,6 +1312,7 @@
}
}
+ // Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
err := dh.flowMgr.AddGroup(ctx, group)
@@ -1910,3 +1946,137 @@
dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
return nil
}
+
+func getInPortFromFlow(flow *of.OfpFlowStats) uint32 {
+ for _, field := range flows.GetOfbFields(flow) {
+ if field.Type == flows.IN_PORT {
+ return field.GetPort()
+ }
+ }
+ return InvalidPort
+}
+
+func getOutPortFromFlow(flow *of.OfpFlowStats) uint32 {
+ for _, action := range flows.GetActions(flow) {
+ if action.Type == flows.OUTPUT {
+ if out := action.GetOutput(); out != nil {
+ return out.GetPort()
+ }
+ }
+ }
+ return InvalidPort
+}
+
+func (dh *DeviceHandler) incrementActiveFlowRemoveCount(flow *of.OfpFlowStats) {
+ inPort, outPort := getPorts(flow)
+ log.Debugw("increment flow remove count for inPort outPort", log.Fields{"inPort": inPort, "outPort": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ log.Debugw("increment flow remove count for subscriber", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
+ if !ok {
+ flowRemoveData = pendingFlowRemoveData{
+ pendingFlowRemoveCount: 0,
+ allFlowsRemoved: make(chan struct{}),
+ }
+ }
+ flowRemoveData.pendingFlowRemoveCount++
+ dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
+
+ log.Debugw("current flow remove count after increment",
+ log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID,
+ "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+ }
+}
+
+func (dh *DeviceHandler) decrementActiveFlowRemoveCount(flow *of.OfpFlowStats) {
+ inPort, outPort := getPorts(flow)
+ log.Debugw("decrement flow remove count for inPort outPort", log.Fields{"inPort": inPort, "outPort": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ log.Debugw("decrement flow remove count for subscriber", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ log.Fatalf("flow remove key not found", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+ } else {
+ if val.pendingFlowRemoveCount > 0 {
+ val.pendingFlowRemoveCount--
+ }
+ log.Debugw("current flow remove count after decrement",
+ log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID,
+ "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+ // If all flow removes have finished, then close the channel to signal the receiver
+ // to go ahead with flow adds.
+ if val.pendingFlowRemoveCount == 0 {
+ close(val.allFlowsRemoved)
+ delete(dh.pendingFlowRemoveDataPerSubscriber, key)
+ return
+ }
+ dh.pendingFlowRemoveDataPerSubscriber[key] = val
+ }
+ }
+}
+
+func (dh *DeviceHandler) waitForFlowRemoveToFinish(flow *of.OfpFlowStats) {
+ var flowRemoveData pendingFlowRemoveData
+ var ok bool
+ inPort, outPort := getPorts(flow)
+ log.Debugw("wait for flow remove to finish for inPort outPort", log.Fields{"inPort": inPort, "outPort": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ log.Debugw("wait for flow remove to finish for subscriber", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+
+ dh.lockDevice.RLock()
+ if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ log.Debugw("no pending flow to remove", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+ dh.lockDevice.RUnlock()
+ return
+ }
+ dh.lockDevice.RUnlock()
+
+ // Wait for all flow removes to finish first
+ <-flowRemoveData.allFlowsRemoved
+
+ log.Debugw("all flows cleared, handling flow add now", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID})
+ }
+}
+
+func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
+ inPort := getInPortFromFlow(flow)
+ outPort := getOutPortFromFlow(flow)
+
+ if inPort == InvalidPort || outPort == InvalidPort {
+ return inPort, outPort
+ }
+
+ if isControllerFlow := IsControllerBoundFlow(outPort); isControllerFlow {
+ /* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows */
+ if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
+ if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+ return uniPort, outPort
+ }
+ }
+ } else {
+ // Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
+ if portType := IntfIDToPortTypeName(outPort); portType == voltha.Port_PON_OLT {
+ if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+ return inPort, uniPort
+ }
+ // Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
+ } else if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
+ if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
+ return uniPort, outPort
+ }
+ }
+ }
+
+ return InvalidPort, InvalidPort
+}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b229f82..0ca3a7f 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,6 +22,7 @@
"crypto/md5"
"encoding/hex"
"encoding/json"
+ "errors"
"fmt"
"math/big"
"strings"
@@ -1727,7 +1728,7 @@
}
//RemoveFlow removes the flow from the device
-func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
log.Debugw("Removing Flow", log.Fields{"flow": flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -1739,21 +1740,39 @@
log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
} else {
log.Error("Invalid output port in action")
- return
+ return olterrors.NewErrInvalidValue(log.Fields{"invalid-out-port-action": 0}, nil)
}
}
}
if flows.HasGroup(flow) {
direction = Multicast
+ f.clearFlowFromResourceManager(ctx, flow, direction)
+ return nil
} else if IsUpstream(actionInfo[Output].(uint32)) {
direction = Upstream
} else {
direction = Downstream
}
- f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
- return
+ _, intfID, onuID, uniID, _, _, err := FlowExtractInfo(flow, direction)
+ if err != nil {
+ return err
+ }
+
+ userKey := tpLockKey{intfID, onuID, uniID}
+
+ // Serialize flow removes on a per subscriber basis
+ if f.perUserFlowHandleLock.TryLock(userKey) {
+ f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
+ f.perUserFlowHandleLock.Unlock(userKey)
+ } else {
+ // Ideally this should never happen
+ log.Errorw("failed to acquire lock to remove flow, flow remove aborted", log.Fields{"flow": flow})
+ return errors.New("failed-to-acquire-per-user-lock")
+ }
+
+ return nil
}
func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,