[VOL-2786] : Handle flow deletes completely before proceeding onto
flow adds for a subscriber.
Also serialize flow deletes on a per subscriber basis.
Flow Adds are already serialized on a per subscriber basis.
Change-Id: If0a85780f76c3fdc6496074356131c5cdfbfd577
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b229f82..0ca3a7f 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,6 +22,7 @@
"crypto/md5"
"encoding/hex"
"encoding/json"
+ "errors"
"fmt"
"math/big"
"strings"
@@ -1727,7 +1728,7 @@
}
//RemoveFlow removes the flow from the device
-func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
log.Debugw("Removing Flow", log.Fields{"flow": flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -1739,21 +1740,39 @@
log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
} else {
log.Error("Invalid output port in action")
- return
+ return olterrors.NewErrInvalidValue(log.Fields{"invalid-out-port-action": 0}, nil)
}
}
}
if flows.HasGroup(flow) {
direction = Multicast
+ f.clearFlowFromResourceManager(ctx, flow, direction)
+ return nil
} else if IsUpstream(actionInfo[Output].(uint32)) {
direction = Upstream
} else {
direction = Downstream
}
- f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
- return
+ _, intfID, onuID, uniID, _, _, err := FlowExtractInfo(flow, direction)
+ if err != nil {
+ return err
+ }
+
+ userKey := tpLockKey{intfID, onuID, uniID}
+
+ // Serialize flow removes on a per subscriber basis
+ if f.perUserFlowHandleLock.TryLock(userKey) {
+ f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
+ f.perUserFlowHandleLock.Unlock(userKey)
+ } else {
+ // Ideally this should never happen
+ log.Errorw("failed to acquire lock to remove flow, flow remove aborted", log.Fields{"flow": flow})
+ return errors.New("failed-to-acquire-per-user-lock")
+ }
+
+ return nil
}
func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,