VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
had a different meaning and the name seems to have been blindly ported to 2.x adapter
and does not make sense anymore
Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0dfb781..f342bcf 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -57,21 +57,6 @@
InvalidPort = 0xffffffff
)
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
- pendingFlowRemoveCount uint32
- allFlowsRemoved chan struct{}
-}
-
//DeviceHandler will interact with the OLT device.
type DeviceHandler struct {
device *voltha.Device
@@ -84,7 +69,8 @@
Client oop.OpenoltClient
transitionMap *TransitionMap
clientCon *grpc.ClientConn
- flowMgr *OpenOltFlowMgr
+ flowMgr []*OpenOltFlowMgr
+ groupMgr *OpenOltGroupMgr
eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
@@ -98,12 +84,7 @@
stopIndications chan bool
isReadIndicationRoutineActive bool
- // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
- // subscriber basis for the number of pending flow removes. This data is used
- // to process all the flow removes for a subscriber before handling flow adds.
- // Interleaving flow delete and flow add processing has known to cause PON resource
- // management contentions on a per subscriber bases, so we need ensure ordering.
- pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+ totalPonPorts uint32
}
//OnuDevice represents ONU related info
@@ -158,7 +139,6 @@
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
- dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
//TODO initialize the support classes.
return &dh
@@ -590,7 +570,7 @@
defer span.Finish()
portStats := indication.GetPortStats()
- go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.resourceMgr.DevInfo.GetPonPorts())
+ go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.totalPonPorts)
case *oop.Indication_FlowStats:
span, ctx := log.CreateChildSpan(ctx, "flow-stats-indication", log.Fields{"device-id": dh.device.Id})
defer span.Finish()
@@ -777,16 +757,23 @@
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
+ dh.totalPonPorts = deviceInfo.GetPonPorts()
+
// Instantiate resource manager
if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo); dh.resourceMgr == nil {
return olterrors.ErrResourceManagerInstantiating
}
- // Instantiate flow manager
- if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
+ dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+ for i := range dh.flowMgr {
+ // Instantiate flow manager
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+ return olterrors.ErrResourceManagerInstantiating
+ }
}
+
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -1060,7 +1047,7 @@
func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
logger.Debugw(ctx, "activate-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "serialNum": serialNum, "serialNumber": serialNumber, "device-id": dh.device.Id, "OmccEncryption": dh.openOLT.config.OmccEncryption})
- if err := dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
+ if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
}
// TODO: need resource manager
@@ -1153,9 +1140,7 @@
logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
// we need to create a new ChildDevice
ponintfid := onuDiscInd.GetIntfId()
- dh.lockDevice.Lock()
onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
- dh.lockDevice.Unlock()
logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
@@ -1437,26 +1422,26 @@
if flows != nil {
for _, flow := range flows.ToRemove.Items {
- dh.incrementActiveFlowRemoveCount(ctx, flow)
+ ponIf := dh.getPonIfFromFlow(ctx, flow)
logger.Debugw(ctx, "removing-flow",
log.Fields{"device-id": device.Id,
+ "ponIf": ponIf,
"flowToRemove": flow})
- err := dh.flowMgr.RemoveFlow(ctx, flow)
+ err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
if err != nil {
errorsList = append(errorsList, err)
}
-
- dh.decrementActiveFlowRemoveCount(ctx, flow)
}
for _, flow := range flows.ToAdd.Items {
+ ponIf := dh.getPonIfFromFlow(ctx, flow)
logger.Debugw(ctx, "adding-flow",
log.Fields{"device-id": device.Id,
+ "ponIf": ponIf,
"flowToAdd": flow})
// If there are active Flow Remove in progress for a given subscriber, wait until it completes
- dh.waitForFlowRemoveToFinish(ctx, flow)
- err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+ err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
if err != nil {
errorsList = append(errorsList, err)
}
@@ -1466,19 +1451,19 @@
// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
- err := dh.flowMgr.AddGroup(ctx, group)
+ err := dh.groupMgr.AddGroup(ctx, group)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, group := range groups.ToUpdate.Items {
- err := dh.flowMgr.ModifyGroup(ctx, group)
+ err := dh.groupMgr.ModifyGroup(ctx, group)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, group := range groups.ToRemove.Items {
- err := dh.flowMgr.DeleteGroup(ctx, group)
+ err := dh.groupMgr.DeleteGroup(ctx, group)
if err != nil {
errorsList = append(errorsList, err)
}
@@ -1596,7 +1581,7 @@
uniID = UniIDFromPortNum(uint32(port))
logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
/* Delete tech-profile instance from the KV store */
- if err = dh.flowMgr.DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+ if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
@@ -1687,9 +1672,8 @@
func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
if dh.resourceMgr != nil {
- noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
var ponPort uint32
- for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
+ for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
var onuGemData []rsrcMgr.OnuGemInfo
err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
if err != nil {
@@ -1762,7 +1746,7 @@
"packet": hex.EncodeToString(packetIn.Pkt),
})
}
- logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
+ logicalPortNum, err := dh.flowMgr[packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, packetIn)
if err != nil {
return olterrors.NewErrNotFound("logical-port", log.Fields{"packet": hex.EncodeToString(packetIn.Pkt)}, err)
}
@@ -1834,7 +1818,7 @@
onuID := OnuIDFromPortNum(uint32(egressPortNo))
uniID := UniIDFromPortNum(uint32(egressPortNo))
- gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
+ gemPortID, err := dh.flowMgr[intfID].GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
if err != nil {
// In this case the openolt agent will receive the gemPortID as 0.
// The agent tries to retrieve the gemPortID in this case.
@@ -2098,21 +2082,12 @@
}
for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
- var flowRemoveData pendingFlowRemoveData
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uint32(uniID)}
- dh.lockDevice.RLock()
- if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- dh.lockDevice.RUnlock()
- continue
- }
- dh.lockDevice.RUnlock()
-
logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
+ dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ // TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
}
onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
@@ -2189,88 +2164,6 @@
return InvalidPort
}
-func (dh *DeviceHandler) incrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- dh.lockDevice.Lock()
- defer dh.lockDevice.Unlock()
- flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
- if !ok {
- flowRemoveData = pendingFlowRemoveData{
- pendingFlowRemoveCount: 0,
- allFlowsRemoved: make(chan struct{}),
- }
- }
- flowRemoveData.pendingFlowRemoveCount++
- dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
- logger.Debugw(ctx, "current-flow-remove-count–increment",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- }
-}
-
-func (dh *DeviceHandler) decrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- dh.lockDevice.Lock()
- defer dh.lockDevice.Unlock()
- if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- } else {
- if val.pendingFlowRemoveCount > 0 {
- val.pendingFlowRemoveCount--
- }
- logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- // If all flow removes have finished, then close the channel to signal the receiver
- // to go ahead with flow adds.
- if val.pendingFlowRemoveCount == 0 {
- close(val.allFlowsRemoved)
- delete(dh.pendingFlowRemoveDataPerSubscriber, key)
- return
- }
- dh.pendingFlowRemoveDataPerSubscriber[key] = val
- }
- }
-}
-
-func (dh *DeviceHandler) waitForFlowRemoveToFinish(ctx context.Context, flow *of.OfpFlowStats) {
- var flowRemoveData pendingFlowRemoveData
- var ok bool
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- dh.lockDevice.RLock()
- if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- dh.lockDevice.RUnlock()
- return
- }
- dh.lockDevice.RUnlock()
-
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
-
- logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- }
-}
-
func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
inPort := getInPortFromFlow(flow)
outPort := getOutPortFromFlow(flow)
@@ -2350,3 +2243,14 @@
logger.Infow(ctx, "get-ext-value", log.Fields{"resp": resp, "device-id": dh.device, "onu-id": device.Id, "pon-intf": device.ParentPortNo})
return resp, nil
}
+
+func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+ // Default to PON0
+ var intfID uint32
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, _, _ = ExtractAccessFromFlow(inPort, outPort)
+ }
+ return intfID
+}
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 855f42c..1a11b92 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -43,6 +43,18 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
+const (
+ NumPonPorts = 2
+ OnuIDStart = 1
+ OnuIDEnd = 32
+ AllocIDStart = 1
+ AllocIDEnd = 10
+ GemIDStart = 1
+ GemIDEnd = 10
+ FlowIDStart = 1
+ FlowIDEnd = 10
+)
+
func newMockCoreProxy() *mocks.MockCoreProxy {
mcp := mocks.MockCoreProxy{
Devices: make(map[string]*voltha.Device),
@@ -153,7 +165,7 @@
Pools: []*oop.DeviceInfo_DeviceResourceRanges_Pool{{}},
}}
- deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: 2}
+ deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
rsrMgr := resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
KVStore: &db.Backend{
Client: &mocks.MockKVClient{},
@@ -192,7 +204,15 @@
dh.resourceMgr.ResourceMgrs[1] = ponmgr
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr)
+ dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
+ dh.totalPonPorts = NumPonPorts
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+ for i := 0; i < int(dh.totalPonPorts); i++ {
+ // Instantiate flow manager
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+ return nil
+ }
+ }
dh.Client = &mocks.MockOpenoltClient{}
dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}, handler: dh}
dh.transitionMap = &TransitionMap{}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d68b8f8..83da756 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -24,11 +24,6 @@
"encoding/json"
"errors"
"fmt"
- "math/big"
- "strings"
- "sync"
- "time"
-
"github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
@@ -39,6 +34,9 @@
openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"github.com/opencord/voltha-protos/v3/go/voltha"
+ "math/big"
+ "strings"
+ "sync"
//deepcopy "github.com/getlantern/deepcopy"
"github.com/EagleChen/mapmutex"
@@ -187,12 +185,6 @@
gemPort uint32
}
-type pendingFlowDeleteKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
type tpLockKey struct {
intfID uint32
onuID uint32
@@ -211,15 +203,26 @@
flowMetadata *voltha.FlowMetadata
}
-type queueInfoBrief struct {
- gemPortID uint32
- servicePriority uint32
+// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
+type pendingFlowRemoveDataKey struct {
+ intfID uint32
+ onuID uint32
+ uniID uint32
+}
+
+// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
+// This holds the number of pending flow removes and also a signal channel to
+// to indicate the receiver when all flow removes are handled
+type pendingFlowRemoveData struct {
+ pendingFlowRemoveCount uint32
+ allFlowsRemoved chan struct{}
}
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
techprofile map[uint32]tp.TechProfileIf
deviceHandler *DeviceHandler
+ grpMgr *OpenOltGroupMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
onuIdsLock sync.RWMutex
perGemPortLock *mapmutex.Mutex // lock to be used to access the flowsUsedByGemPort map
@@ -228,21 +231,28 @@
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
// We need to have a global lock on the onuGemInfo map
- onuGemInfoLock sync.RWMutex
- pendingFlowDelete sync.Map
+ onuGemInfoLock sync.RWMutex
// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
- perUserFlowHandleLock *mapmutex.Mutex
- interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+ perUserFlowHandleLock *mapmutex.Mutex
+
+ // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
+ // subscriber basis for the number of pending flow removes. This data is used
+ // to process all the flow removes for a subscriber before handling flow adds.
+ // Interleaving flow delete and flow add processing has known to cause PON resource
+ // management contentions on a per subscriber bases, so we need ensure ordering.
+ pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+ pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr) *OpenOltFlowMgr {
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
var flowMgr OpenOltFlowMgr
var err error
var idx uint32
flowMgr.deviceHandler = dh
+ flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
@@ -250,15 +260,15 @@
return nil
}
flowMgr.onuIdsLock = sync.RWMutex{}
+ flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
ponPorts := rMgr.DevInfo.GetPonPorts()
flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
- flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
+ flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
//Load the onugem info cache from kv store on flowmanager start
for idx = 0; idx < ponPorts; idx++ {
if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
@@ -268,7 +278,7 @@
flowMgr.loadFlowIDlistForGem(ctx, idx)
}
//load interface to multicast queue map from kv store
- flowMgr.loadInterfaceToMulticastQueueMap(ctx)
+ flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
@@ -305,7 +315,7 @@
}, nil)
}
-func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) error {
var allocID uint32
@@ -571,16 +581,17 @@
if sq.direction == tp_pb.Direction_DOWNSTREAM {
multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
if len(multicastTrafficQueues) > 0 {
- if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
+ if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
//assumed that there is only one queue per PON for the multicast service
//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
//just put it in interfaceToMcastQueueMap to use for building group members
logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
multicastQueuePerPonPort := multicastTrafficQueues[0]
- f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
+ val := &QueueInfoBrief{
gemPortID: multicastQueuePerPonPort.GemportId,
servicePriority: multicastQueuePerPonPort.Priority,
}
+ f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
//also store the queue info in kv store
if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
@@ -1923,42 +1934,6 @@
return nil
}
-func (f *OpenOltFlowMgr) deletePendingFlows(ctx context.Context, Intf uint32, onuID int32, uniID int32) {
- pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
- if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
- if val.(int) > 0 {
- pnFlDels := val.(int) - 1
- if pnFlDels > 0 {
- logger.Debugw(ctx, "flow-delete-succeeded--more-pending",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "currpendingflowcnt": pnFlDels,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
- } else {
- logger.Debugw(ctx, "all-pending-flow-deletes-handled--removing-entry-from-map",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Delete(pnFlDelKey)
- }
- }
- } else {
- logger.Debugw(ctx, "no-pending-delete-flows-found",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id})
-
- }
-
-}
-
// Once the gemport is released for a given onu, it also has to be cleared from local cache
// which was used for deriving the gemport->logicalPortNo during packet-in.
// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
@@ -1975,6 +1950,7 @@
"onu-id": onuID,
"device-id": f.deviceHandler.device.Id,
"onu-gem": f.onuGemInfo[intfID]})
+
onugem := f.onuGemInfo[intfID]
deleteLoop:
for i, onu := range onugem {
@@ -2028,34 +2004,6 @@
"device-id": f.deviceHandler.device.Id}, err).Log()
}
if len(updatedFlows) == 0 {
- // Do this for subscriber flows only (not trap from NNI flows)
- if onuID != -1 && uniID != -1 {
- pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
- if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
- logger.Debugw(ctx, "creating-entry-for-pending-flow-delete",
- log.Fields{
- "flow-id": flowID,
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Store(pnFlDelKey, 1)
- } else {
- pnFlDels := val.(int) + 1
- logger.Debugw(ctx, "updating-flow-delete-entry",
- log.Fields{
- "flow-id": flowID,
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "currPendingFlowCnt": pnFlDels,
- "device-id": f.deviceHandler.device.Id})
- f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
- }
-
- defer f.deletePendingFlows(ctx, Intf, onuID, uniID)
- }
-
logger.Debugw(ctx, "releasing-flow-id-to-resource-manager",
log.Fields{
"Intf": Intf,
@@ -2310,72 +2258,6 @@
}
}
-//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
-// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
- classifierInfo := make(map[string]interface{})
- formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
- networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
- if err != nil {
- logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
- return
- }
-
- var onuID = int32(NoneOnuID)
- var uniID = int32(NoneUniID)
- var flowID uint32
-
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
-
- for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
- if flowInfo == nil {
- logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
- log.Fields{
- "intf": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "flow-id": flowID})
- continue
- }
- updatedFlows := *flowInfo
- for i, storedFlow := range updatedFlows {
- if flow.Id == storedFlow.LogicalFlowID {
- removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
- logger.Debugw(ctx, "multicast-flow-to-be-deleted",
- log.Fields{
- "flow": storedFlow,
- "flow-id": flow.Id,
- "device-id": f.deviceHandler.device.Id})
- //remove from device
- if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
- // DKB
- logger.Errorw(ctx, "failed-to-remove-multicast-flow",
- log.Fields{
- "flow-id": flow.Id,
- "error": err})
- return
- }
- logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
- //Remove the Flow from FlowInfo
- updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
- logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
- log.Fields{"flow": storedFlow,
- "err": err})
- return
- }
- //release flow id
- logger.Debugw(ctx, "releasing-multicast-flow-id",
- log.Fields{"flow-id": flowID,
- "interfaceID": networkInterfaceID})
- f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
- }
- }
- }
-}
-
//RemoveFlow removes the flow from the device
func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
@@ -2394,6 +2276,9 @@
}
}
+ f.incrementActiveFlowRemoveCount(ctx, flow)
+ defer f.decrementActiveFlowRemoveCount(ctx, flow)
+
if flows.HasGroup(flow) {
direction = Multicast
f.clearFlowFromResourceManager(ctx, flow, direction)
@@ -2424,24 +2309,6 @@
return nil
}
-func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
- uniID uint32, ch chan bool) {
- pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
- for {
- select {
- case <-time.After(20 * time.Millisecond):
- if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
- logger.Debug(ctx, "pending-flow-deletes-completed")
- ch <- true
- return
- }
- case <-ctx.Done():
- logger.Error(ctx, "flow-delete-wait-handler-routine-canceled")
- return
- }
- }
-}
-
//isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
func isIgmpTrapDownstreamFlow(classifierInfo map[string]interface{}) bool {
if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_ETHERNET_NNI {
@@ -2518,6 +2385,11 @@
return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
+ // If we are here it is not a trap-from-nni flow, i.e., it is subscriber specific flow.
+ // Wait for any FlowRemoves for that specific subscriber to finish first
+ // The goal here is to serialize FlowRemove and FlowAdd. FlowRemove take priority
+ f.waitForFlowRemoveToFinish(ctx, flow)
+
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(ctx, flow)
@@ -2543,33 +2415,29 @@
logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
}
+ return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+}
- pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
- if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
- logger.Debugw(ctx, "no-pending-flows-found--going-ahead-with-flow-install",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID})
- return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
+func (f *OpenOltFlowMgr) WaitForFlowRemoveToFinishForSubscriber(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
+ var flowRemoveData pendingFlowRemoveData
+ var ok bool
+
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+ if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+ return
}
- pendingFlowDelComplete := make(chan bool)
- go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
- select {
- case <-pendingFlowDelComplete:
- logger.Debugw(ctx, "all-pending-flow-deletes-completed",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID})
- return f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
- case <-time.After(10 * time.Second):
- return olterrors.NewErrTimeout("pending-flow-deletes",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "uni-id": uniID}, nil)
- }
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+ // Wait for all flow removes to finish first
+ <-flowRemoveData.allFlowsRemoved
+
+ logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
}
// handleFlowWithGroup adds multicast flow to the device.
@@ -2643,9 +2511,9 @@
}
logger.Info(ctx, "multicast-flow-added-to-device-successfully")
//get cached group
- if group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true); err == nil {
+ if group, _, err := f.grpMgr.getFlowGroupFromKVStore(ctx, groupID, true); err == nil {
//calling groupAdd to set group members after multicast flow creation
- if err := f.ModifyGroup(ctx, group); err != nil {
+ if err := f.grpMgr.ModifyGroup(ctx, group); err != nil {
return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err)
}
//cached group can be removed now
@@ -2681,241 +2549,6 @@
return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
}
-// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
- logger.Infow(ctx, "add-group", log.Fields{"group": group})
- if group == nil {
- return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
- }
-
- groupToOlt := openoltpb2.Group{
- GroupId: group.Desc.GroupId,
- Command: openoltpb2.Group_SET_MEMBERS,
- Action: f.buildGroupAction(),
- }
-
- logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
- _, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
- if err != nil {
- return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
- }
- // group members not created yet. So let's store the group
- if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
- return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
- }
- logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
- return nil
-}
-
-// DeleteGroup deletes a group from the device
-func (f *OpenOltFlowMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
- logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
- if group == nil {
- logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
- return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
- }
-
- groupToOlt := openoltpb2.Group{
- GroupId: group.Desc.GroupId,
- }
-
- logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
- _, err := f.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
- if err != nil {
- logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
- return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
- }
- //remove group from the store
- if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
- return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
- }
- logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
- return nil
-}
-
-//buildGroupAction creates and returns a group action
-func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
- var actionCmd openoltpb2.ActionCmd
- var action openoltpb2.Action
- action.Cmd = &actionCmd
- //pop outer vlan
- action.Cmd.RemoveOuterTag = true
- return &action
-}
-
-// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
- logger.Infow(ctx, "modify-group", log.Fields{"group": group})
- if group == nil || group.Desc == nil {
- return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
- }
-
- newGroup := f.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
- //get existing members of the group
- val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
-
- if err != nil {
- return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
- }
-
- var current *openoltpb2.Group // represents the group on the device
- if groupExists {
- // group already exists
- current = f.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
- logger.Debugw(ctx, "modify-group--group exists",
- log.Fields{
- "group on the device": val,
- "new": group})
- } else {
- current = f.buildGroup(ctx, group.Desc.GroupId, nil)
- }
-
- logger.Debugw(ctx, "modify-group--comparing-current-and-new",
- log.Fields{
- "group on the device": current,
- "new": newGroup})
- // get members to be added
- membersToBeAdded := f.findDiff(current, newGroup)
- // get members to be removed
- membersToBeRemoved := f.findDiff(newGroup, current)
-
- logger.Infow(ctx, "modify-group--differences found", log.Fields{
- "membersToBeAdded": membersToBeAdded,
- "membersToBeRemoved": membersToBeRemoved,
- "groupId": group.Desc.GroupId})
-
- groupToOlt := openoltpb2.Group{
- GroupId: group.Desc.GroupId,
- }
- var errAdd, errRemoved error
- if len(membersToBeAdded) > 0 {
- groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
- groupToOlt.Members = membersToBeAdded
- //execute addMembers
- errAdd = f.callGroupAddRemove(ctx, &groupToOlt)
- }
- if len(membersToBeRemoved) > 0 {
- groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
- groupToOlt.Members = membersToBeRemoved
- //execute removeMembers
- errRemoved = f.callGroupAddRemove(ctx, &groupToOlt)
- }
-
- //save the modified group
- if errAdd == nil && errRemoved == nil {
- if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
- return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
- }
- logger.Infow(ctx, "modify-group-was-success--storing-group",
- log.Fields{
- "group": group,
- "existingGroup": current})
- } else {
- logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
- log.Fields{"group": group})
- if errAdd != nil {
- return errAdd
- }
- return errRemoved
- }
- return nil
-}
-
-//callGroupAddRemove performs add/remove buckets operation for the indicated group
-func (f *OpenOltFlowMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
- if err := f.performGroupOperation(ctx, group); err != nil {
- st, _ := status.FromError(err)
- //ignore already exists error code
- if st.Code() != codes.AlreadyExists {
- return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
- }
- }
- return nil
-}
-
-//findDiff compares group members and finds members which only exists in groups2
-func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
- var members []*openoltpb2.GroupMember
- for _, bucket := range group2.Members {
- if !f.contains(group1.Members, bucket) {
- // bucket does not exist and must be added
- members = append(members, bucket)
- }
- }
- return members
-}
-
-//contains returns true if the members list contains the given member; false otherwise
-func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
- for _, groupMember := range members {
- if groupMember.InterfaceId == member.InterfaceId {
- return true
- }
- }
- return false
-}
-
-//performGroupOperation call performGroupOperation operation of openolt proto
-func (f *OpenOltFlowMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
- logger.Debugw(ctx, "sending-group-to-device",
- log.Fields{
- "groupToOlt": group,
- "command": group.Command})
- _, err := f.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
- if err != nil {
- return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
- }
- return nil
-}
-
-//buildGroup build openoltpb2.Group from given group id and bucket list
-func (f *OpenOltFlowMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
- group := openoltpb2.Group{
- GroupId: groupID}
- // create members of the group
- for _, ofBucket := range buckets {
- member := f.buildMember(ctx, ofBucket)
- if member != nil && !f.contains(group.Members, member) {
- group.Members = append(group.Members, member)
- }
- }
- return &group
-}
-
-//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
-func (f *OpenOltFlowMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
- var outPort uint32
- outPortFound := false
- for _, ofAction := range ofBucket.Actions {
- if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
- outPort = ofAction.GetOutput().Port
- outPortFound = true
- }
- }
-
- if !outPortFound {
- logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
- return nil
- }
- interfaceID := IntfIDFromUniPortNum(outPort)
- logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
- log.Fields{
- "portNumber:": outPort,
- "interfaceId:": interfaceID})
- if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
- member := openoltpb2.GroupMember{
- InterfaceId: interfaceID,
- InterfaceType: openoltpb2.GroupMember_PON,
- GemPortId: groupInfo.gemPortID,
- Priority: groupInfo.servicePriority,
- }
- //add member to the group
- return &member
- }
- logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
- return nil
-}
-
//sendTPDownloadMsgToChild send payload
func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
@@ -2957,6 +2590,16 @@
f.onuGemInfoLock.Lock()
defer f.onuGemInfoLock.Unlock()
+ onugem := f.onuGemInfo[intfID]
+ // If the ONU already exists in onuGemInfo list, nothing to do
+ for _, onu := range onugem {
+ if onu.OnuID == onuID && onu.SerialNumber == serialNum {
+ logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+ log.Fields{"onuID": onuID,
+ "serialNum": serialNum})
+ return nil
+ }
+ }
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
@@ -3002,6 +2645,7 @@
}
onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
f.onuGemInfo[intfID] = onugem
+ break
}
}
err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
@@ -3037,7 +2681,6 @@
"onu-geminfo": f.onuGemInfo[intfID],
"intf-id": intfID,
"gemport-id": gemPortID})
-
// get onuid from the onugem info cache
onugem := f.onuGemInfo[intfID]
@@ -4010,52 +3653,150 @@
}
}
-//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
-//and put them into interfaceToMcastQueueMap.
-func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
- storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
+// clears resources reserved for this multicast flow
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
+ classifierInfo := make(map[string]interface{})
+ formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+ networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
+
if err != nil {
- logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+ logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
return
}
- for intf, queueInfo := range storedMulticastQueueMap {
- q := queueInfoBrief{
- gemPortID: queueInfo[0],
- servicePriority: queueInfo[1],
+
+ var onuID = int32(NoneOnuID)
+ var uniID = int32(NoneUniID)
+ var flowID uint32
+
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
+
+ for _, flowID = range flowIds {
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
+ if flowInfo == nil {
+ logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
+ log.Fields{
+ "intf": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "flow-id": flowID})
+ continue
}
- f.interfaceToMcastQueueMap[intf] = &q
+ updatedFlows := *flowInfo
+ for i, storedFlow := range updatedFlows {
+ if flow.Id == storedFlow.LogicalFlowID {
+ removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+ logger.Debugw(ctx, "multicast-flow-to-be-deleted",
+ log.Fields{
+ "flow": storedFlow,
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id})
+ //remove from device
+ if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
+ // DKB
+ logger.Errorw(ctx, "failed-to-remove-multicast-flow",
+ log.Fields{
+ "flow-id": flow.Id,
+ "error": err})
+ return
+ }
+ logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
+ //Remove the Flow from FlowInfo
+ updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+ logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
+ log.Fields{"flow": storedFlow,
+ "err": err})
+ return
+ }
+ //release flow id
+ logger.Debugw(ctx, "releasing-multicast-flow-id",
+ log.Fields{"flow-id": flowID,
+ "interfaceID": networkInterfaceID})
+ f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+ }
+ }
}
}
-//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
-//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
-//Returns (nil, false, nil) if the group does not exists in the KV store.
-func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
- exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
- if err != nil {
- return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+ defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+ flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
+ if !ok {
+ flowRemoveData = pendingFlowRemoveData{
+ pendingFlowRemoveCount: 0,
+ allFlowsRemoved: make(chan struct{}),
+ }
+ }
+ flowRemoveData.pendingFlowRemoveCount++
+ f.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
+
+ logger.Debugw(ctx, "current-flow-remove-count–increment",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+ "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
}
- if exists {
- return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
- }
- return nil, exists, nil
}
-func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
- groupDesc := ofp.OfpGroupDesc{
- Type: ofp.OfpGroupType_OFPGT_ALL,
- GroupId: groupID,
- }
- groupEntry := ofp.OfpGroupEntry{
- Desc: &groupDesc,
- }
- for i := 0; i < len(outPorts); i++ {
- var acts []*ofp.OfpAction
- acts = append(acts, flows.Output(outPorts[i]))
- bucket := ofp.OfpBucket{
- Actions: acts,
+func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+ defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+ if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ } else {
+ if val.pendingFlowRemoveCount > 0 {
+ val.pendingFlowRemoveCount--
+ }
+ logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
+ "currCnt": f.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
+ // If all flow removes have finished, then close the channel to signal the receiver
+ // to go ahead with flow adds.
+ if val.pendingFlowRemoveCount == 0 {
+ close(val.allFlowsRemoved)
+ delete(f.pendingFlowRemoveDataPerSubscriber, key)
+ return
+ }
+ f.pendingFlowRemoveDataPerSubscriber[key] = val
}
- groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
}
- return &groupEntry
+}
+
+func (f *OpenOltFlowMgr) waitForFlowRemoveToFinish(ctx context.Context, flow *ofp.OfpFlowStats) {
+ var flowRemoveData pendingFlowRemoveData
+ var ok bool
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
+ key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
+ logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+
+ f.pendingFlowRemoveDataPerSubscriberLock.RLock()
+ if flowRemoveData, ok = f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
+ logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+ return
+ }
+ f.pendingFlowRemoveDataPerSubscriberLock.RUnlock()
+
+ // Wait for all flow removes to finish first
+ <-flowRemoveData.allFlowsRemoved
+
+ logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ }
}
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 490a40c..b75bfac 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -42,7 +42,7 @@
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
)
-var flowMgr *OpenOltFlowMgr
+var flowMgr []*OpenOltFlowMgr
func init() {
_, _ = log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
@@ -58,8 +58,8 @@
deviceinfo := &openolt.DeviceInfo{Vendor: "openolt", Model: "openolt", HardwareVersion: "1.0", FirmwareVersion: "1.0",
DeviceId: "olt", DeviceSerialNumber: "openolt", PonPorts: 16, Technology: "Default",
- OnuIdStart: 1, OnuIdEnd: 1, AllocIdStart: 1, AllocIdEnd: 1,
- GemportIdStart: 1, GemportIdEnd: 1, FlowIdStart: 1, FlowIdEnd: 1,
+ OnuIdStart: OnuIDStart, OnuIdEnd: OnuIDEnd, AllocIdStart: AllocIDStart, AllocIdEnd: AllocIDEnd,
+ GemportIdStart: GemIDStart, GemportIdEnd: GemIDEnd, FlowIdStart: FlowIDStart, FlowIdEnd: FlowIDEnd,
Ranges: ranges,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -73,7 +73,7 @@
return rsrMgr
}
-func newMockFlowmgr() *OpenOltFlowMgr {
+func newMockFlowmgr() []*OpenOltFlowMgr {
rMgr := newMockResourceMgr()
dh := newMockDeviceHandler()
@@ -81,40 +81,32 @@
rMgr.KVStore.Client = &mocks.MockKVClient{}
dh.resourceMgr = rMgr
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- flwMgr := NewFlowManager(ctx, dh, rMgr)
- onuGemInfo1 := make([]rsrcMgr.OnuGemInfo, 2)
- onuGemInfo2 := make([]rsrcMgr.OnuGemInfo, 2)
- onuGemInfo1[0] = rsrcMgr.OnuGemInfo{OnuID: 1, SerialNumber: "1", IntfID: 1, GemPorts: []uint32{1}}
- onuGemInfo2[1] = rsrcMgr.OnuGemInfo{OnuID: 2, SerialNumber: "2", IntfID: 2, GemPorts: []uint32{2}}
- flwMgr.onuGemInfo[1] = onuGemInfo1
- flwMgr.onuGemInfo[2] = onuGemInfo2
+ // onuGemInfo := make([]rsrcMgr.OnuGemInfo, NumPonPorts)
+ var i uint32
- packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
- packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 1, OnuID: 1, LogicalPort: 1}] = 1
- packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 2, OnuID: 2, LogicalPort: 2}] = 2
+ for i = 0; i < NumPonPorts; i++ {
+ packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
+ packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
- flwMgr.packetInGemPort = packetInGemPort
- tps := make(map[uint32]tp.TechProfileIf)
- for key := range rMgr.ResourceMgrs {
- tps[key] = mocks.MockTechProfile{TpID: key}
+ dh.flowMgr[i].packetInGemPort = packetInGemPort
+ tps := make(map[uint32]tp.TechProfileIf)
+ for key := range rMgr.ResourceMgrs {
+ tps[key] = mocks.MockTechProfile{TpID: key}
+ }
+ dh.flowMgr[i].techprofile = tps
+ interface2mcastQeueuMap := make(map[uint32]*QueueInfoBrief)
+ interface2mcastQeueuMap[0] = &QueueInfoBrief{
+ gemPortID: 4000,
+ servicePriority: 3,
+ }
+ dh.flowMgr[i].grpMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
}
- flwMgr.techprofile = tps
- interface2mcastQeueuMap := make(map[uint32]*queueInfoBrief)
- interface2mcastQeueuMap[0] = &queueInfoBrief{
- gemPortID: 4000,
- servicePriority: 3,
- }
- flwMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
- return flwMgr
+ return dh.flowMgr
}
func TestOpenOltFlowMgr_CreateSchedulerQueues(t *testing.T) {
- // flowMgr := newMockFlowmgr()
-
tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -140,26 +132,26 @@
wantErr bool
}{
// TODO: Add test cases.
- {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
- {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
- {"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 2, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 2, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
- {"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+ {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, flowmetadata}, false},
+ {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
+ {"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, flowmetadata}, true},
+ {"CreateSchedulerQueues-13", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, flowmetadata}, false},
//Negative testcases
- {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 1, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, nil}, true},
+ {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+ if err := flowMgr[tt.schedQueue.intfID].CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.CreateSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -167,8 +159,6 @@
}
func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
-
- // flowMgr := newMockFlowmgr()
tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
@@ -198,7 +188,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
+ if err := flowMgr[tt.schedQueue.intfID].RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.RemoveSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -207,7 +197,6 @@
}
func TestOpenOltFlowMgr_createTcontGemports(t *testing.T) {
- // flowMgr := newMockFlowmgr()
bands := make([]*ofp.OfpMeterBandHeader, 2)
bands[0] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 1000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
bands[1] = &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 2000, BurstSize: 5000, Data: &ofp.OfpMeterBandHeader_Drop{}}
@@ -237,7 +226,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- _, _, tpInst := flowMgr.createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
+ _, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
switch tpInst := tpInst.(type) {
case *tp.TechProfile:
if tt.args.TpID != 64 {
@@ -256,7 +245,6 @@
func TestOpenOltFlowMgr_RemoveFlow(t *testing.T) {
ctx := context.Background()
- // flowMgr := newMockFlowmgr()
logger.Debug(ctx, "Info Warning Error: Starting RemoveFlow() test")
fa := &fu.FlowArgs{
MatchFields: []*ofp.OfpOxmOfbField{
@@ -334,7 +322,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.RemoveFlow(ctx, tt.args.flow); err != nil {
+ if err := flowMgr[0].RemoveFlow(ctx, tt.args.flow); err != nil {
logger.Warn(ctx, err)
}
})
@@ -343,7 +331,6 @@
}
func TestOpenOltFlowMgr_AddFlow(t *testing.T) {
- // flowMgr := newMockFlowmgr()
kw := make(map[string]uint64)
kw["table_id"] = 1
kw["meter_id"] = 1
@@ -581,29 +568,27 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- _ = flowMgr.AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
+ _ = flowMgr[0].AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
// TODO: actually verify test cases
})
}
}
func TestOpenOltFlowMgr_UpdateOnuInfo(t *testing.T) {
- flwMgr := newMockFlowmgr()
-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wg := sync.WaitGroup{}
- intfCount := 16
- onuCount := 32
+ intfCount := NumPonPorts
+ onuCount := OnuIDEnd - OnuIDStart + 1
for i := 0; i < intfCount; i++ {
- for j := 0; j < onuCount; j++ {
+ for j := 1; j <= onuCount; j++ {
wg.Add(1)
go func(i uint32, j uint32) {
// TODO: actually verify success
- _ = flwMgr.UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
+ _ = flowMgr[i].UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
wg.Done()
}(uint32(i), uint32(j))
}
@@ -614,34 +599,35 @@
}
func TestOpenOltFlowMgr_addGemPortToOnuInfoMap(t *testing.T) {
- flowMgr = newMockFlowmgr()
- intfNum := 16
- onuNum := 32
+ intfNum := NumPonPorts
+ onuNum := OnuIDEnd - OnuIDStart + 1
// clean the flowMgr
- flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+ for i := 0; i < intfNum; i++ {
+ flowMgr[i].onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, intfNum)
+ }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create OnuInfo
for i := 0; i < intfNum; i++ {
- for o := 0; o < onuNum; o++ {
+ for o := 1; o <= onuNum; o++ {
// TODO: actually verify success
- _ = flowMgr.UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o))
+ _ = flowMgr[i].UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o-1))
}
}
// Add gemPorts to OnuInfo in parallel threads
wg := sync.WaitGroup{}
- for o := 0; o < onuNum; o++ {
+ for o := 1; o <= onuNum; o++ {
for i := 0; i < intfNum; i++ {
wg.Add(1)
go func(intfId uint32, onuId uint32) {
- gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId))
+ gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", intfId, onuId-1))
- flowMgr.addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
+ flowMgr[intfId].addGemPortToOnuInfoMap(ctx, intfId, onuId, uint32(gemID))
wg.Done()
}(uint32(i), uint32(o))
}
@@ -651,21 +637,21 @@
// check that each entry of onuGemInfo has the correct number of ONUs
for i := 0; i < intfNum; i++ {
- lenofOnu := len(flowMgr.onuGemInfo[uint32(i)])
+ lenofOnu := len(flowMgr[i].onuGemInfo[uint32(i)])
if onuNum != lenofOnu {
t.Errorf("OnuGemInfo length is not as expected len = %d, want %d", lenofOnu, onuNum)
}
- for o := 0; o < onuNum; o++ {
- lenOfGemPorts := len(flowMgr.onuGemInfo[uint32(i)][o].GemPorts)
+ for o := 1; o <= onuNum; o++ {
+ lenOfGemPorts := len(flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts)
// check that each onuEntry has 1 gemPort
if lenOfGemPorts != 1 {
t.Errorf("Expected 1 GemPort per ONU, found %d", lenOfGemPorts)
}
// check that the value of the gemport is correct
- gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o))
- currentValue := flowMgr.onuGemInfo[uint32(i)][o].GemPorts[0]
+ gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o-1))
+ currentValue := flowMgr[i].onuGemInfo[uint32(i)][o-1].GemPorts[0]
if uint32(gemID) != currentValue {
t.Errorf("Expected GemPort value to be %d, found %d", gemID, currentValue)
}
@@ -674,7 +660,8 @@
}
func TestOpenOltFlowMgr_deleteGemPortFromLocalCache(t *testing.T) {
- flwMgr := newMockFlowmgr()
+ // Create fresh flowMgr instance
+ flowMgr = newMockFlowmgr()
type args struct {
intfID uint32
onuID uint32
@@ -704,18 +691,18 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// TODO: should check returned errors are as expected?
- _ = flwMgr.UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
+ _ = flowMgr[tt.args.intfID].UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
for _, gemPort := range tt.args.gemPortIDs {
- flwMgr.addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
+ flowMgr[tt.args.intfID].addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
}
for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
- flwMgr.deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
+ flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
}
- lenofGemPorts := len(flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts)
+ lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts)
if lenofGemPorts != tt.args.finalLength {
t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
}
- gemPorts := flwMgr.onuGemInfo[tt.args.intfID][0].GemPorts
+ gemPorts := flowMgr[tt.args.intfID].onuGemInfo[tt.args.intfID][0].GemPorts
if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
}
@@ -725,7 +712,6 @@
}
func TestOpenOltFlowMgr_GetLogicalPortFromPacketIn(t *testing.T) {
- flwMgr := newMockFlowmgr()
type args struct {
packetIn *openoltpb2.PacketIndication
}
@@ -736,18 +722,18 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048577, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
// Negative Test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 2, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 4112, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flwMgr.GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
+ got, err := flowMgr[tt.args.packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
if (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.GetLogicalPortFromPacketIn() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -760,7 +746,8 @@
}
func TestOpenOltFlowMgr_GetPacketOutGemPortID(t *testing.T) {
- // flwMgr := newMockFlowmgr()
+ // Create fresh flowMgr instance
+ flowMgr = newMockFlowmgr()
//untagged packet in hex string
untaggedStr := "01005e000002000000000001080046c00020000040000102fa140a000001e00000029404000017000705e10000fa"
@@ -769,15 +756,15 @@
t.Error("Unable to parse hex string", err)
panic(err)
}
- //single-tagged packet in hex string. vlanID.pbit: 540.0
- singleTaggedStr := "01005e0000010025ba48172481000225080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
+ //single-tagged packet in hex string. vlanID.pbit: 1.1
+ singleTaggedStr := "01005e0000010025ba48172481002001080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
singleTagged, err := hex.DecodeString(singleTaggedStr)
if err != nil {
t.Error("Unable to parse hex string", err)
panic(err)
}
- //double-tagged packet in hex string. vlanID.pbit: 210.0-48.7
- doubleTaggedStr := "01005e000016deadbeefba11810002108100e030080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
+ //double-tagged packet in hex string. vlanID.pbit: 210.0-0.0
+ doubleTaggedStr := "01005e000016deadbeefba118100021081000000080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
doubleTagged, err := hex.DecodeString(doubleTaggedStr)
if err != nil {
t.Error("Unable to parse hex string", err)
@@ -797,11 +784,11 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: untagged}, 3, false},
- {"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 4, packet: singleTagged}, 4, false},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: doubleTagged}, 2, false},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 10, portNum: 10, packet: untagged}, 2, true},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: untagged}, 1, false},
+ {"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: singleTagged}, 2, false},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 1, packet: doubleTagged}, 1, false},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 10, portNum: 10, packet: untagged}, 2, true},
+ {"GetPacketOutGemPortID", args{intfID: 0, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -809,7 +796,7 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
+ got, err := flowMgr[tt.args.intfID].GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
if tt.wantErr {
if err == nil {
//error expected but got value
@@ -830,7 +817,6 @@
}
func TestOpenOltFlowMgr_DeleteTechProfileInstance(t *testing.T) {
- // flwMgr := newMockFlowmgr()
type args struct {
intfID uint32
onuID uint32
@@ -850,7 +836,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
+ if err := flowMgr[tt.args.intfID].DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.DeleteTechProfileInstance() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -859,7 +845,6 @@
func TestOpenOltFlowMgr_checkAndAddFlow(t *testing.T) {
ctx := context.Background()
- // flowMgr := newMockFlowmgr()
kw := make(map[string]uint64)
kw["table_id"] = 1
kw["meter_id"] = 1
@@ -1097,7 +1082,7 @@
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowMgr.checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
+ flowMgr[tt.args.intfID].checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
tt.args.TpInst, tt.args.gemPorts, tt.args.TpID, tt.args.uni)
})
}
@@ -1108,7 +1093,7 @@
defer cancel()
//create group
group := newGroup(2, []uint32{1})
- err := flowMgr.AddGroup(ctx, group)
+ err := flowMgr[0].grpMgr.AddGroup(ctx, group)
if err != nil {
t.Error("group-add failed", err)
return
@@ -1128,7 +1113,7 @@
}
ofpStats, _ := fu.MkFlowStat(multicastFlowArgs)
fmt.Println(ofpStats.Id)
- err = flowMgr.AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
+ err = flowMgr[0].AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
if err != nil {
t.Error("Multicast flow-add failed", err)
return
@@ -1136,20 +1121,20 @@
//add bucket to the group
group = newGroup(2, []uint32{1, 2})
- err = flowMgr.ModifyGroup(ctx, group)
+ err = flowMgr[0].grpMgr.ModifyGroup(ctx, group)
if err != nil {
t.Error("modify-group failed", err)
return
}
//remove the multicast flow
- err = flowMgr.RemoveFlow(ctx, ofpStats)
+ err = flowMgr[0].RemoveFlow(ctx, ofpStats)
if err != nil {
t.Error("Multicast flow-remove failed", err)
return
}
//remove the group
- err = flowMgr.DeleteGroup(ctx, group)
+ err = flowMgr[0].grpMgr.DeleteGroup(ctx, group)
if err != nil {
t.Error("delete-group failed", err)
return
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
new file mode 100644
index 0000000..d14d24b
--- /dev/null
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -0,0 +1,350 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package core provides the utility for olt devices, flows, groups and statistics
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+ rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+//QueueInfoBrief has information about gemPortID and service priority associated with Mcast group
+type QueueInfoBrief struct {
+ gemPortID uint32
+ servicePriority uint32
+}
+
+//OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj
+type OpenOltGroupMgr struct {
+ deviceHandler *DeviceHandler
+ resourceMgr *rsrcMgr.OpenOltResourceMgr
+ interfaceToMcastQueueMap map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+ interfaceToMcastQueueMapLock sync.RWMutex
+}
+
+//////////////////////////////////////////////
+// EXPORTED FUNCTIONS //
+//////////////////////////////////////////////
+
+//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
+func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
+ logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
+ var grpMgr OpenOltGroupMgr
+ grpMgr.deviceHandler = dh
+ grpMgr.resourceMgr = rMgr
+ grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief)
+ grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{}
+ logger.Info(ctx, "initialization-of-group-manager-success")
+ return &grpMgr
+}
+
+// AddGroup add or update the group
+func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ logger.Infow(ctx, "add-group", log.Fields{"group": group})
+ if group == nil {
+ return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+ }
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ Command: openoltpb2.Group_SET_MEMBERS,
+ Action: g.buildGroupAction(),
+ }
+ logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
+ _, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
+ if err != nil {
+ return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
+ }
+ // group members not created yet. So let's store the group
+ if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
+ return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+ }
+ logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
+ return nil
+}
+
+// DeleteGroup deletes a group from the device
+func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
+ if group == nil {
+ logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
+ return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+ }
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ }
+ logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
+ _, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
+ if err != nil {
+ logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
+ return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
+ }
+ //remove group from the store
+ if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
+ return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+ }
+ logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
+ return nil
+}
+
+// ModifyGroup updates the group
+func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ logger.Infow(ctx, "modify-group", log.Fields{"group": group})
+ if group == nil || group.Desc == nil {
+ return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+ }
+ newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
+ //get existing members of the group
+ val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
+ if err != nil {
+ return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
+ }
+ var current *openoltpb2.Group // represents the group on the device
+ if groupExists {
+ // group already exists
+ current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
+ logger.Debugw(ctx, "modify-group--group exists",
+ log.Fields{
+ "group on the device": val,
+ "new": group})
+ } else {
+ current = g.buildGroup(ctx, group.Desc.GroupId, nil)
+ }
+ logger.Debugw(ctx, "modify-group--comparing-current-and-new",
+ log.Fields{
+ "group on the device": current,
+ "new": newGroup})
+ // get members to be added
+ membersToBeAdded := g.findDiff(current, newGroup)
+ // get members to be removed
+ membersToBeRemoved := g.findDiff(newGroup, current)
+ logger.Infow(ctx, "modify-group--differences found", log.Fields{
+ "membersToBeAdded": membersToBeAdded,
+ "membersToBeRemoved": membersToBeRemoved,
+ "groupId": group.Desc.GroupId})
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ }
+ var errAdd, errRemoved error
+ if len(membersToBeAdded) > 0 {
+ groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
+ groupToOlt.Members = membersToBeAdded
+ //execute addMembers
+ errAdd = g.callGroupAddRemove(ctx, &groupToOlt)
+ }
+ if len(membersToBeRemoved) > 0 {
+ groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
+ groupToOlt.Members = membersToBeRemoved
+ //execute removeMembers
+ errRemoved = g.callGroupAddRemove(ctx, &groupToOlt)
+ }
+ //save the modified group
+ if errAdd == nil && errRemoved == nil {
+ if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
+ return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+ }
+ logger.Infow(ctx, "modify-group-was-success--storing-group",
+ log.Fields{
+ "group": group,
+ "existingGroup": current})
+ } else {
+ logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
+ log.Fields{"group": group})
+ if errAdd != nil {
+ return errAdd
+ }
+ return errRemoved
+ }
+ return nil
+}
+
+//LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
+//and put them into interfaceToMcastQueueMap.
+func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) {
+ storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+ if err != nil {
+ logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+ return
+ }
+ for intf, queueInfo := range storedMulticastQueueMap {
+ q := QueueInfoBrief{
+ gemPortID: queueInfo[0],
+ servicePriority: queueInfo[1],
+ }
+ g.interfaceToMcastQueueMap[intf] = &q
+ }
+}
+
+//GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface
+func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) {
+ g.interfaceToMcastQueueMapLock.RLock()
+ defer g.interfaceToMcastQueueMapLock.RUnlock()
+ val, present := g.interfaceToMcastQueueMap[intfID]
+ return val, present
+}
+
+//UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface
+func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) {
+ g.interfaceToMcastQueueMapLock.Lock()
+ defer g.interfaceToMcastQueueMapLock.Unlock()
+ g.interfaceToMcastQueueMap[intfID] = val
+}
+
+////////////////////////////////////////////////
+// INTERNAL or UNEXPORTED FUNCTIONS //
+////////////////////////////////////////////////
+//getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
+//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
+//Returns (nil, false, nil) if the group does not exists in the KV store.
+func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+ exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
+ if err != nil {
+ return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+ }
+ if exists {
+ return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
+ }
+ return nil, exists, nil
+}
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+ groupDesc := ofp.OfpGroupDesc{
+ Type: ofp.OfpGroupType_OFPGT_ALL,
+ GroupId: groupID,
+ }
+ groupEntry := ofp.OfpGroupEntry{
+ Desc: &groupDesc,
+ }
+ for i := 0; i < len(outPorts); i++ {
+ var acts []*ofp.OfpAction
+ acts = append(acts, flows.Output(outPorts[i]))
+ bucket := ofp.OfpBucket{
+ Actions: acts,
+ }
+ groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
+ }
+ return &groupEntry
+}
+
+//buildGroupAction creates and returns a group action
+func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action {
+ var actionCmd openoltpb2.ActionCmd
+ var action openoltpb2.Action
+ action.Cmd = &actionCmd
+ //pop outer vlan
+ action.Cmd.RemoveOuterTag = true
+ return &action
+}
+
+//callGroupAddRemove performs add/remove buckets operation for the indicated group
+func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
+ if err := g.performGroupOperation(ctx, group); err != nil {
+ st, _ := status.FromError(err)
+ //ignore already exists error code
+ if st.Code() != codes.AlreadyExists {
+ return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
+ }
+ }
+ return nil
+}
+
+//findDiff compares group members and finds members which only exists in groups2
+func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
+ var members []*openoltpb2.GroupMember
+ for _, bucket := range group2.Members {
+ if !g.contains(group1.Members, bucket) {
+ // bucket does not exist and must be added
+ members = append(members, bucket)
+ }
+ }
+ return members
+}
+
+//contains returns true if the members list contains the given member; false otherwise
+func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
+ for _, groupMember := range members {
+ if groupMember.InterfaceId == member.InterfaceId {
+ return true
+ }
+ }
+ return false
+}
+
+//performGroupOperation call performGroupOperation operation of openolt proto
+func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
+ logger.Debugw(ctx, "sending-group-to-device",
+ log.Fields{
+ "groupToOlt": group,
+ "command": group.Command})
+ _, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
+ if err != nil {
+ return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
+ }
+ return nil
+}
+
+//buildGroup build openoltpb2.Group from given group id and bucket list
+func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
+ group := openoltpb2.Group{
+ GroupId: groupID}
+ // create members of the group
+ for _, ofBucket := range buckets {
+ member := g.buildMember(ctx, ofBucket)
+ if member != nil && !g.contains(group.Members, member) {
+ group.Members = append(group.Members, member)
+ }
+ }
+ return &group
+}
+
+//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
+func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
+ var outPort uint32
+ outPortFound := false
+ for _, ofAction := range ofBucket.Actions {
+ if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+ outPort = ofAction.GetOutput().Port
+ outPortFound = true
+ }
+ }
+ if !outPortFound {
+ logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
+ return nil
+ }
+ interfaceID := IntfIDFromUniPortNum(outPort)
+ logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
+ log.Fields{
+ "portNumber:": outPort,
+ "interfaceId:": interfaceID})
+ g.interfaceToMcastQueueMapLock.RLock()
+ defer g.interfaceToMcastQueueMapLock.RUnlock()
+ if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok {
+ member := openoltpb2.GroupMember{
+ InterfaceId: interfaceID,
+ InterfaceType: openoltpb2.GroupMember_PON,
+ GemPortId: groupInfo.gemPortID,
+ Priority: groupInfo.servicePriority,
+ }
+ //add member to the group
+ return &member
+ }
+ logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
+ return nil
+}