VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 4ae86ac..9c1cb09 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,14 +22,15 @@
"encoding/hex"
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/meters"
+ "github.com/opencord/voltha-lib-go/v5/pkg/meters"
"strconv"
"strings"
"sync"
+ "time"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v5/pkg/techprofile"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
"github.com/opencord/voltha-protos/v4/go/common"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -154,11 +155,6 @@
pbit1 = '1'
)
-type gemPortKey struct {
- intfID uint32
- gemPort uint32
-}
-
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -186,15 +182,6 @@
gemToAes map[uint32]bool
}
-// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
-type subscriberDataPathFlowIDKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
- direction string
- tpID uint32
-}
-
// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
@@ -210,37 +197,28 @@
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
ponPortIdx uint32 // Pon Port this FlowManager is responsible for
- techprofile map[uint32]tp.TechProfileIf
+ techprofile tp.TechProfileIf
deviceHandler *DeviceHandler
grpMgr *OpenOltGroupMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- onuIdsLock sync.RWMutex // TODO: Do we need this?
-
- flowsUsedByGemPort map[uint32][]uint64 // gem port id to flow ids
- flowsUsedByGemPortKey sync.RWMutex // lock to be used to access the flowsUsedByGemPort map
+ gemToFlowIDs map[uint32][]uint64 // gem port id to flow ids
+ gemToFlowIDsKey sync.RWMutex // lock to be used to access the gemToFlowIDs map
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
packetInGemPortLock sync.RWMutex
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
- onuGemInfo []rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
+ onuGemInfoMap map[uint32]*rsrcMgr.OnuGemInfo //onu, gem and uni info local cache -> map of onuID to OnuGemInfo
// We need to have a global lock on the onuGemInfo map
onuGemInfoLock sync.RWMutex
- // Map of voltha flowID associated with subscriberDataPathFlowIDKey
- // This information is not persisted on Kv store and hence should be reconciled on adapter restart
- subscriberDataPathFlowIDMap map[subscriberDataPathFlowIDKey]uint64
- subscriberDataPathFlowIDMapLock sync.RWMutex
+ flowIDToGems map[uint64][]uint32
+ flowIDToGemsLock sync.RWMutex
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
incomingFlows []chan flowControlBlock
-
- //this map keeps uni port info by gem and pon port. This relation shall be used for packet-out operations
- gemToUniMap map[gemPortKey][]uint32
- //We need to have a global lock on the gemToUniLock map
- gemToUniLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,23 +226,18 @@
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
var flowMgr OpenOltFlowMgr
var err error
- var idx uint32
flowMgr.deviceHandler = dh
+ flowMgr.ponPortIdx = ponPortIdx
flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
- flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
- logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
return nil
}
- flowMgr.onuIdsLock = sync.RWMutex{}
- flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
+ flowMgr.gemToFlowIDs = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
- flowMgr.packetInGemPortLock = sync.RWMutex{}
- flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
- flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+ flowMgr.flowIDToGems = make(map[uint64][]uint32)
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
@@ -276,54 +249,35 @@
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
}
-
+ flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
//Load the onugem info cache from kv store on flowmanager start
- if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
- logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
+ onuIDStart := flowMgr.deviceHandler.deviceInfo.OnuIdStart
+ onuIDEnd := flowMgr.deviceHandler.deviceInfo.OnuIdEnd
+ for onuID := onuIDStart; onuID <= onuIDEnd; onuID++ {
+ // check for a valid serial number in onuGem as GetOnuGemInfo can return nil error in case of nothing found in the path.
+ onugem, err := rMgr.GetOnuGemInfo(ctx, onuID, ponPortIdx)
+ if err == nil && onugem != nil && onugem.SerialNumber != "" {
+ flowMgr.onuGemInfoMap[onuID] = onugem
+ }
}
- //Load flowID list per gem map per interface from the kvstore.
- flowMgr.loadFlowIDlistForGem(ctx, idx)
+
+ //Load flowID list per gem map And gemIDs per flow per interface from the kvstore.
+ flowMgr.loadFlowIDsForGemAndGemIDsForFlow(ctx)
+
//load interface to multicast queue map from kv store
-
- flowMgr.gemToUniMap = make(map[gemPortKey][]uint32)
- flowMgr.gemToUniLock = sync.RWMutex{}
-
flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
- flowMgr.reconcileSubscriberDataPathFlowIDMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
-// toGemToUniMap adds uni info consisting of onu and uni ID to the map and associates it with a gem port
-func (f *OpenOltFlowMgr) toGemToUniMap(ctx context.Context, gemPK gemPortKey, onuID uint32, uniID uint32) {
- f.gemToUniLock.Lock()
- f.gemToUniMap[gemPK] = []uint32{onuID, uniID}
- f.gemToUniLock.Unlock()
-}
-
-// fromGemToUniMap returns onu and uni ID associated with the given key
-func (f *OpenOltFlowMgr) fromGemToUniMap(key gemPortKey) ([]uint32, bool) {
- f.gemToUniLock.RLock()
- defer f.gemToUniLock.RUnlock()
- val, ok := f.gemToUniMap[key]
- return val, ok
-}
-
-// removeFromGemToUniMap removes an entry associated with the given key from gemToUniMap
-func (f *OpenOltFlowMgr) removeFromGemToUniMap(key gemPortKey) {
- f.gemToUniLock.Lock()
- defer f.gemToUniLock.Unlock()
- delete(f.gemToUniMap, key)
-}
-
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
- return f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+ return f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
} else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
// Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
for _, gemPort := range deviceFlow.PbitToGemport {
- if err := f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+ if err := f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
return err
}
}
@@ -331,15 +285,26 @@
return nil
}
-func (f *OpenOltFlowMgr) registerFlowIDForGem(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
- f.flowsUsedByGemPortKey.Lock()
- flowIDList, ok := f.flowsUsedByGemPort[gemPortID]
+func (f *OpenOltFlowMgr) registerFlowIDForGemAndGemIDForFlow(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
+ // update gem->flows map
+ f.gemToFlowIDsKey.Lock()
+ flowIDList, ok := f.gemToFlowIDs[gemPortID]
if !ok {
flowIDList = []uint64{flowFromCore.Id}
+ } else {
+ flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
}
- flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
- f.flowsUsedByGemPort[gemPortID] = flowIDList
- f.flowsUsedByGemPortKey.Unlock()
+ f.gemToFlowIDs[gemPortID] = flowIDList
+ f.gemToFlowIDsKey.Unlock()
+
+ // update flow->gems map
+ f.flowIDToGemsLock.Lock()
+ if _, ok := f.flowIDToGems[flowFromCore.Id]; !ok {
+ f.flowIDToGems[flowFromCore.Id] = []uint32{gemPortID}
+ } else {
+ f.flowIDToGems[flowFromCore.Id] = appendUnique32bit(f.flowIDToGems[flowFromCore.Id], gemPortID)
+ }
+ f.flowIDToGemsLock.Unlock()
// update the flowids for a gem to the KVstore
return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
@@ -452,7 +417,7 @@
if meterInfo != nil {
logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
- if meterInfo.MeterConfig.MeterId == sq.meterID {
+ if meterInfo.MeterID == sq.meterID {
if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
return err
}
@@ -460,7 +425,7 @@
}
return olterrors.NewErrInvalidValue(log.Fields{
"unsupported": "meter-id",
- "kv-store-meter-id": meterInfo.MeterConfig.MeterId,
+ "kv-store-meter-id": meterInfo.MeterID,
"meter-id-in-flow": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
@@ -472,18 +437,9 @@
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
- }
-
- if err != nil {
- return olterrors.NewErrNotFound("scheduler-config",
- log.Fields{
- "intf-id": sq.intfID,
- "direction": sq.direction,
- "tp-inst": sq.tpInst,
- "device-id": f.deviceHandler.device.Id}, err)
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
}
found := false
@@ -491,13 +447,10 @@
if sq.flowMetadata != nil {
for _, meter := range sq.flowMetadata.Meters {
if sq.meterID == meter.MeterId {
- meterInfo.MeterConfig = ofp.OfpMeterConfig{}
- meterInfo.MeterConfig.MeterId = meter.MeterId
- meterInfo.MeterConfig.Flags = meter.Flags
+ meterInfo.MeterID = meter.MeterId
meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
- meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
- log.Fields{"meterConfig": meterInfo.MeterConfig,
+ log.Fields{"meter": meter,
"device-id": f.deviceHandler.device.Id})
found = true
break
@@ -515,14 +468,14 @@
}
var TrafficShaping *tp_pb.TrafficShapingInfo
- if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, &meterInfo.MeterConfig); err != nil {
+ if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, sq.flowMetadata.Meters[0]); err != nil {
return olterrors.NewErrInvalidValue(log.Fields{
"reason": "invalid-meter-config",
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficSched); err != nil {
@@ -549,7 +502,7 @@
}
func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
- trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+ trafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
if err != nil {
return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
@@ -598,10 +551,9 @@
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_DOWNSTREAM {
- multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
+ multicastTrafficQueues := f.techprofile.GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance))
if len(multicastTrafficQueues) > 0 {
- if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
- //assumed that there is only one queue per PON for the multicast service
+ if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present { //assumed that there is only one queue per PON for the multicast service
//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
//just put it in interfaceToMcastQueueMap to use for building group members
logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
@@ -613,7 +565,7 @@
f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
//also store the queue info in kv store
if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
- logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"err": err})
return err
}
@@ -639,27 +591,19 @@
"uni-port": sq.uniPort,
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
Direction = "upstream"
} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
Direction = "downstream"
}
- if err != nil {
- return olterrors.NewErrNotFound("scheduler-config",
- log.Fields{
- "int-id": sq.intfID,
- "direction": sq.direction,
- "device-id": f.deviceHandler.device.Id}, err)
- }
-
TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
- TrafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+ TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
if err != nil {
return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
log.Fields{
@@ -703,7 +647,7 @@
"uni-port": sq.uniPort})
if sq.direction == tp_pb.Direction_UPSTREAM {
- allocID := sq.tpInst.(*tp.TechProfile).UsScheduler.AllocID
+ allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
f.resourceMgr.FreeAllocID(ctx, sq.intfID, sq.onuID, sq.uniID, allocID)
// Delete the TCONT on the ONU.
uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
@@ -752,7 +696,6 @@
var gemPortIDs []uint32
tpInstanceExists := false
var err error
-
allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
@@ -765,24 +708,24 @@
"tp-id": TpID})
// Check tech profile instance already exists for derived port name
- techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
+ techProfileInstance, _ := f.techprofile.GetTPInstance(ctx, tpPath)
if techProfileInstance == nil {
logger.Infow(ctx, "tp-instance-not-found--creating-new",
log.Fields{
"path": tpPath,
"device-id": f.deviceHandler.device.Id})
- techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
+ techProfileInstance, err = f.techprofile.CreateTechProfileInstance(ctx, TpID, uni, intfID)
if err != nil {
// This should not happen, something wrong in KV backend transaction
logger.Errorw(ctx, "tp-instance-create-failed",
log.Fields{
- "error": err,
+ "err": err,
"tp-id": TpID,
"device-id": f.deviceHandler.device.Id})
return 0, nil, nil
}
if err := f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID); err != nil {
- logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"error": err})
+ logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"err": err})
}
} else {
logger.Debugw(ctx, "tech-profile-instance-already-exist-for-given port-name",
@@ -793,14 +736,14 @@
}
switch tpInst := techProfileInstance.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if UsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
log.Fields{
- "error": err,
+ "err": err,
"onu-id": onuID,
"uni-id": uniID,
"intf-id": intfID,
@@ -815,7 +758,7 @@
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
log.Fields{
- "error": err,
+ "err": err,
"onu-id": onuID,
"uni-id": uniID,
"intf-id": intfID,
@@ -824,9 +767,9 @@
return 0, nil, nil
}
}
- allocID := tpInst.UsScheduler.AllocID
+ allocID := tpInst.UsScheduler.AllocId
for _, gem := range tpInst.UpstreamGemPortAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportId)
}
allocIDs = appendUnique32bit(allocIDs, allocID)
@@ -848,12 +791,12 @@
// Send Tconts and GEM ports to KV store
f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
return allocID, gemPortIDs, techProfileInstance
- case *tp.EponProfile:
+ case *openoltpb2.EponTechProfileInstance:
// CreateSchedulerQueues for EPON needs to be implemented here
// when voltha-protos for EPON is completed.
- allocID := tpInst.AllocID
+ allocID := tpInst.AllocId
for _, gem := range tpInst.UpstreamQueueAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportId)
}
allocIDs = appendUnique32bit(allocIDs, allocID)
@@ -897,36 +840,18 @@
if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
}
- if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
- logger.Error(ctx, "error-while-uploading-gemtopon-map-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
- } else {
- //add to gem to uni cache
- f.addGemPortUniAssociationsToCache(ctx, intfID, onuID, uniID, gemPortIDs)
- }
+
logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
for _, gemPort := range gemPortIDs {
f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
}
}
-//addGemPortUniAssociationsToCache
-func (f *OpenOltFlowMgr) addGemPortUniAssociationsToCache(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortIDs []uint32) {
- for _, gemPortID := range gemPortIDs {
- key := gemPortKey{
- intfID: intfID,
- gemPort: gemPortID,
- }
- f.toGemToUniMap(ctx, key, onuID, uniID)
- }
- logger.Debugw(ctx, "gem-to-uni-info-added-to-cache", log.Fields{"device-id": f.deviceHandler.device.Id, "intfID": intfID,
- "gemPortIDs": gemPortIDs, "onuID": onuID, "uniID": uniID})
-}
-
func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
var tpCount int
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
for _, intfID := range techRange.IntfIds {
- f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[intfID].TechProfileMgr
+ f.techprofile = f.resourceMgr.PonRsrMgr.TechProfileMgr
tpCount++
logger.Debugw(ctx, "init-tech-profile-done",
log.Fields{
@@ -1004,13 +929,6 @@
func (f *OpenOltFlowMgr) addSymmetricDataPathFlow(ctx context.Context, flowContext *flowContext, direction string) error {
- var inverseDirection string
- if direction == Upstream {
- inverseDirection = Downstream
- } else {
- inverseDirection = Upstream
- }
-
intfID := flowContext.intfID
onuID := flowContext.onuID
uniID := flowContext.uniID
@@ -1067,33 +985,23 @@
}, err).Log()
}
- // Get symmetric flowID if it exists
- // This symmetric flowID will be needed by agent software to use the same device flow-id that was used for the
- // symmetric flow earlier
- // symmetric flowID 0 is considered by agent as non-existent symmetric flow
- keySymm := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: inverseDirection, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.RLock()
- symmFlowID := f.subscriberDataPathFlowIDMap[keySymm]
- f.subscriberDataPathFlowIDMapLock.RUnlock()
-
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
- OnuId: int32(onuID),
- UniId: int32(uniID),
- FlowId: logicalFlow.Id,
- FlowType: direction,
- AllocId: int32(allocID),
- NetworkIntfId: int32(networkIntfID),
- GemportId: int32(gemPortID),
- Classifier: classifierProto,
- Action: actionProto,
- Priority: int32(logicalFlow.Priority),
- Cookie: logicalFlow.Cookie,
- PortNo: flowContext.portNo,
- TechProfileId: tpID,
- ReplicateFlow: len(flowContext.pbitToGem) > 0,
- PbitToGemport: flowContext.pbitToGem,
- SymmetricFlowId: symmFlowID,
- GemportToAes: flowContext.gemToAes,
+ OnuId: int32(onuID),
+ UniId: int32(uniID),
+ FlowId: logicalFlow.Id,
+ FlowType: direction,
+ AllocId: int32(allocID),
+ NetworkIntfId: int32(networkIntfID),
+ GemportId: int32(gemPortID),
+ Classifier: classifierProto,
+ Action: actionProto,
+ Priority: int32(logicalFlow.Priority),
+ Cookie: logicalFlow.Cookie,
+ PortNo: flowContext.portNo,
+ TechProfileId: tpID,
+ ReplicateFlow: len(flowContext.pbitToGem) > 0,
+ PbitToGemport: flowContext.pbitToGem,
+ GemportToAes: flowContext.gemToAes,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
return olterrors.NewErrFlowOp("add", logicalFlow.Id, nil, err).Log()
@@ -1104,21 +1012,6 @@
"flow": flow,
"intf-id": intfID,
"onu-id": onuID})
- flowInfo := rsrcMgr.FlowInfo{Flow: &flow, IsSymmtricFlow: true}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id,
- log.Fields{
- "flow": flow,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID}, err).Log()
- }
-
- // Update the current flowID to the map
- keyCurr := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: direction, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.Lock()
- f.subscriberDataPathFlowIDMap[keyCurr] = logicalFlow.Id
- f.subscriberDataPathFlowIDMapLock.Unlock()
return nil
}
@@ -1206,13 +1099,6 @@
"flow-id": logicalFlow.Id,
"intf-id": intfID,
"onu-id": onuID})
- flowInfo := rsrcMgr.FlowInfo{Flow: &dhcpFlow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(dhcpFlow.AccessIntfId), dhcpFlow.OnuId, dhcpFlow.UniId, dhcpFlow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", dhcpFlow.FlowId,
- log.Fields{
- "flow": dhcpFlow,
- "device-id": f.deviceHandler.device.Id}, err).Log()
- }
return nil
}
@@ -1301,11 +1187,6 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
}
- flowInfo := rsrcMgr.FlowInfo{Flow: &flow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
- }
-
return nil
}
@@ -1336,7 +1217,7 @@
uplinkAction := make(map[string]interface{})
// Fill Classfier
- uplinkClassifier[EthType] = uint32(ethType)
+ uplinkClassifier[EthType] = ethType
uplinkClassifier[PacketTagType] = SingleTag
uplinkClassifier[VlanVid] = vlanID
uplinkClassifier[VlanPcp] = classifier[VlanPcp]
@@ -1415,13 +1296,7 @@
"intf-id": intfID,
"ethType": ethType,
})
- flowInfo := rsrcMgr.FlowInfo{Flow: &upstreamFlow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(upstreamFlow.AccessIntfId), upstreamFlow.OnuId, upstreamFlow.UniId, upstreamFlow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", upstreamFlow.FlowId,
- log.Fields{
- "flow": upstreamFlow,
- "device-id": f.deviceHandler.device.Id}, err).Log()
- }
+
return nil
}
@@ -1502,7 +1377,7 @@
// getTPpath return the ETCD path for a given UNI port
func (f *OpenOltFlowMgr) getTPpath(ctx context.Context, intfID uint32, uniPath string, TpID uint32) string {
- return f.techprofile[intfID].GetTechProfileInstanceKVPath(ctx, TpID, uniPath)
+ return f.techprofile.GetTechProfileInstanceKey(ctx, TpID, uniPath)
}
// DeleteTechProfileInstances removes the tech profile instances from persistent storage
@@ -1512,11 +1387,11 @@
for _, tpID := range tpIDList {
if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
- _ = olterrors.NewErrAdapter("delete-tech-profile-failed", log.Fields{"device-id": f.deviceHandler.device.Id}, err).Log()
+ logger.Errorw(ctx, "delete-tech-profile-failed", log.Fields{"err": err, "device-id": f.deviceHandler.device.Id})
// return err
// We should continue to delete tech-profile instances for other TP IDs
}
- logger.Debugw(ctx, "tech-profile-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "tp-id": tpID})
+ logger.Debugw(ctx, "tech-profile-instance-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "uniPortName": uniPortName, "tp-id": tpID})
}
return nil
}
@@ -1526,7 +1401,7 @@
if uniPortName == "" {
uniPortName = getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
}
- if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
+ if err := f.techprofile.DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-tp-instance-from-kv-store",
log.Fields{
"tp-id": tpID,
@@ -1698,13 +1573,7 @@
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
"flow-id": flow.Id})
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.Id,
- log.Fields{
- "flow": downstreamflow,
- "device-id": f.deviceHandler.device.Id}, err)
- }
+
return nil
}
@@ -1781,8 +1650,8 @@
return err
}
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpPath: tpPath, GemPortId: gemPortID}
- logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter",
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpInstancePath: tpPath, GemPortId: gemPortID}
+ logger.Infow(ctx, "sending-gem-port-delete-to-openonu-adapter",
log.Fields{
"msg": *delGemPortMsg,
"device-id": f.deviceHandler.device.Id})
@@ -1822,7 +1691,7 @@
return err
}
- delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpPath: tpPath, AllocId: allocID}
+ delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpInstancePath: tpPath, AllocId: allocID}
logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
log.Fields{
"msg": *delTcontMsg,
@@ -1853,37 +1722,38 @@
// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
// is conveyed to ONOS during packet-in OF message.
func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
-
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
logger.Infow(ctx, "deleting-gem-from-local-cache",
log.Fields{
"gem-port-id": gemPortID,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
-
- onugem := f.onuGemInfo
+ "device-id": f.deviceHandler.device.Id})
+ f.onuGemInfoLock.RLock()
+ onugem, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
+ if !ok {
+ logger.Warnw(ctx, "onu gem info already cleared from cache", log.Fields{
+ "gem-port-id": gemPortID,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
+ }
deleteLoop:
- for i, onu := range onugem {
- if onu.OnuID == onuID {
- for j, gem := range onu.GemPorts {
- // If the gemport is found, delete it from local cache.
- if gem == gemPortID {
- onu.GemPorts = append(onu.GemPorts[:j], onu.GemPorts[j+1:]...)
- onugem[i] = onu
- logger.Infow(ctx, "removed-gemport-from-local-cache",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "deletedgemport-id": gemPortID,
- "gemports": onu.GemPorts,
- "device-id": f.deviceHandler.device.Id})
- break deleteLoop
- }
- }
+ for j, gem := range onugem.GemPorts {
+ // If the gemport is found, delete it from local cache.
+ if gem == gemPortID {
+ onugem.GemPorts = append(onugem.GemPorts[:j], onugem.GemPorts[j+1:]...)
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = onugem
+ f.onuGemInfoLock.Unlock()
+ logger.Infow(ctx, "removed-gemport-from-local-cache",
+ log.Fields{
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "deletedgemport-id": gemPortID,
+ "gemports": onugem.GemPorts,
+ "device-id": f.deviceHandler.device.Id})
break deleteLoop
}
}
@@ -1891,7 +1761,7 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
@@ -1900,27 +1770,22 @@
log.Fields{
"tpPath": tpPath,
"device-id": f.deviceHandler.device.Id})
- techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
- if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
- return olterrors.NewErrNotFound("tech-profile-in-kv-store",
- log.Fields{
- "tp-id": tpID,
- "path": tpPath}, err)
- }
used := f.isGemPortUsedByAnotherFlow(uint32(gemPortID))
if used {
- f.flowsUsedByGemPortKey.Lock()
- defer f.flowsUsedByGemPortKey.Unlock()
+ f.gemToFlowIDsKey.RLock()
+ flowIDs := f.gemToFlowIDs[uint32(gemPortID)]
+ f.gemToFlowIDsKey.RUnlock()
- flowIDs := f.flowsUsedByGemPort[uint32(gemPortID)]
for i, flowIDinMap := range flowIDs {
if flowIDinMap == flowID {
flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- // everytime flowsUsedByGemPort cache is updated the same should be updated
+ f.gemToFlowIDsKey.Lock()
+ f.gemToFlowIDs[uint32(gemPortID)] = flowIDs
+ f.gemToFlowIDsKey.Unlock()
+ // everytime gemToFlowIDs cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
- f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
return err
}
@@ -1936,28 +1801,17 @@
return nil
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
- // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
- // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
- // also clear gem to uni cache
- f.removeFromGemToUniMap(gemPortKey{
- intfID: intfID,
- gemPort: uint32(gemPortID),
- })
f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
-
- f.onuIdsLock.Lock() // TODO: What is this lock?
-
- //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
+ _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), uint32(gemPortID)) // ignore error and proceed.
+ //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
- f.flowsUsedByGemPortKey.Lock()
- delete(f.flowsUsedByGemPort, uint32(gemPortID))
- f.flowsUsedByGemPortKey.Unlock()
- f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.gemToFlowIDsKey.Lock()
+ delete(f.gemToFlowIDs, uint32(gemPortID))
+ f.gemToFlowIDsKey.Unlock()
- f.onuIdsLock.Unlock()
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
// Delete the gem port on the ONU.
if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
@@ -1970,8 +1824,15 @@
"device-id": f.deviceHandler.device.Id,
"gemport-id": gemPortID})
}
+ techprofileInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
+ if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
+ return olterrors.NewErrNotFound("tech-profile-in-kv-store",
+ log.Fields{
+ "tp-id": tpID,
+ "path": tpPath}, err)
+ }
switch techprofileInst := techprofileInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
if !ok {
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
@@ -1987,23 +1848,23 @@
logger.Warn(ctx, err)
}
}
- case *tp.EponProfile:
+ case *tp_pb.EponTechProfileInstance:
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
"intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
- "alloc-id": techprofileInst.AllocID})
+ "alloc-id": techprofileInst.AllocId})
}
default:
logger.Errorw(ctx, "error-unknown-tech",
@@ -2016,7 +1877,6 @@
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
- var flowInfo *rsrcMgr.FlowInfo
logger.Infow(ctx, "clear-flow-from-resource-manager",
log.Fields{
"flowDirection": flowDirection,
@@ -2037,6 +1897,16 @@
onuID := int32(onu)
uniID := int32(uni)
+ tpID, err := getTpIDFromFlow(ctx, flow)
+ if err != nil {
+ return olterrors.NewErrNotFound("tp-id",
+ log.Fields{
+ "flow": flow,
+ "intf-id": Intf,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
for _, field := range flows.GetOfbFields(flow) {
if field.Type == flows.IP_PROTO {
@@ -2060,86 +1930,45 @@
logger.Errorw(ctx, "invalid-in-port-number",
log.Fields{
"port-number": inPort,
- "error": err})
+ "err": err})
return err
}
}
- if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); flowInfo == nil {
- logger.Errorw(ctx, "flow-info-not-found-for-flow-to-be-removed", log.Fields{"flow-id": flow.Id, "intf-id": Intf, "onu-id": onuID, "uni-id": uniID})
- return olterrors.NewErrPersistence("remove", "flow", flow.Id, log.Fields{"flow": flow}, err)
- }
- removeFlowMessage := openoltpb2.Flow{FlowId: flowInfo.Flow.FlowId, FlowType: flowInfo.Flow.FlowType}
- logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flowInfo.Flow})
+
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, AccessIntfId: int32(Intf), OnuId: onuID, UniId: uniID, TechProfileId: tpID, FlowType: flowDirection}
+ logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flow})
if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
return err
}
- if err = f.resourceMgr.RemoveFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); err != nil {
- logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
- return err
- }
- tpID, err := getTpIDFromFlow(ctx, flow)
- if err != nil {
- return olterrors.NewErrNotFound("tp-id",
- log.Fields{
- "flow": flow,
- "intf-id": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id}, err)
- }
- if !flowInfo.Flow.ReplicateFlow {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, tpID); err != nil {
+ f.flowIDToGemsLock.Lock()
+ gems, ok := f.flowIDToGems[flow.Id]
+ if !ok {
+ logger.Errorw(ctx, "flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id})
+ f.flowIDToGemsLock.Unlock()
+ return olterrors.NewErrNotFound("flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id}, nil)
+ }
+ copyOfGems := make([]uint32, len(gems))
+ _ = copy(copyOfGems, gems)
+ // Delete the flow-id to gemport list entry from the map now the flow is deleted.
+ delete(f.flowIDToGems, flow.Id)
+ f.flowIDToGemsLock.Unlock()
+
+ logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": copyOfGems})
+ for _, gem := range copyOfGems {
+ if err = f.clearResources(ctx, Intf, onuID, uniID, int32(gem), flow.Id, portNum, tpID); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": flowInfo.Flow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowInfo.Flow.FlowId,
- "onu-id": onuID,
- "intf": Intf,
- "err": err,
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-id": onuID,
+ "intf": Intf,
+ "gem": gem,
+ "err": err,
})
return err
}
- } else {
- gems := make([]uint32, 0)
- for _, gem := range flowInfo.Flow.PbitToGemport {
- gems = appendUnique32bit(gems, gem)
- }
- logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
- for _, gem := range gems {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, tpID); err != nil {
- logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": flowInfo.Flow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowInfo.Flow.FlowId,
- "onu-id": onuID,
- "intf": Intf,
- "gem": gem,
- "err": err,
- })
- return err
- }
- }
}
- // If datapath flow, clear the symmetric flow data from the subscriberDataPathFlowIDMap map
- if isDatapathFlow(flow) {
- if tpID, err := getTpIDFromFlow(ctx, flow); err != nil {
- var inverseDirection string
- if flowDirection == Upstream {
- inverseDirection = Downstream
- } else {
- inverseDirection = Upstream
- }
-
- keySymm := subscriberDataPathFlowIDKey{intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), direction: inverseDirection, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.Lock()
- delete(f.subscriberDataPathFlowIDMap, keySymm)
- f.subscriberDataPathFlowIDMapLock.Unlock()
- }
- }
// Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, flowDirection, Intf, uint32(onuID), uint32(uniID), tpID, false); err != nil {
return err
@@ -2203,7 +2032,8 @@
// Step2 : Push the flowControlBlock to ONU channel
// Step3 : Wait on response channel for response
// Step4 : Return error value
- logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+ startTime := time.Now()
+ logger.Infow(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
errChan := make(chan error)
flowCb := flowControlBlock{
ctx: ctx,
@@ -2223,7 +2053,7 @@
f.incomingFlows[onuID] <- flowCb
// Wait on the channel for flow handlers return value
err := <-errChan
- logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+ logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
return err
}
@@ -2235,17 +2065,17 @@
// process the flow completely before proceeding to handle the next flow
flowCb := <-subscriberFlowChannel
if flowCb.addFlow {
- logger.Debugw(flowCb.ctx, "adding-flow",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "flowToAdd": flowCb.flow})
+ logger.Info(flowCb.ctx, "adding-flow-start")
+ startTime := time.Now()
err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
// Pass the return value over the return channel
*flowCb.errChan <- err
} else {
- logger.Debugw(flowCb.ctx, "removing-flow",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "flowToRemove": flowCb.flow})
+ logger.Info(flowCb.ctx, "removing-flow-start")
+ startTime := time.Now()
err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
// Pass the return value over the return channel
*flowCb.errChan <- err
}
@@ -2393,14 +2223,10 @@
}
//cached group can be removed now
if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true); err != nil {
- logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "error": err})
+ logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "err": err})
}
}
- flowInfo := rsrcMgr.FlowInfo{Flow: &multicastFlow}
- if err = f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.Id, log.Fields{"flow": multicastFlow}, err)
- }
return nil
}
@@ -2413,16 +2239,13 @@
}
return nniInterfaceID, nil
}
- // find the first NNI interface id of the device
- nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
- if e == nil && len(nniPorts) > 0 {
- return nniPorts[0], nil
- }
- return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
+
+ // TODO: For now we support only one NNI port in VOLTHA. We shall use only the first NNI port, i.e., interface-id 0.
+ return 0, nil
}
//sendTPDownloadMsgToChild send payload
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32, tpInst tp_pb.TechProfileInstance) error {
onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
if err != nil {
@@ -2436,7 +2259,11 @@
logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
- tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
+ tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{
+ UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
+ }
logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
tpDownloadMsg,
@@ -2458,24 +2285,25 @@
}
//UpdateOnuInfo function adds onu info to cache and kvstore
+//UpdateOnuInfo function adds onu info to cache and kvstore
func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
- onugem := f.onuGemInfo
+ f.onuGemInfoLock.RLock()
+ _, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
// If the ONU already exists in onuGemInfo list, nothing to do
- for _, onu := range onugem {
- if onu.OnuID == onuID && onu.SerialNumber == serialNum {
- logger.Debugw(ctx, "onu-id-already-exists-in-cache",
- log.Fields{"onuID": onuID,
- "serialNum": serialNum})
- return nil
- }
+ if ok {
+ logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+ log.Fields{"onuID": onuID,
+ "serialNum": serialNum})
+ return nil
}
- onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
- f.onuGemInfo = append(f.onuGemInfo, onu)
- if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
+ onuGemInfo := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = &onuGemInfo
+ f.onuGemInfoLock.Unlock()
+ if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
return err
}
logger.Infow(ctx, "updated-onuinfo",
@@ -2483,7 +2311,7 @@
"intf-id": intfID,
"onu-id": onuID,
"serial-num": serialNum,
- "onu": onu,
+ "onu": onuGemInfo,
"device-id": f.deviceHandler.device.Id})
return nil
}
@@ -2491,34 +2319,46 @@
//addGemPortToOnuInfoMap function adds GEMport to ONU map
func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
logger.Infow(ctx, "adding-gem-to-onu-info-map",
log.Fields{
"gem-port-id": gemPort,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
- onugem := f.onuGemInfo
- // update the gem to the local cache as well as to kv strore
- for idx, onu := range onugem {
- if onu.OnuID == onuID {
- // check if gem already exists , else update the cache and kvstore
- for _, gem := range onu.GemPorts {
- if gem == gemPort {
- logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
- log.Fields{
- "gem": gemPort,
- "device-id": f.deviceHandler.device.Id})
- return
- }
+ "device-id": f.deviceHandler.device.Id})
+ f.onuGemInfoLock.RLock()
+ onugem, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
+ if !ok {
+ logger.Warnw(ctx, "onu gem info is missing", log.Fields{
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
+ }
+
+ if onugem.OnuID == onuID {
+ // check if gem already exists , else update the cache and kvstore
+ for _, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
+ log.Fields{
+ "gem": gemPort,
+ "device-id": f.deviceHandler.device.Id})
+ return
}
- onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
- f.onuGemInfo = onugem
- break
}
+ onugem.GemPorts = append(onugem.GemPorts, gemPort)
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = onugem
+ f.onuGemInfoLock.Unlock()
+ } else {
+ logger.Warnw(ctx, "mismatched onu id", log.Fields{
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
}
err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
if err != nil {
@@ -2535,24 +2375,18 @@
"gem-port-id": gemPort,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
+ "device-id": f.deviceHandler.device.Id})
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
- var onuID, uniID uint32
- var err error
if packetIn.IntfType == "pon" {
// packet indication does not have serial number , so sending as nil
// get onu and uni ids associated with the given pon and gem ports
- if onuID, uniID, err = f.GetUniPortByPonPortGemPort(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
- // Called method is returning error with all data populated; just return the same
- return logicalPortNum, err
- }
- logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d")
+ onuID, uniID := packetIn.OnuId, packetIn.UniId
+ logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d", packetIn.OnuId, packetIn.UniId, packetIn.GemportId)
if packetIn.PortNo != 0 {
logicalPortNum = packetIn.PortNo
@@ -2576,40 +2410,6 @@
return logicalPortNum, nil
}
-//GetUniPortByPonPortGemPort return onu and uni IDs associated with given pon and gem ports
-func (f *OpenOltFlowMgr) GetUniPortByPonPortGemPort(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, uint32, error) {
- key := gemPortKey{
- intfID: intfID,
- gemPort: gemPortID,
- }
- uniPortInfo, ok := f.fromGemToUniMap(key) //try to get from the cache first
- if ok {
- if len(uniPortInfo) > 1 {
- //return onu ID and uni port from the cache
- logger.Debugw(ctx, "found-uni-port-by-pon-and-gem-ports",
- log.Fields{
- "intfID": intfID,
- "gemPortID": gemPortID,
- "onuID, uniID": uniPortInfo})
- return uniPortInfo[0], uniPortInfo[1], nil
- }
- }
- //If uni port is not found in cache try to get it from kv store. if it is found in kv store, update the cache and return.
- onuID, uniID, err := f.resourceMgr.GetUniPortByPonPortGemPortFromKVStore(ctx, intfID, gemPortID)
- if err == nil {
- f.toGemToUniMap(ctx, key, onuID, uniID)
- logger.Infow(ctx, "found-uni-port-by-pon-and-gem-port-from-kv-store-and-updating-cache-with-uni-port",
- log.Fields{
- "gemPortKey": key,
- "onuID": onuID,
- "uniID": uniID})
- return onuID, uniID, nil
- }
- return uint32(0), uint32(0), olterrors.NewErrNotFound("uni-id",
- log.Fields{"interfaceID": intfID, "gemPortID": gemPortID},
- errors.New("no uni port found"))
-}
-
//GetPacketOutGemPortID returns gemPortId
func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
var gemPortID uint32
@@ -2721,10 +2521,6 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "trap-on-nni-flow-added–to-device-successfully")
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
- }
return nil
}
@@ -2814,10 +2610,7 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "igmp-trap-on-nni-flow-added-to-device-successfully")
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
- }
+
return nil
}
@@ -2846,10 +2639,10 @@
pbitToGem := make(map[uint32]uint32)
gemToAes := make(map[uint32]bool)
- var attributes []tp.IGemPortAttribute
+ var attributes []*tp_pb.GemPortAttributes
var direction = tp_pb.Direction_UPSTREAM
switch TpInst := TpInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if IsUpstream(actionInfo[Output].(uint32)) {
attributes = TpInst.UpstreamGemPortAttributeList
} else {
@@ -2881,9 +2674,9 @@
}
}
} else { // Extract the exact gemport which maps to the PCP classifier in the flow
- if gem := f.techprofile[intfID].GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
- gemPortID = gem.(tp.IGemPortAttribute).GemportID
- gemToAes[gemPortID], _ = strconv.ParseBool(gem.(tp.IGemPortAttribute).AesEncryption)
+ if gem := f.techprofile.GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
+ gemPortID = gem.(*tp_pb.GemPortAttributes).GemportId
+ gemToAes[gemPortID], _ = strconv.ParseBool(gem.(*tp_pb.GemPortAttributes).AesEncryption)
}
}
@@ -2981,26 +2774,26 @@
}
// Send Techprofile download event to child device in go routine as it takes time
go func() {
- if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID); err != nil {
+ if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID, *(TpInst.(*tp_pb.TechProfileInstance))); err != nil {
logger.Warn(ctx, err)
}
}()
}
func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
- f.flowsUsedByGemPortKey.RLock()
- flowIDList := f.flowsUsedByGemPort[gemPortID]
- f.flowsUsedByGemPortKey.RUnlock()
+ f.gemToFlowIDsKey.RLock()
+ flowIDList := f.gemToFlowIDs[gemPortID]
+ f.gemToFlowIDsKey.RUnlock()
return len(flowIDList) > 1
}
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp_pb.TechProfileInstance, gemPortID uint32) (bool, uint32) {
currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
tpGemPorts := tpInst.UpstreamGemPortAttributeList
for _, currentGemPort := range currentGemPorts {
for _, tpGemPort := range tpGemPorts {
- if (currentGemPort == tpGemPort.GemportID) && (currentGemPort != gemPortID) {
+ if (currentGemPort == tpGemPort.GemportId) && (currentGemPort != gemPortID) {
return true, currentGemPort
}
}
@@ -3010,21 +2803,21 @@
}
func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {
- tpInst := sq.tpInst.(*tp.TechProfile)
- if tpInst.InstanceCtrl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
- tpInstances := f.techprofile[sq.intfID].FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp.TechProfile)
+ tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
+ if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
+ tpInstances := f.techprofile.FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp_pb.TechProfileInstance)
logger.Debugw(ctx, "got-single-instance-tp-instances", log.Fields{"tp-instances": tpInstances})
for i := 0; i < len(tpInstances); i++ {
tpI := tpInstances[i]
if tpI.SubscriberIdentifier != tpInst.SubscriberIdentifier &&
- tpI.UsScheduler.AllocID == tpInst.UsScheduler.AllocID {
+ tpI.UsScheduler.AllocId == tpInst.UsScheduler.AllocId {
logger.Debugw(ctx, "alloc-is-in-use",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intfID": sq.intfID,
"onuID": sq.onuID,
"uniID": sq.uniID,
- "allocID": tpI.UsScheduler.AllocID,
+ "allocID": tpI.UsScheduler.AllocId,
})
return true
}
@@ -3250,7 +3043,7 @@
logger.Debugw(ctx, "invalid-action-port-number",
log.Fields{
"port-number": action[Output].(uint32),
- "error": err})
+ "err": err})
return uint32(0), err
}
logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3261,7 +3054,7 @@
logger.Debugw(ctx, "invalid-classifier-port-number",
log.Fields{
"port-number": action[Output].(uint32),
- "error": err})
+ "err": err})
return uint32(0), err
}
logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3331,69 +3124,39 @@
return 0, 0, nil
}
-// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
-func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
-
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
- onugem := f.onuGemInfo
- for idx, onu := range onugem {
- if onu.OnuID == onuID {
- for _, uni := range onu.UniPorts {
- if uni == portNum {
- logger.Infow(ctx, "uni-already-in-cache--no-need-to-update-cache-and-kv-store", log.Fields{"uni": portNum})
- return
+func (f *OpenOltFlowMgr) loadFlowIDsForGemAndGemIDsForFlow(ctx context.Context) {
+ logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - start")
+ f.onuGemInfoLock.RLock()
+ f.gemToFlowIDsKey.Lock()
+ f.flowIDToGemsLock.Lock()
+ for _, og := range f.onuGemInfoMap {
+ for _, gem := range og.GemPorts {
+ flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, f.ponPortIdx, gem)
+ if err != nil {
+ f.gemToFlowIDs[gem] = flowIDs
+ for _, flowID := range flowIDs {
+ if _, ok := f.flowIDToGems[flowID]; !ok {
+ f.flowIDToGems[flowID] = []uint32{gem}
+ } else {
+ f.flowIDToGems[flowID] = appendUnique32bit(f.flowIDToGems[flowID], gem)
+ }
}
}
- onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
- f.onuGemInfo = onugem
}
}
- f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
-
-}
-
-func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
- flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "failed-to-get-flowid-list-per-gem", log.Fields{"intf": intf})
- return
- }
- f.flowsUsedByGemPortKey.Lock()
- for gem, FlowIDs := range flowIDsList {
- f.flowsUsedByGemPort[gem] = FlowIDs
- }
- f.flowsUsedByGemPortKey.Unlock()
+ f.flowIDToGemsLock.Unlock()
+ f.gemToFlowIDsKey.Unlock()
+ f.onuGemInfoLock.RUnlock()
+ logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - end")
}
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
- classifierInfo := make(map[string]interface{})
- var flowInfo *rsrcMgr.FlowInfo
- formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
- networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
- if err != nil {
- logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
- return err
- }
-
- var onuID = int32(NoneOnuID)
- var uniID = int32(NoneUniID)
- if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id); flowInfo == nil {
- return olterrors.NewErrPersistence("remove", "flow", flow.Id,
- log.Fields{
- "flow": flow,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": networkInterfaceID,
- "onu-id": onuID}, err).Log()
- }
- removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: flowInfo.Flow.FlowType}
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: Multicast}
logger.Debugw(ctx, "multicast-flow-to-be-deleted",
log.Fields{
- "flow": flowInfo.Flow,
+ "flow": flow,
"flow-id": flow.Id,
"device-id": f.deviceHandler.device.Id})
// Remove from device
@@ -3402,60 +3165,44 @@
logger.Errorw(ctx, "failed-to-remove-multicast-flow",
log.Fields{
"flow-id": flow.Id,
- "error": err})
+ "err": err})
return err
}
- // Remove flow from KV store
- return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id)
+
+ return nil
}
-// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
-func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
- onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
+func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, ponID uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+ tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
if err != nil {
- _ = olterrors.NewErrNotFound("onu", log.Fields{
- "pon-port": f.ponPortIdx}, err).Log()
- return
+ logger.Errorw(ctx, "error-fetching-tp-instance", log.Fields{"tpPath": tpPath})
+ return nil
}
- f.subscriberDataPathFlowIDMapLock.Lock()
- defer f.subscriberDataPathFlowIDMapLock.Unlock()
-
- for _, onu := range onuGemInfo {
- for _, uniID := range onu.UniPorts {
- flowIDs, err := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
- if err != nil {
- logger.Fatalf(ctx, "failed-to-read-flow-ids-of-onu-during-reconciliation")
- }
- for _, flowID := range flowIDs {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
- if flowInfo == nil {
- // Error is already logged in the called function
- continue
- }
- if flowInfo.Flow.Classifier.PktTagType == DoubleTag &&
- flowInfo.Flow.FlowType == Downstream &&
- flowInfo.Flow.Classifier.OVid > 0 &&
- flowInfo.Flow.TechProfileId > 0 {
- key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
- if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
- f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
- }
- } else if flowInfo.Flow.Classifier.PktTagType == SingleTag &&
- flowInfo.Flow.FlowType == Upstream &&
- flowInfo.Flow.Action.OVid > 0 &&
- flowInfo.Flow.TechProfileId > 0 {
- key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
- if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
- f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
- }
- }
- }
+ switch tpInst := tpInst.(type) {
+ case *tp_pb.TechProfileInstance:
+ logger.Debugw(ctx, "fetched-tp-instance-successfully--formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
+ return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
}
+ case *openoltpb2.EponTechProfileInstance:
+ return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
+ }
+ default:
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tpPath": tpPath})
}
+ return nil
}
-// isDatapathFlow declares a flow as datapath flow if it is not a controller bound flow and the flow does not have group
-func isDatapathFlow(flow *ofp.OfpFlowStats) bool {
- return !IsControllerBoundFlow(flows.GetOutPort(flow)) && !flows.HasGroup(flow)
+func (f *OpenOltFlowMgr) getOnuGemInfoList() []rsrcMgr.OnuGemInfo {
+ var onuGemInfoLst []rsrcMgr.OnuGemInfo
+ f.onuGemInfoLock.RLock()
+ defer f.onuGemInfoLock.RUnlock()
+ for _, v := range f.onuGemInfoMap {
+ onuGemInfoLst = append(onuGemInfoLst, *v)
+ }
+ return onuGemInfoLst
}