VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
had a different meaning and the name seems to have been blindly ported to 2.x adapter
and does not make sense anymore
Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d68b8f8..83da756 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -24,11 +24,6 @@
"encoding/json"
"errors"
"fmt"
- "math/big"
- "strings"
- "sync"
- "time"
-
"github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
@@ -39,6 +34,9 @@
openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"github.com/opencord/voltha-protos/v3/go/voltha"
+ "math/big"
+ "strings"
+ "sync"
//deepcopy "github.com/getlantern/deepcopy"
"github.com/EagleChen/mapmutex"
@@ -187,12 +185,6 @@
gemPort uint32
}
-type pendingFlowDeleteKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
type tpLockKey struct {
intfID uint32
onuID uint32
@@ -211,15 +203,26 @@
flowMetadata *voltha.FlowMetadata
}
-type queueInfoBrief struct {
- gemPortID uint32
- servicePriority uint32
+// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
+type pendingFlowRemoveDataKey struct {
+ intfID uint32
+ onuID uint32
+ uniID uint32
+}
+
+// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
+// This holds the number of pending flow removes and also a signal channel to
+// to indicate the receiver when all flow removes are handled
+type pendingFlowRemoveData struct {
+ pendingFlowRemoveCount uint32
+ allFlowsRemoved chan struct{}
}
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
techprofile map[uint32]tp.TechProfileIf
deviceHandler *DeviceHandler
+ grpMgr *OpenOltGroupMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
onuIdsLock sync.RWMutex
perGemPortLock *mapmutex.Mutex // lock to be used to access the flowsUsedByGemPort map
@@ -228,21 +231,28 @@
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
// We need to have a global lock on the onuGemInfo map
- onuGemInfoLock sync.RWMutex
- pendingFlowDelete sync.Map
+ onuGemInfoLock sync.RWMutex
// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
- perUserFlowHandleLock *mapmutex.Mutex
- interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+ perUserFlowHandleLock *mapmutex.Mutex
+
+ // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
+ // subscriber basis for the number of pending flow removes. This data is used
+ // to process all the flow removes for a subscriber before handling flow adds.
+ // Interleaving flow delete and flow add processing has known to cause PON resource
+ // management contentions on a per subscriber bases, so we need ensure ordering.
+ pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+ pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr) *OpenOltFlowMgr {
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
var flowMgr OpenOltFlowMgr
var err error
var idx uint32
flowMgr.deviceHandler = dh
+ flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
@@ -250,15 +260,15 @@
return nil
}
flowMgr.onuIdsLock = sync.RWMutex{}
+ flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
ponPorts := rMgr.DevInfo.GetPonPorts()
flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
- flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
+ flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
//Load the onugem info cache from kv store on flowmanager start
for idx = 0; idx < ponPorts; idx++ {
if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -268,7 +278,7 @@
flowMgr.loadFlowIDlistForGem(ctx, idx)
}
//load interface to multicast queue map from kv store
- flowMgr.loadInterfaceToMulticastQueueMap(ctx)
+ flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
@@ -305,7 +315,7 @@
}, nil)
}
-func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) error {
var allocID uint32
@@ -571,16 +581,17 @@
if sq.direction == tp_pb.Direction_DOWNSTREAM {
multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
if len(multicastTrafficQueues) > 0 {
- if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
+ if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
//assumed that there is only one queue per PON for the multicast service
//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
//just put it in interfaceToMcastQueueMap to use for building group members
logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
multicastQueuePerPonPort := multicastTrafficQueues[0]
- f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
+ val := &QueueInfoBrief{
gemPortID: multicastQueuePerPonPort.GemportId,
servicePriority: multicastQueuePerPonPort.Priority,
}
+ f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
//also store the queue info in kv store
if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
@@ -1923,42 +1934,6 @@
return nil
}
-func (f *OpenOltFlowMgr) deletePendingFlows(ctx context.Context, Intf uint32, onuID int32, uniID int32) {
- pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
- if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
- if val.(int) > 0 {
- pnFlDels := val.(int) - 1
- if pnFlDels > 0 {
- logger.Debugw(ctx, "flow-delete-succeeded--more-pending",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "currpendingflowcnt": pnFlDels,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
- } else {
- logger.Debugw(ctx, "all-pending-flow-deletes-handled--removing-entry-from-map",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Delete(pnFlDelKey)
- }
- }
- } else {
- logger.Debugw(ctx, "no-pending-delete-flows-found",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id})
-
- }
-
-}
-
// Once the gemport is released for a given onu, it also has to be cleared from local cache
// which was used for deriving the gemport->logicalPortNo during packet-in.
// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
@@ -1975,6 +1950,7 @@
"onu-id": onuID,
"device-id": f.deviceHandler.device.Id,
"onu-gem": f.onuGemInfo[intfID]})
+
onugem := f.onuGemInfo[intfID]
deleteLoop:
for i, onu := range onugem {
@@ -2028,34 +2004,6 @@
"device-id": f.deviceHandler.device.Id}, err).Log()
}
if len(updatedFlows) == 0 {
- // Do this for subscriber flows only (not trap from NNI flows)
- if onuID != -1 && uniID != -1 {
- pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
- if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
- logger.Debugw(ctx, "creating-entry-for-pending-flow-delete",
- log.Fields{
- "flow-id": flowID,
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Store(pnFlDelKey, 1)
- } else {
- pnFlDels := val.(int) + 1
- logger.Debugw(ctx, "updating-flow-delete-entry",
- log.Fields{
- "flow-id": flowID,
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "currPendingFlowCnt": pnFlDels,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
- }
-
- defer f.deletePendingFlows(ctx, Intf, onuID, uniID)
- }
-
logger.Debugw(ctx, "releasing-flow-id-to-resource-manager",
log.Fields{
"Intf": Intf,
@@ -2310,72 +2258,6 @@
}
}
-//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
-// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
- classifierInfo := make(map[string]interface{})
- formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
- networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
- if err != nil {
- logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
- return
- }
-
- var onuID = int32(NoneOnuID)
- var uniID = int32(NoneUniID)
- var flowID uint32
-
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
-
- for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
- if flowInfo == nil {
- logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
- log.Fields{
- "intf": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "flow-id": flowID})
- continue
- }
- updatedFlows := *flowInfo
- for i, storedFlow := range updatedFlows {
- if flow.Id == storedFlow.LogicalFlowID {
- removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
- logger.Debugw(ctx, "multicast-flow-to-be-deleted",
- log.Fields{
- "flow": storedFlow,
- "flow-id": flow.Id,
- "device-id": f.deviceHandler.device.Id})
- //remove from device
- if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
- // DKB
- logger.Errorw(ctx, "failed-to-remove-multicast-flow",
- log.Fields{
- "flow-id": flow.Id,
- "error": err})
- return
- }
- logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
- //Remove the Flow from FlowInfo
- updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
- logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
- log.Fields{"flow": storedFlow,
- "err": err})
- return
- }
- //release flow id
- logger.Debugw(ctx, "releasing-multicast-flow-id",
- log.Fields{"flow-id": flowID,
- "interfaceID": networkInterfaceID})
- f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
- }
- }
- }
-}
-
//RemoveFlow removes the flow from the device
func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
@@ -2394,6 +2276,9 @@
}
}
+ f.incrementActiveFlowRemoveCount(ctx, flow)
+ defer f.decrementActiveFlowRemoveCount(ctx, flow)
+
if flows.HasGroup(flow) {
direction = Multicast
f.clearFlowFromResourceManager(ctx, flow, direction)
@@ -2424,24 +2309,6 @@
return nil
}
-func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
- uniID uint32, ch chan bool) {
- pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
- for {
- select {
- case <-time.After(20 * time.Millisecond):
- if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
- logger.Debug(ctx, "pending-flow-deletes-completed")
- ch <- true
- return
- }
- case <-ctx.Done():
- logger.Error(ctx, "flow-delete-wait-handler-routine-canceled")
- return
- }
- }
-}
-
//isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
func isIgmpTrapDownstreamFlow(classifierInfo map[string]interface{}) bool {
if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_ETHERNET_NNI {
@@ -2518,6 +2385,11 @@
return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
+ // If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
+ // Wait for any FlowRemoves for that specific subscriber to finish first
+ // The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
+ f.waitForFlowRemoveToFinish(ctx, flow)
+
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2543,33 +2415,29 @@
logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
}
+ return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+}
- pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
- if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
- logger.Debugw(ctx, "no-pending-flows-found--going-ahead-with-flow-install",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID})
- return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
+func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
+ var flowRemoveData pendingFlowRemoveData
+ var ok bool
+
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+ if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+ return
}
- pendingFlowDelComplete := make(chan bool)
- go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
- select {
- case <-pendingFlowDelComplete:
- logger.Debugw(ctx, "all-pending-flow-deletes-completed",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID})
- return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
- case <-time.After(10 * time.Second):
- return olterrors.NewErrTimeout("pending-flow-deletes",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID}, nil)
- }
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+ // Wait for all flow removes to finish first
+ <-flowRemoveData.allFlowsRemoved
+
+ logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
}
// handleFlowWithGroup adds multicast flow to the device.
@@ -2643,9 +2511,9 @@
}
logger.Info(ctx, "multicast-flow-added-to-device-successfully")
//get cached group
- if group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true); err == nil {
+ if group, _, err := f.grpMgr.getFlowGroupFromKVStore(ctx, groupID, true); err == nil {
//calling groupAdd to set group members after multicast flow creation
- if err := f.ModifyGroup(ctx, group); err != nil {
+ if err := f.grpMgr.ModifyGroup(ctx, group); err != nil {
return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err)
}
//cached group can be removed now
@@ -2681,241 +2549,6 @@
return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
}
-// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
- logger.Infow(ctx, "add-group", log.Fields{"group": group})
- if group == nil {
- return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
- }
-
- groupToOlt := openoltpb2.Group{
- GroupId: group.Desc.GroupId,
- Command: openoltpb2.Group_SET_MEMBERS,
- Action: f.buildGroupAction(),
- }
-
- logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
- _, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
- if err != nil {
- return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
- }
- // group members not created yet. So let's store the group
- if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
- return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
- }
- logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
- return nil
-}
-
-// DeleteGroup deletes a group from the device
-func (f *OpenOltFlowMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
- logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
- if group == nil {
- logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
- return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
- }
-
- groupToOlt := openoltpb2.Group{
- GroupId: group.Desc.GroupId,
- }
-
- logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
- _, err := f.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
- if err != nil {
- logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
- return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
- }
- //remove group from the store
- if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
- return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
- }
- logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
- return nil
-}
-
-//buildGroupAction creates and returns a group action
-func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
- var actionCmd openoltpb2.ActionCmd
- var action openoltpb2.Action
- action.Cmd = &actionCmd
- //pop outer vlan
- action.Cmd.RemoveOuterTag = true
- return &action
-}
-
-// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
- logger.Infow(ctx, "modify-group", log.Fields{"group": group})
- if group == nil || group.Desc == nil {
- return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
- }
-
- newGroup := f.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
- //get existing members of the group
- val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
-
- if err != nil {
- return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
- }
-
- var current *openoltpb2.Group // represents the group on the device
- if groupExists {
- // group already exists
- current = f.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
- logger.Debugw(ctx, "modify-group--group exists",
- log.Fields{
- "group on the device": val,
- "new": group})
- } else {
- current = f.buildGroup(ctx, group.Desc.GroupId, nil)
- }
-
- logger.Debugw(ctx, "modify-group--comparing-current-and-new",
- log.Fields{
- "group on the device": current,
- "new": newGroup})
- // get members to be added
- membersToBeAdded := f.findDiff(current, newGroup)
- // get members to be removed
- membersToBeRemoved := f.findDiff(newGroup, current)
-
- logger.Infow(ctx, "modify-group--differences found", log.Fields{
- "membersToBeAdded": membersToBeAdded,
- "membersToBeRemoved": membersToBeRemoved,
- "groupId": group.Desc.GroupId})
-
- groupToOlt := openoltpb2.Group{
- GroupId: group.Desc.GroupId,
- }
- var errAdd, errRemoved error
- if len(membersToBeAdded) > 0 {
- groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
- groupToOlt.Members = membersToBeAdded
- //execute addMembers
- errAdd = f.callGroupAddRemove(ctx, &groupToOlt)
- }
- if len(membersToBeRemoved) > 0 {
- groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
- groupToOlt.Members = membersToBeRemoved
- //execute removeMembers
- errRemoved = f.callGroupAddRemove(ctx, &groupToOlt)
- }
-
- //save the modified group
- if errAdd == nil && errRemoved == nil {
- if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
- return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
- }
- logger.Infow(ctx, "modify-group-was-success--storing-group",
- log.Fields{
- "group": group,
- "existingGroup": current})
- } else {
- logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
- log.Fields{"group": group})
- if errAdd != nil {
- return errAdd
- }
- return errRemoved
- }
- return nil
-}
-
-//callGroupAddRemove performs add/remove buckets operation for the indicated group
-func (f *OpenOltFlowMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
- if err := f.performGroupOperation(ctx, group); err != nil {
- st, _ := status.FromError(err)
- //ignore already exists error code
- if st.Code() != codes.AlreadyExists {
- return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
- }
- }
- return nil
-}
-
-//findDiff compares group members and finds members which only exists in groups2
-func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
- var members []*openoltpb2.GroupMember
- for _, bucket := range group2.Members {
- if !f.contains(group1.Members, bucket) {
- // bucket does not exist and must be added
- members = append(members, bucket)
- }
- }
- return members
-}
-
-//contains returns true if the members list contains the given member; false otherwise
-func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
- for _, groupMember := range members {
- if groupMember.InterfaceId == member.InterfaceId {
- return true
- }
- }
- return false
-}
-
-//performGroupOperation call performGroupOperation operation of openolt proto
-func (f *OpenOltFlowMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
- logger.Debugw(ctx, "sending-group-to-device",
- log.Fields{
- "groupToOlt": group,
- "command": group.Command})
- _, err := f.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
- if err != nil {
- return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
- }
- return nil
-}
-
-//buildGroup build openoltpb2.Group from given group id and bucket list
-func (f *OpenOltFlowMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
- group := openoltpb2.Group{
- GroupId: groupID}
- // create members of the group
- for _, ofBucket := range buckets {
- member := f.buildMember(ctx, ofBucket)
- if member != nil && !f.contains(group.Members, member) {
- group.Members = append(group.Members, member)
- }
- }
- return &group
-}
-
-//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
-func (f *OpenOltFlowMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
- var outPort uint32
- outPortFound := false
- for _, ofAction := range ofBucket.Actions {
- if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
- outPort = ofAction.GetOutput().Port
- outPortFound = true
- }
- }
-
- if !outPortFound {
- logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
- return nil
- }
- interfaceID := IntfIDFromUniPortNum(outPort)
- logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
- log.Fields{
- "portNumber:": outPort,
- "interfaceId:": interfaceID})
- if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
- member := openoltpb2.GroupMember{
- InterfaceId: interfaceID,
- InterfaceType: openoltpb2.GroupMember_PON,
- GemPortId: groupInfo.gemPortID,
- Priority: groupInfo.servicePriority,
- }
- //add member to the group
- return &member
- }
- logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
- return nil
-}
-
//sendTPDownloadMsgToChild send payload
func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
@@ -2957,6 +2590,16 @@
f.onuGemInfoLock.Lock()
defer f.onuGemInfoLock.Unlock()
+ onugem := f.onuGemInfo[intfID]
+ // If the ONU already exists in onuGemInfo list, nothing to do
+ for _, onu := range onugem {
+ if onu.OnuID == onuID && onu.SerialNumber == serialNum {
+ logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+ log.Fields{"onuID": onuID,
+ "serialNum": serialNum})
+ return nil
+ }
+ }
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
@@ -3002,6 +2645,7 @@
}
onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
f.onuGemInfo[intfID] = onugem
+ break
}
}
err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
@@ -3037,7 +2681,6 @@
"onu-geminfo": f.onuGemInfo[intfID],
"intf-id": intfID,
"gemport-id": gemPortID})
-
// get onuid from the onugem info cache
onugem := f.onuGemInfo[intfID]
@@ -4010,52 +3653,150 @@
}
}
-//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
-//and put them into interfaceToMcastQueueMap.
-func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
- storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
+// clears resources reserved for this multicast flow
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
+ classifierInfo := make(map[string]interface{})
+ formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+ networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
+
if err != nil {
- logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+ logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
return
}
- for intf, queueInfo := range storedMulticastQueueMap {
- q := queueInfoBrief{
- gemPortID: queueInfo[0],
- servicePriority: queueInfo[1],
+
+ var onuID = int32(NoneOnuID)
+ var uniID = int32(NoneUniID)
+ var flowID uint32
+
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
+
+ for _, flowID = range flowIds {
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
+ if flowInfo == nil {
+ logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
+ log.Fields{
+ "intf": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "flow-id": flowID})
+ continue
}
- f.interfaceToMcastQueueMap[intf] = &q
+ updatedFlows := *flowInfo
+ for i, storedFlow := range updatedFlows {
+ if flow.Id == storedFlow.LogicalFlowID {
+ removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+ logger.Debugw(ctx, "multicast-flow-to-be-deleted",
+ log.Fields{
+ "flow": storedFlow,
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id})
+ //remove from device
+ if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
+ // DKB
+ logger.Errorw(ctx, "failed-to-remove-multicast-flow",
+ log.Fields{
+ "flow-id": flow.Id,
+ "error": err})
+ return
+ }
+ logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
+ //Remove the Flow from FlowInfo
+ updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+ logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
+ log.Fields{"flow": storedFlow,
+ "err": err})
+ return
+ }
+ //release flow id
+ logger.Debugw(ctx, "releasing-multicast-flow-id",
+ log.Fields{"flow-id": flowID,
+ "interfaceID": networkInterfaceID})
+ f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+ }
+ }
}
}
-//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
-//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
-//Returns (nil, false, nil) if the group does not exists in the KV store.
-func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
- exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
- if err != nil {
- return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+ defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+ flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
+ if !ok {
+ flowRemoveData = pendingFlowRemoveData{
+ pendingFlowRemoveCount: 0,
+ allFlowsRemoved: make(chan struct{}),
+ }
+ }
+ flowRemoveData.pendingFlowRemoveCount++
+ f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
+
+ logger.Debugw(ctx, "current-flow-remove-count–increment",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+ "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
}
- if exists {
- return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
- }
- return nil, exists, nil
}
-func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
- groupDesc := ofp.OfpGroupDesc{
- Type: ofp.OfpGroupType_OFPGT_ALL,
- GroupId: groupID,
- }
- groupEntry := ofp.OfpGroupEntry{
- Desc: &groupDesc,
- }
- for i := 0; i < len(outPorts); i++ {
- var acts []*ofp.OfpAction
- acts = append(acts, flows.Output(outPorts[i]))
- bucket := ofp.OfpBucket{
- Actions: acts,
+func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+ defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+ if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ } else {
+ if val.pendingFlowRemoveCount > 0 {
+ val.pendingFlowRemoveCount--
+ }
+ logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+ "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+ // If all flow removes have finished, then close the channel to signal the receiver
+ // to go ahead with flow adds.
+ if val.pendingFlowRemoveCount == 0 {
+ close(val.allFlowsRemoved)
+ delete(f.pendingFlowRemoveDataPerSubscriber, key)
+ return
+ }
+ f.pendingFlowRemoveDataPerSubscriber[key] = val
}
- groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
}
- return &groupEntry
+}
+
+func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
+ var flowRemoveData pendingFlowRemoveData
+ var ok bool
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+ if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+ return
+ }
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+ // Wait for all flow removes to finish first
+ <-flowRemoveData.allFlowsRemoved
+
+ logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ }
}