[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase
Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index f16477f..439c64d 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -37,7 +37,7 @@
}
// NewFlowDecomposer creates flow decomposer instance
-func NewFlowDecomposer(deviceMgr coreif.DeviceManager) *FlowDecomposer {
+func NewFlowDecomposer(ctx context.Context, deviceMgr coreif.DeviceManager) *FlowDecomposer {
var decomposer FlowDecomposer
decomposer.deviceMgr = deviceMgr
return &decomposer
@@ -45,7 +45,7 @@
//DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
func (fd *FlowDecomposer) DecomposeRules(ctx context.Context, agent coreif.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) (*fu.DeviceRules, error) {
- deviceRules := *fu.NewDeviceRules()
+ deviceRules := *fu.NewDeviceRules(ctx)
devicesToUpdate := make(map[string]string)
groupMap := make(map[uint32]*ofp.OfpGroupEntry)
@@ -59,30 +59,30 @@
return nil, err
}
for deviceID, flowAndGroups := range decomposedRules.Rules {
- deviceRules.CreateEntryIfNotExist(deviceID)
- deviceRules.Rules[deviceID].AddFrom(flowAndGroups)
+ deviceRules.CreateEntryIfNotExist(ctx, deviceID)
+ deviceRules.Rules[deviceID].AddFrom(ctx, flowAndGroups)
devicesToUpdate[deviceID] = deviceID
}
}
- return deviceRules.FilterRules(devicesToUpdate), nil
+ return deviceRules.FilterRules(ctx, devicesToUpdate), nil
}
// Handles special case of any controller-bound flow for a parent device
-func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(flow *ofp.OfpFlowStats,
+func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(ctx context.Context, flow *ofp.OfpFlowStats,
dr *fu.DeviceRules) (*fu.DeviceRules, error) {
- EAPOL := fu.EthType(0x888e)
- IGMP := fu.IpProto(2)
- UDP := fu.IpProto(17)
+ EAPOL := fu.EthType(ctx, 0x888e)
+ IGMP := fu.IpProto(ctx, 2)
+ UDP := fu.IpProto(ctx, 17)
- newDeviceRules := dr.Copy()
+ newDeviceRules := dr.Copy(ctx)
// Check whether we are dealing with a parent device
- for deviceID, fg := range dr.GetRules() {
- if root, _ := fd.deviceMgr.IsRootDevice(deviceID); root {
- newDeviceRules.ClearFlows(deviceID)
+ for deviceID, fg := range dr.GetRules(ctx) {
+ if root, _ := fd.deviceMgr.IsRootDevice(ctx, deviceID); root {
+ newDeviceRules.ClearFlows(ctx, deviceID)
for i := 0; i < fg.Flows.Len(); i++ {
- f := fg.GetFlow(i)
+ f := fg.GetFlow(ctx, i)
UpdateOutPortNo := false
- for _, field := range fu.GetOfbFields(f) {
+ for _, field := range fu.GetOfbFields(ctx, f) {
UpdateOutPortNo = (field.String() == EAPOL.String())
UpdateOutPortNo = UpdateOutPortNo || (field.String() == IGMP.String())
UpdateOutPortNo = UpdateOutPortNo || (field.String() == UDP.String())
@@ -91,7 +91,7 @@
}
}
if UpdateOutPortNo {
- f = fu.UpdateOutputPortByActionType(f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+ f = fu.UpdateOutputPortByActionType(ctx, f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
}
// Update flow Id as a change in the instruction field will result in a new flow ID
@@ -99,7 +99,7 @@
//if f.Id, err = fu.HashFlowStats(f); err != nil {
//return nil, err
//}
- newDeviceRules.AddFlow(deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
+ newDeviceRules.AddFlow(ctx, deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
}
}
}
@@ -112,34 +112,34 @@
inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
logger.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
- deviceRules := fu.NewDeviceRules()
- meterID := fu.GetMeterIdFromFlow(flow)
- metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+ deviceRules := fu.NewDeviceRules(ctx)
+ meterID := fu.GetMeterIdFromFlow(ctx, flow)
+ metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
ingressHop := path[0]
egressHop := path[1]
//case of packet_in from NNI port rule
- if agent.GetDeviceRoutes().IsRootPort(inPortNo) {
+ if agent.GetDeviceRoutes(ctx).IsRootPort(ctx, inPortNo) {
// Trap flow for NNI port
logger.Debug("trap-nni")
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(egressHop.Egress),
+ fu.InPort(ctx, egressHop.Egress),
},
- Actions: fu.GetActions(flow),
+ Actions: fu.GetActions(ctx, flow),
}
// Augment the matchfields with the ofpfields from the flow
- fg := fu.NewFlowsAndGroups()
- fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
- fs, err := fu.MkFlowStat(fa)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
+ fs, err := fu.MkFlowStat(ctx, fa)
if err != nil {
return nil, err
}
- fg.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+ fg.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fg)
} else {
// Trap flow for UNI port
logger.Debug("trap-uni")
@@ -147,7 +147,7 @@
//inPortNo is 0 for wildcard input case, do not include upstream port for controller bound flow in input
var inPorts []uint32
if inPortNo == 0 {
- inPorts = agent.GetWildcardInputPorts(egressHop.Egress) // exclude egress_hop.egress_port.port_no
+ inPorts = agent.GetWildcardInputPorts(ctx, egressHop.Egress) // exclude egress_hop.egress_port.port_no
} else {
inPorts = []uint32{inPortNo}
}
@@ -156,45 +156,45 @@
faParent := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(egressHop.Ingress),
- fu.TunnelId(uint64(inputPort)),
+ fu.InPort(ctx, egressHop.Ingress),
+ fu.TunnelId(ctx, uint64(inputPort)),
},
Actions: []*ofp.OfpAction{
- fu.Output(egressHop.Egress),
+ fu.Output(ctx, egressHop.Egress),
},
}
// Augment the matchfields with the ofpfields from the flow
- faParent.MatchFields = append(faParent.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
- fgParent := fu.NewFlowsAndGroups()
- fs, err := fu.MkFlowStat(faParent)
+ faParent.MatchFields = append(faParent.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
+ fgParent := fu.NewFlowsAndGroups(ctx)
+ fs, err := fu.MkFlowStat(ctx, faParent)
if err != nil {
return nil, err
}
- fgParent.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fgParent)
+ fgParent.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fgParent)
logger.Debugw("parent-trap-flow-set", log.Fields{"flow": faParent})
// Upstream flow on child (onu) device
var actions []*ofp.OfpAction
- setvid := fu.GetVlanVid(flow)
+ setvid := fu.GetVlanVid(ctx, flow)
if setvid != nil {
// have this child push the vlan the parent is matching/trapping on above
actions = []*ofp.OfpAction{
- fu.PushVlan(0x8100),
- fu.SetField(fu.VlanVid(*setvid)),
- fu.Output(ingressHop.Egress),
+ fu.PushVlan(ctx, 0x8100),
+ fu.SetField(ctx, fu.VlanVid(ctx, *setvid)),
+ fu.Output(ctx, ingressHop.Egress),
}
} else {
// otherwise just set the egress port
actions = []*ofp.OfpAction{
- fu.Output(ingressHop.Egress),
+ fu.Output(ctx, ingressHop.Egress),
}
}
faChild := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(ingressHop.Ingress),
- fu.TunnelId(uint64(inputPort)),
+ fu.InPort(ctx, ingressHop.Ingress),
+ fu.TunnelId(ctx, uint64(inputPort)),
},
Actions: actions,
}
@@ -202,17 +202,17 @@
// If the parent has a match vid and the child is setting that match vid exclude the the match vlan
// for the child given it will be setting that vlan and the parent will be matching on it
if setvid != nil {
- faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.VLAN_VID)...)
+ faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT, fu.VLAN_VID)...)
} else {
- faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+ faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
}
- fgChild := fu.NewFlowsAndGroups()
- fs, err = fu.MkFlowStat(faChild)
+ fgChild := fu.NewFlowsAndGroups(ctx)
+ fs, err = fu.MkFlowStat(ctx, faChild)
if err != nil {
return nil, err
}
- fgChild.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fgChild)
+ fgChild.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fgChild)
logger.Debugw("child-trap-flow-set", log.Fields{"flow": faChild})
}
}
@@ -228,15 +228,15 @@
path []route.Hop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
logger.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
- deviceRules := fu.NewDeviceRules()
+ deviceRules := fu.NewDeviceRules(ctx)
- meterID := fu.GetMeterIdFromFlow(flow)
- metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+ meterID := fu.GetMeterIdFromFlow(ctx, flow)
+ metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
ingressHop := path[0]
egressHop := path[1]
- if flow.TableId == 0 && fu.HasNextTable(flow) {
+ if flow.TableId == 0 && fu.HasNextTable(ctx, flow) {
logger.Debugw("decomposing-onu-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
if outPortNo != 0 {
logger.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
@@ -245,48 +245,48 @@
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(ingressHop.Ingress),
- fu.TunnelId(uint64(inPortNo)),
+ fu.InPort(ctx, ingressHop.Ingress),
+ fu.TunnelId(ctx, uint64(inPortNo)),
},
- Actions: fu.GetActions(flow),
+ Actions: fu.GetActions(ctx, flow),
}
// Augment the matchfields with the ofpfields from the flow
- fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+ fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
// Augment the Actions
- fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
+ fa.Actions = append(fa.Actions, fu.Output(ctx, ingressHop.Egress))
- fg := fu.NewFlowsAndGroups()
- fs, err := fu.MkFlowStat(fa)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fs, err := fu.MkFlowStat(ctx, fa)
if err != nil {
return nil, err
}
- fg.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+ fg.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fg)
} else if flow.TableId == 1 && outPortNo != 0 {
logger.Debugw("decomposing-olt-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(egressHop.Ingress),
- fu.TunnelId(uint64(inPortNo)),
+ fu.InPort(ctx, egressHop.Ingress),
+ fu.TunnelId(ctx, uint64(inPortNo)),
},
}
// Augment the matchfields with the ofpfields from the flow
- fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+ fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
//Augment the actions
- filteredAction := fu.GetActions(flow, fu.OUTPUT)
- filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+ filteredAction := fu.GetActions(ctx, flow, fu.OUTPUT)
+ filteredAction = append(filteredAction, fu.Output(ctx, egressHop.Egress))
fa.Actions = filteredAction
- fg := fu.NewFlowsAndGroups()
- fs, err := fu.MkFlowStat(fa)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fs, err := fu.MkFlowStat(ctx, fa)
if err != nil {
return nil, err
}
- fg.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+ fg.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fg)
}
return deviceRules, nil
}
@@ -295,9 +295,9 @@
func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(ctx context.Context, agent coreif.LogicalDeviceAgent, path []route.Hop,
inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
logger.Debugw("decomposing-olt-flow-in-downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
- deviceRules := fu.NewDeviceRules()
- meterID := fu.GetMeterIdFromFlow(flow)
- metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+ deviceRules := fu.NewDeviceRules(ctx)
+ meterID := fu.GetMeterIdFromFlow(ctx, flow)
+ metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
if outPortNo != 0 {
logger.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
@@ -313,7 +313,7 @@
egressHop := path[1]
if metadataFromwriteMetadata != 0 {
logger.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
- portNumber := fu.GetEgressPortNumberFromWriteMetadata(flow)
+ portNumber := fu.GetEgressPortNumberFromWriteMetadata(ctx, flow)
if portNumber != 0 {
recalculatedRoute, err := agent.GetRoute(ctx, inPortNo, portNumber)
if err != nil {
@@ -333,7 +333,7 @@
}
ingressHop = recalculatedRoute[0]
}
- innerTag := fu.GetInnerTagFromMetaData(flow)
+ innerTag := fu.GetInnerTagFromMetaData(ctx, flow)
if innerTag == 0 {
logger.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
//TODO: Delete flow
@@ -342,48 +342,48 @@
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(ingressHop.Ingress),
- fu.Metadata_ofp(uint64(innerTag)),
- fu.TunnelId(uint64(portNumber)),
+ fu.InPort(ctx, ingressHop.Ingress),
+ fu.Metadata_ofp(ctx, uint64(innerTag)),
+ fu.TunnelId(ctx, uint64(portNumber)),
},
- Actions: fu.GetActions(flow),
+ Actions: fu.GetActions(ctx, flow),
}
// Augment the matchfields with the ofpfields from the flow
- fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.METADATA)...)
+ fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT, fu.METADATA)...)
// Augment the Actions
- fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
+ fa.Actions = append(fa.Actions, fu.Output(ctx, ingressHop.Egress))
- fg := fu.NewFlowsAndGroups()
- fs, err := fu.MkFlowStat(fa)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fs, err := fu.MkFlowStat(ctx, fa)
if err != nil {
return nil, err
}
- fg.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+ fg.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fg)
} else { // Create standard flow
logger.Debugw("creating-standard-flow", log.Fields{"flow": flow})
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(ingressHop.Ingress),
- fu.TunnelId(uint64(inPortNo)),
+ fu.InPort(ctx, ingressHop.Ingress),
+ fu.TunnelId(ctx, uint64(inPortNo)),
},
- Actions: fu.GetActions(flow),
+ Actions: fu.GetActions(ctx, flow),
}
// Augment the matchfields with the ofpfields from the flow
- fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+ fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
// Augment the Actions
- fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
+ fa.Actions = append(fa.Actions, fu.Output(ctx, ingressHop.Egress))
- fg := fu.NewFlowsAndGroups()
- fs, err := fu.MkFlowStat(fa)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fs, err := fu.MkFlowStat(ctx, fa)
if err != nil {
return nil, err
}
- fg.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+ fg.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fg)
}
return deviceRules, nil
@@ -394,33 +394,33 @@
inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
logger.Debugw("decomposing-onu-flow-in-downstream-unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
- deviceRules := fu.NewDeviceRules()
+ deviceRules := fu.NewDeviceRules(ctx)
egressHop := path[1]
- meterID := fu.GetMeterIdFromFlow(flow)
- metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+ meterID := fu.GetMeterIdFromFlow(ctx, flow)
+ metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(egressHop.Ingress),
+ fu.InPort(ctx, egressHop.Ingress),
},
}
// Augment the matchfields with the ofpfields from the flow
- fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+ fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
// Augment the Actions
- filteredAction := fu.GetActions(flow, fu.OUTPUT)
- filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+ filteredAction := fu.GetActions(ctx, flow, fu.OUTPUT)
+ filteredAction = append(filteredAction, fu.Output(ctx, egressHop.Egress))
fa.Actions = filteredAction
- fg := fu.NewFlowsAndGroups()
- fs, err := fu.MkFlowStat(fa)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fs, err := fu.MkFlowStat(ctx, fa)
if err != nil {
return nil, err
}
- fg.AddFlow(fs)
- deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+ fg.AddFlow(ctx, fs)
+ deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fg)
return deviceRules, nil
}
@@ -430,7 +430,7 @@
groupMap map[uint32]*ofp.OfpGroupEntry) *fu.DeviceRules {
logger.Debugw("multicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
- deviceRules := fu.NewDeviceRules()
+ deviceRules := fu.NewDeviceRules(ctx)
//having no Group yet is the same as having a Group with no buckets
var grp *ofp.OfpGroupEntry
@@ -444,11 +444,11 @@
return deviceRules
}
- deviceRules.CreateEntryIfNotExist(path[0].DeviceID)
- fg := fu.NewFlowsAndGroups()
- fg.AddFlow(flow)
+ deviceRules.CreateEntryIfNotExist(ctx, path[0].DeviceID)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fg.AddFlow(ctx, flow)
//return the multicast flow without decomposing it
- deviceRules.AddFlowsAndGroup(path[0].DeviceID, fg)
+ deviceRules.AddFlowsAndGroup(ctx, path[0].DeviceID, fg)
return deviceRules
}
@@ -456,18 +456,18 @@
func (fd *FlowDecomposer) decomposeFlow(ctx context.Context, agent coreif.LogicalDeviceAgent, flow *ofp.OfpFlowStats,
groupMap map[uint32]*ofp.OfpGroupEntry) (*fu.DeviceRules, error) {
- inPortNo := fu.GetInPort(flow)
- if fu.HasGroup(flow) && inPortNo == 0 {
+ inPortNo := fu.GetInPort(ctx, flow)
+ if fu.HasGroup(ctx, flow) && inPortNo == 0 {
//if no in-port specified for a multicast flow, put NNI port as in-port
//so that a valid path can be found for the flow
- nniPorts := agent.GetNNIPorts()
+ nniPorts := agent.GetNNIPorts(ctx)
if len(nniPorts) > 0 {
inPortNo = nniPorts[0]
logger.Debugw("assigning-nni-port-as-in-port-for-multicast-flow", log.Fields{"nni": nniPorts[0], "flow:": flow})
}
}
- outPortNo := fu.GetOutPort(flow)
- deviceRules := fu.NewDeviceRules()
+ outPortNo := fu.GetOutPort(ctx, flow)
+ deviceRules := fu.NewDeviceRules(ctx)
path, err := agent.GetRoute(ctx, inPortNo, outPortNo)
if err != nil {
return deviceRules, err
@@ -503,7 +503,7 @@
if err != nil {
return nil, err
}
- } else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
+ } else if fu.HasNextTable(ctx, flow) && flow.TableId == 0 { // Unicast OLT flow DL
logger.Debugw("process-olt-downstream-noncontrollerbound-flow-with-nexttable", log.Fields{"flows": flow})
deviceRules, err = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
if err != nil {
@@ -515,13 +515,13 @@
if err != nil {
return nil, err
}
- } else if grpID := fu.GetGroup(flow); grpID != 0 && flow.TableId == 0 { //Multicast
+ } else if grpID := fu.GetGroup(ctx, flow); grpID != 0 && flow.TableId == 0 { //Multicast
logger.Debugw("process-multicast-flow", log.Fields{"flows": flow})
deviceRules = fd.processMulticastFlow(ctx, path, inPortNo, outPortNo, flow, grpID, groupMap)
} else {
return deviceRules, status.Errorf(codes.Aborted, "unknown downstream flow %v", *flow)
}
}
- deviceRules, err = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
+ deviceRules, err = fd.updateOutputPortForControllerBoundFlowForParentDevide(ctx, flow, deviceRules)
return deviceRules, err
}