[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase

Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index f16477f..439c64d 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -37,7 +37,7 @@
 }
 
 // NewFlowDecomposer creates flow decomposer instance
-func NewFlowDecomposer(deviceMgr coreif.DeviceManager) *FlowDecomposer {
+func NewFlowDecomposer(ctx context.Context, deviceMgr coreif.DeviceManager) *FlowDecomposer {
 	var decomposer FlowDecomposer
 	decomposer.deviceMgr = deviceMgr
 	return &decomposer
@@ -45,7 +45,7 @@
 
 //DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
 func (fd *FlowDecomposer) DecomposeRules(ctx context.Context, agent coreif.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) (*fu.DeviceRules, error) {
-	deviceRules := *fu.NewDeviceRules()
+	deviceRules := *fu.NewDeviceRules(ctx)
 	devicesToUpdate := make(map[string]string)
 
 	groupMap := make(map[uint32]*ofp.OfpGroupEntry)
@@ -59,30 +59,30 @@
 			return nil, err
 		}
 		for deviceID, flowAndGroups := range decomposedRules.Rules {
-			deviceRules.CreateEntryIfNotExist(deviceID)
-			deviceRules.Rules[deviceID].AddFrom(flowAndGroups)
+			deviceRules.CreateEntryIfNotExist(ctx, deviceID)
+			deviceRules.Rules[deviceID].AddFrom(ctx, flowAndGroups)
 			devicesToUpdate[deviceID] = deviceID
 		}
 	}
-	return deviceRules.FilterRules(devicesToUpdate), nil
+	return deviceRules.FilterRules(ctx, devicesToUpdate), nil
 }
 
 // Handles special case of any controller-bound flow for a parent device
