VOL-2180 code changes for context addition
Integrating InterContainerProxy interface changes
Change-Id: Ia20c5ac3093b7845acf80cce801ec0c1d90c125f
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index e882e18..892663f 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -221,7 +221,7 @@
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
log.Info("Initializing flow manager")
var flowMgr OpenOltFlowMgr
var err error
@@ -241,18 +241,18 @@
ponPorts := rMgr.DevInfo.GetPonPorts()
//Load the onugem info cache from kv store on flowmanager start
for idx = 0; idx < ponPorts; idx++ {
- if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(idx); err != nil {
+ if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
log.Error("Failed to load onu gem info cache")
}
//Load flowID list per gem map per interface from the kvstore.
- flowMgr.loadFlowIDlistForGem(idx)
+ flowMgr.loadFlowIDlistForGem(ctx, idx)
}
flowMgr.lockCache = sync.RWMutex{}
flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
//load interface to multicast queue map from kv store
- flowMgr.loadInterfaceToMulticastQueueMap()
+ flowMgr.loadInterfaceToMulticastQueueMap(ctx)
log.Info("Initialization of flow manager success!!")
return &flowMgr
}
@@ -273,7 +273,7 @@
}
}
-func (f *OpenOltFlowMgr) registerFlow(flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
+func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
log.Debug("Registering Flow for Device ", log.Fields{"flow": flowFromCore},
log.Fields{"device": f.deviceHandler.deviceID})
gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
@@ -284,10 +284,10 @@
flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
f.flowsUsedByGemPort[gemPK] = flowIDList
// update the flowids for a gem to the KVstore
- f.resourceMgr.UpdateFlowIDsForGem(uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
+ f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
}
-func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) {
var allocID uint32
@@ -309,7 +309,7 @@
tpLockMapKey := tpLockKey{intfID, onuID, uniID}
if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
- allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+ allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
if allocID == 0 || gemPorts == nil || TpInst == nil {
log.Error("alloc-id-gem-ports-tp-unavailable")
f.perUserFlowHandleLock.Unlock(tpLockMapKey)
@@ -325,7 +325,7 @@
/* Flows can be added specific to gemport if p-bits are received.
* If no pbit mentioned then adding flows for all gemports
*/
- f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+ f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
f.perUserFlowHandleLock.Unlock(tpLockMapKey)
} else {
log.Errorw("failed to acquire per user flow handle lock",
@@ -335,7 +335,7 @@
}
// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) CreateSchedulerQueues(sq schedQueue) error {
+func (f *OpenOltFlowMgr) CreateSchedulerQueues(ctx context.Context, sq schedQueue) error {
log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": sq.direction, "IntfID": sq.intfID,
"OnuID": sq.onuID, "UniID": sq.uniID, "TpID": sq.tpID, "MeterID": sq.meterID,
@@ -352,7 +352,7 @@
*/
var SchedCfg *tp_pb.SchedulerConfig
- KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", sq.intfID, sq.onuID, sq.uniID)
return err
@@ -409,7 +409,7 @@
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst, SchedCfg, TrafficShaping)}
- if err := f.pushSchedulerQueuesToDevice(sq, TrafficShaping, TrafficSched); err != nil {
+ if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficShaping, TrafficSched); err != nil {
log.Errorw("Failed to push traffic scheduler and queues to device", log.Fields{"intfID": sq.intfID, "direction": sq.direction})
return err
}
@@ -417,7 +417,7 @@
/* After we successfully applied the scheduler configuration on the OLT device,
* store the meter id on the KV store, for further reference.
*/
- if err := f.resourceMgr.UpdateMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+ if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
log.Error("Failed to update meter id for onu %d, meterid %d", sq.onuID, sq.meterID)
return err
}
@@ -426,7 +426,7 @@
return nil
}
-func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(sq schedQueue, TrafficShaping *tp_pb.TrafficShapingInfo, TrafficSched []*tp_pb.TrafficScheduler) error {
+func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficShaping *tp_pb.TrafficShapingInfo, TrafficSched []*tp_pb.TrafficScheduler) error {
trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(sq.tpInst, sq.direction)
@@ -436,7 +436,7 @@
}
log.Debugw("Sending Traffic scheduler create to device", log.Fields{"Direction": sq.direction, "TrafficScheds": TrafficSched})
- if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
@@ -447,7 +447,7 @@
// On receiving the CreateTrafficQueues request, the driver should create corresponding
// downstream queues.
log.Debugw("Sending Traffic Queues create to device", log.Fields{"Direction": sq.direction, "TrafficQueues": trafficQueues})
- if _, err := f.deviceHandler.Client.CreateTrafficQueues(context.Background(),
+ if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues}); err != nil {
@@ -468,7 +468,7 @@
servicePriority: multicastQueuePerPonPort.Priority,
}
//also store the queue info in kv store
- f.resourceMgr.AddMcastQueueForIntf(sq.intfID,
+ f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID,
multicastQueuePerPonPort.GemportId,
multicastQueuePerPonPort.Priority)
}
@@ -478,7 +478,7 @@
}
// RemoveSchedulerQueues removes the traffic schedulers from the device based on the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) RemoveSchedulerQueues(sq schedQueue) error {
+func (f *OpenOltFlowMgr) RemoveSchedulerQueues(ctx context.Context, sq schedQueue) error {
var Direction string
var SchedCfg *tp_pb.SchedulerConfig
@@ -498,7 +498,7 @@
return err
}
- KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
log.Errorf("Failed to get Meter for Onu %d", sq.onuID)
return err
@@ -524,7 +524,7 @@
return err
}
- if _, err = f.deviceHandler.Client.RemoveTrafficQueues(context.Background(),
+ if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: TrafficQueues}); err != nil {
@@ -532,7 +532,7 @@
return err
}
log.Debug("Removed traffic queues successfully")
- if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
@@ -545,7 +545,7 @@
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
*/
- err = f.resourceMgr.RemoveMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
log.Errorf("Failed to remove meter for onu %d, meter id %d", sq.onuID, KVStoreMeter.MeterId)
return err
@@ -555,31 +555,31 @@
}
// This function allocates tconts and GEM ports for an ONU
-func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, *tp.TechProfile) {
+func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, *tp.TechProfile) {
var allocIDs []uint32
var allgemPortIDs []uint32
var gemPortIDs []uint32
tpInstanceExists := false
var err error
- allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
- allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
+ allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
+ allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
tpPath := f.getTPpath(intfID, uni, TpID)
log.Infow("creating-new-tcont-and-gem", log.Fields{"pon": intfID, "onu": onuID, "uni": uniID})
// Check tech profile instance already exists for derived port name
- techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
+ techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
if techProfileInstance == nil {
log.Infow("tp-instance-not-found--creating-new", log.Fields{"path": tpPath})
- techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
+ techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
if err != nil {
// This should not happen, something wrong in KV backend transaction
log.Errorw("tp-instance-create-failed", log.Fields{"error": err, "tpID": TpID})
return 0, nil, nil
}
- f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
+ f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID)
} else {
log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
tpInstanceExists = true
@@ -587,7 +587,7 @@
if UsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
- if err := f.CreateSchedulerQueues(sq); err != nil {
+ if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
log.Errorw("CreateSchedulerQueues Failed-upstream", log.Fields{"error": err, "meterID": UsMeterID})
return 0, nil, nil
}
@@ -595,7 +595,7 @@
if DsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
- if err := f.CreateSchedulerQueues(sq); err != nil {
+ if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
log.Errorw("CreateSchedulerQueues Failed-downstream", log.Fields{"error": err, "meterID": DsMeterID})
return 0, nil, nil
}
@@ -617,27 +617,27 @@
log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocIDs": allocIDs, "gemports": allgemPortIDs})
// Send Tconts and GEM ports to KV store
- f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocIDs, allgemPortIDs)
+ f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
return allocID, gemPortIDs, techProfileInstance
}
-func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
+func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
log.Debugw("Storing allocated Tconts and GEM ports into KV store",
log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "allocID": allocID, "gemPortIDs": gemPortIDs})
/* Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store */
- if err := f.resourceMgr.UpdateAllocIdsForOnu(intfID, onuID, uniID, allocID); err != nil {
+ if err := f.resourceMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocID); err != nil {
log.Error("Errow while uploading allocID to KV store")
}
- if err := f.resourceMgr.UpdateGEMPortIDsForOnu(intfID, onuID, uniID, gemPortIDs); err != nil {
+ if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
log.Error("Errow while uploading GEMports to KV store")
}
- if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(gemPortIDs, intfID, onuID, uniID); err != nil {
+ if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
log.Error("Errow while uploading gemtopon map to KV store")
}
log.Debug("Stored tconts and GEM into KV store successfully")
for _, gemPort := range gemPortIDs {
- f.addGemPortToOnuInfoMap(intfID, onuID, gemPort)
+ f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
}
}
@@ -661,18 +661,18 @@
return nil
}
-func (f *OpenOltFlowMgr) addUpstreamDataFlow(intfID uint32, onuID uint32, uniID uint32,
+func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, uplinkClassifier map[string]interface{},
uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
allocID uint32, gemportID uint32) {
uplinkClassifier[PacketTagType] = SingleTag
log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
- f.addHSIAFlow(intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
+ f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
Upstream, logicalFlow, allocID, gemportID)
/* TODO: Install Secondary EAP on the subscriber vlan */
}
-func (f *OpenOltFlowMgr) addDownstreamDataFlow(intfID uint32, onuID uint32, uniID uint32,
+func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, downlinkClassifier map[string]interface{},
downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
allocID uint32, gemportID uint32) {
@@ -702,11 +702,11 @@
return
}
- f.addHSIAFlow(intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
+ f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
Downstream, logicalFlow, allocID, gemportID)
}
-func (f *OpenOltFlowMgr) addHSIAFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
allocID uint32, gemPortID uint32) {
var networkIntfID uint32
@@ -726,11 +726,11 @@
log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
}
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
if err != nil {
log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
return
@@ -765,10 +765,10 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
- flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
flow.OnuId,
flow.UniId,
flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
@@ -778,7 +778,7 @@
}
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
var dhcpFlow openoltpb2.Flow
var actionProto *openoltpb2.Action
@@ -802,12 +802,12 @@
delete(classifier, VlanVid)
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err = f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -840,10 +840,10 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &dhcpFlow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); ok {
log.Debug("DHCP UL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(dhcpFlow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
dhcpFlow.OnuId,
dhcpFlow.UniId,
dhcpFlow.FlowId, flowsToKVStore); err != nil {
@@ -856,13 +856,13 @@
}
//addIGMPTrapFlow creates IGMP trap-to-host flow
-func (f *OpenOltFlowMgr) addIGMPTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
- f.addUpstreamTrapFlow(intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
+ f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
}
//addUpstreamTrapFlow creates a trap-to-host flow
-func (f *OpenOltFlowMgr) addUpstreamTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) {
var flow openoltpb2.Flow
@@ -885,12 +885,12 @@
delete(classifier, VlanVid)
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
log.Errorw("flowId unavailable for upstream trap flow", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie, "flowType": flowType})
@@ -923,11 +923,11 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
log.Debugf("%s UL flow added to device successfully", flowType)
- flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
flow.OnuId,
flow.UniId,
flow.FlowId, flowsToKVStore); err != nil {
@@ -940,7 +940,7 @@
}
// Add EAPOL flow to device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
uplinkClassifier := make(map[string]interface{})
@@ -956,12 +956,12 @@
// Fill action
uplinkAction[TrapToHost] = true
flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
//Add Uplink EAPOL Flow
- uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
+ uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
if err != nil {
log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
return
@@ -999,11 +999,11 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &upstreamFlow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); ok {
log.Debug("EAPOL UL flow added to device successfully")
flowCategory := "EAPOL"
- flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(upstreamFlow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
upstreamFlow.OnuId,
upstreamFlow.UniId,
upstreamFlow.FlowId,
@@ -1085,11 +1085,11 @@
}
// DeleteTechProfileInstances removes the tech profile instances from persistent storage
-func (f *OpenOltFlowMgr) DeleteTechProfileInstances(intfID uint32, onuID uint32, uniID uint32, sn string) error {
- tpIDList := f.resourceMgr.GetTechProfileIDForOnu(intfID, onuID, uniID)
+func (f *OpenOltFlowMgr) DeleteTechProfileInstances(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, sn string) error {
+ tpIDList := f.resourceMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
uniPortName := fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
for _, tpID := range tpIDList {
- if err := f.DeleteTechProfileInstance(intfID, onuID, uniID, uniPortName, tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
log.Debugw("Failed-to-delete-tp-instance-from-kv-store", log.Fields{"tp-id": tpID, "uni-port-name": uniPortName})
// return err
// We should continue to delete tech-profile instances for other TP IDs
@@ -1099,11 +1099,11 @@
}
// DeleteTechProfileInstance removes the tech profile instance from persistent storage
-func (f *OpenOltFlowMgr) DeleteTechProfileInstance(intfID uint32, onuID uint32, uniID uint32, uniPortName string, tpID uint32) error {
+func (f *OpenOltFlowMgr) DeleteTechProfileInstance(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uniPortName string, tpID uint32) error {
if uniPortName == "" {
uniPortName = fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
}
- if err := f.techprofile[intfID].DeleteTechProfileInstance(tpID, uniPortName); err != nil {
+ if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
log.Debugw("Failed-to-delete-tp-instance-from-kv-store", log.Fields{"tp-id": tpID, "uni-port-name": uniPortName})
return err
}
@@ -1137,7 +1137,7 @@
return generatedHash
}
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
+func (f *OpenOltFlowMgr) getUpdatedFlowInfo(ctx context.Context, flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie, LogicalFlowID: logicalFlowID}}
var intfID uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1149,7 +1149,7 @@
intfID = uint32(flow.NetworkIntfId)
}
// Get existing flows matching flowid for given subscriber from KV store
- existingFlows := f.resourceMgr.GetFlowIDInfo(intfID, flow.OnuId, flow.UniId, flow.FlowId)
+ existingFlows := f.resourceMgr.GetFlowIDInfo(ctx, intfID, flow.OnuId, flow.UniId, flow.FlowId)
if existingFlows != nil {
log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
//for _, f := range *existingFlows {
@@ -1184,9 +1184,9 @@
// return &flows
//}
-func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
+func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(ctx context.Context, intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
log.Debugw("Storing flow(s) into KV store", log.Fields{"flows": *flows})
- if err := f.resourceMgr.UpdateFlowIDInfo(intfID, onuID, uniID, flowID, flows); err != nil {
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, intfID, onuID, uniID, flowID, flows); err != nil {
log.Debug("Error while Storing flow into KV store")
return err
}
@@ -1194,7 +1194,7 @@
return nil
}
-func (f *OpenOltFlowMgr) addFlowToDevice(logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
var intfID uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1218,12 +1218,12 @@
if err != nil {
log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": deviceFlow})
- f.resourceMgr.FreeFlowID(intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
+ f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
return false
}
if deviceFlow.GemportId != -1 {
// No need to register the flow if it is a trap on nni flow.
- f.registerFlow(logicalFlow, deviceFlow)
+ f.registerFlow(ctx, logicalFlow, deviceFlow)
}
log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
return true
@@ -1266,7 +1266,7 @@
*/
-func (f *OpenOltFlowMgr) addLLDPFlow(flow *ofp.OfpFlowStats, portNo uint32) {
+func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
@@ -1293,11 +1293,11 @@
var networkInterfaceID = IntfIDFromNniPortNum(portNo)
var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
@@ -1328,10 +1328,10 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(flow, &downstreamflow); ok {
+ if ok := f.addFlowToDevice(ctx, flow, &downstreamflow); ok {
log.Debug("LLDP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -1472,7 +1472,7 @@
}
//clearResources clears pon resources in kv store and the device
-func (f *OpenOltFlowMgr) clearResources(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
gemPortID int32, flowID uint32, flowDirection string,
portNum uint32, updatedFlows []rsrcMgr.FlowInfo) error {
@@ -1487,7 +1487,7 @@
// So the flow should not be freed yet.
// For ex: Case of HSIA where same flow is shared
// between DS and US.
- f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
+ f.updateFlowInfoToKVStore(ctx, int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
if len(updatedFlows) == 0 {
// Do this for subscriber flows only (not trap from NNI flows)
if onuID != -1 && uniID != -1 {
@@ -1507,12 +1507,12 @@
}
log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
- f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
+ f.resourceMgr.FreeFlowID(ctx, Intf, int32(onuID), int32(uniID), flowID)
uni := getUniPortPath(Intf, onuID, uniID)
tpPath := f.getTPpath(Intf, uni, tpID)
log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
- techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(tpID, tpPath)
+ techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
if err != nil { // This should not happen, something wrong in KV backend transaction
log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
return err
@@ -1531,7 +1531,7 @@
// everytime flowsUsedByGemPort cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
f.flowsUsedByGemPort[gemPK] = flowIDs
- f.resourceMgr.UpdateFlowIDsForGem(Intf, uint32(gemPortID), flowIDs)
+ f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs)
break
}
}
@@ -1539,17 +1539,17 @@
return nil
}
log.Debugf("Gem port id %d is not used by another flow - releasing the gem port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
// But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), Intf)
+ f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
f.deleteGemPortFromLocalCache(Intf, uint32(onuID), uint32(gemPortID))
f.onuIdsLock.Lock()
//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
delete(f.flowsUsedByGemPort, gemPK)
- f.resourceMgr.DeleteFlowIDsForGem(Intf, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
+ f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
// Delete the gem port on the ONU.
if err := f.sendDeleteGemPortToChild(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
@@ -1557,13 +1557,13 @@
log.Fields{"err": err, "pon": Intf, "onuID": onuID, "uniID": uniID, "gemPortId": gemPortID})
}
- ok, _ := f.isTechProfileUsedByAnotherGem(Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+ ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
if !ok {
- f.resourceMgr.RemoveTechProfileIDForOnu(Intf, uint32(onuID), uint32(uniID), tpID)
- f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
- f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
- f.DeleteTechProfileInstance(Intf, uint32(onuID), uint32(uniID), "", tpID)
- f.resourceMgr.FreeAllocID(Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+ f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID)
+ f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+ f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+ f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID)
+ f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
// Delete the TCONT on the ONU.
if err := f.sendDeleteTcontToChild(Intf, uint32(onuID), uint32(uniID), uint32(techprofileInst.UsScheduler.AllocID), tpPath); err != nil {
log.Errorw("error processing delete tcont towards onu",
@@ -1575,12 +1575,12 @@
return nil
}
-func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
if flowDirection == Multicast {
- f.clearMulticastFlowFromResourceManager(flow)
+ f.clearMulticastFlowFromResourceManager(ctx, flow)
return
}
@@ -1613,9 +1613,9 @@
log.Debug("Trap on nni flow set oni, uni to -1")
Intf = IntfIDFromNniPortNum(inPort)
}
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(Intf, onuID, uniID)
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(Intf, onuID, uniID, flowID)
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
if flowInfo == nil {
log.Debugw("No FlowInfo found found in KV store",
log.Fields{"Intf": Intf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
@@ -1634,7 +1634,7 @@
log.Debug("Flow removed from device successfully")
//Remove the Flow from FlowInfo
updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- err = f.clearResources(flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
+ err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
flowID, flowDirection, portNum, updatedFlows)
if err != nil {
log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
@@ -1651,10 +1651,10 @@
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
classifierInfo := make(map[string]interface{})
formulateClassifierInfoFromFlow(classifierInfo, flow)
- inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+ inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
if err != nil {
log.Warnw("No inPort found. Cannot release resources of the multicast flow.", log.Fields{"flowId:": flow.Id})
@@ -1667,10 +1667,10 @@
var flowID uint32
var updatedFlows []rsrcMgr.FlowInfo
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(networkInterfaceID, onuID, uniID)
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(networkInterfaceID, onuID, uniID, flowID)
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
if flowInfo == nil {
log.Debugw("No multicast FlowInfo found in the KV store",
log.Fields{"Intf": networkInterfaceID, "onuID": onuID, "uniID": uniID, "flowID": flowID})
@@ -1692,20 +1692,20 @@
log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
//Remove the Flow from FlowInfo
updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
log.Error("Failed to delete multicast flow from the KV store", log.Fields{"flow": storedFlow, "err": err})
return
}
//release flow id
log.Debugw("Releasing multicast flow id", log.Fields{"flowId": flowID, "interfaceID": networkInterfaceID})
- f.resourceMgr.FreeFlowID(uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+ f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
}
}
}
}
//RemoveFlow removes the flow from the device
-func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) {
log.Debugw("Removing Flow", log.Fields{"flow": flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -1729,7 +1729,7 @@
} else {
direction = Downstream
}
- f.clearFlowFromResourceManager(flow, direction) //TODO: Take care of the limitations
+ f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
return
}
@@ -1770,7 +1770,7 @@
// AddFlow add flow to device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
+func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
var UsMeterID uint32
@@ -1788,7 +1788,7 @@
if flows.HasGroup(flow) {
// handle multicast flow
- f.handleFlowWithGroup(actionInfo, classifierInfo, flow)
+ f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
return
}
@@ -1805,7 +1805,7 @@
if ethType, ok := classifierInfo[EthType]; ok {
if ethType.(uint32) == LldpEthType {
log.Info("Adding LLDP flow")
- f.addLLDPFlow(flow, portNo)
+ f.addLLDPFlow(ctx, flow, portNo)
return
}
}
@@ -1814,7 +1814,7 @@
if udpSrc, ok := classifierInfo[UDPSrc]; ok {
if udpSrc.(uint32) == uint32(67) || udpSrc.(uint32) == uint32(546) {
log.Debug("trap-dhcp-from-nni-flow")
- f.addDHCPTrapFlowOnNNI(flow, classifierInfo, portNo)
+ f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
return
}
}
@@ -1822,12 +1822,12 @@
}
if isIgmpTrapDownstreamFlow(classifierInfo) {
log.Debug("trap-igmp-from-nni-flow")
- f.addIgmpTrapFlowOnNNI(flow, classifierInfo, portNo)
+ f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
return
}
f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
- f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNo)
+ f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(flow)
if err != nil {
@@ -1847,17 +1847,14 @@
pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
log.Debugw("no pending flows found, going ahead with flow install", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
- f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+ f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
} else {
- ctx := context.Background()
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
pendingFlowDelComplete := make(chan bool)
go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
select {
case <-pendingFlowDelComplete:
log.Debugw("all pending flow deletes completed", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
- f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+ f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
case <-time.After(10 * time.Second):
log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
@@ -1866,11 +1863,11 @@
}
// handleFlowWithGroup adds multicast flow to the device.
-func (f *OpenOltFlowMgr) handleFlowWithGroup(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
classifierInfo[PacketTagType] = DoubleTag
log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
- inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+ inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
if err != nil {
log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
return
@@ -1893,11 +1890,11 @@
networkInterfaceID := IntfIDFromNniPortNum(inPort)
var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
return
@@ -1917,20 +1914,20 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie}
- if ok := f.addFlowToDevice(flow, &multicastFlow); ok {
+ if ok := f.addFlowToDevice(ctx, flow, &multicastFlow); ok {
log.Debug("multicast flow added to device successfully")
//get cached group
- group, _, err := f.GetFlowGroupFromKVStore(groupID, true)
+ group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
if err == nil {
//calling groupAdd to set group members after multicast flow creation
- if f.ModifyGroup(group) {
+ if f.ModifyGroup(ctx, group) {
//cached group can be removed now
- f.resourceMgr.RemoveFlowGroupFromKVStore(groupID, true)
+ f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
}
}
- flowsToKVStore := f.getUpdatedFlowInfo(&multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -1941,12 +1938,12 @@
}
//getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
-func (f *OpenOltFlowMgr) getInPortOfMulticastFlow(classifierInfo map[string]interface{}) (uint32, error) {
+func (f *OpenOltFlowMgr) getInPortOfMulticastFlow(ctx context.Context, classifierInfo map[string]interface{}) (uint32, error) {
if _, ok := classifierInfo[InPort]; ok {
return classifierInfo[InPort].(uint32), nil
}
// find first NNI port of the device
- nniPorts, e := f.resourceMgr.GetNNIFromKVStore()
+ nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
if e == nil && len(nniPorts) > 0 {
return nniPorts[0], nil
}
@@ -1954,7 +1951,7 @@
}
// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(group *ofp.OfpGroupEntry) {
+func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) {
log.Infow("add-group", log.Fields{"group": group})
if group == nil {
log.Warn("skipping nil group")
@@ -1968,13 +1965,13 @@
}
log.Debugw("Sending group to device", log.Fields{"groupToOlt": groupToOlt})
- _, err := f.deviceHandler.Client.PerformGroupOperation(context.Background(), &groupToOlt)
+ _, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
if err != nil {
log.Errorw("add-group operation failed", log.Fields{"err": err, "groupToOlt": groupToOlt})
return
}
// group members not created yet. So let's store the group
- if err := f.resourceMgr.AddFlowGroupToKVStore(group, true); err != nil {
+ if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
log.Errorw("Group cannot be stored in KV store", log.Fields{"groupId": group.Desc.GroupId, "err": err})
} else {
log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
@@ -1992,7 +1989,7 @@
}
// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(group *ofp.OfpGroupEntry) bool {
+func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) bool {
log.Infow("modify-group", log.Fields{"group": group})
if group == nil || group.Desc == nil {
log.Warn("cannot modify group; group is nil")
@@ -2001,7 +1998,7 @@
new := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
//get existing members of the group
- val, groupExists, err := f.GetFlowGroupFromKVStore(group.Desc.GroupId, false)
+ val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
if err != nil {
log.Errorw("Failed to retrieve the group from the store. Cannot modify group.",
@@ -2039,7 +2036,7 @@
}
if isSuccess {
- if err := f.resourceMgr.AddFlowGroupToKVStore(group, false); err != nil {
+ if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId})
}
log.Debugw("modify-group was success. Storing the group", log.Fields{"group": group, "existingGroup": current})
@@ -2173,13 +2170,13 @@
}
//UpdateOnuInfo function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) UpdateOnuInfo(intfID uint32, onuID uint32, serialNum string) {
+func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) {
f.lockCache.Lock()
defer f.lockCache.Unlock()
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
- if err := f.resourceMgr.AddOnuInfo(intfID, onu); err != nil {
+ if err := f.resourceMgr.AddOnuInfo(ctx, intfID, onu); err != nil {
log.Errorw("failed to add onu info", log.Fields{"onu": onu})
return
}
@@ -2187,7 +2184,7 @@
}
//addGemPortToOnuInfoMap function adds GEMport to ONU map
-func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(intfID uint32, onuID uint32, gemPort uint32) {
+func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
f.lockCache.Lock()
defer f.lockCache.Unlock()
onugem := f.onuGemInfo[intfID]
@@ -2206,7 +2203,7 @@
f.onuGemInfo[intfID] = onugem
}
}
- err := f.resourceMgr.AddGemToOnuGemInfo(intfID, onuID, gemPort)
+ err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
if err != nil {
log.Errorw("Failed to add gem to onu", log.Fields{"intfId": intfID, "onuId": onuID, "gemPort": gemPort})
return
@@ -2236,7 +2233,7 @@
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
-func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(packetIn *openoltpb2.PacketIndication) (uint32, error) {
+func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
var onuID uint32
var err error
@@ -2254,7 +2251,7 @@
logicalPortNum = MkUniPortNum(packetIn.IntfId, onuID, uniID)
}
// Store the gem port through which the packet_in came. Use the same gem port for packet_out
- f.UpdateGemPortForPktIn(packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
+ f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
} else if packetIn.IntfType == "nni" {
logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
}
@@ -2267,7 +2264,7 @@
}
//GetPacketOutGemPortID returns gemPortId
-func (f *OpenOltFlowMgr) GetPacketOutGemPortID(intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
+func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
var gemPortID uint32
var err error
@@ -2281,7 +2278,7 @@
return gemPortID, err
}
//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
- gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(intfID, onuID, portNum)
+ gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, intfID, onuID, portNum)
if err == nil {
if gemPortID != 0 {
f.packetInGemPort[pktInkey] = gemPortID
@@ -2294,11 +2291,11 @@
return uint32(0), err
}
-func installFlowOnAllGemports(
- f1 func(intfId uint32, onuId uint32, uniId uint32,
+func installFlowOnAllGemports(ctx context.Context,
+ f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
portNo uint32, classifier map[string]interface{}, action map[string]interface{},
logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
- f2 func(intfId uint32, onuId uint32, uniId uint32, portNo uint32,
+ f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
classifier map[string]interface{}, action map[string]interface{}),
args map[string]uint32,
@@ -2310,9 +2307,9 @@
log.Debugw("Installing flow on all GEM ports", log.Fields{"FlowType": FlowType, "gemPorts": gemPorts, "vlan": vlanID})
for _, gemPortID := range gemPorts {
if FlowType == HsiaFlow || FlowType == DhcpFlow {
- f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
+ f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
} else if FlowType == EapolFlow {
- f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0], classifier, action)
+ f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0], classifier, action)
} else {
log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
return
@@ -2320,7 +2317,7 @@
}
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
log.Debug("Adding trap-dhcp-of-nni-flow")
action := make(map[string]interface{})
classifier[PacketTagType] = DoubleTag
@@ -2349,11 +2346,11 @@
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
return
@@ -2383,10 +2380,10 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
log.Debug("DHCP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -2423,7 +2420,7 @@
}
//addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
-func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
log.Debugw("Adding igmp-trap-of-nni-flow", log.Fields{"classifierInfo": classifier})
action := make(map[string]interface{})
classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
@@ -2449,11 +2446,11 @@
return
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("igmp-flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
log.Errorw("IGMP flow id unavailable for trap-on-NNI flow", log.Fields{"error": err})
return
@@ -2483,10 +2480,10 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
log.Debug("IGMP Trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -2509,7 +2506,7 @@
return "", nil
}
-func (f *OpenOltFlowMgr) checkAndAddFlow(args map[string]uint32, classifierInfo map[string]interface{},
+func (f *OpenOltFlowMgr) checkAndAddFlow(ctx context.Context, args map[string]uint32, classifierInfo map[string]interface{},
actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpInst *tp.TechProfile, gemPorts []uint32,
TpID uint32, uni string) {
var gemPort uint32
@@ -2526,10 +2523,10 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
//Adding DHCP upstream flow
- f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addDHCPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding DHCP upstream flow to all gemports
- installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
+ installFlowOnAllGemports(ctx, f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
}
} else if ipProto == IgmpProto {
@@ -2538,10 +2535,10 @@
gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
- f.addIGMPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addIGMPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding IGMP upstream flow to all gem ports
- installFlowOnAllGemports(f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, IgmpFlow)
+ installFlowOnAllGemports(ctx, f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, IgmpFlow)
}
} else {
log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
@@ -2561,9 +2558,9 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
- f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID, classifierInfo, actionInfo)
+ f.addEAPOLFlow(ctx, intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID, classifierInfo, actionInfo)
} else {
- installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
+ installFlowOnAllGemports(ctx, nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
}
}
} else if _, ok := actionInfo[PushVlan]; ok {
@@ -2573,10 +2570,10 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
//Adding HSIA upstream flow
- f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addUpstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding HSIA upstream flow to all gemports
- installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+ installFlowOnAllGemports(ctx, f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
}
} else if _, ok := actionInfo[PopVlan]; ok {
log.Info("Adding Downstream data rule")
@@ -2585,10 +2582,10 @@
tp_pb.Direction_DOWNSTREAM,
pcp.(uint32))
//Adding HSIA downstream flow
- f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addDownstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding HSIA downstream flow to all gemports
- installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+ installFlowOnAllGemports(ctx, f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
}
} else {
log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
@@ -2606,8 +2603,8 @@
return false
}
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
- currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ponIntf, onuID, uniID)
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+ currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
tpGemPorts := tpInst.UpstreamGemPortAttributeList
for _, currentGemPort := range currentGemPorts {
for _, tpGemPort := range tpGemPorts {
@@ -2618,14 +2615,14 @@
}
if tpInst.InstanceCtrl.Onu == "single-instance" {
// The TP information for the given TP ID, PON ID, ONU ID, UNI ID should be removed.
- f.resourceMgr.RemoveTechProfileIDForOnu(ponIntf, uint32(onuID), uint32(uniID), tpID)
- f.DeleteTechProfileInstance(ponIntf, uint32(onuID), uint32(uniID), "", tpID)
+ f.resourceMgr.RemoveTechProfileIDForOnu(ctx, ponIntf, uint32(onuID), uint32(uniID), tpID)
+ f.DeleteTechProfileInstance(ctx, ponIntf, uint32(onuID), uint32(uniID), "", tpID)
// Although we cleaned up TP Instance for the given (PON ID, ONU ID, UNI ID), the TP might
// still be used on other uni ports.
// So, we need to check and make sure that no other gem port is referring to the given TP ID
// on any other uni port.
- tpInstances := f.techprofile[ponIntf].FindAllTpInstances(tpID, ponIntf, onuID)
+ tpInstances := f.techprofile[ponIntf].FindAllTpInstances(ctx, tpID, ponIntf, onuID)
log.Debugw("got single instance tp instances", log.Fields{"tpInstances": tpInstances})
for i := 0; i < len(tpInstances); i++ {
tpI := tpInstances[i]
@@ -2839,7 +2836,7 @@
}
// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
-func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
f.lockCache.Lock()
@@ -2854,13 +2851,13 @@
}
f.packetInGemPort[pktInkey] = gemPort
- f.resourceMgr.UpdateGemPortForPktIn(pktInkey, gemPort)
+ f.resourceMgr.UpdateGemPortForPktIn(ctx, pktInkey, gemPort)
log.Debugw("pktin key not found in local cache or value is different. updating cache and kv store", log.Fields{"pktinkey": pktInkey, "gem": gemPort})
return
}
// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
-func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(intfID uint32, onuID uint32, portNum uint32) {
+func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
f.lockCache.Lock()
defer f.lockCache.Unlock()
@@ -2878,11 +2875,11 @@
f.onuGemInfo[intfID] = onugem
}
}
- f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNum)
+ f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
}
-func (f *OpenOltFlowMgr) loadFlowIDlistForGem(intf uint32) {
- flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(intf)
+func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
+ flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
if err != nil {
log.Error("Failed to get flowid list per gem", log.Fields{"intf": intf})
return
@@ -2896,8 +2893,8 @@
//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
//and put them into interfaceToMcastQueueMap.
-func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap() {
- storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap()
+func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
+ storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
if err != nil {
log.Error("Failed to get pon interface to multicast queue map")
return
@@ -2914,8 +2911,8 @@
//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
//Returns (nil, false, nil) if the group does not exists in the KV store.
-func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
- exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(groupID, cached)
+func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+ exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
if err != nil {
log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
return nil, false, errors.New("failed to retrieve the flow group")