VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
had a different meaning and the name seems to have been blindly ported to 2.x adapter
and does not make sense anymore
Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0dfb781..f342bcf 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -57,21 +57,6 @@
InvalidPort = 0xffffffff
)
-// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
-type pendingFlowRemoveDataKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
-}
-
-// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
-// This holds the number of pending flow removes and also a signal channel to
-// to indicate the receiver when all flow removes are handled
-type pendingFlowRemoveData struct {
- pendingFlowRemoveCount uint32
- allFlowsRemoved chan struct{}
-}
-
//DeviceHandler will interact with the OLT device.
type DeviceHandler struct {
device *voltha.Device
@@ -84,7 +69,8 @@
Client oop.OpenoltClient
transitionMap *TransitionMap
clientCon *grpc.ClientConn
- flowMgr *OpenOltFlowMgr
+ flowMgr []*OpenOltFlowMgr
+ groupMgr *OpenOltGroupMgr
eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
@@ -98,12 +84,7 @@
stopIndications chan bool
isReadIndicationRoutineActive bool
- // pendingFlowRemoveDataPerSubscriber map is used to maintain the context on a per
- // subscriber basis for the number of pending flow removes. This data is used
- // to process all the flow removes for a subscriber before handling flow adds.
- // Interleaving flow delete and flow add processing has known to cause PON resource
- // management contentions on a per subscriber bases, so we need ensure ordering.
- pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
+ totalPonPorts uint32
}
//OnuDevice represents ONU related info
@@ -158,7 +139,6 @@
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
- dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
//TODO initialize the support classes.
return &dh
@@ -590,7 +570,7 @@
defer span.Finish()
portStats := indication.GetPortStats()
- go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.resourceMgr.DevInfo.GetPonPorts())
+ go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.totalPonPorts)
case *oop.Indication_FlowStats:
span, ctx := log.CreateChildSpan(ctx, "flow-stats-indication", log.Fields{"device-id": dh.device.Id})
defer span.Finish()
@@ -777,16 +757,23 @@
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
+ dh.totalPonPorts = deviceInfo.GetPonPorts()
+
// Instantiate resource manager
if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo); dh.resourceMgr == nil {
return olterrors.ErrResourceManagerInstantiating
}
- // Instantiate flow manager
- if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
+ dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+ for i := range dh.flowMgr {
+ // Instantiate flow manager
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+ return olterrors.ErrResourceManagerInstantiating
+ }
}
+
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -1060,7 +1047,7 @@
func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
logger.Debugw(ctx, "activate-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "serialNum": serialNum, "serialNumber": serialNumber, "device-id": dh.device.Id, "OmccEncryption": dh.openOLT.config.OmccEncryption})
- if err := dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
+ if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
}
// TODO: need resource manager
@@ -1153,9 +1140,7 @@
logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
// we need to create a new ChildDevice
ponintfid := onuDiscInd.GetIntfId()
- dh.lockDevice.Lock()
onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
- dh.lockDevice.Unlock()
logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
@@ -1437,26 +1422,26 @@
if flows != nil {
for _, flow := range flows.ToRemove.Items {
- dh.incrementActiveFlowRemoveCount(ctx, flow)
+ ponIf := dh.getPonIfFromFlow(ctx, flow)
logger.Debugw(ctx, "removing-flow",
log.Fields{"device-id": device.Id,
+ "ponIf": ponIf,
"flowToRemove": flow})
- err := dh.flowMgr.RemoveFlow(ctx, flow)
+ err := dh.flowMgr[ponIf].RemoveFlow(ctx, flow)
if err != nil {
errorsList = append(errorsList, err)
}
-
- dh.decrementActiveFlowRemoveCount(ctx, flow)
}
for _, flow := range flows.ToAdd.Items {
+ ponIf := dh.getPonIfFromFlow(ctx, flow)
logger.Debugw(ctx, "adding-flow",
log.Fields{"device-id": device.Id,
+ "ponIf": ponIf,
"flowToAdd": flow})
// If there are active Flow Remove in progress for a given subscriber, wait until it completes
- dh.waitForFlowRemoveToFinish(ctx, flow)
- err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+ err := dh.flowMgr[ponIf].AddFlow(ctx, flow, flowMetadata)
if err != nil {
errorsList = append(errorsList, err)
}
@@ -1466,19 +1451,19 @@
// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
- err := dh.flowMgr.AddGroup(ctx, group)
+ err := dh.groupMgr.AddGroup(ctx, group)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, group := range groups.ToUpdate.Items {
- err := dh.flowMgr.ModifyGroup(ctx, group)
+ err := dh.groupMgr.ModifyGroup(ctx, group)
if err != nil {
errorsList = append(errorsList, err)
}
}
for _, group := range groups.ToRemove.Items {
- err := dh.flowMgr.DeleteGroup(ctx, group)
+ err := dh.groupMgr.DeleteGroup(ctx, group)
if err != nil {
errorsList = append(errorsList, err)
}
@@ -1596,7 +1581,7 @@
uniID = UniIDFromPortNum(uint32(port))
logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
/* Delete tech-profile instance from the KV store */
- if err = dh.flowMgr.DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+ if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
@@ -1687,9 +1672,8 @@
func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
if dh.resourceMgr != nil {
- noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
var ponPort uint32
- for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
+ for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
var onuGemData []rsrcMgr.OnuGemInfo
err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
if err != nil {
@@ -1762,7 +1746,7 @@
"packet": hex.EncodeToString(packetIn.Pkt),
})
}
- logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
+ logicalPortNum, err := dh.flowMgr[packetIn.IntfId].GetLogicalPortFromPacketIn(ctx, packetIn)
if err != nil {
return olterrors.NewErrNotFound("logical-port", log.Fields{"packet": hex.EncodeToString(packetIn.Pkt)}, err)
}
@@ -1834,7 +1818,7 @@
onuID := OnuIDFromPortNum(uint32(egressPortNo))
uniID := UniIDFromPortNum(uint32(egressPortNo))
- gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
+ gemPortID, err := dh.flowMgr[intfID].GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
if err != nil {
// In this case the openolt agent will receive the gemPortID as 0.
// The agent tries to retrieve the gemPortID in this case.
@@ -2098,21 +2082,12 @@
}
for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
- var flowRemoveData pendingFlowRemoveData
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uint32(uniID)}
- dh.lockDevice.RLock()
- if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- dh.lockDevice.RUnlock()
- continue
- }
- dh.lockDevice.RUnlock()
-
logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
+ dh.flowMgr[intfID].WaitForFlowRemoveToFinishForSubscriber(ctx, intfID, onuID, uint32(uniID))
logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
+ // TODO: Would be good to delete the subscriber entry from flowMgr.pendingFlowRemoveDataPerSubscriber map
}
onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
@@ -2189,88 +2164,6 @@
return InvalidPort
}
-func (dh *DeviceHandler) incrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- dh.lockDevice.Lock()
- defer dh.lockDevice.Unlock()
- flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
- if !ok {
- flowRemoveData = pendingFlowRemoveData{
- pendingFlowRemoveCount: 0,
- allFlowsRemoved: make(chan struct{}),
- }
- }
- flowRemoveData.pendingFlowRemoveCount++
- dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
-
- logger.Debugw(ctx, "current-flow-remove-count–increment",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- }
-}
-
-func (dh *DeviceHandler) decrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- dh.lockDevice.Lock()
- defer dh.lockDevice.Unlock()
- if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- } else {
- if val.pendingFlowRemoveCount > 0 {
- val.pendingFlowRemoveCount--
- }
- logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
- log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
- "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
- // If all flow removes have finished, then close the channel to signal the receiver
- // to go ahead with flow adds.
- if val.pendingFlowRemoveCount == 0 {
- close(val.allFlowsRemoved)
- delete(dh.pendingFlowRemoveDataPerSubscriber, key)
- return
- }
- dh.pendingFlowRemoveDataPerSubscriber[key] = val
- }
- }
-}
-
-func (dh *DeviceHandler) waitForFlowRemoveToFinish(ctx context.Context, flow *of.OfpFlowStats) {
- var flowRemoveData pendingFlowRemoveData
- var ok bool
- inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
- if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
- key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
- logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
-
- dh.lockDevice.RLock()
- if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
- logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- dh.lockDevice.RUnlock()
- return
- }
- dh.lockDevice.RUnlock()
-
- // Wait for all flow removes to finish first
- <-flowRemoveData.allFlowsRemoved
-
- logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- }
-}
-
func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
inPort := getInPortFromFlow(flow)
outPort := getOutPortFromFlow(flow)
@@ -2350,3 +2243,14 @@
logger.Infow(ctx, "get-ext-value", log.Fields{"resp": resp, "device-id": dh.device, "onu-id": device.Id, "pon-intf": device.ParentPortNo})
return resp, nil
}
+
+func (dh *DeviceHandler) getPonIfFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+ // Default to PON0
+ var intfID uint32
+ inPort, outPort := getPorts(flow)
+ logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
+ if inPort != InvalidPort && outPort != InvalidPort {
+ _, intfID, _, _ = ExtractAccessFromFlow(inPort, outPort)
+ }
+ return intfID
+}