-func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(flow *ofp.OfpFlowStats,
+func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(ctx context.Context, flow *ofp.OfpFlowStats,
 	dr *fu.DeviceRules) (*fu.DeviceRules, error) {
-	EAPOL := fu.EthType(0x888e)
-	IGMP := fu.IpProto(2)
-	UDP := fu.IpProto(17)
+	EAPOL := fu.EthType(ctx, 0x888e)
+	IGMP := fu.IpProto(ctx, 2)
+	UDP := fu.IpProto(ctx, 17)
 
-	newDeviceRules := dr.Copy()
+	newDeviceRules := dr.Copy(ctx)
 	//	Check whether we are dealing with a parent device
-	for deviceID, fg := range dr.GetRules() {
-		if root, _ := fd.deviceMgr.IsRootDevice(deviceID); root {
-			newDeviceRules.ClearFlows(deviceID)
+	for deviceID, fg := range dr.GetRules(ctx) {
+		if root, _ := fd.deviceMgr.IsRootDevice(ctx, deviceID); root {
+			newDeviceRules.ClearFlows(ctx, deviceID)
 			for i := 0; i < fg.Flows.Len(); i++ {
-				f := fg.GetFlow(i)
+				f := fg.GetFlow(ctx, i)
 				UpdateOutPortNo := false
-				for _, field := range fu.GetOfbFields(f) {
+				for _, field := range fu.GetOfbFields(ctx, f) {
 					UpdateOutPortNo = (field.String() == EAPOL.String())
 					UpdateOutPortNo = UpdateOutPortNo || (field.String() == IGMP.String())
 					UpdateOutPortNo = UpdateOutPortNo || (field.String() == UDP.String())
@@ -91,7 +91,7 @@
 					}
 				}
 				if UpdateOutPortNo {
-					f = fu.UpdateOutputPortByActionType(f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+					f = fu.UpdateOutputPortByActionType(ctx, f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
 						uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
 				}
 				// Update flow Id as a change in the instruction field will result in a new flow ID
@@ -99,7 +99,7 @@
 				//if f.Id, err = fu.HashFlowStats(f); err != nil {
 				//return nil, err
 				//}
-				newDeviceRules.AddFlow(deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
+				newDeviceRules.AddFlow(ctx, deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
 			}
 		}
 	}
@@ -112,34 +112,34 @@
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 
 	logger.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
-	deviceRules := fu.NewDeviceRules()
-	meterID := fu.GetMeterIdFromFlow(flow)
-	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+	deviceRules := fu.NewDeviceRules(ctx)
+	meterID := fu.GetMeterIdFromFlow(ctx, flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
 
 	ingressHop := path[0]
 	egressHop := path[1]
 
 	//case of packet_in from NNI port rule
-	if agent.GetDeviceRoutes().IsRootPort(inPortNo) {
+	if agent.GetDeviceRoutes(ctx).IsRootPort(ctx, inPortNo) {
 		// Trap flow for NNI port
 		logger.Debug("trap-nni")
 
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(egressHop.Egress),
+				fu.InPort(ctx, egressHop.Egress),
 			},
-			Actions: fu.GetActions(flow),
+			Actions: fu.GetActions(ctx, flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fg := fu.NewFlowsAndGroups()
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-		fs, err := fu.MkFlowStat(fa)
+		fg := fu.NewFlowsAndGroups(ctx)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
+		fs, err := fu.MkFlowStat(ctx, fa)
 		if err != nil {
 			return nil, err
 		}
-		fg.AddFlow(fs)
-		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+		fg.AddFlow(ctx, fs)
+		deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fg)
 	} else {
 		// Trap flow for UNI port
 		logger.Debug("trap-uni")
@@ -147,7 +147,7 @@
 		//inPortNo is 0 for wildcard input case, do not include upstream port for controller bound flow in input
 		var inPorts []uint32
 		if inPortNo == 0 {
-			inPorts = agent.GetWildcardInputPorts(egressHop.Egress) // exclude egress_hop.egress_port.port_no
+			inPorts = agent.GetWildcardInputPorts(ctx, egressHop.Egress) // exclude egress_hop.egress_port.port_no
 		} else {
 			inPorts = []uint32{inPortNo}
 		}
@@ -156,45 +156,45 @@
 			faParent := &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Ingress),
-					fu.TunnelId(uint64(inputPort)),
+					fu.InPort(ctx, egressHop.Ingress),
+					fu.TunnelId(ctx, uint64(inputPort)),
 				},
 				Actions: []*ofp.OfpAction{
-					fu.Output(egressHop.Egress),
+					fu.Output(ctx, egressHop.Egress),
 				},
 			}
 			// Augment the matchfields with the ofpfields from the flow
-			faParent.MatchFields = append(faParent.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-			fgParent := fu.NewFlowsAndGroups()
-			fs, err := fu.MkFlowStat(faParent)
+			faParent.MatchFields = append(faParent.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
+			fgParent := fu.NewFlowsAndGroups(ctx)
+			fs, err := fu.MkFlowStat(ctx, faParent)
 			if err != nil {
 				return nil, err
 			}
-			fgParent.AddFlow(fs)
-			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fgParent)
+			fgParent.AddFlow(ctx, fs)
+			deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fgParent)
 			logger.Debugw("parent-trap-flow-set", log.Fields{"flow": faParent})
 
 			// Upstream flow on child (onu) device
 			var actions []*ofp.OfpAction
-			setvid := fu.GetVlanVid(flow)
+			setvid := fu.GetVlanVid(ctx, flow)
 			if setvid != nil {
 				// have this child push the vlan the parent is matching/trapping on above
 				actions = []*ofp.OfpAction{
-					fu.PushVlan(0x8100),
-					fu.SetField(fu.VlanVid(*setvid)),
-					fu.Output(ingressHop.Egress),
+					fu.PushVlan(ctx, 0x8100),
+					fu.SetField(ctx, fu.VlanVid(ctx, *setvid)),
+					fu.Output(ctx, ingressHop.Egress),
 				}
 			} else {
 				// otherwise just set the egress port
 				actions = []*ofp.OfpAction{
-					fu.Output(ingressHop.Egress),
+					fu.Output(ctx, ingressHop.Egress),
 				}
 			}
 			faChild := &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(ingressHop.Ingress),
-					fu.TunnelId(uint64(inputPort)),
+					fu.InPort(ctx, ingressHop.Ingress),
+					fu.TunnelId(ctx, uint64(inputPort)),
 				},
 				Actions: actions,
 			}
@@ -202,17 +202,17 @@
 			// If the parent has a match vid and the child is setting that match vid exclude the the match vlan
 			// for the child given it will be setting that vlan and the parent will be matching on it
 			if setvid != nil {
-				faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.VLAN_VID)...)
+				faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT, fu.VLAN_VID)...)
 			} else {
-				faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+				faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
 			}
-			fgChild := fu.NewFlowsAndGroups()
-			fs, err = fu.MkFlowStat(faChild)
+			fgChild := fu.NewFlowsAndGroups(ctx)
+			fs, err = fu.MkFlowStat(ctx, faChild)
 			if err != nil {
 				return nil, err
 			}
-			fgChild.AddFlow(fs)
-			deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fgChild)
+			fgChild.AddFlow(ctx, fs)
+			deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fgChild)
 			logger.Debugw("child-trap-flow-set", log.Fields{"flow": faChild})
 		}
 	}
@@ -228,15 +228,15 @@
 	path []route.Hop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 
 	logger.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
-	deviceRules := fu.NewDeviceRules()
+	deviceRules := fu.NewDeviceRules(ctx)
 
