VOL-3521 : scale: intermittent issue - voltha complains that different meter is in use for subscriber
- Process incoming flows on a per ONU basis using channels per ONU
Change-Id: I0f375d90d786a0135bb51ce18036e5297dc7297b
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index b87ca12..a098387 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1424,26 +1424,25 @@
if flows != nil {
for _, flow := range flows.ToRemove.Items {
- ponIf := dh.getPonIfFromFlow(ctx, flow)
+ ponIf := dh.getPonIfFromFlow(flow)
logger.Debugw(ctx, "removing-flow",
log.Fields{"device-id": device.Id,
"ponIf": ponIf,
"flowToRemove": flow})
- err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
+ err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, flow := range flows.ToAdd.Items {
- ponIf := dh.getPonIfFromFlow(ctx, flow)
+ ponIf := dh.getPonIfFromFlow(flow)
logger.Debugw(ctx, "adding-flow",
log.Fields{"device-id": device.Id,
"ponIf": ponIf,
"flowToAdd": flow})
- // If there are active Flow Remove in progress for a given subscriber, wait until it completes
- err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
+ err := dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
if err != nil {
errorsList = append(errorsList, err)
}
@@ -2082,15 +2081,6 @@
"serial-number": onuDevice.(*OnuDevice).serialNumber}, err).Log()
}
- for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
- logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
- log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
- dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
- logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
- log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
- // TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
- }
-
onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
@@ -2245,7 +2235,7 @@
return resp, nil
}
-func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+func (dh *DeviceHandler) getPonIfFromFlow(flow *of.OfpFlowStats) uint32 {
// Default to PON0
var intfID uint32
inPort, outPort := getPorts(flow)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 311a371..68309ad 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -44,7 +44,7 @@
)
const (
- NumPonPorts = 2
+ NumPonPorts = 16
OnuIDStart = 1
OnuIDEnd = 32
AllocIDStart = 1
@@ -160,7 +160,7 @@
openOLT := &OpenOLT{coreProxy: cp, adapterProxy: ap, eventProxy: ep, config: cfg}
dh := NewDeviceHandler(cp, ap, ep, device, openOLT)
oopRanges := []*oop.DeviceInfo_DeviceResourceRanges{{
- IntfIds: []uint32{0, 1},
+ IntfIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Technology: "xgs-pon",
Pools: []*oop.DeviceInfo_DeviceResourceRanges_Pool{{}},
}}
@@ -193,15 +193,16 @@
ponmgr := &ponrmgr.PONResourceManager{
DeviceID: "onu-1",
- IntfIDs: []uint32{0, 1},
+ IntfIDs: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
KVStore: &db.Backend{
Client: &mocks.MockKVClient{},
},
PonResourceRanges: ranges,
SharedIdxByType: sharedIdxByType,
}
- dh.resourceMgr.ResourceMgrs[0] = ponmgr
- dh.resourceMgr.ResourceMgrs[1] = ponmgr
+ for i := 0; i < NumPonPorts; i++ {
+ dh.resourceMgr.ResourceMgrs[uint32(i)] = ponmgr
+ }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e1bc9aa..009b43c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -35,7 +35,6 @@
"strings"
"sync"
- "github.com/EagleChen/mapmutex"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -141,23 +140,13 @@
//NoneUniID constant
NoneUniID = -1
- // MapMutex
- maxRetry = 300
- maxDelay = 100000000
- baseDelay = 10000000
- factor = 1.1
- jitter = 0.2
+ // Max number of flows that can be queued per ONU
+ maxConcurrentFlowsPerOnu = 20
bitMapPrefix = "0b"
pbit1 = '1'
)
-type tpLockKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -170,13 +159,6 @@
flowMetadata *voltha.FlowMetadata
}
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
type subscriberDataPathFlowIDKey struct {
intfID uint32
@@ -186,12 +168,16 @@
tpID uint32
}
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
- pendingFlowRemoveCount uint32
- allFlowsRemoved chan struct{}
+// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
+// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
+// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
+// flow and processes it serially
+type flowControlBlock struct {
+ ctx context.Context // Flow handler context
+ addFlow bool // if true flow to be added, else removed
+ flow *voltha.OfpFlowStats // Flow message
+ flowMetadata *voltha.FlowMetadata // FlowMetadata that contains flow meter information. This can be nil for Flow remove
+ errChan *chan error // channel to report the Flow handling error
}
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
@@ -215,21 +201,14 @@
// We need to have a global lock on the onuGemInfo map
onuGemInfoLock sync.RWMutex
- // The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
- perUserFlowHandleLock *mapmutex.Mutex
-
- // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
- // subscriber basis for the number of pending flow removes. This data is used
- // to process all the flow removes for a subscriber before handling flow adds.
- // Interleaving flow delete and flow add processing has known to cause PON resource
- // management contentions on a per subscriber bases, so we need ensure ordering.
- pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
- pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
-
// Map of voltha flowID associated with subscriberDataPathFlowIDKey
// This information is not persisted on Kv store and hence should be reconciled on adapter restart
subscriberDataPathFlowIDMap map[subscriberDataPathFlowIDKey]uint64
subscriberDataPathFlowIDMapLock sync.RWMutex
+
+ // Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
+ // A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
+ incomingFlows []chan flowControlBlock
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,15 +227,24 @@
return nil
}
flowMgr.onuIdsLock = sync.RWMutex{}
- flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
flowMgr.packetInGemPortLock = sync.RWMutex{}
flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
- flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+
+ // Create a slice of buffered channels for handling concurrent flows per ONU.
+ // The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
+ flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+ for i := range flowMgr.incomingFlows {
+ flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+ // Spin up a go routine to handling incoming flows (add/remove).
+ // There will be on go routine per ONU.
+ // This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
+ go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+ }
+
//Load the onugem info cache from kv store on flowmanager start
if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
@@ -337,51 +325,36 @@
"uni": uni,
"device-id": f.deviceHandler.device.Id})
- tpLockMapKey := tpLockKey{intfID, onuID, uniID}
- if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
- logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "port-no": portNo,
- "classifier": classifierInfo,
- "action": actionInfo,
- "usmeter-id": UsMeterID,
- "dsmeter-id": DsMeterID,
- "tp-id": TpID})
- allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
- if allocID == 0 || gemPorts == nil || TpInst == nil {
- logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
- f.perUserFlowHandleLock.Unlock(tpLockMapKey)
- return olterrors.NewErrNotFound(
- "alloc-id-gem-ports-tp-unavailable",
- nil, nil)
- }
- args := make(map[string]uint32)
- args[IntfID] = intfID
- args[OnuID] = onuID
- args[UniID] = uniID
- args[PortNo] = portNo
- args[AllocID] = allocID
-
- /* Flows can be added specific to gemport if p-bits are received.
- * If no pbit mentioned then adding flows for all gemports
- */
- f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
- f.perUserFlowHandleLock.Unlock(tpLockMapKey)
- } else {
- cause := "failed-to-acquire-per-user-flow-handle-lock"
- fields := log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "flow-id": flow.Id,
- "flow-cookie": flow.Cookie,
- "device-id": f.deviceHandler.device.Id}
- logger.Errorw(ctx, cause, fields)
- return olterrors.NewErrAdapter(cause, fields, nil)
+ logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "port-no": portNo,
+ "classifier": classifierInfo,
+ "action": actionInfo,
+ "usmeter-id": UsMeterID,
+ "dsmeter-id": DsMeterID,
+ "tp-id": TpID})
+ allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+ if allocID == 0 || gemPorts == nil || TpInst == nil {
+ logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
+ return olterrors.NewErrNotFound(
+ "alloc-id-gem-ports-tp-unavailable",
+ nil, nil)
}
+ args := make(map[string]uint32)
+ args[IntfID] = intfID
+ args[OnuID] = onuID
+ args[UniID] = uniID
+ args[PortNo] = portNo
+ args[AllocID] = allocID
+
+ /* Flows can be added specific to gemport if p-bits are received.
+ * If no pbit mentioned then adding flows for all gemports
+ */
+ f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+
return nil
}
@@ -676,10 +649,18 @@
return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
log.Fields{
"intf-id": sq.intfID,
- "traffic-schedulers": TrafficSched}, err)
+ "traffic-schedulers": TrafficSched,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort}, err)
}
- logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
+ logger.Infow(ctx, "removed-traffic-schedulers-successfully",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort})
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
@@ -690,13 +671,21 @@
log.Fields{
"onu": sq.onuID,
"meter": KVStoreMeter.MeterId,
- "device-id": f.deviceHandler.device.Id}, err)
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort}, err)
}
logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
log.Fields{
"meter-id": KVStoreMeter.MeterId,
"dir": Direction,
- "device-id": f.deviceHandler.device.Id})
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort})
return err
}
@@ -710,7 +699,6 @@
allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
-
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
logger.Debugw(ctx, "creating-new-tcont-and-gem", log.Fields{
@@ -2058,9 +2046,6 @@
//RemoveFlow removes the flow from the device
func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
- f.incrementActiveFlowRemoveCount(ctx, flow)
- defer f.decrementActiveFlowRemoveCount(ctx, flow)
-
logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -2086,22 +2071,8 @@
direction = Downstream
}
- _, intfID, onuID, uniID, _, _, err := FlowExtractInfo(ctx, flow, direction)
- if err != nil {
- return err
- }
-
- userKey := tpLockKey{intfID, onuID, uniID}
-
// Serialize flow removes on a per subscriber basis
- if f.perUserFlowHandleLock.TryLock(userKey) {
- err = f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
- f.perUserFlowHandleLock.Unlock(userKey)
- } else {
- // Ideally this should never happen
- logger.Errorw(ctx, "failed-to-acquire-lock-to-remove-flow--remove-aborted", log.Fields{"flow": flow})
- return errors.New("failed-to-acquire-per-user-lock")
- }
+ err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
return err
}
@@ -2122,6 +2093,61 @@
return false
}
+// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
+func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+ // Step1 : Fill flowControlBlock
+ // Step2 : Push the flowControlBlock to ONU channel
+ // Step3 : Wait on response channel for response
+ // Step4 : Return error value
+ logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+ errChan := make(chan error)
+ flowCb := flowControlBlock{
+ ctx: ctx,
+ addFlow: addFlow,
+ flow: flow,
+ flowMetadata: flowMetadata,
+ errChan: &errChan,
+ }
+ inPort, outPort := getPorts(flow)
+ var onuID uint32
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
+ }
+ // inPort or outPort is InvalidPort for trap-from-nni flows.
+ // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+ // Send the flowCb on the ONU flow channel
+ f.incomingFlows[onuID] <- flowCb
+ // Wait on the channel for flow handlers return value
+ err := <-errChan
+ logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+ return err
+}
+
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+ for {
+ // block on the channel to receive an incoming flow
+ // process the flow completely before proceeding to handle the next flow
+ flowCb := <-subscriberFlowChannel
+ if flowCb.addFlow {
+ logger.Debugw(flowCb.ctx, "adding-flow",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "flowToAdd": flowCb.flow})
+ err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ } else {
+ logger.Debugw(flowCb.ctx, "removing-flow",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "flowToRemove": flowCb.flow})
+ err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ }
+ }
+}
+
// AddFlow add flow to device
// nolint: gocyclo
func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
@@ -2182,11 +2208,6 @@
return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
- // If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
- // Wait for any FlowRemoves for that specific subscriber to finish first
- // The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
- f.waitForFlowRemoveToFinish(ctx, flow)
-
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2215,28 +2236,6 @@
return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
}
-//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
-func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
- var flowRemoveData pendingFlowRemoveData
- var ok bool
-
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- f.pendingFlowRemoveDataPerSubscriberLock.RLock()
- if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
- return
- }
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
-
- logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-}
-
// handleFlowWithGroup adds multicast flow to the device.
func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
classifierInfo[PacketTagType] = DoubleTag
@@ -2430,8 +2429,6 @@
"onu-gem": f.onuGemInfo})
}
-// This function Lookup maps by serialNumber or (intfId, gemPort)
-
//getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
@@ -3263,91 +3260,6 @@
return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
}
-func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-
- f.pendingFlowRemoveDataPerSubscriberLock.Lock()
- defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
- if !ok {
- flowRemoveData = pendingFlowRemoveData{
- pendingFlowRemoveCount: 0,
- allFlowsRemoved: make(chan struct{}),
- }
- }
- flowRemoveData.pendingFlowRemoveCount++
- f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
- logger.Debugw(ctx, "current-flow-remove-count–increment",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- }
-}
-
-func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
- f.pendingFlowRemoveDataPerSubscriberLock.Lock()
- defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- } else {
- if val.pendingFlowRemoveCount > 0 {
- val.pendingFlowRemoveCount--
- }
- logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- // If all flow removes have finished, then close the channel to signal the receiver
- // to go ahead with flow adds.
- if val.pendingFlowRemoveCount == 0 {
- close(val.allFlowsRemoved)
- delete(f.pendingFlowRemoveDataPerSubscriber, key)
- return
- }
- f.pendingFlowRemoveDataPerSubscriber[key] = val
- }
- }
-}
-
-func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
- var flowRemoveData pendingFlowRemoveData
- var ok bool
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- f.pendingFlowRemoveDataPerSubscriberLock.RLock()
- if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
- return
- }
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
-
- logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- }
-}
-
// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 9464830..d8caa38 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -927,37 +927,37 @@
actionInfo3 := make(map[string]interface{})
classifierInfo4 := make(map[string]interface{})
actionInfo4 := make(map[string]interface{})
- flowState, _ := fu.MkFlowStat(fa)
- flowState2, _ := fu.MkFlowStat(fa2)
- flowState3, _ := fu.MkFlowStat(fa3)
- flowState4, _ := fu.MkFlowStat(fa4)
- formulateClassifierInfoFromFlow(ctx, classifierInfo, flowState)
- formulateClassifierInfoFromFlow(ctx, classifierInfo2, flowState2)
- formulateClassifierInfoFromFlow(ctx, classifierInfo3, flowState3)
- formulateClassifierInfoFromFlow(ctx, classifierInfo4, flowState4)
+ flow, _ := fu.MkFlowStat(fa)
+ flow2, _ := fu.MkFlowStat(fa2)
+ flow3, _ := fu.MkFlowStat(fa3)
+ flow4, _ := fu.MkFlowStat(fa4)
+ formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+ formulateClassifierInfoFromFlow(ctx, classifierInfo2, flow2)
+ formulateClassifierInfoFromFlow(ctx, classifierInfo3, flow3)
+ formulateClassifierInfoFromFlow(ctx, classifierInfo4, flow4)
- err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flowState)
+ err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flow)
if err != nil {
// Error logging is already done in the called function
// So just return in case of error
return
}
- err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flowState2)
+ err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flow2)
if err != nil {
// Error logging is already done in the called function
// So just return in case of error
return
}
- err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flowState3)
+ err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flow3)
if err != nil {
// Error logging is already done in the called function
// So just return in case of error
return
}
- err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flowState4)
+ err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flow4)
if err != nil {
// Error logging is already done in the called function
// So just return in case of error
@@ -1035,7 +1035,7 @@
args: nil,
classifierInfo: classifierInfo,
actionInfo: actionInfo,
- flow: flowState,
+ flow: flow,
gemPort: 1,
intfID: 1,
onuID: 1,
@@ -1054,7 +1054,7 @@
args: nil,
classifierInfo: classifierInfo2,
actionInfo: actionInfo2,
- flow: flowState2,
+ flow: flow2,
gemPort: 1,
intfID: 1,
onuID: 1,
@@ -1073,7 +1073,7 @@
args: nil,
classifierInfo: classifierInfo3,
actionInfo: actionInfo3,
- flow: flowState3,
+ flow: flow3,
gemPort: 1,
intfID: 1,
onuID: 1,
@@ -1092,7 +1092,7 @@
args: nil,
classifierInfo: classifierInfo4,
actionInfo: actionInfo4,
- flow: flowState4,
+ flow: flow4,
gemPort: 1,
intfID: 1,
onuID: 1,
@@ -1168,3 +1168,301 @@
return
}
}
+
+func TestOpenOltFlowMgr_TestRouteFlowToOnuChannel(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/core", log.DebugLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager", log.DebugLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/pkg/mocks", log.DebugLevel)
+ kw := make(map[string]uint64)
+ kw["table_id"] = 1
+ kw["meter_id"] = 1
+ kw["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+
+ flowMetadata1 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+ {
+ Flags: 5,
+ MeterId: 1,
+ Bands: []*voltha.OfpMeterBandHeader{
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 16000,
+ BurstSize: 30,
+ },
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 32000,
+ BurstSize: 30,
+ },
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 64000,
+ BurstSize: 30,
+ },
+ },
+ },
+ }}
+
+ flowMetadata2 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+ {
+ Flags: 5,
+ MeterId: 2,
+ Bands: []*voltha.OfpMeterBandHeader{
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 16000,
+ BurstSize: 30,
+ },
+ },
+ },
+ }}
+
+ // Downstream LLDP Trap from NNI0 flow
+ fa0 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1048576),
+ fu.EthType(35020),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(4294967293),
+ },
+ KV: kw,
+ }
+
+ // Upstream flow DHCP flow - ONU1 UNI0 PON0
+ fa1 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(536870912),
+ fu.Metadata_ofp(1),
+ fu.IpProto(17), // dhcp
+ fu.VlanPcp(0),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ fu.TunnelId(16),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+ fu.Output(2147483645),
+ fu.PushVlan(0x8100),
+ },
+ KV: kw,
+ }
+
+ // Upstream EAPOL - ONU1 UNI0 PON0
+ fa2 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(536870912),
+ fu.Metadata_ofp(1),
+ fu.EthType(0x888E),
+ fu.VlanPcp(1),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257),
+ fu.TunnelId(16),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+ fu.Output(2147483645),
+ fu.PushVlan(0x8100),
+ },
+ KV: kw,
+ }
+
+ // Upstream HSIA - ONU1 UNI0 PON0
+ fa3 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(536870912),
+ fu.Metadata_ofp(1),
+ //fu.EthType(0x8100),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+ fu.Output(1048576),
+ fu.PushVlan(0x8100),
+ },
+ KV: kw,
+ }
+
+ // Downstream HSIA - ONU1 UNI0 PON0
+ fa4 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1048576),
+ fu.Metadata_ofp(1),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ fu.VlanPcp(1),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+ fu.Output(536870912),
+ fu.PopVlan(),
+ },
+ KV: kw,
+ }
+
+ // Upstream flow DHCP flow - ONU1 UNI0 PON15
+ fa5 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(536870927),
+ fu.Metadata_ofp(1),
+ fu.IpProto(17), // dhcp
+ fu.VlanPcp(0),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ fu.TunnelId(61456),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259)),
+ fu.Output(2147483645),
+ fu.PushVlan(0x8100),
+ },
+ KV: kw,
+ }
+ // Upstream EAPOL - ONU1 UNI0 PON15
+ fa6 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(536870927),
+ fu.Metadata_ofp(1),
+ fu.EthType(0x888E),
+ fu.VlanPcp(1),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259),
+ fu.TunnelId(61456),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+ fu.Output(2147483645),
+ fu.PushVlan(0x8100),
+ },
+ KV: kw,
+ }
+ flow0, _ := fu.MkFlowStat(fa0)
+ flow1, _ := fu.MkFlowStat(fa1)
+ flow2, _ := fu.MkFlowStat(fa2)
+ flow3, _ := fu.MkFlowStat(fa3)
+ flow4, _ := fu.MkFlowStat(fa4)
+
+ flow5, _ := fu.MkFlowStat(fa5)
+ flow6, _ := fu.MkFlowStat(fa6)
+
+ type args struct {
+ ctx context.Context
+ flow *ofp.OfpFlowStats
+ addFlow bool
+ flowMetadata *voltha.FlowMetadata
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ returnedErr error
+ }{
+ {
+ name: "RouteFlowToOnuChannel-0",
+ args: args{
+ ctx: ctx,
+ flow: flow0,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-1",
+ args: args{
+ ctx: ctx,
+ flow: flow1,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-2",
+ args: args{
+ ctx: ctx,
+ flow: flow2,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-3",
+ args: args{
+ ctx: ctx,
+ flow: flow3,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-4",
+ args: args{
+ ctx: ctx,
+ flow: flow4,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-5",
+ args: args{
+ ctx: ctx,
+ flow: flow1,
+ addFlow: false,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-6",
+ args: args{
+ ctx: ctx,
+ flow: flow1,
+ addFlow: true,
+ flowMetadata: &flowMetadata2,
+ },
+ wantErr: true,
+ },
+ {
+ name: "RouteFlowToOnuChannel-7",
+ args: args{
+ ctx: ctx,
+ flow: flow5,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ {
+ name: "RouteFlowToOnuChannel-8",
+ args: args{
+ ctx: ctx,
+ flow: flow6,
+ addFlow: true,
+ flowMetadata: &flowMetadata1,
+ },
+ wantErr: false,
+ },
+ }
+
+ var wg sync.WaitGroup
+ defer wg.Wait() // wait for all go routines to complete
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ wg.Add(1) // one per go routine
+ go func() {
+ defer wg.Done()
+ tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+ if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
+ t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
+ }
+ }()
+ })
+ }
+}