VOL-3521 : scale: intermittent issue - voltha complains that different meter is in use for subscriber
- Process incoming flows on a per ONU basis using channels per ONU
Change-Id: I0f375d90d786a0135bb51ce18036e5297dc7297b
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e1bc9aa..009b43c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -35,7 +35,6 @@
"strings"
"sync"
- "github.com/EagleChen/mapmutex"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -141,23 +140,13 @@
//NoneUniID constant
NoneUniID = -1
- // MapMutex
- maxRetry = 300
- maxDelay = 100000000
- baseDelay = 10000000
- factor = 1.1
- jitter = 0.2
+ // Max number of flows that can be queued per ONU
+ maxConcurrentFlowsPerOnu = 20
bitMapPrefix = "0b"
pbit1 = '1'
)
-type tpLockKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -170,13 +159,6 @@
flowMetadata *voltha.FlowMetadata
}
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
type subscriberDataPathFlowIDKey struct {
intfID uint32
@@ -186,12 +168,16 @@
tpID uint32
}
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
- pendingFlowRemoveCount uint32
- allFlowsRemoved chan struct{}
+// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
+// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
+// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
+// flow and processes it serially
+type flowControlBlock struct {
+ ctx context.Context // Flow handler context
+ addFlow bool // if true flow to be added, else removed
+ flow *voltha.OfpFlowStats // Flow message
+ flowMetadata *voltha.FlowMetadata // FlowMetadata that contains flow meter information. This can be nil for Flow remove
+ errChan *chan error // channel to report the Flow handling error
}
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
@@ -215,21 +201,14 @@
// We need to have a global lock on the onuGemInfo map
onuGemInfoLock sync.RWMutex
- // The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
- perUserFlowHandleLock *mapmutex.Mutex
-
- // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
- // subscriber basis for the number of pending flow removes. This data is used
- // to process all the flow removes for a subscriber before handling flow adds.
- // Interleaving flow delete and flow add processing has known to cause PON resource
- // management contentions on a per subscriber bases, so we need ensure ordering.
- pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
- pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
-
// Map of voltha flowID associated with subscriberDataPathFlowIDKey
// This information is not persisted on Kv store and hence should be reconciled on adapter restart
subscriberDataPathFlowIDMap map[subscriberDataPathFlowIDKey]uint64
subscriberDataPathFlowIDMapLock sync.RWMutex
+
+ // Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
+ // A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
+ incomingFlows []chan flowControlBlock
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,15 +227,24 @@
return nil
}
flowMgr.onuIdsLock = sync.RWMutex{}
- flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
flowMgr.packetInGemPortLock = sync.RWMutex{}
flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
- flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+
+ // Create a slice of buffered channels for handling concurrent flows per ONU.
+ // The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
+ flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+ for i := range flowMgr.incomingFlows {
+ flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+ // Spin up a go routine to handling incoming flows (add/remove).
+ // There will be on go routine per ONU.
+ // This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
+ go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+ }
+
//Load the onugem info cache from kv store on flowmanager start
if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
@@ -337,51 +325,36 @@
"uni": uni,
"device-id": f.deviceHandler.device.Id})
- tpLockMapKey := tpLockKey{intfID, onuID, uniID}
- if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
- logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "port-no": portNo,
- "classifier": classifierInfo,
- "action": actionInfo,
- "usmeter-id": UsMeterID,
- "dsmeter-id": DsMeterID,
- "tp-id": TpID})
- allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
- if allocID == 0 || gemPorts == nil || TpInst == nil {
- logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
- f.perUserFlowHandleLock.Unlock(tpLockMapKey)
- return olterrors.NewErrNotFound(
- "alloc-id-gem-ports-tp-unavailable",
- nil, nil)
- }
- args := make(map[string]uint32)
- args[IntfID] = intfID
- args[OnuID] = onuID
- args[UniID] = uniID
- args[PortNo] = portNo
- args[AllocID] = allocID
-
- /* Flows can be added specific to gemport if p-bits are received.
- * If no pbit mentioned then adding flows for all gemports
- */
- f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
- f.perUserFlowHandleLock.Unlock(tpLockMapKey)
- } else {
- cause := "failed-to-acquire-per-user-flow-handle-lock"
- fields := log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID,
- "flow-id": flow.Id,
- "flow-cookie": flow.Cookie,
- "device-id": f.deviceHandler.device.Id}
- logger.Errorw(ctx, cause, fields)
- return olterrors.NewErrAdapter(cause, fields, nil)
+ logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "port-no": portNo,
+ "classifier": classifierInfo,
+ "action": actionInfo,
+ "usmeter-id": UsMeterID,
+ "dsmeter-id": DsMeterID,
+ "tp-id": TpID})
+ allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+ if allocID == 0 || gemPorts == nil || TpInst == nil {
+ logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
+ return olterrors.NewErrNotFound(
+ "alloc-id-gem-ports-tp-unavailable",
+ nil, nil)
}
+ args := make(map[string]uint32)
+ args[IntfID] = intfID
+ args[OnuID] = onuID
+ args[UniID] = uniID
+ args[PortNo] = portNo
+ args[AllocID] = allocID
+
+ /* Flows can be added specific to gemport if p-bits are received.
+ * If no pbit mentioned then adding flows for all gemports
+ */
+ f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+
return nil
}
@@ -676,10 +649,18 @@
return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
log.Fields{
"intf-id": sq.intfID,
- "traffic-schedulers": TrafficSched}, err)
+ "traffic-schedulers": TrafficSched,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort}, err)
}
- logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
+ logger.Infow(ctx, "removed-traffic-schedulers-successfully",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort})
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
@@ -690,13 +671,21 @@
log.Fields{
"onu": sq.onuID,
"meter": KVStoreMeter.MeterId,
- "device-id": f.deviceHandler.device.Id}, err)
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort}, err)
}
logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
log.Fields{
"meter-id": KVStoreMeter.MeterId,
"dir": Direction,
- "device-id": f.deviceHandler.device.Id})
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": sq.intfID,
+ "onu-id": sq.onuID,
+ "uni-id": sq.uniID,
+ "uni-port": sq.uniPort})
return err
}
@@ -710,7 +699,6 @@
allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
-
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
logger.Debugw(ctx, "creating-new-tcont-and-gem", log.Fields{
@@ -2058,9 +2046,6 @@
//RemoveFlow removes the flow from the device
func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
- f.incrementActiveFlowRemoveCount(ctx, flow)
- defer f.decrementActiveFlowRemoveCount(ctx, flow)
-
logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -2086,22 +2071,8 @@
direction = Downstream
}
- _, intfID, onuID, uniID, _, _, err := FlowExtractInfo(ctx, flow, direction)
- if err != nil {
- return err
- }
-
- userKey := tpLockKey{intfID, onuID, uniID}
-
// Serialize flow removes on a per subscriber basis
- if f.perUserFlowHandleLock.TryLock(userKey) {
- err = f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
- f.perUserFlowHandleLock.Unlock(userKey)
- } else {
- // Ideally this should never happen
- logger.Errorw(ctx, "failed-to-acquire-lock-to-remove-flow--remove-aborted", log.Fields{"flow": flow})
- return errors.New("failed-to-acquire-per-user-lock")
- }
+ err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
return err
}
@@ -2122,6 +2093,61 @@
return false
}
+// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
+func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+ // Step1 : Fill flowControlBlock
+ // Step2 : Push the flowControlBlock to ONU channel
+ // Step3 : Wait on response channel for response
+ // Step4 : Return error value
+ logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+ errChan := make(chan error)
+ flowCb := flowControlBlock{
+ ctx: ctx,
+ addFlow: addFlow,
+ flow: flow,
+ flowMetadata: flowMetadata,
+ errChan: &errChan,
+ }
+ inPort, outPort := getPorts(flow)
+ var onuID uint32
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
+ }
+ // inPort or outPort is InvalidPort for trap-from-nni flows.
+ // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+ // Send the flowCb on the ONU flow channel
+ f.incomingFlows[onuID] <- flowCb
+ // Wait on the channel for flow handlers return value
+ err := <-errChan
+ logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+ return err
+}
+
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+ for {
+ // block on the channel to receive an incoming flow
+ // process the flow completely before proceeding to handle the next flow
+ flowCb := <-subscriberFlowChannel
+ if flowCb.addFlow {
+ logger.Debugw(flowCb.ctx, "adding-flow",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "flowToAdd": flowCb.flow})
+ err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ } else {
+ logger.Debugw(flowCb.ctx, "removing-flow",
+ log.Fields{"device-id": f.deviceHandler.device.Id,
+ "flowToRemove": flowCb.flow})
+ err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ }
+ }
+}
+
// AddFlow add flow to device
// nolint: gocyclo
func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
@@ -2182,11 +2208,6 @@
return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
- // If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
- // Wait for any FlowRemoves for that specific subscriber to finish first
- // The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
- f.waitForFlowRemoveToFinish(ctx, flow)
-
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2215,28 +2236,6 @@
return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
}
-//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
-func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
- var flowRemoveData pendingFlowRemoveData
- var ok bool
-
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- f.pendingFlowRemoveDataPerSubscriberLock.RLock()
- if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
- return
- }
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
-
- logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-}
-
// handleFlowWithGroup adds multicast flow to the device.
func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
classifierInfo[PacketTagType] = DoubleTag
@@ -2430,8 +2429,6 @@
"onu-gem": f.onuGemInfo})
}
-// This function Lookup maps by serialNumber or (intfId, gemPort)
-
//getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
@@ -3263,91 +3260,6 @@
return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
}
-func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
-
- f.pendingFlowRemoveDataPerSubscriberLock.Lock()
- defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
- if !ok {
- flowRemoveData = pendingFlowRemoveData{
- pendingFlowRemoveCount: 0,
- allFlowsRemoved: make(chan struct{}),
- }
- }
- flowRemoveData.pendingFlowRemoveCount++
- f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
- logger.Debugw(ctx, "current-flow-remove-count–increment",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- }
-}
-
-func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
- f.pendingFlowRemoveDataPerSubscriberLock.Lock()
- defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
-
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- } else {
- if val.pendingFlowRemoveCount > 0 {
- val.pendingFlowRemoveCount--
- }
- logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- // If all flow removes have finished, then close the channel to signal the receiver
- // to go ahead with flow adds.
- if val.pendingFlowRemoveCount == 0 {
- close(val.allFlowsRemoved)
- delete(f.pendingFlowRemoveDataPerSubscriber, key)
- return
- }
- f.pendingFlowRemoveDataPerSubscriber[key] = val
- }
- }
-}
-
-func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
- var flowRemoveData pendingFlowRemoveData
- var ok bool
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- f.pendingFlowRemoveDataPerSubscriberLock.RLock()
- if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
- return
- }
- f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
-
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
-
- logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- }
-}
-
// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)