-	meterID := fu.GetMeterIdFromFlow(flow)
-	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+	meterID := fu.GetMeterIdFromFlow(ctx, flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
 
 	ingressHop := path[0]
 	egressHop := path[1]
 
-	if flow.TableId == 0 && fu.HasNextTable(flow) {
+	if flow.TableId == 0 && fu.HasNextTable(ctx, flow) {
 		logger.Debugw("decomposing-onu-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
 		if outPortNo != 0 {
 			logger.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
@@ -245,48 +245,48 @@
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(ingressHop.Ingress),
-				fu.TunnelId(uint64(inPortNo)),
+				fu.InPort(ctx, ingressHop.Ingress),
+				fu.TunnelId(ctx, uint64(inPortNo)),
 			},
-			Actions: fu.GetActions(flow),
+			Actions: fu.GetActions(ctx, flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
 
 		// Augment the Actions
-		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
+		fa.Actions = append(fa.Actions, fu.Output(ctx, ingressHop.Egress))
 
-		fg := fu.NewFlowsAndGroups()
-		fs, err := fu.MkFlowStat(fa)
+		fg := fu.NewFlowsAndGroups(ctx)
+		fs, err := fu.MkFlowStat(ctx, fa)
 		if err != nil {
 			return nil, err
 		}
-		fg.AddFlow(fs)
-		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+		fg.AddFlow(ctx, fs)
+		deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fg)
 	} else if flow.TableId == 1 && outPortNo != 0 {
 		logger.Debugw("decomposing-olt-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(egressHop.Ingress),
-				fu.TunnelId(uint64(inPortNo)),
+				fu.InPort(ctx, egressHop.Ingress),
+				fu.TunnelId(ctx, uint64(inPortNo)),
 			},
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
 
 		//Augment the actions
-		filteredAction := fu.GetActions(flow, fu.OUTPUT)
-		filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+		filteredAction := fu.GetActions(ctx, flow, fu.OUTPUT)
+		filteredAction = append(filteredAction, fu.Output(ctx, egressHop.Egress))
 		fa.Actions = filteredAction
 
-		fg := fu.NewFlowsAndGroups()
-		fs, err := fu.MkFlowStat(fa)
+		fg := fu.NewFlowsAndGroups(ctx)
+		fs, err := fu.MkFlowStat(ctx, fa)
 		if err != nil {
 			return nil, err
 		}
-		fg.AddFlow(fs)
-		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+		fg.AddFlow(ctx, fs)
+		deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fg)
 	}
 	return deviceRules, nil
 }
@@ -295,9 +295,9 @@
 func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(ctx context.Context, agent coreif.LogicalDeviceAgent, path []route.Hop,
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 	logger.Debugw("decomposing-olt-flow-in-downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
-	deviceRules := fu.NewDeviceRules()
-	meterID := fu.GetMeterIdFromFlow(flow)
-	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+	deviceRules := fu.NewDeviceRules(ctx)
+	meterID := fu.GetMeterIdFromFlow(ctx, flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
 
 	if outPortNo != 0 {
 		logger.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
@@ -313,7 +313,7 @@
 	egressHop := path[1]
 	if metadataFromwriteMetadata != 0 {
 		logger.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
-		portNumber := fu.GetEgressPortNumberFromWriteMetadata(flow)
+		portNumber := fu.GetEgressPortNumberFromWriteMetadata(ctx, flow)
 		if portNumber != 0 {
 			recalculatedRoute, err := agent.GetRoute(ctx, inPortNo, portNumber)
 			if err != nil {
@@ -333,7 +333,7 @@
 			}
 			ingressHop = recalculatedRoute[0]
 		}
-		innerTag := fu.GetInnerTagFromMetaData(flow)
+		innerTag := fu.GetInnerTagFromMetaData(ctx, flow)
 		if innerTag == 0 {
 			logger.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
 			//TODO: Delete flow
@@ -342,48 +342,48 @@
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(ingressHop.Ingress),
-				fu.Metadata_ofp(uint64(innerTag)),
-				fu.TunnelId(uint64(portNumber)),
+				fu.InPort(ctx, ingressHop.Ingress),
+				fu.Metadata_ofp(ctx, uint64(innerTag)),
+				fu.TunnelId(ctx, uint64(portNumber)),
 			},
-			Actions: fu.GetActions(flow),
+			Actions: fu.GetActions(ctx, flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.METADATA)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT, fu.METADATA)...)
 
 		// Augment the Actions
-		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
+		fa.Actions = append(fa.Actions, fu.Output(ctx, ingressHop.Egress))
 
-		fg := fu.NewFlowsAndGroups()
-		fs, err := fu.MkFlowStat(fa)
+		fg := fu.NewFlowsAndGroups(ctx)
+		fs, err := fu.MkFlowStat(ctx, fa)
 		if err != nil {
 			return nil, err
 		}
-		fg.AddFlow(fs)
-		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+		fg.AddFlow(ctx, fs)
+		deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fg)
 	} else { // Create standard flow
 		logger.Debugw("creating-standard-flow", log.Fields{"flow": flow})
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(ingressHop.Ingress),
-				fu.TunnelId(uint64(inPortNo)),
+				fu.InPort(ctx, ingressHop.Ingress),
+				fu.TunnelId(ctx, uint64(inPortNo)),
 			},
-			Actions: fu.GetActions(flow),
+			Actions: fu.GetActions(ctx, flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
 
 		// Augment the Actions
-		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
+		fa.Actions = append(fa.Actions, fu.Output(ctx, ingressHop.Egress))
 
-		fg := fu.NewFlowsAndGroups()
-		fs, err := fu.MkFlowStat(fa)
+		fg := fu.NewFlowsAndGroups(ctx)
+		fs, err := fu.MkFlowStat(ctx, fa)
 		if err != nil {
 			return nil, err
 		}
-		fg.AddFlow(fs)
-		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+		fg.AddFlow(ctx, fs)
+		deviceRules.AddFlowsAndGroup(ctx, ingressHop.DeviceID, fg)
 	}
 
 	return deviceRules, nil
@@ -394,33 +394,33 @@
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 
 	logger.Debugw("decomposing-onu-flow-in-downstream-unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
-	deviceRules := fu.NewDeviceRules()
+	deviceRules := fu.NewDeviceRules(ctx)
 
 	egressHop := path[1]
 
-	meterID := fu.GetMeterIdFromFlow(flow)
-	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+	meterID := fu.GetMeterIdFromFlow(ctx, flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(ctx, flow)
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(egressHop.Ingress),
+			fu.InPort(ctx, egressHop.Ingress),
 		},
 	}
 	// Augment the matchfields with the ofpfields from the flow
-	fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+	fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(ctx, flow, fu.IN_PORT)...)
 
 	// Augment the Actions
-	filteredAction := fu.GetActions(flow, fu.OUTPUT)
-	filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+	filteredAction := fu.GetActions(ctx, flow, fu.OUTPUT)
+	filteredAction = append(filteredAction, fu.Output(ctx, egressHop.Egress))
 	fa.Actions = filteredAction
 
-	fg := fu.NewFlowsAndGroups()
-	fs, err := fu.MkFlowStat(fa)
+	fg := fu.NewFlowsAndGroups(ctx)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	if err != nil {
 		return nil, err
 	}
-	fg.AddFlow(fs)
-	deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+	fg.AddFlow(ctx, fs)
+	deviceRules.AddFlowsAndGroup(ctx, egressHop.DeviceID, fg)
 	return deviceRules, nil
 }
 
