VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
- Pass information to agent to do the flow replication
- Consolidate various locks in the adapter and remove reduntant locks
- use voltha-proto version 4.0.2 and voltha-lib-go version 4.0.0
- Bump adapter version to 3.0.0
Change-Id: Ic053c54e5319bb1736ec74facfc79dd10058ecf5
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b82d7f6..b936a90 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -19,26 +19,22 @@
import (
"context"
- "crypto/md5"
"encoding/hex"
- "encoding/json"
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v3/pkg/flows"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v4/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
- "github.com/opencord/voltha-protos/v3/go/common"
- ic "github.com/opencord/voltha-protos/v3/go/inter_container"
- ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
- openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "math/big"
+ "github.com/opencord/voltha-protos/v4/go/common"
+ ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+ openoltpb2 "github.com/opencord/voltha-protos/v4/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
"strings"
"sync"
- //deepcopy "github.com/getlantern/deepcopy"
"github.com/EagleChen/mapmutex"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"google.golang.org/grpc/codes"
@@ -46,28 +42,11 @@
)
const (
- // Flow categories
-
- //HsiaFlow flow category
- HsiaFlow = "HSIA_FLOW"
-
- //EapolFlow flow category
- EapolFlow = "EAPOL_FLOW"
-
- //DhcpFlow flow category
- DhcpFlow = "DHCP_FLOW"
-
- //MulticastFlow flow category
- MulticastFlow = "MULTICAST_FLOW"
-
- //IgmpFlow flow category
- IgmpFlow = "IGMP_FLOW"
-
//IPProtoDhcp flow category
IPProtoDhcp = 17
- //IPProtoIgmp flow category
- IPProtoIgmp = 2
+ //IgmpProto proto value
+ IgmpProto = 2
//EapEthType eapethtype value
EapEthType = 0x888e
@@ -76,9 +55,6 @@
//IPv4EthType IPv4 ethernet type value
IPv4EthType = 0x800
- //IgmpProto proto value
- IgmpProto = 2
-
//ReservedVlan Transparent Vlan (Masked Vlan, VLAN_ANY in ONOS Flows)
ReservedVlan = 4096
@@ -164,13 +140,6 @@
NoneOnuID = -1
//NoneUniID constant
NoneUniID = -1
- //NoneGemPortID constant
- NoneGemPortID = -1
-
- // BinaryStringPrefix is binary string prefix
- BinaryStringPrefix = "0b"
- // BinaryBit1 is binary bit 1 expressed as a character
- BinaryBit1 = '1'
// MapMutex
maxRetry = 300
@@ -178,12 +147,10 @@
baseDelay = 10000000
factor = 1.1
jitter = 0.2
-)
-type gemPortKey struct {
- intfID uint32
- gemPort uint32
-}
+ bitMapPrefix = "0b"
+ pbit1 = '1'
+)
type tpLockKey struct {
intfID uint32
@@ -210,6 +177,15 @@
uniID uint32
}
+// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
+type subscriberDataPathFlowIDKey struct {
+ intfID uint32
+ onuID uint32
+ uniID uint32
+ direction string
+ tpID uint32
+}
+
// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
// This holds the number of pending flow removes and also a signal channel to
// to indicate the receiver when all flow removes are handled
@@ -220,18 +196,25 @@
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
- techprofile map[uint32]tp.TechProfileIf
- deviceHandler *DeviceHandler
- grpMgr *OpenOltGroupMgr
- resourceMgr *rsrcMgr.OpenOltResourceMgr
- onuIdsLock sync.RWMutex
- perGemPortLock *mapmutex.Mutex // lock to be used to access the flowsUsedByGemPort map
- flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
- packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
+ ponPortIdx uint32 // Pon Port this FlowManager is responsible for
+ techprofile map[uint32]tp.TechProfileIf
+ deviceHandler *DeviceHandler
+ grpMgr *OpenOltGroupMgr
+ resourceMgr *rsrcMgr.OpenOltResourceMgr
+
+ onuIdsLock sync.RWMutex // TODO: Do we need this?
+
+ flowsUsedByGemPort map[uint32][]uint64 // gem port id to flow ids
+ flowsUsedByGemPortKey sync.RWMutex // lock to be used to access the flowsUsedByGemPort map
+
+ packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
+ packetInGemPortLock sync.RWMutex
+
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
- onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
+ onuGemInfo []rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
// We need to have a global lock on the onuGemInfo map
onuGemInfoLock sync.RWMutex
+
// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
perUserFlowHandleLock *mapmutex.Mutex
@@ -242,10 +225,15 @@
// management contentions on a per subscriber bases, so we need ensure ordering.
pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
pendingFlowRemoveDataPerSubscriberLock sync.RWMutex
+
+ // Map of voltha flowID associated with subscriberDataPathFlowIDKey
+ // This information is not persisted on Kv store and hence should be reconciled on adapter restart
+ subscriberDataPathFlowIDMap map[subscriberDataPathFlowIDKey]uint64
+ subscriberDataPathFlowIDMapLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr, ponPortIdx uint32) *OpenOltFlowMgr {
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
var flowMgr OpenOltFlowMgr
var err error
@@ -261,58 +249,54 @@
}
flowMgr.onuIdsLock = sync.RWMutex{}
flowMgr.pendingFlowRemoveDataPerSubscriberLock = sync.RWMutex{}
- flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
+ flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
- ponPorts := rMgr.DevInfo.GetPonPorts()
- flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
+ flowMgr.packetInGemPortLock = sync.RWMutex{}
flowMgr.onuGemInfoLock = sync.RWMutex{}
flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
- flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
flowMgr.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
+ flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
+ flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
//Load the onugem info cache from kv store on flowmanager start
- for idx = 0; idx < ponPorts; idx++ {
- if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
- logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
- }
- //Load flowID list per gem map per interface from the kvstore.
- flowMgr.loadFlowIDlistForGem(ctx, idx)
+ if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
+ logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
}
+ //Load flowID list per gem map per interface from the kvstore.
+ flowMgr.loadFlowIDlistForGem(ctx, idx)
//load interface to multicast queue map from kv store
flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
+ flowMgr.reconcileSubscriberDataPathFlowIDMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
- gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
- if f.perGemPortLock.TryLock(gemPK) {
- logger.Debugw(ctx, "registering-flow-for-device ",
- log.Fields{
- "flow": flowFromCore,
- "device-id": f.deviceHandler.device.Id})
- flowIDList, ok := f.flowsUsedByGemPort[gemPK]
- if !ok {
- flowIDList = []uint32{deviceFlow.FlowId}
+ if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
+ // Flow is not replicated in this case, we need to register the flow for a single gem-port
+ return f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+ } else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
+ // Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
+ for _, gemPort := range deviceFlow.PbitToGemport {
+ if err := f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+ return err
+ }
}
- flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
- f.flowsUsedByGemPort[gemPK] = flowIDList
-
- f.perGemPortLock.Unlock(gemPK)
-
- // update the flowids for a gem to the KVstore
- return f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
}
- logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
- log.Fields{
- "flow-from-core": flowFromCore,
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- })
- return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
- "flow-from-core": flowFromCore,
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- }, nil)
+ return nil
+}
+
+func (f *OpenOltFlowMgr) registerFlowIDForGem(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
+ f.flowsUsedByGemPortKey.Lock()
+ flowIDList, ok := f.flowsUsedByGemPort[gemPortID]
+ if !ok {
+ flowIDList = []uint64{flowFromCore.Id}
+ }
+ flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
+ f.flowsUsedByGemPort[gemPortID] = flowIDList
+ f.flowsUsedByGemPortKey.Unlock()
+
+ // update the flowids for a gem to the KVstore
+ return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
}
func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -511,7 +495,7 @@
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
- if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficShaping, TrafficSched); err != nil {
+ if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficSched); err != nil {
return olterrors.NewErrAdapter("failure-pushing-traffic-scheduler-and-queues-to-device",
log.Fields{"intf-id": sq.intfID,
"direction": sq.direction,
@@ -534,7 +518,7 @@
return nil
}
-func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficShaping *tp_pb.TrafficShapingInfo, TrafficSched []*tp_pb.TrafficScheduler) error {
+func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
if err != nil {
@@ -800,14 +784,14 @@
for _, gem := range tpInst.UpstreamGemPortAttributeList {
gemPortIDs = append(gemPortIDs, gem.GemportID)
}
- allocIDs = appendUnique(allocIDs, allocID)
+ allocIDs = appendUnique32bit(allocIDs, allocID)
if tpInstanceExists {
return allocID, gemPortIDs, techProfileInstance
}
for _, gemPortID := range gemPortIDs {
- allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
+ allgemPortIDs = appendUnique32bit(allgemPortIDs, gemPortID)
}
logger.Infow(ctx, "allocated-tcont-and-gem-ports",
log.Fields{
@@ -824,14 +808,14 @@
for _, gem := range tpInst.UpstreamQueueAttributeList {
gemPortIDs = append(gemPortIDs, gem.GemportID)
}
- allocIDs = appendUnique(allocIDs, allocID)
+ allocIDs = appendUnique32bit(allocIDs, allocID)
if tpInstanceExists {
return allocID, gemPortIDs, techProfileInstance
}
for _, gemPortID := range gemPortIDs {
- allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
+ allgemPortIDs = appendUnique32bit(allgemPortIDs, gemPortID)
}
logger.Infow(ctx, "allocated-tcont-and-gem-ports",
log.Fields{
@@ -879,7 +863,7 @@
var tpCount int
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
for _, intfID := range techRange.IntfIds {
- f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[uint32(intfID)].TechProfileMgr
+ f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[intfID].TechProfileMgr
tpCount++
logger.Debugw(ctx, "init-tech-profile-done",
log.Fields{
@@ -903,24 +887,24 @@
return nil
}
-func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
+func (f *OpenOltFlowMgr) addUpstreamDataPathFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, uplinkClassifier map[string]interface{},
uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemportID uint32, tpID uint32) error {
+ allocID uint32, gemportID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
uplinkClassifier[PacketTagType] = SingleTag
logger.Debugw(ctx, "adding-upstream-data-flow",
log.Fields{
"uplinkClassifier": uplinkClassifier,
"uplinkAction": uplinkAction})
- return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
- Upstream, logicalFlow, allocID, gemportID, tpID)
+ return f.addSymmetricDataPathFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
+ Upstream, logicalFlow, allocID, gemportID, tpID, pbitToGem)
/* TODO: Install Secondary EAP on the subscriber vlan */
}
-func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
+func (f *OpenOltFlowMgr) addDownstreamDataPathFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, downlinkClassifier map[string]interface{},
downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemportID uint32, tpID uint32) error {
+ allocID uint32, gemportID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
downlinkClassifier[PacketTagType] = DoubleTag
logger.Debugw(ctx, "adding-downstream-data-flow",
log.Fields{
@@ -956,20 +940,21 @@
"device-id": f.deviceHandler.device.Id}, nil).Log()
}
- return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
- Downstream, logicalFlow, allocID, gemportID, tpID)
+ return f.addSymmetricDataPathFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
+ Downstream, logicalFlow, allocID, gemportID, tpID, pbitToGem)
}
-func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addSymmetricDataPathFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemPortID uint32, tpID uint32) error {
- /* One of the OLT platform (Broadcom BAL) requires that symmetric
- flows require the same flow_id to be used across UL and DL.
- Since HSIA flow is the only symmetric flow currently, we need to
- re-use the flow_id across both direction. The 'flow_category'
- takes priority over flow_cookie to find any available HSIA_FLOW
- id for the ONU.
- */
+ allocID uint32, gemPortID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
+
+ var inverseDirection string
+ if direction == Upstream {
+ inverseDirection = Downstream
+ } else {
+ inverseDirection = Upstream
+ }
+
logger.Infow(ctx, "adding-hsia-flow",
log.Fields{
"intf-id": intfID,
@@ -982,35 +967,8 @@
"alloc-id": allocID,
"gemport-id": gemPortID,
"logicalflow": *logicalFlow})
- var vlanPbit uint32 = 0xff // means no pbit
- var vlanVid uint32
- if _, ok := classifier[VlanPcp]; ok {
- vlanPbit = classifier[VlanPcp].(uint32)
- logger.Debugw(ctx, "found-pbit-in-flow",
- log.Fields{
- "vlan-pbit": vlanPbit,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- } else {
- logger.Debugw(ctx, "pbit-not-found-in-flow",
- log.Fields{
- "vlan-pcp": VlanPcp,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- }
- if _, ok := classifier[VlanVid]; ok {
- vlanVid = classifier[VlanVid].(uint32)
- logger.Debugw(ctx, "found-vlan-in-the-flow",
- log.Fields{
- "vlan-vid": vlanVid,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- }
- flowStoreCookie := getFlowStoreCookie(ctx, classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
logger.Infow(ctx, "flow-already-exists",
log.Fields{
"device-id": f.deviceHandler.device.Id,
@@ -1018,16 +976,6 @@
"onu-id": onuID})
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanVid, vlanPbit)
- if err != nil {
- return olterrors.NewErrNotFound("hsia-flow-id",
- log.Fields{
- "direction": direction,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID,
- }, err).Log()
- }
classifierProto, err := makeOpenOltClassifierField(classifier)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
@@ -1053,23 +1001,36 @@
"device-id": f.deviceHandler.device.Id,
}, err).Log()
}
+
+ // Get symmetric flowID if it exists
+ // This symmetric flowID will be needed by agent software to use the same device flow-id that was used for the
+ // symmetric flow earlier
+ // symmetric flowID 0 is considered by agent as non-existent symmetric flow
+ keySymm := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: inverseDirection, tpID: tpID}
+ f.subscriberDataPathFlowIDMapLock.RLock()
+ symmFlowID := f.subscriberDataPathFlowIDMap[keySymm]
+ f.subscriberDataPathFlowIDMapLock.RUnlock()
+
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
- OnuId: int32(onuID),
- UniId: int32(uniID),
- FlowId: flowID,
- FlowType: direction,
- AllocId: int32(allocID),
- NetworkIntfId: int32(networkIntfID),
- GemportId: int32(gemPortID),
- Classifier: classifierProto,
- Action: actionProto,
- Priority: int32(logicalFlow.Priority),
- Cookie: logicalFlow.Cookie,
- PortNo: portNo,
- TechProfileId: tpID,
+ OnuId: int32(onuID),
+ UniId: int32(uniID),
+ FlowId: logicalFlow.Id,
+ FlowType: direction,
+ AllocId: int32(allocID),
+ NetworkIntfId: int32(networkIntfID),
+ GemportId: int32(gemPortID),
+ Classifier: classifierProto,
+ Action: actionProto,
+ Priority: int32(logicalFlow.Priority),
+ Cookie: logicalFlow.Cookie,
+ PortNo: portNo,
+ TechProfileId: tpID,
+ ReplicateFlow: len(pbitToGem) > 0,
+ PbitToGemport: pbitToGem,
+ SymmetricFlowId: symmFlowID,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID, nil, err).Log()
+ return olterrors.NewErrFlowOp("add", logicalFlow.Id, nil, err).Log()
}
logger.Infow(ctx, "hsia-flow-added-to-device-successfully",
log.Fields{"direction": direction,
@@ -1077,24 +1038,28 @@
"flow": flow,
"intf-id": intfID,
"onu-id": onuID})
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
- flow.OnuId,
- flow.UniId,
- flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flowID,
+ flowInfo := rsrcMgr.FlowInfo{Flow: &flow, IsSymmtricFlow: true}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
+ return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id,
log.Fields{
"flow": flow,
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID}, err).Log()
}
+
+ // Update the current flowID to the map
+ keyCurr := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: direction, tpID: tpID}
+ f.subscriberDataPathFlowIDMapLock.Lock()
+ f.subscriberDataPathFlowIDMap[keyCurr] = logicalFlow.Id
+ f.subscriberDataPathFlowIDMapLock.Unlock()
+
return nil
}
func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32,
- gemPortID uint32, tpID uint32) error {
+ gemPortID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
networkIntfID, err := getNniIntfID(ctx, classifier, action)
if err != nil {
@@ -1115,8 +1080,7 @@
classifier[UDPDst] = uint32(67)
classifier[PacketTagType] = SingleTag
- flowStoreCookie := getFlowStoreCookie(ctx, classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
logger.Infow(ctx, "flow-exists--not-re-adding",
log.Fields{
"device-id": f.deviceHandler.device.Id,
@@ -1125,23 +1089,11 @@
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
-
- if err != nil {
- return olterrors.NewErrNotFound("flow",
- log.Fields{
- "interface-id": intfID,
- "gem-port": gemPortID,
- "cookie": flowStoreCookie,
- "device-id": f.deviceHandler.device.Id},
- err).Log()
- }
-
logger.Debugw(ctx, "creating-ul-dhcp-flow",
log.Fields{
"ul_classifier": classifier,
"ul_action": action,
- "uplinkFlowId": flowID,
+ "uplinkFlowId": logicalFlow.Id,
"intf-id": intfID,
"onu-id": onuID,
"device-id": f.deviceHandler.device.Id})
@@ -1159,7 +1111,7 @@
dhcpFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
- FlowId: flowID,
+ FlowId: logicalFlow.Id,
FlowType: Upstream,
AllocId: int32(allocID),
NetworkIntfId: int32(networkIntfID),
@@ -1170,21 +1122,20 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo,
TechProfileId: tpID,
+ ReplicateFlow: len(pbitToGem) > 0,
+ PbitToGemport: pbitToGem,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID, log.Fields{"dhcp-flow": dhcpFlow}, err).Log()
+ return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"dhcp-flow": dhcpFlow}, err).Log()
}
logger.Infow(ctx, "dhcp-ul-flow-added-to-device-successfully",
log.Fields{
"device-id": f.deviceHandler.device.Id,
- "flow-id": flowID,
+ "flow-id": logicalFlow.Id,
"intf-id": intfID,
"onu-id": onuID})
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
- dhcpFlow.OnuId,
- dhcpFlow.UniId,
- dhcpFlow.FlowId, flowsToKVStore); err != nil {
+ flowInfo := rsrcMgr.FlowInfo{Flow: &dhcpFlow}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(dhcpFlow.AccessIntfId), dhcpFlow.OnuId, dhcpFlow.UniId, dhcpFlow.FlowId, flowInfo); err != nil {
return olterrors.NewErrPersistence("update", "flow", dhcpFlow.FlowId,
log.Fields{
"flow": dhcpFlow,
@@ -1196,13 +1147,13 @@
//addIGMPTrapFlow creates IGMP trap-to-host flow
func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
- action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, tpID uint32) error {
- return f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow, tpID)
+ action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
+ return f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, tpID, pbitToGem)
}
//addUpstreamTrapFlow creates a trap-to-host flow
func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
- action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string, tpID uint32) error {
+ action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
networkIntfID, err := getNniIntfID(ctx, classifier, action)
if err != nil {
@@ -1223,32 +1174,16 @@
classifier[PacketTagType] = SingleTag
delete(classifier, VlanVid)
- flowStoreCookie := getFlowStoreCookie(ctx, classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
-
- if err != nil {
- return olterrors.NewErrNotFound("flow-id",
- log.Fields{
- "intf-id": intfID,
- "oni-id": onuID,
- "cookie": flowStoreCookie,
- "flow-type": flowType,
- "device-id": f.deviceHandler.device.Id,
- "onu-id": onuID},
- err).Log()
- }
-
logger.Debugw(ctx, "creating-upstream-trap-flow",
log.Fields{
"ul_classifier": classifier,
"ul_action": action,
- "uplinkFlowId": flowID,
- "flowType": flowType,
+ "uplinkFlowId": logicalFlow.Id,
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID})
@@ -1269,7 +1204,7 @@
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
- FlowId: flowID,
+ FlowId: logicalFlow.Id,
FlowType: Upstream,
AllocId: int32(allocID),
NetworkIntfId: int32(networkIntfID),
@@ -1280,18 +1215,16 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo,
TechProfileId: tpID,
+ ReplicateFlow: len(pbitToGem) > 0,
+ PbitToGemport: pbitToGem,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
+ return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
}
- logger.Infof(ctx, "%s ul-flow-added-to-device-successfully", flowType)
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
- flow.OnuId,
- flow.UniId,
- flow.FlowId, flowsToKVStore); err != nil {
+ flowInfo := rsrcMgr.FlowInfo{Flow: &flow}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
return olterrors.NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
}
@@ -1301,7 +1234,7 @@
// Add EAPOL flow to device with mac, vlanId as classifier for upstream and downstream
func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32,
- gemPortID uint32, vlanID uint32, tpID uint32) error {
+ gemPortID uint32, vlanID uint32, tpID uint32, pbitToGem map[uint32]uint32) error {
logger.Infow(ctx, "adding-eapol-to-device",
log.Fields{
"intf-id": intfID,
@@ -1322,8 +1255,7 @@
uplinkClassifier[VlanPcp] = classifier[VlanPcp]
// Fill action
uplinkAction[TrapToHost] = true
- flowStoreCookie := getFlowStoreCookie(ctx, uplinkClassifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
@@ -1331,21 +1263,11 @@
return nil
}
//Add Uplink EAPOL Flow
- uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0, 0)
- if err != nil {
- return olterrors.NewErrNotFound("flow-id",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "coookie": flowStoreCookie,
- "device-id": f.deviceHandler.device.Id},
- err).Log()
- }
logger.Debugw(ctx, "creating-ul-eapol-flow",
log.Fields{
"ul_classifier": uplinkClassifier,
"ul_action": uplinkAction,
- "uplinkFlowId": uplinkFlowID,
+ "uplinkFlowId": logicalFlow.Id,
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID})
@@ -1380,7 +1302,7 @@
upstreamFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
- FlowId: uplinkFlowID,
+ FlowId: logicalFlow.Id,
FlowType: Upstream,
AllocId: int32(allocID),
NetworkIntfId: int32(networkIntfID),
@@ -1391,9 +1313,11 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo,
TechProfileId: tpID,
+ ReplicateFlow: len(pbitToGem) > 0,
+ PbitToGemport: pbitToGem,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); err != nil {
- return olterrors.NewErrFlowOp("add", uplinkFlowID, log.Fields{"flow": upstreamFlow}, err).Log()
+ return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": upstreamFlow}, err).Log()
}
logger.Infow(ctx, "eapol-ul-flow-added-to-device-successfully",
log.Fields{
@@ -1401,14 +1325,8 @@
"onu-id": onuID,
"intf-id": intfID,
})
- flowCategory := "EAPOL"
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
- upstreamFlow.OnuId,
- upstreamFlow.UniId,
- upstreamFlow.FlowId,
- /* lowCategory, */
- flowsToKVStore); err != nil {
+ flowInfo := rsrcMgr.FlowInfo{Flow: &upstreamFlow}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(upstreamFlow.AccessIntfId), upstreamFlow.OnuId, upstreamFlow.UniId, upstreamFlow.FlowId, flowInfo); err != nil {
return olterrors.NewErrPersistence("update", "flow", upstreamFlow.FlowId,
log.Fields{
"flow": upstreamFlow,
@@ -1498,7 +1416,7 @@
}
// DeleteTechProfileInstances removes the tech profile instances from persistent storage
-func (f *OpenOltFlowMgr) DeleteTechProfileInstances(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, sn string) error {
+func (f *OpenOltFlowMgr) DeleteTechProfileInstances(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) error {
tpIDList := f.resourceMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
uniPortName := getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
@@ -1528,92 +1446,6 @@
return nil
}
-func getFlowStoreCookie(ctx context.Context, classifier map[string]interface{}, gemPortID uint32) uint64 {
- if len(classifier) == 0 { // should never happen
- logger.Error(ctx, "invalid-classfier-object")
- return 0
- }
- logger.Debugw(ctx, "generating-flow-store-cookie",
- log.Fields{
- "classifier": classifier,
- "gemport-id": gemPortID})
- var jsonData []byte
- var flowString string
- var err error
- // TODO: Do we need to marshall ??
- if jsonData, err = json.Marshal(classifier); err != nil {
- logger.Error(ctx, "failed-to-encode-classifier")
- return 0
- }
- flowString = string(jsonData)
- if gemPortID != 0 {
- flowString = fmt.Sprintf("%s%s", string(jsonData), string(gemPortID))
- }
- h := md5.New()
- _, _ = h.Write([]byte(flowString))
- hash := big.NewInt(0)
- hash.SetBytes(h.Sum(nil))
- generatedHash := hash.Uint64()
- logger.Debugw(ctx, "hash-generated", log.Fields{"hash": generatedHash})
- return generatedHash
-}
-
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(ctx context.Context, flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
- var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie, LogicalFlowID: logicalFlowID}}
- var intfID uint32
- /* For flows which trap out of the NNI, the AccessIntfId is invalid
- (set to -1). In such cases, we need to refer to the NetworkIntfId .
- */
- if flow.AccessIntfId != -1 {
- intfID = uint32(flow.AccessIntfId)
- } else {
- intfID = uint32(flow.NetworkIntfId)
- }
- // Get existing flows matching flowid for given subscriber from KV store
- existingFlows := f.resourceMgr.GetFlowIDInfo(ctx, intfID, flow.OnuId, flow.UniId, flow.FlowId)
- if existingFlows != nil {
- logger.Debugw(ctx, "flow-exists-for-given-flowID--appending-it-to-current-flow",
- log.Fields{
- "flow-id": flow.FlowId,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": flow.OnuId})
- //for _, f := range *existingFlows {
- // flows = append(flows, f)
- //}
- flows = append(flows, *existingFlows...)
- }
- logger.Debugw(ctx, "updated-flows-for-given-flowID-and-onuid",
- log.Fields{
- "updatedflow": flows,
- "flow-id": flow.FlowId,
- "onu-id": flow.OnuId,
- "device-id": f.deviceHandler.device.Id})
- return &flows
-}
-
-func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(ctx context.Context, intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
- logger.Debugw(ctx, "storing-flow(s)-into-kv-store", log.Fields{
- "flow-id": flowID,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID})
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, intfID, onuID, uniID, flowID, flows); err != nil {
- logger.Warnw(ctx, "error-while-storing-flow-into-kv-store", log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "onu-id": onuID,
- "intf-id": intfID,
- "flow-id": flowID})
- return err
- }
- logger.Infow(ctx, "stored-flow(s)-into-kv-store-successfully!", log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "onu-id": onuID,
- "intf-id": intfID,
- "flow-id": flowID})
- return nil
-}
-
func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
var intfID uint32
@@ -1623,7 +1455,8 @@
if deviceFlow.AccessIntfId != -1 {
intfID = uint32(deviceFlow.AccessIntfId)
} else {
- // REVIST : Why ponport is given as network port?
+ // We need to log the valid interface ID.
+ // For trap-on-nni flows, the access_intf_id is invalid (-1), so choose the network_intf_id.
intfID = uint32(deviceFlow.NetworkIntfId)
}
@@ -1649,21 +1482,22 @@
"device-flow": deviceFlow,
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID})
- f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
return err
}
- if deviceFlow.GemportId != -1 {
- // No need to register the flow if it is a trap on nni flow.
- if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
- logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
- return err
- }
- }
logger.Infow(ctx, "flow-added-to-device-successfully ",
log.Fields{
"flow": *deviceFlow,
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID})
+
+ // Case of trap-on-nni flow when deviceFlow.AccessIntfId is invalid (-1)
+ if deviceFlow.AccessIntfId != -1 {
+ // No need to register the flow if it is a trap on nni flow.
+ if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
+ logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
+ return err
+ }
+ }
return nil
}
@@ -1723,24 +1557,11 @@
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
}
- var flowStoreCookie = getFlowStoreCookie(ctx, classifierInfo, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
logger.Infow(ctx, "flow-exists--not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
- if err != nil {
- return olterrors.NewErrNotFound("flow-id",
- log.Fields{
- "interface-id": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "gem-port-id": gemPortID,
- "cookie": flowStoreCookie,
- "device-id": f.deviceHandler.device.Id},
- err)
- }
classifierProto, err := makeOpenOltClassifierField(classifierInfo)
if err != nil {
return olterrors.NewErrInvalidValue(
@@ -1767,7 +1588,7 @@
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
OnuId: int32(onuID), // OnuId not required
UniId: int32(uniID), // UniId not used
- FlowId: flowID,
+ FlowId: flow.Id,
FlowType: Downstream,
NetworkIntfId: int32(networkInterfaceID),
GemportId: int32(gemPortID),
@@ -1777,7 +1598,7 @@
Cookie: flow.Cookie,
PortNo: portNo}
if err := f.addFlowToDevice(ctx, flow, &downstreamflow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID,
+ return olterrors.NewErrFlowOp("add", flow.Id,
log.Fields{
"flow": downstreamflow,
"device-id": f.deviceHandler.device.Id}, err)
@@ -1786,13 +1607,10 @@
log.Fields{
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
- "flow-id": flowID})
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flowID,
+ "flow-id": flow.Id})
+ flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
+ return olterrors.NewErrPersistence("update", "flow", flow.Id,
log.Fields{
"flow": downstreamflow,
"device-id": f.deviceHandler.device.Id}, err)
@@ -1955,9 +1773,9 @@
"intf-id": intfID,
"onu-id": onuID,
"device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo[intfID]})
+ "onu-gem": f.onuGemInfo})
- onugem := f.onuGemInfo[intfID]
+ onugem := f.onuGemInfo
deleteLoop:
for i, onu := range onugem {
if onu.OnuID == onuID {
@@ -1984,8 +1802,7 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
- gemPortID int32, flowID uint32, flowDirection string,
- portNum uint32, updatedFlows []rsrcMgr.FlowInfo) error {
+ gemPortID int32, flowID uint64, portNum uint32) error {
tpID, err := getTpIDFromFlow(ctx, flow)
if err != nil {
@@ -1998,174 +1815,133 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- if len(updatedFlows) >= 0 {
- // There are still flows referencing the same flow_id.
- // So the flow should not be freed yet.
- // For ex: Case of HSIA where same flow is shared
- // between DS and US.
- if err := f.updateFlowInfoToKVStore(ctx, int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows); err != nil {
- _ = olterrors.NewErrPersistence("update", "flow", flowID,
- log.Fields{
- "flow": updatedFlows,
- "device-id": f.deviceHandler.device.Id}, err).Log()
+ uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
+ tpPath := f.getTPpath(ctx, Intf, uni, tpID)
+ logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
+ log.Fields{
+ "tpPath": tpPath,
+ "device-id": f.deviceHandler.device.Id})
+ techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
+ if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
+ return olterrors.NewErrNotFound("tech-profile-in-kv-store",
+ log.Fields{
+ "tp-id": tpID,
+ "path": tpPath}, err)
+ }
+
+ used := f.isGemPortUsedByAnotherFlow(uint32(gemPortID))
+
+ if used {
+ f.flowsUsedByGemPortKey.Lock()
+ defer f.flowsUsedByGemPortKey.Unlock()
+
+ flowIDs := f.flowsUsedByGemPort[uint32(gemPortID)]
+ for i, flowIDinMap := range flowIDs {
+ if flowIDinMap == flowID {
+ flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+ // everytime flowsUsedByGemPort cache is updated the same should be updated
+ // in kv store by calling UpdateFlowIDsForGem
+ f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
+ if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+ return err
+ }
+ break
+ }
}
- if len(updatedFlows) == 0 {
- logger.Debugw(ctx, "releasing-flow-id-to-resource-manager",
+ logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
+ log.Fields{
+ "gemport-id": gemPortID,
+ "usedByFlows": flowIDs,
+ "device-id": f.deviceHandler.device.Id})
+ return nil
+ }
+ logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
+ f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
+ // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
+ f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
+ f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
+ f.onuIdsLock.Lock() // TODO: What is this lock?
+
+ //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
+ // by calling DeleteFlowIDsForGem
+ f.flowsUsedByGemPortKey.Lock()
+ delete(f.flowsUsedByGemPort, uint32(gemPortID))
+ f.flowsUsedByGemPortKey.Unlock()
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
+ f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+
+ f.onuIdsLock.Unlock()
+
+ // Delete the gem port on the ONU.
+ if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
+ logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
+ log.Fields{
+ "err": err,
+ "intf": Intf,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "gemport-id": gemPortID})
+ }
+ switch techprofileInst := techprofileInst.(type) {
+ case *tp.TechProfile:
+ ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+ if !ok {
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ logger.Warn(ctx, err)
+ }
+ if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ logger.Warn(ctx, err)
+ }
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+ logger.Warn(ctx, err)
+ }
+ if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+ logger.Warn(ctx, err)
+ }
+ f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+ // Delete the TCONT on the ONU.
+ if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
+ logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
+ log.Fields{
+ "intf": Intf,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id,
+ "alloc-id": techprofileInst.UsScheduler.AllocID})
+ }
+ }
+ case *tp.EponProfile:
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ logger.Warn(ctx, err)
+ }
+ if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ logger.Warn(ctx, err)
+ }
+ f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+ // Delete the TCONT on the ONU.
+ if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+ logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
- "Intf": Intf,
+ "intf": Intf,
"onu-id": onuID,
"uni-id": uniID,
- "flow-id": flowID,
- "device-id": f.deviceHandler.device.Id})
- f.resourceMgr.FreeFlowID(ctx, Intf, int32(onuID), int32(uniID), flowID)
-
- uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
- tpPath := f.getTPpath(ctx, Intf, uni, tpID)
- logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
- log.Fields{
- "TP-PATH": tpPath,
- "device-id": f.deviceHandler.device.Id})
- techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
- if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
- return olterrors.NewErrNotFound("tech-profile-in-kv-store",
- log.Fields{
- "tp-id": tpID,
- "path": tpPath}, err)
- }
-
- gemPK := gemPortKey{Intf, uint32(gemPortID)}
- used, err := f.isGemPortUsedByAnotherFlow(ctx, gemPK)
- if err != nil {
- return err
- }
- if used {
- if f.perGemPortLock.TryLock(gemPK) {
- flowIDs := f.flowsUsedByGemPort[gemPK]
- for i, flowIDinMap := range flowIDs {
- if flowIDinMap == flowID {
- flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- // everytime flowsUsedByGemPort cache is updated the same should be updated
- // in kv store by calling UpdateFlowIDsForGem
- f.flowsUsedByGemPort[gemPK] = flowIDs
- if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
- return err
- }
- break
- }
- }
- logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
- log.Fields{
- "gemport-id": gemPortID,
- "usedByFlows": flowIDs,
- "device-id": f.deviceHandler.device.Id})
- f.perGemPortLock.Unlock(gemPK)
- return nil
- }
-
- logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
- log.Fields{
- "gemport-id": gemPortID,
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- })
- return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
- "gemport-id": gemPortID,
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- }, nil)
- }
- logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
- // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
- // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
- f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
- f.onuIdsLock.Lock()
- //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
- // by calling DeleteFlowIDsForGem
- if f.perGemPortLock.TryLock(gemPK) {
- delete(f.flowsUsedByGemPort, gemPK)
- f.perGemPortLock.Unlock(gemPK)
- } else {
- logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
- log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- })
- }
- f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
- f.onuIdsLock.Unlock()
- // Delete the gem port on the ONU.
- if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
- logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
- log.Fields{
- "err": err,
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id,
- "gemport-id": gemPortID})
- }
- switch techprofileInst := techprofileInst.(type) {
- case *tp.TechProfile:
- ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
- if !ok {
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
- logger.Warn(ctx, err)
- }
- if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
- logger.Warn(ctx, err)
- }
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
- logger.Warn(ctx, err)
- }
- if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
- logger.Warn(ctx, err)
- }
- f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
- // Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(techprofileInst.UsScheduler.AllocID), tpPath); err != nil {
- logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id,
- "alloc-id": techprofileInst.UsScheduler.AllocID})
- }
- }
- case *tp.EponProfile:
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
- logger.Warn(ctx, err)
- }
- if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
- logger.Warn(ctx, err)
- }
- f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
- // Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(techprofileInst.AllocID), tpPath); err != nil {
- logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id,
- "alloc-id": techprofileInst.AllocID})
- }
- default:
- logger.Errorw(ctx, "error-unknown-tech",
- log.Fields{
- "techprofileInst": techprofileInst})
- }
+ "device-id": f.deviceHandler.device.Id,
+ "alloc-id": techprofileInst.AllocID})
}
+ default:
+ logger.Errorw(ctx, "error-unknown-tech",
+ log.Fields{
+ "techprofileInst": techprofileInst})
}
+
return nil
}
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
-
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
+ var flowInfo *rsrcMgr.FlowInfo
logger.Infow(ctx, "clear-flow-from-resource-manager",
log.Fields{
"flowDirection": flowDirection,
@@ -2173,8 +1949,7 @@
"device-id": f.deviceHandler.device.Id})
if flowDirection == Multicast {
- f.clearMulticastFlowFromResourceManager(ctx, flow)
- return
+ return f.clearMulticastFlowFromResourceManager(ctx, flow)
}
classifierInfo := make(map[string]interface{})
@@ -2182,7 +1957,7 @@
portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(ctx, flow, flowDirection)
if err != nil {
logger.Error(ctx, err)
- return
+ return err
}
onuID := int32(onu)
@@ -2211,61 +1986,79 @@
log.Fields{
"port-number": inPort,
"error": err})
- return
+ return err
}
}
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
- for _, flowID := range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
- if flowInfo == nil {
- logger.Debugw(ctx, "no-flowinfo-found-in-kv-store",
- log.Fields{
- "intf": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "flow-id": flowID})
- return
+ if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); flowInfo == nil {
+ logger.Errorw(ctx, "flow-info-not-found-for-flow-to-be-removed", log.Fields{"flow-id": flow.Id, "intf-id": Intf, "onu-id": onuID, "uni-id": uniID})
+ return olterrors.NewErrPersistence("remove", "flow", flow.Id, log.Fields{"flow": flow}, err)
+ }
+ removeFlowMessage := openoltpb2.Flow{FlowId: flowInfo.Flow.FlowId, FlowType: flowInfo.Flow.FlowType}
+ logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flowInfo.Flow})
+ if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
+ return err
+ }
+ if err = f.resourceMgr.RemoveFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); err != nil {
+ logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
+ return err
+ }
+ if !flowInfo.Flow.ReplicateFlow {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum); err != nil {
+ logger.Error(ctx, "failed-to-clear-resources-for-flow", log.Fields{
+ "flow-id": flow.Id,
+ "stored-flow": flowInfo.Flow,
+ "device-id": f.deviceHandler.device.Id,
+ "stored-flow-id": flowInfo.Flow.FlowId,
+ "onu-id": onuID,
+ "intf": Intf,
+ })
+ return err
}
-
- updatedFlows := *flowInfo
- for i, storedFlow := range updatedFlows {
- if flow.Id == storedFlow.LogicalFlowID {
- removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
- logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": storedFlow})
- // DKB
- if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
- logger.Errorw(ctx, "failed-to-remove-flow", log.Fields{"error": err})
- return
- }
- logger.Info(ctx, "flow-removed-from-device-successfully", log.Fields{
+ } else {
+ gems := make([]uint32, 0)
+ for _, gem := range flowInfo.Flow.PbitToGemport {
+ gems = appendUnique32bit(gems, gem)
+ }
+ logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
+ for _, gem := range gems {
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum); err != nil {
+ logger.Error(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
- "stored-flow": storedFlow,
+ "stored-flow": flowInfo.Flow,
"device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowID,
+ "stored-flow-id": flowInfo.Flow.FlowId,
"onu-id": onuID,
"intf": Intf,
+ "gem": gem,
})
- //Remove the Flow from FlowInfo
- updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
- flowID, flowDirection, portNum, updatedFlows); err != nil {
- logger.Error(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": storedFlow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowID,
- "onu-id": onuID,
- "intf": Intf,
- })
- return
- }
+ return err
}
}
}
+
+ // If datapath flow, clear the symmetric flow data from the subscriberDataPathFlowIDMap map
+ if isDatapathFlow(flow) {
+ if tpID, err := getTpIDFromFlow(ctx, flow); err != nil {
+ var inverseDirection string
+ if flowDirection == Upstream {
+ inverseDirection = Downstream
+ } else {
+ inverseDirection = Upstream
+ }
+
+ keySymm := subscriberDataPathFlowIDKey{intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), direction: inverseDirection, tpID: tpID}
+ delete(f.subscriberDataPathFlowIDMap, keySymm)
+ }
+ }
+ return nil
}
//RemoveFlow removes the flow from the device
func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
+
+ f.incrementActiveFlowRemoveCount(ctx, flow)
+ defer f.decrementActiveFlowRemoveCount(ctx, flow)
+
logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -2282,13 +2075,9 @@
}
}
- f.incrementActiveFlowRemoveCount(ctx, flow)
- defer f.decrementActiveFlowRemoveCount(ctx, flow)
-
if flows.HasGroup(flow) {
direction = Multicast
- f.clearFlowFromResourceManager(ctx, flow, direction)
- return nil
+ return f.clearFlowFromResourceManager(ctx, flow, direction)
} else if IsUpstream(actionInfo[Output].(uint32)) {
direction = Upstream
} else {
@@ -2304,7 +2093,7 @@
// Serialize flow removes on a per subscriber basis
if f.perUserFlowHandleLock.TryLock(userKey) {
- f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
+ err = f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
f.perUserFlowHandleLock.Unlock(userKey)
} else {
// Ideally this should never happen
@@ -2312,7 +2101,7 @@
return errors.New("failed-to-acquire-per-user-lock")
}
- return nil
+ return err
}
//isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
@@ -2421,7 +2210,7 @@
logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
}
- return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+ return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
}
//WaitForFlowRemoveToFinishForSubscriber blocks until flow removes are complete for a given subscriber
@@ -2457,54 +2246,23 @@
if err != nil {
return olterrors.NewErrNotFound("multicast-in-port", log.Fields{"classifier": classifierInfo}, err)
}
- //this variable acts like a switch. When it is set, multicast flows are classified by eth_dst.
- //otherwise, classification is based on ipv4_dst by default.
- //the variable can be configurable in the future; it can be read from a configuration path in the kv store.
- mcastFlowClassificationByEthDst := false
- if mcastFlowClassificationByEthDst {
- //replace ipDst with ethDst
- if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
- flows.IsMulticastIp(ipv4Dst.(uint32)) {
- // replace ipv4_dst classifier with eth_dst
- multicastMac := flows.ConvertToMulticastMacBytes(ipv4Dst.(uint32))
- delete(classifierInfo, Ipv4Dst)
- classifierInfo[EthDst] = multicastMac
- logger.Debugw(ctx, "multicast-ip-to-mac-conversion-success",
- log.Fields{
- "ip:": ipv4Dst.(uint32),
- "mac:": multicastMac})
- }
- }
delete(classifierInfo, EthType)
onuID := NoneOnuID
uniID := NoneUniID
- gemPortID := NoneGemPortID
- flowStoreCookie := getFlowStoreCookie(ctx, classifierInfo, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
logger.Infow(ctx, "multicast-flow-exists-not-re-adding", log.Fields{"classifier-info": classifierInfo})
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
- if err != nil {
- return olterrors.NewErrNotFound("multicast-flow-id",
- log.Fields{
- "interface-id": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "gem-port-id": gemPortID,
- "cookie": flowStoreCookie},
- err)
- }
classifierProto, err := makeOpenOltClassifierField(classifierInfo)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err)
}
groupID := actionInfo[GroupID].(uint32)
multicastFlow := openoltpb2.Flow{
- FlowId: flowID,
+ FlowId: flow.Id,
FlowType: Multicast,
NetworkIntfId: int32(networkInterfaceID),
GroupId: groupID,
@@ -2513,7 +2271,7 @@
Cookie: flow.Cookie}
if err := f.addFlowToDevice(ctx, flow, &multicastFlow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": multicastFlow}, err)
+ return olterrors.NewErrFlowOp("add", flow.Id, log.Fields{"flow": multicastFlow}, err)
}
logger.Info(ctx, "multicast-flow-added-to-device-successfully")
//get cached group
@@ -2528,12 +2286,9 @@
}
}
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
- if err = f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flowID, log.Fields{"flow": multicastFlow}, err)
+ flowInfo := rsrcMgr.FlowInfo{Flow: &multicastFlow}
+ if err = f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
+ return olterrors.NewErrPersistence("update", "flow", flow.Id, log.Fields{"flow": multicastFlow}, err)
}
return nil
}
@@ -2596,7 +2351,7 @@
f.onuGemInfoLock.Lock()
defer f.onuGemInfoLock.Unlock()
- onugem := f.onuGemInfo[intfID]
+ onugem := f.onuGemInfo
// If the ONU already exists in onuGemInfo list, nothing to do
for _, onu := range onugem {
if onu.OnuID == onuID && onu.SerialNumber == serialNum {
@@ -2608,7 +2363,7 @@
}
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
- f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
+ f.onuGemInfo = append(f.onuGemInfo, onu)
if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
return err
}
@@ -2634,8 +2389,8 @@
"intf-id": intfID,
"onu-id": onuID,
"device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo[intfID]})
- onugem := f.onuGemInfo[intfID]
+ "onu-gem": f.onuGemInfo})
+ onugem := f.onuGemInfo
// update the gem to the local cache as well as to kv strore
for idx, onu := range onugem {
if onu.OnuID == onuID {
@@ -2650,7 +2405,7 @@
}
}
onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
- f.onuGemInfo[intfID] = onugem
+ f.onuGemInfo = onugem
break
}
}
@@ -2670,7 +2425,7 @@
"intf-id": intfID,
"onu-id": onuID,
"device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo[intfID]})
+ "onu-gem": f.onuGemInfo})
}
// This function Lookup maps by serialNumber or (intfId, gemPort)
@@ -2681,14 +2436,14 @@
f.onuGemInfoLock.RLock()
defer f.onuGemInfoLock.RUnlock()
- logger.Infow(ctx, "getting-onu-id-from-gem-port-and-pon-port",
+ logger.Debugw(ctx, "getting-onu-id-from-gem-port-and-pon-port",
log.Fields{
"device-id": f.deviceHandler.device.Id,
- "onu-geminfo": f.onuGemInfo[intfID],
+ "onu-geminfo": f.onuGemInfo,
"intf-id": intfID,
"gemport-id": gemPortID})
// get onuid from the onugem info cache
- onugem := f.onuGemInfo[intfID]
+ onugem := f.onuGemInfo
for _, onu := range onugem {
for _, gem := range onu.GemPorts {
@@ -2749,11 +2504,11 @@
return 0, err
}
- f.onuGemInfoLock.RLock()
- defer f.onuGemInfoLock.RUnlock()
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum, VlanID: ctag, Priority: priority}
var ok bool
+ f.packetInGemPortLock.RLock()
gemPortID, ok = f.packetInGemPort[pktInkey]
+ f.packetInGemPortLock.RUnlock()
if ok {
logger.Debugw(ctx, "found-gemport-for-pktin-key",
log.Fields{
@@ -2766,7 +2521,9 @@
gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, pktInkey)
if err == nil {
if gemPortID != 0 {
+ f.packetInGemPortLock.Lock()
f.packetInGemPort[pktInkey] = gemPortID
+ f.packetInGemPortLock.Unlock()
logger.Infow(ctx, "found-gem-port-from-kv-store-and-updating-cache-with-gemport",
log.Fields{
"pktinkey": pktInkey,
@@ -2781,159 +2538,6 @@
}
-// nolint: gocyclo
-func installFlowOnAllGemports(ctx context.Context,
- f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
- portNo uint32, classifier map[string]interface{}, action map[string]interface{},
- logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, tpID uint32) error,
- f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
- classifier map[string]interface{}, action map[string]interface{},
- logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
- tpID uint32) error,
- args map[string]uint32,
- classifier map[string]interface{}, action map[string]interface{},
- logicalFlow *ofp.OfpFlowStats,
- gemPorts []uint32,
- TpInst interface{},
- FlowType string,
- direction string,
- tpID uint32,
- vlanID ...uint32) {
- logger.Debugw(ctx, "installing-flow-on-all-gem-ports",
- log.Fields{
- "FlowType": FlowType,
- "gemPorts": gemPorts,
- "vlan": vlanID})
-
- // The bit mapping for a gemport is expressed in tech-profile as a binary string. For example, 0b00000001
- // We need to trim prefix "0b", before further processing
- // Once the "0b" prefix is trimmed, we iterate each character in the string to identify which index
- // in the string is set to binary bit 1 (expressed as char '1' in the binary string).
-
- // If a particular character in the string is set to '1', identify the index of this character from
- // the LSB position which marks the PCP bit consumed by the given gem port.
- // This PCP bit now becomes a classifier in the flow.
-
- switch TpInst := TpInst.(type) {
- case *tp.TechProfile:
- attributes := TpInst.DownstreamGemPortAttributeList
- if direction == Upstream {
- attributes = TpInst.UpstreamGemPortAttributeList
- }
-
- for _, gemPortAttribute := range attributes {
- if direction == Downstream && strings.ToUpper(gemPortAttribute.IsMulticast) == "TRUE" {
- continue
- }
- gemPortID := gemPortAttribute.GemportID
- if allPbitsMarked(gemPortAttribute.PbitMap) {
- classifier[VlanPcp] = uint32(VlanPCPMask)
- if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
- if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else if FlowType == EapolFlow {
- if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
- logger.Warn(ctx, err)
- }
- }
- } else {
- for pos, pbitSet := range strings.TrimPrefix(gemPortAttribute.PbitMap, BinaryStringPrefix) {
- if pbitSet == BinaryBit1 {
- classifier[VlanPcp] = uint32(len(strings.TrimPrefix(gemPortAttribute.PbitMap, BinaryStringPrefix))) - 1 - uint32(pos)
- if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
- if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else if FlowType == EapolFlow {
- if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
- logger.Warn(ctx, err)
- }
- }
- }
- }
- }
- }
- case *tp.EponProfile:
- if direction == Upstream {
- attributes := TpInst.UpstreamQueueAttributeList
- for _, queueAttribute := range attributes {
- gemPortID := queueAttribute.GemportID
- if allPbitsMarked(queueAttribute.PbitMap) {
- classifier[VlanPcp] = uint32(VlanPCPMask)
- if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
- if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else if FlowType == EapolFlow {
- if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
- logger.Warn(ctx, err)
- }
- }
- } else {
- for pos, pbitSet := range strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix) {
- if pbitSet == BinaryBit1 {
- classifier[VlanPcp] = uint32(len(strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix))) - 1 - uint32(pos)
- if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
- if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else if FlowType == EapolFlow {
- if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
- logger.Warn(ctx, err)
- }
- }
- }
- }
- }
- }
- } else {
- attributes := TpInst.DownstreamQueueAttributeList
- for _, queueAttribute := range attributes {
- gemPortID := queueAttribute.GemportID
- if allPbitsMarked(queueAttribute.PbitMap) {
- classifier[VlanPcp] = uint32(VlanPCPMask)
- if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
- if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else if FlowType == EapolFlow {
- if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
- logger.Warn(ctx, err)
- }
- }
- } else {
- for pos, pbitSet := range strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix) {
- if pbitSet == BinaryBit1 {
- classifier[VlanPcp] = uint32(len(strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix))) - 1 - uint32(pos)
- if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
- if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else if FlowType == EapolFlow {
- if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
- logger.Warn(ctx, err)
- }
- }
- }
- }
- }
- }
- }
- default:
- logger.Errorw(ctx, "unknown-tech", log.Fields{"tpInst": TpInst})
- }
-}
-
-func allPbitsMarked(pbitMap string) bool {
- for pos, pBit := range pbitMap {
- if pos >= 2 && pBit != BinaryBit1 {
- return false
- }
- }
- return true
-}
-
func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
logger.Debug(ctx, "adding-trap-dhcp-of-nni-flow")
action := make(map[string]interface{})
@@ -2963,22 +2567,11 @@
err)
}
- flowStoreCookie := getFlowStoreCookie(ctx, classifier, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
logger.Info(ctx, "flow-exists-not-re-adding")
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
- if err != nil {
- return olterrors.NewErrNotFound("dhcp-trap-nni-flow-id",
- log.Fields{
- "interface-id": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "gem-port-id": gemPortID,
- "cookie": flowStoreCookie},
- err)
- }
+
classifierProto, err := makeOpenOltClassifierField(classifier)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier}, err)
@@ -2992,7 +2585,7 @@
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
OnuId: int32(onuID), // OnuId not required
UniId: int32(uniID), // UniId not used
- FlowId: flowID,
+ FlowId: logicalFlow.Id,
FlowType: Downstream,
AllocId: int32(allocID), // AllocId not used
NetworkIntfId: int32(networkInterfaceID),
@@ -3003,15 +2596,12 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err)
+ return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "dhcp-trap-on-nni-flow-added–to-device-successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err)
+ flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
+ return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
return nil
}
@@ -3070,22 +2660,11 @@
"action": action},
err)
}
- flowStoreCookie := getFlowStoreCookie(ctx, classifier, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
logger.Info(ctx, "igmp-flow-exists-not-re-adding")
return nil
}
- flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
- if err != nil {
- return olterrors.NewErrNotFound("igmp-flow-id",
- log.Fields{
- "interface-id": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "gem-port-id": gemPortID,
- "cookie": flowStoreCookie},
- err)
- }
+
classifierProto, err := makeOpenOltClassifierField(classifier)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier}, err)
@@ -3099,7 +2678,7 @@
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
OnuId: int32(onuID), // OnuId not required
UniId: int32(uniID), // UniId not used
- FlowId: flowID,
+ FlowId: logicalFlow.Id,
FlowType: Downstream,
AllocId: int32(allocID), // AllocId not used
NetworkIntfId: int32(networkInterfaceID),
@@ -3110,15 +2689,12 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
- return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err)
+ return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "igmp-trap-on-nni-flow-added-to-device-successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err)
+ flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
+ return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
return nil
}
@@ -3145,6 +2721,37 @@
uniID := args[UniID]
portNo := args[PortNo]
allocID := args[AllocID]
+ pbitToGem := make(map[uint32]uint32)
+
+ if len(gemPorts) == 1 {
+ // If there is only single gemport use that and do not populate pbitToGem map
+ gemPort = gemPorts[0]
+ } else if pcp, ok := classifierInfo[VlanPcp]; !ok {
+ for idx, gemID := range gemPorts {
+ switch TpInst := TpInst.(type) {
+ case *tp.TechProfile:
+ pBitMap := TpInst.UpstreamGemPortAttributeList[idx].PbitMap
+ // Trim the bitMapPrefix form the binary string and then iterate each character in the binary string.
+ // If the character is set to pbit1, extract the pcp value from the position of this character in the string.
+ // Update the pbitToGem map with key being the pcp bit and the value being the gemPortID that consumes
+ // this pcp bit traffic.
+ for pos, pbitSet := range strings.TrimPrefix(pBitMap, bitMapPrefix) {
+ if pbitSet == pbit1 {
+ pcp := uint32(len(strings.TrimPrefix(pBitMap, bitMapPrefix))) - 1 - uint32(pos)
+ pbitToGem[pcp] = gemID
+ }
+ }
+ default:
+ logger.Errorw(ctx, "unsupported-tech", log.Fields{"tpInst": TpInst})
+ return
+ }
+ }
+ } else { // Extract the exact gemport which maps to the PCP classifier in the flow
+ gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
+ tp_pb.Direction_UPSTREAM,
+ pcp.(uint32))
+ }
+
if ipProto, ok := classifierInfo[IPProto]; ok {
if ipProto.(uint32) == IPProtoDhcp {
logger.Infow(ctx, "adding-dhcp-flow", log.Fields{
@@ -3154,18 +2761,9 @@
"onu-id": onuID,
"uni-id": uniID,
})
- if pcp, ok := classifierInfo[VlanPcp]; ok {
- gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
- tp_pb.Direction_UPSTREAM,
- pcp.(uint32))
- //Adding DHCP upstream flow
-
- if err := f.addDHCPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else {
- //Adding DHCP upstream flow to all gemports
- installFlowOnAllGemports(ctx, f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, DhcpFlow, Upstream, tpID)
+ //Adding DHCP upstream flow
+ if err := f.addDHCPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID, pbitToGem); err != nil {
+ logger.Warn(ctx, err)
}
} else if ipProto.(uint32) == IgmpProto {
@@ -3175,16 +2773,8 @@
"onu-id": onuID,
"uni-id": uniID,
"classifier-info:": classifierInfo})
- if pcp, ok := classifierInfo[VlanPcp]; ok {
- gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
- tp_pb.Direction_UPSTREAM,
- pcp.(uint32))
- if err := f.addIGMPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else {
- //Adding IGMP upstream flow to all gem ports
- installFlowOnAllGemports(ctx, f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, IgmpFlow, Upstream, tpID)
+ if err := f.addIGMPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID, pbitToGem); err != nil {
+ logger.Warn(ctx, err)
}
} else {
logger.Errorw(ctx, "invalid-classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
@@ -3203,16 +2793,8 @@
} else {
vlanID = DefaultMgmtVlan
}
- if pcp, ok := classifierInfo[VlanPcp]; ok {
- gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
- tp_pb.Direction_UPSTREAM,
- pcp.(uint32))
-
- if err := f.addEAPOLFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, vlanID, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else {
- installFlowOnAllGemports(ctx, nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, EapolFlow, Upstream, tpID, vlanID)
+ if err := f.addEAPOLFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, vlanID, tpID, pbitToGem); err != nil {
+ logger.Warn(ctx, err)
}
}
} else if _, ok := actionInfo[PushVlan]; ok {
@@ -3221,17 +2803,9 @@
"onu-id": onuID,
"uni-id": uniID,
})
- if pcp, ok := classifierInfo[VlanPcp]; ok {
- gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
- tp_pb.Direction_UPSTREAM,
- pcp.(uint32))
- //Adding HSIA upstream flow
- if err := f.addUpstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else {
- //Adding HSIA upstream flow to all gemports
- installFlowOnAllGemports(ctx, f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, HsiaFlow, Upstream, tpID)
+ //Adding HSIA upstream flow
+ if err := f.addUpstreamDataPathFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID, pbitToGem); err != nil {
+ logger.Warn(ctx, err)
}
} else if _, ok := actionInfo[PopVlan]; ok {
logger.Infow(ctx, "adding-downstream-data-rule", log.Fields{
@@ -3239,17 +2813,9 @@
"onu-id": onuID,
"uni-id": uniID,
})
- if pcp, ok := classifierInfo[VlanPcp]; ok {
- gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
- tp_pb.Direction_DOWNSTREAM,
- pcp.(uint32))
- //Adding HSIA downstream flow
- if err := f.addDownstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
- logger.Warn(ctx, err)
- }
- } else {
- //Adding HSIA downstream flow to all gemports
- installFlowOnAllGemports(ctx, f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, HsiaFlow, Downstream, tpID)
+ //Adding HSIA downstream flow
+ if err := f.addDownstreamDataPathFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID, pbitToGem); err != nil {
+ logger.Warn(ctx, err)
}
} else {
logger.Errorw(ctx, "invalid-flow-type-to-handle",
@@ -3270,21 +2836,12 @@
}()
}
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(ctx context.Context, gemPK gemPortKey) (bool, error) {
- if f.perGemPortLock.TryLock(gemPK) {
- flowIDList := f.flowsUsedByGemPort[gemPK]
- f.perGemPortLock.Unlock(gemPK)
- return len(flowIDList) > 1, nil
- }
- logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
- log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- })
- return false, olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- }, nil)
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
+ f.flowsUsedByGemPortKey.RLock()
+ flowIDList := f.flowsUsedByGemPort[gemPortID]
+ f.flowsUsedByGemPortKey.RUnlock()
+ return len(flowIDList) > 1
+
}
func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
@@ -3299,10 +2856,10 @@
}
if tpInst.InstanceCtrl.Onu == "single-instance" {
// The TP information for the given TP ID, PON ID, ONU ID, UNI ID should be removed.
- if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, ponIntf, uint32(onuID), uint32(uniID), tpID); err != nil {
+ if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, ponIntf, onuID, uniID, tpID); err != nil {
logger.Warn(ctx, err)
}
- if err := f.DeleteTechProfileInstance(ctx, ponIntf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, ponIntf, onuID, uniID, "", tpID); err != nil {
logger.Warn(ctx, err)
}
@@ -3516,7 +3073,16 @@
return uint32(TpID), nil
}
-func appendUnique(slice []uint32, item uint32) []uint32 {
+func appendUnique64bit(slice []uint64, item uint64) []uint64 {
+ for _, sliceElement := range slice {
+ if sliceElement == item {
+ return slice
+ }
+ }
+ return append(slice, item)
+}
+
+func appendUnique32bit(slice []uint32, item uint32) []uint32 {
for _, sliceElement := range slice {
if sliceElement == item {
return slice
@@ -3565,10 +3131,10 @@
}
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort, VlanID: cTag, Priority: priority}
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
+ f.packetInGemPortLock.RLock()
lookupGemPort, ok := f.packetInGemPort[pktInkey]
+ f.packetInGemPortLock.RUnlock()
+
if ok {
if lookupGemPort == gemPort {
logger.Infow(ctx, "pktin-key/value-found-in-cache--no-need-to-update-kv--assume-both-in-sync",
@@ -3578,7 +3144,9 @@
return
}
}
+ f.packetInGemPortLock.Lock()
f.packetInGemPort[pktInkey] = gemPort
+ f.packetInGemPortLock.Unlock()
f.resourceMgr.UpdateGemPortForPktIn(ctx, pktInkey, gemPort)
logger.Infow(ctx, "pktin-key-not-found-in-local-cache-value-is-different--updating-cache-and-kv-store",
@@ -3620,7 +3188,7 @@
f.onuGemInfoLock.Lock()
defer f.onuGemInfoLock.Unlock()
- onugem := f.onuGemInfo[intfID]
+ onugem := f.onuGemInfo
for idx, onu := range onugem {
if onu.OnuID == onuID {
for _, uni := range onu.UniPorts {
@@ -3630,7 +3198,7 @@
}
}
onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
- f.onuGemInfo[intfID] = onugem
+ f.onuGemInfo = onugem
}
}
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
@@ -3643,89 +3211,61 @@
logger.Error(ctx, "failed-to-get-flowid-list-per-gem", log.Fields{"intf": intf})
return
}
+ f.flowsUsedByGemPortKey.Lock()
for gem, FlowIDs := range flowIDsList {
- gemPK := gemPortKey{intf, uint32(gem)}
- if f.perGemPortLock.TryLock(gemPK) {
- f.flowsUsedByGemPort[gemPK] = FlowIDs
- f.perGemPortLock.Unlock(gemPK)
- } else {
- logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
- log.Fields{
- "intf-id": intf,
- "device-id": f.deviceHandler.device.Id,
- "key": gemPK,
- })
- }
+ f.flowsUsedByGemPort[gem] = FlowIDs
}
+ f.flowsUsedByGemPortKey.Unlock()
}
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
classifierInfo := make(map[string]interface{})
+ var flowInfo *rsrcMgr.FlowInfo
formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
if err != nil {
logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
- return
+ return err
}
var onuID = int32(NoneOnuID)
var uniID = int32(NoneUniID)
- var flowID uint32
-
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
-
- for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
- if flowInfo == nil {
- logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
- log.Fields{
- "intf": networkInterfaceID,
- "onu-id": onuID,
- "uni-id": uniID,
- "flow-id": flowID})
- continue
- }
- updatedFlows := *flowInfo
- for i, storedFlow := range updatedFlows {
- if flow.Id == storedFlow.LogicalFlowID {
- removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
- logger.Debugw(ctx, "multicast-flow-to-be-deleted",
- log.Fields{
- "flow": storedFlow,
- "flow-id": flow.Id,
- "device-id": f.deviceHandler.device.Id})
- //remove from device
- if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
- // DKB
- logger.Errorw(ctx, "failed-to-remove-multicast-flow",
- log.Fields{
- "flow-id": flow.Id,
- "error": err})
- return
- }
- logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
- //Remove the Flow from FlowInfo
- updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
- logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
- log.Fields{"flow": storedFlow,
- "err": err})
- return
- }
- //release flow id
- logger.Debugw(ctx, "releasing-multicast-flow-id",
- log.Fields{"flow-id": flowID,
- "interfaceID": networkInterfaceID})
- f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
- }
- }
+ var flowID uint64
+ if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id); flowInfo == nil {
+ return olterrors.NewErrPersistence("remove", "flow", flow.Id,
+ log.Fields{
+ "flow": flow,
+ "device-id": f.deviceHandler.device.Id,
+ "intf-id": networkInterfaceID,
+ "onu-id": onuID}, err).Log()
}
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: flowInfo.Flow.FlowType}
+ logger.Debugw(ctx, "multicast-flow-to-be-deleted",
+ log.Fields{
+ "flow": flowInfo.Flow,
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id})
+ // Remove from device
+ if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
+ // DKB
+ logger.Errorw(ctx, "failed-to-remove-multicast-flow",
+ log.Fields{
+ "flow-id": flow.Id,
+ "error": err})
+ return err
+ }
+ // Remove flow from KV store
+ return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
}
func (f *OpenOltFlowMgr) incrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+
+ f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+ defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+
inPort, outPort := getPorts(flow)
logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
if inPort != InvalidPort && outPort != InvalidPort {
@@ -3733,8 +3273,6 @@
key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- f.pendingFlowRemoveDataPerSubscriberLock.Lock()
- defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
flowRemoveData, ok := f.pendingFlowRemoveDataPerSubscriber[key]
if !ok {
flowRemoveData = pendingFlowRemoveData{
@@ -3752,15 +3290,16 @@
}
func (f *OpenOltFlowMgr) decrementActiveFlowRemoveCount(ctx context.Context, flow *ofp.OfpFlowStats) {
+ f.pendingFlowRemoveDataPerSubscriberLock.Lock()
+ defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
+
inPort, outPort := getPorts(flow)
logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
if inPort != InvalidPort && outPort != InvalidPort {
- _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
+ _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
- f.pendingFlowRemoveDataPerSubscriberLock.Lock()
- defer f.pendingFlowRemoveDataPerSubscriberLock.Unlock()
if val, ok := f.pendingFlowRemoveDataPerSubscriber[key]; !ok {
logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
} else {
@@ -3806,3 +3345,54 @@
logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
}
}
+
+// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
+func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
+ onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
+ if err != nil {
+ _ = olterrors.NewErrNotFound("onu", log.Fields{
+ "pon-port": f.ponPortIdx}, err).Log()
+ return
+ }
+
+ f.subscriberDataPathFlowIDMapLock.Lock()
+ defer f.subscriberDataPathFlowIDMapLock.Unlock()
+
+ for _, onu := range onuGemInfo {
+ for _, uniID := range onu.UniPorts {
+ flowIDs, err := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
+ if err != nil {
+ logger.Fatalf(ctx, "failed-to-read-flow-ids-of-onu-during-reconciliation")
+ }
+ for _, flowID := range flowIDs {
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
+ if flowInfo == nil {
+ // Error is already logged in the called function
+ continue
+ }
+ if flowInfo.Flow.Classifier.PktTagType == DoubleTag &&
+ flowInfo.Flow.FlowType == Downstream &&
+ flowInfo.Flow.Classifier.OVid > 0 &&
+ flowInfo.Flow.TechProfileId > 0 {
+ key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
+ if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
+ f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
+ }
+ } else if flowInfo.Flow.Classifier.PktTagType == SingleTag &&
+ flowInfo.Flow.FlowType == Upstream &&
+ flowInfo.Flow.Action.OVid > 0 &&
+ flowInfo.Flow.TechProfileId > 0 {
+ key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
+ if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
+ f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
+ }
+ }
+ }
+ }
+ }
+}
+
+// isDatapathFlow declares a flow as datapath flow if it is not a controller bound flow and the flow does not have group
+func isDatapathFlow(flow *ofp.OfpFlowStats) bool {
+ return !IsControllerBoundFlow(flows.GetOutPort(flow)) && !flows.HasGroup(flow)
+}