@@ -430,7 +430,7 @@
 	groupMap map[uint32]*ofp.OfpGroupEntry) *fu.DeviceRules {
 
 	logger.Debugw("multicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
-	deviceRules := fu.NewDeviceRules()
+	deviceRules := fu.NewDeviceRules(ctx)
 
 	//having no Group yet is the same as having a Group with no buckets
 	var grp *ofp.OfpGroupEntry
@@ -444,11 +444,11 @@
 		return deviceRules
 	}
 
-	deviceRules.CreateEntryIfNotExist(path[0].DeviceID)
-	fg := fu.NewFlowsAndGroups()
-	fg.AddFlow(flow)
+	deviceRules.CreateEntryIfNotExist(ctx, path[0].DeviceID)
+	fg := fu.NewFlowsAndGroups(ctx)
+	fg.AddFlow(ctx, flow)
 	//return the multicast flow without decomposing it
-	deviceRules.AddFlowsAndGroup(path[0].DeviceID, fg)
+	deviceRules.AddFlowsAndGroup(ctx, path[0].DeviceID, fg)
 	return deviceRules
 }
 
@@ -456,18 +456,18 @@
 func (fd *FlowDecomposer) decomposeFlow(ctx context.Context, agent coreif.LogicalDeviceAgent, flow *ofp.OfpFlowStats,
 	groupMap map[uint32]*ofp.OfpGroupEntry) (*fu.DeviceRules, error) {
 
-	inPortNo := fu.GetInPort(flow)
-	if fu.HasGroup(flow) && inPortNo == 0 {
+	inPortNo := fu.GetInPort(ctx, flow)
+	if fu.HasGroup(ctx, flow) && inPortNo == 0 {
 		//if no in-port specified for a multicast flow, put NNI port as in-port
 		//so that a valid path can be found for the flow
-		nniPorts := agent.GetNNIPorts()
+		nniPorts := agent.GetNNIPorts(ctx)
 		if len(nniPorts) > 0 {
 			inPortNo = nniPorts[0]
 			logger.Debugw("assigning-nni-port-as-in-port-for-multicast-flow", log.Fields{"nni": nniPorts[0], "flow:": flow})
 		}
 	}
-	outPortNo := fu.GetOutPort(flow)
-	deviceRules := fu.NewDeviceRules()
+	outPortNo := fu.GetOutPort(ctx, flow)
+	deviceRules := fu.NewDeviceRules(ctx)
 	path, err := agent.GetRoute(ctx, inPortNo, outPortNo)
 	if err != nil {
 		return deviceRules, err
@@ -503,7 +503,7 @@
 			if err != nil {
 				return nil, err
 			}
-		} else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
+		} else if fu.HasNextTable(ctx, flow) && flow.TableId == 0 { // Unicast OLT flow DL
 			logger.Debugw("process-olt-downstream-noncontrollerbound-flow-with-nexttable", log.Fields{"flows": flow})
 			deviceRules, err = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
 			if err != nil {
@@ -515,13 +515,13 @@
 			if err != nil {
 				return nil, err
 			}
-		} else if grpID := fu.GetGroup(flow); grpID != 0 && flow.TableId == 0 { //Multicast
+		} else if grpID := fu.GetGroup(ctx, flow); grpID != 0 && flow.TableId == 0 { //Multicast
 			logger.Debugw("process-multicast-flow", log.Fields{"flows": flow})
 			deviceRules = fd.processMulticastFlow(ctx, path, inPortNo, outPortNo, flow, grpID, groupMap)
 		} else {
 			return deviceRules, status.Errorf(codes.Aborted, "unknown downstream flow %v", *flow)
 		}
 	}
-	deviceRules, err = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
+	deviceRules, err = fd.updateOutputPortForControllerBoundFlowForParentDevide(ctx, flow, deviceRules)
 	return deviceRules, err
 }
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index eb7ad75..211ffe6 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -92,7 +92,7 @@
 	}
 	return nil, errors.New("ABSENT")
 }
-func (tdm *testDeviceManager) IsRootDevice(deviceID string) (bool, error) {
+func (tdm *testDeviceManager) IsRootDevice(ctx context.Context, deviceID string) (bool, error) {
 	if d, ok := tdm.devices[deviceID]; ok {
 		return d.Root, nil
 	}
@@ -111,6 +111,7 @@
 
 func newTestFlowDecomposer(t *testing.T, deviceMgr *testDeviceManager) *testFlowDecomposer {
 	var tfd testFlowDecomposer
+	ctx := context.Background()
 	tfd.dMgr = deviceMgr
 
 	tfd.logicalPorts = make(map[uint32]*voltha.LogicalPort)
@@ -318,77 +319,77 @@
 
 	// DEFAULT RULES
 
-	tfd.defaultRules = fu.NewDeviceRules()
-	fg := fu.NewFlowsAndGroups()
+	tfd.defaultRules = fu.NewDeviceRules(ctx)
+	fg := fu.NewFlowsAndGroups(ctx)
 	fa := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(ctx, 2),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			fu.Output(1),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101)),
+			fu.Output(ctx, 1),
 		},
 	}
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	fg.AddFlow(fs)
-	tfd.defaultRules.AddFlowsAndGroup("onu1", fg)
+	fg.AddFlow(ctx, fs)
+	tfd.defaultRules.AddFlowsAndGroup(ctx, "onu1", fg)
 
-	fg = fu.NewFlowsAndGroups()
+	fg = fu.NewFlowsAndGroups(ctx)
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(ctx, 2),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 102)),
-			fu.Output(1),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|102)),
+			fu.Output(ctx, 1),
 		},
 	}
-	fs, err = fu.MkFlowStat(fa)
+	fs, err = fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	fg.AddFlow(fs)
-	tfd.defaultRules.AddFlowsAndGroup("onu2", fg)
+	fg.AddFlow(ctx, fs)
+	tfd.defaultRules.AddFlowsAndGroup(ctx, "onu2", fg)
 
-	fg = fu.NewFlowsAndGroups()
+	fg = fu.NewFlowsAndGroups(ctx)
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(ctx, 2),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 103)),
-			fu.Output(1),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|103)),
+			fu.Output(ctx, 1),
 		},
 	}
-	fs, err = fu.MkFlowStat(fa)
+	fs, err = fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	fg.AddFlow(fs)
-	tfd.defaultRules.AddFlowsAndGroup("onu3", fg)
+	fg.AddFlow(ctx, fs)
+	tfd.defaultRules.AddFlowsAndGroup(ctx, "onu3", fg)
 
-	fg = fu.NewFlowsAndGroups()
+	fg = fu.NewFlowsAndGroups(ctx)
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(ctx, 2),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 104)),
-			fu.Output(1),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|104)),
+			fu.Output(ctx, 1),
 		},
 	}
-	fs, err = fu.MkFlowStat(fa)
+	fs, err = fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	fg.AddFlow(fs)
-	tfd.defaultRules.AddFlowsAndGroup("onu4", fg)
+	fg.AddFlow(ctx, fs)
+	tfd.defaultRules.AddFlowsAndGroup(ctx, "onu4", fg)
 
 	//Set up the device graph - flow decomposer uses it only to verify whether a port is a root port.
-	tfd.deviceRoutes = route.NewDeviceRoutes("ldid", tfd.getDeviceHelper)
+	tfd.deviceRoutes = route.NewDeviceRoutes(ctx, "ldid", tfd.getDeviceHelper)
 	tfd.deviceRoutes.RootPorts = make(map[uint32]uint32)
 	tfd.deviceRoutes.RootPorts[10] = 10
 
-	tfd.fd = NewFlowDecomposer(tfd.dMgr)
+	tfd.fd = NewFlowDecomposer(ctx, tfd.dMgr)
 
 	return &tfd
 }
@@ -405,7 +406,7 @@
 	return nil, nil
 }
 
-func (tfd *testFlowDecomposer) GetDeviceRoutes() *route.DeviceRoutes {
+func (tfd *testFlowDecomposer) GetDeviceRoutes(ctx context.Context) *route.DeviceRoutes {
 	return tfd.deviceRoutes
 }
 
@@ -413,7 +414,7 @@
 	return tfd.defaultRules
 }
 
-func (tfd *testFlowDecomposer) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+func (tfd *testFlowDecomposer) GetWildcardInputPorts(ctx context.Context, excludePort ...uint32) []uint32 {
 	lPorts := make([]uint32, 0)
 	var exclPort uint32
 	if len(excludePort) == 1 {
@@ -449,7 +450,7 @@
 	return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingressPortNo, egressPortNo)
 }
 
-func (tfd *testFlowDecomposer) GetNNIPorts() []uint32 {
+func (tfd *testFlowDecomposer) GetNNIPorts(ctx context.Context) []uint32 {
 	nniPorts := make([]uint32, 0)
 	for portNo, nni := range tfd.logicalPortsNo {
 		if nni {
@@ -460,27 +461,27 @@
 }
 
 func TestEapolReRouteRuleVlanDecomposition(t *testing.T) {
-
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.VlanVid(50),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 1),
+			fu.VlanVid(ctx, 50),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -491,61 +492,61 @@
 	faParent := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.TunnelId(uint64(1)),
-			fu.VlanVid(50),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 1),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.VlanVid(ctx, 50),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(faParent)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, faParent)
 	assert.Nil(t, err)
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	faChild := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.TunnelId(uint64(1)),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 2),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.PushVlan(0x8100),
-			fu.SetField(fu.VlanVid(50)),
-			fu.Output(1),
+			fu.PushVlan(ctx, 0x8100),
+			fu.SetField(ctx, fu.VlanVid(ctx, 50)),
+			fu.Output(ctx, 1),
 		},
 	}
-	expectedOnuFlow, err := fu.MkFlowStat(faChild)
+	expectedOnuFlow, err := fu.MkFlowStat(ctx, faChild)
 	assert.Nil(t, err)
-	derivedFlow = onu1FlowAndGroup.GetFlow(0)
+	derivedFlow = onu1FlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOnuFlow.String(), derivedFlow.String())
 }
 
 func TestEapolReRouteRuleZeroVlanDecomposition(t *testing.T) {
-
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.VlanVid(0),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 1),
+			fu.VlanVid(ctx, 0),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -556,60 +557,60 @@
 	faParent := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.TunnelId(uint64(1)),
-			fu.VlanVid(0),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 1),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.VlanVid(ctx, 0),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(faParent)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, faParent)
 	assert.Nil(t, err)
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	faChild := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.TunnelId(uint64(1)),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 2),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.PushVlan(0x8100),
-			fu.SetField(fu.VlanVid(0)),
-			fu.Output(1),
+			fu.PushVlan(ctx, 0x8100),
+			fu.SetField(ctx, fu.VlanVid(ctx, 0)),
+			fu.Output(ctx, 1),
 		},
 	}
-	expectedOnuFlow, err := fu.MkFlowStat(faChild)
+	expectedOnuFlow, err := fu.MkFlowStat(ctx, faChild)
 	assert.Nil(t, err)
-	derivedFlow = onu1FlowAndGroup.GetFlow(0)
+	derivedFlow = onu1FlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOnuFlow.String(), derivedFlow.String())
 }
 
 func TestEapolReRouteRuleNoVlanDecomposition(t *testing.T) {
-
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 1),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -620,60 +621,60 @@
 	faParent := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.TunnelId(uint64(1)),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 1),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(faParent)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, faParent)
 	assert.Nil(t, err)
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	faChild := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.TunnelId(uint64(1)),
-			fu.EthType(0x888e),
+			fu.InPort(ctx, 2),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.EthType(ctx, 0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(1),
+			fu.Output(ctx, 1),
 		},
 	}
-	expectedOnuFlow, err := fu.MkFlowStat(faChild)
+	expectedOnuFlow, err := fu.MkFlowStat(ctx, faChild)
 	assert.Nil(t, err)
-	derivedFlow = onu1FlowAndGroup.GetFlow(0)
+	derivedFlow = onu1FlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOnuFlow.String(), derivedFlow.String())
 }
 
 func TestDhcpReRouteRuleDecomposition(t *testing.T) {
-
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.EthType(0x0800),
-			fu.Ipv4Dst(0xffffffff),
-			fu.IpProto(17),
-			fu.UdpSrc(68),
-			fu.UdpDst(67),
+			fu.InPort(ctx, 1),
+			fu.EthType(ctx, 0x0800),
+			fu.Ipv4Dst(ctx, 0xffffffff),
+			fu.IpProto(ctx, 17),
+			fu.UdpSrc(ctx, 68),
+			fu.UdpDst(ctx, 67),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -685,62 +686,63 @@
 	faParent := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.TunnelId(uint64(1)),
-			fu.EthType(0x0800),
-			fu.Ipv4Dst(0xffffffff),
-			fu.IpProto(17),
-			fu.UdpSrc(68),
-			fu.UdpDst(67),
+			fu.InPort(ctx, 1),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.EthType(ctx, 0x0800),
+			fu.Ipv4Dst(ctx, 0xffffffff),
+			fu.IpProto(ctx, 17),
+			fu.UdpSrc(ctx, 68),
+			fu.UdpDst(ctx, 67),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(faParent)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, faParent)
 	assert.Nil(t, err)
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	faChild := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.TunnelId(uint64(1)),
-			fu.EthType(0x0800),
-			fu.Ipv4Dst(0xffffffff),
-			fu.IpProto(17),
-			fu.UdpSrc(68),
-			fu.UdpDst(67),
+			fu.InPort(ctx, 2),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.EthType(ctx, 0x0800),
+			fu.Ipv4Dst(ctx, 0xffffffff),
+			fu.IpProto(ctx, 17),
+			fu.UdpSrc(ctx, 68),
+			fu.UdpDst(ctx, 67),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(1),
+			fu.Output(ctx, 1),
 		},
 	}
-	expectedOnuFlow, err := fu.MkFlowStat(faChild)
+	expectedOnuFlow, err := fu.MkFlowStat(ctx, faChild)
 	assert.Nil(t, err)
-	derivedFlow = onu1FlowAndGroup.GetFlow(0)
+	derivedFlow = onu1FlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOnuFlow.String(), derivedFlow.String())
 }
 
 func TestLldpReRouteRuleDecomposition(t *testing.T) {
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(10),
-			fu.EthType(0x88CC),
+			fu.InPort(ctx, 10),
+			fu.EthType(ctx, 0x88CC),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -751,50 +753,51 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.EthType(0x88CC),
+			fu.InPort(ctx, 2),
+			fu.EthType(ctx, 0x88CC),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(ctx, uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(fa)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
 
 func TestUnicastUpstreamRuleDecomposition(t *testing.T) {
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 5000, "table_id": 0},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 1),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101)),
 		},
 	}
 
 	fa2 := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 1),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.PushVlan(0x8100),
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
-			fu.SetField(fu.VlanPcp(0)),
-			fu.Output(10),
+			fu.PushVlan(ctx, 0x8100),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|1000)),
+			fu.SetField(ctx, fu.VlanPcp(ctx, 0)),
+			fu.Output(ctx, 10),
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	fs2, err := fu.MkFlowStat(fa2)
+	fs2, err := fu.MkFlowStat(ctx, fa2)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs, fs2}}
 	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
@@ -808,7 +811,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -822,20 +825,20 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 5000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.TunnelId(uint64(1)),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 2),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			fu.Output(1),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101)),
+			fu.Output(ctx, 1),
 		},
 	}
 
-	derivedFlow := onu1FlowAndGroup.GetFlow(0)
+	derivedFlow := onu1FlowAndGroup.GetFlow(ctx, 0)
 	// Form the expected flow
-	expectedOnu1Flow, err := fu.MkFlowStat(fa)
+	expectedOnu1Flow, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	expectedOnu1Flow.Instructions = []*ofp.OfpInstruction{{
 		Type: uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
@@ -856,54 +859,55 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.TunnelId(uint64(1)),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 1),
+			fu.TunnelId(ctx, uint64(1)),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.PushVlan(0x8100),
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
-			fu.SetField(fu.VlanPcp(0)),
-			fu.Output(2),
+			fu.PushVlan(ctx, 0x8100),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|1000)),
+			fu.SetField(ctx, fu.VlanPcp(ctx, 0)),
+			fu.Output(ctx, 2),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(fa)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	derivedFlow = oltFlowAndGroup.GetFlow(0)
+	derivedFlow = oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
 
 func TestUnicastDownstreamRuleDecomposition(t *testing.T) {
+	ctx := context.Background()
 	logger.Debugf("Starting Test Unicast Downstream")
 	fa1 := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 0},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(10),
-			fu.Metadata_ofp((1000 << 32) | 1),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 10),
+			fu.Metadata_ofp(ctx, (1000<<32)|1),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.PopVlan(),
+			fu.PopVlan(ctx),
 		},
 	}
 
 	fa2 := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(10),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 10),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
-			fu.Output(1),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0)),
+			fu.Output(ctx, 1),
 		},
 	}
 
-	fs1, err := fu.MkFlowStat(fa1)
+	fs1, err := fu.MkFlowStat(ctx, fa1)
 	assert.Nil(t, err)
-	fs2, err := fu.MkFlowStat(fa2)
+	fs2, err := fu.MkFlowStat(ctx, fa2)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs1, fs2}}
 	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
@@ -917,7 +921,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -929,19 +933,19 @@
 	fa1 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.TunnelId(uint64(10)),
-			fu.Metadata_ofp(4294967296001),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 2),
+			fu.TunnelId(ctx, uint64(10)),
+			fu.Metadata_ofp(ctx, 4294967296001),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.PopVlan(),
-			fu.Output(1),
+			fu.PopVlan(ctx),
+			fu.Output(ctx, 1),
 		},
 	}
 
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
-	expectedOltFlow, err := fu.MkFlowStat(fa1)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, fa1)
 	assert.Nil(t, err)
 	expectedOltFlow.Instructions = []*ofp.OfpInstruction{{
 		Type: uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
@@ -961,33 +965,34 @@
 	fa1 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(1),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			fu.VlanPcp(0),
+			fu.InPort(ctx, 1),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|101),
+			fu.VlanPcp(ctx, 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
-			fu.Output(2),
+			fu.SetField(ctx, fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|0)),
+			fu.Output(ctx, 2),
 		},
 	}
-	expectedOnu1Flow, err := fu.MkFlowStat(fa1)
+	expectedOnu1Flow, err := fu.MkFlowStat(ctx, fa1)
 	assert.Nil(t, err)
-	derivedFlow = onu1FlowAndGroup.GetFlow(0)
+	derivedFlow = onu1FlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 }
 
 func TestMulticastDownstreamRuleDecomposition(t *testing.T) {
+	ctx := context.Background()
 	fa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(10),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 170),
-			fu.VlanPcp(0),
-			fu.EthType(0x800),
-			fu.Ipv4Dst(0xe00a0a0a),
+			fu.InPort(ctx, 10),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|170),
+			fu.VlanPcp(ctx, 0),
+			fu.EthType(ctx, 0x800),
+			fu.Ipv4Dst(ctx, 0xe00a0a0a),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Group(10),
+			fu.Group(ctx, 10),
 		},
 	}
 
@@ -995,20 +1000,20 @@
 		GroupId: 10,
 		Buckets: []*ofp.OfpBucket{
 			{Actions: []*ofp.OfpAction{
-				fu.PopVlan(),
-				fu.Output(1),
+				fu.PopVlan(ctx),
+				fu.Output(ctx, 1),
 			},
 			},
 		},
 	}
 
-	fs, err := fu.MkFlowStat(fa)
+	fs, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{fu.MkGroupStat(ga)}}
+	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{fu.MkGroupStat(ctx, ga)}}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(ctx, tfd, flows, groups)
 	assert.Nil(t, err)
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
@@ -1017,18 +1022,18 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(10),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 170),
-			fu.VlanPcp(0),
-			fu.EthType(0x800),
-			fu.Ipv4Dst(0xe00a0a0a),
+			fu.InPort(ctx, 10),
+			fu.VlanVid(ctx, uint32(ofp.OfpVlanId_OFPVID_PRESENT)|170),
+			fu.VlanPcp(ctx, 0),
+			fu.EthType(ctx, 0x800),
+			fu.Ipv4Dst(ctx, 0xe00a0a0a),
 		},
 		Actions: []*ofp.OfpAction{
-			fu.Group(10),
+			fu.Group(ctx, 10),
 		},
 	}
-	expectedOltFlow, err := fu.MkFlowStat(fa)
+	expectedOltFlow, err := fu.MkFlowStat(ctx, fa)
 	assert.Nil(t, err)
-	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	derivedFlow := oltFlowAndGroup.GetFlow(ctx, 0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }