VOL-1623-meter support and handling  techprofile and fix for flow delete , now migrated to onosproject/onos:1.13.9-rc4

Change in flowupdate API towards adapters

Remove meter_get API from adapter to core

Added dependent vendor library files downloaded  by "dep-ensure -update"

Added techprofile changes in the single commit

Review comments are addressed

submiting patch for  integration tests for meter changes and modifications in unit test for updated flow decomposer logic
  - submitting on behalf of "Salman.Siddiqui@radisys.com"

Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management


Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
- submitting on behalf of "Salman.Siddiqui@radisys.com"

pulled latest protos

verified EAPOL/DHCP/HSIA data with Edgecore OLT & TW ONT kit for one subcriber
verified delete/re-add is working end to end for the same subscriber

Change-Id: Idb232b7a0f05dc0c7e68266ac885740a3adff317
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 98d5092..09b29e8 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -105,6 +105,8 @@
 	log.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
 	deviceRules := fu.NewDeviceRules()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
 	egressHop := route[1]
@@ -128,10 +130,9 @@
 			var fa *fu.FlowArgs
 			// Upstream flow
 			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | inputPort),
 				Actions: []*ofp.OfpAction{
@@ -141,24 +142,7 @@
 			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.VLAN_VID)...)
-			fg.AddFlow(fu.MkFlowStat(fa))
-			// Downstream flow
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority)},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Egress),
-					fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-					fu.VlanPcp(0),
-					fu.Metadata_ofp(uint64(inputPort)),
-					fu.TunnelId(uint64(inputPort)),
-				},
-				Actions: []*ofp.OfpAction{
-					fu.PopVlan(),
-					fu.Output(egressHop.Ingress),
-				},
-			}
+			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
@@ -177,17 +161,21 @@
 	log.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
 	ingressHop := route[0]
 	egressHop := route[1]
-	if fu.HasNextTable(flow) {
-		log.Debugw("has-next-table", log.Fields{"table_id": flow.TableId})
+	if flow.TableId == 0 && fu.HasNextTable(flow) {
+		log.Debugw("decomposing-onu-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
 		if outPortNo != 0 {
 			log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
+			return deviceRules
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
@@ -203,73 +191,27 @@
 		fg := fu.NewFlowsAndGroups()
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
-	} else {
-		var actions []ofp.OfpActionType
-		var isOutputTypeInActions bool
-		for _, action := range fu.GetActions(flow) {
-			actions = append(actions, action.Type)
-			if !isOutputTypeInActions && action.Type == fu.OUTPUT {
-				isOutputTypeInActions = true
-			}
+	} else if flow.TableId == 1 && outPortNo != 0 {
+		log.Debugw("decomposing-olt-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				fu.InPort(egressHop.Ingress),
+				fu.TunnelId(uint64(inPortNo)),
+			},
-		if len(actions) == 1 && isOutputTypeInActions {
-			var fa *fu.FlowArgs
-			// child device flow
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(ingressHop.Ingress),
-				},
-				Actions: []*ofp.OfpAction{
-					fu.Output(ingressHop.Egress),
-				},
-			}
-			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-			fg := fu.NewFlowsAndGroups()
-			fg.AddFlow(fu.MkFlowStat(fa))
-			deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+		// Augment the matchfields with the ofpfields from the flow
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-			// parent device flow
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Ingress), //egress_hop.ingress_port.port_no
-					fu.TunnelId(uint64(inPortNo)),
-				},
-				Actions: []*ofp.OfpAction{
-					fu.Output(egressHop.Egress),
-				},
-			}
-			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-			fg = fu.NewFlowsAndGroups()
-			fg.AddFlow(fu.MkFlowStat(fa))
-			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-		} else {
-			if outPortNo == 0 {
-				log.Warnw("outPort-should-be-specified", log.Fields{"outPortNo": outPortNo})
-			}
-			var fa *fu.FlowArgs
-			fa = &fu.FlowArgs{
-				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-				MatchFields: []*ofp.OfpOxmOfbField{
-					fu.InPort(egressHop.Ingress),
-					fu.TunnelId(uint64(inPortNo)),
-				},
-			}
-			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+		//Augment the actions
+		filteredAction := fu.GetActions(flow, fu.OUTPUT)
+		filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+		fa.Actions = filteredAction
-			//Augment the actions
-			filteredAction := fu.GetActions(flow, fu.OUTPUT)
-			filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
-			fa.Actions = filteredAction
-			fg := fu.NewFlowsAndGroups()
-			fg.AddFlow(fu.MkFlowStat(fa))
-			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-		}
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(fu.MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	return deviceRules
@@ -277,25 +219,32 @@
 // processDownstreamFlowWithNextTable decomposes downstream flows containing next table ID instructions
 func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop,
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
-	log.Debugw("downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	log.Debugw("decomposing-olt-flow-in-downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
 	if outPortNo != 0 {
 		log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
+		return deviceRules
+	if flow.TableId != 0 {
+		log.Warnw("This is not olt pipeline table, so skipping", log.Fields{"tableId": flow.TableId})
+		return deviceRules
+	}
 	ingressHop := route[0]
 	egressHop := route[1]
-	if fu.GetMetaData(flow) != 0 {
+	if metadataFromwriteMetadata != 0 {
 		log.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
-		portNumber := uint32(fu.GetPortNumberFromMetadata(flow))
+		portNumber := fu.GetEgressPortNumberFromWriteMetadata(flow)
 		if portNumber != 0 {
 			recalculatedRoute := agent.GetRoute(inPortNo, portNumber)
 			switch len(recalculatedRoute) {
 			case 0:
-				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": fu.GetMetaData64Bit(flow)})
-				//	TODO: Delete flow
+				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
+				//TODO: Delete flow
 				return deviceRules
 			case 2:
 				log.Debugw("route-found", log.Fields{"ingressHop": ingressHop, "egressHop": egressHop})
@@ -308,16 +257,16 @@
 		innerTag := fu.GetInnerTagFromMetaData(flow)
 		if innerTag == 0 {
-			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": fu.GetMetaData64Bit(flow)})
-			//	TODO: Delete flow
+			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
+			//TODO: Delete flow
 			return deviceRules
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.Metadata_ofp(innerTag),
+				fu.Metadata_ofp(uint64(innerTag)),
 			Actions: fu.GetActions(flow),
@@ -335,7 +284,7 @@
 		log.Debugw("creating-standard-flow", log.Fields{"flow": flow})
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
 			MatchFields: []*ofp.OfpOxmOfbField{
@@ -360,76 +309,31 @@
 func (fd *FlowDecomposer) processUnicastFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop,
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
-	log.Debugw("unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	log.Debugw("decomposing-onu-flow-in-downstream-unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
-	ingressHop := route[0]
 	egressHop := route[1]
-	var actions []ofp.OfpActionType
-	var isOutputTypeInActions bool
-	for _, action := range fu.GetActions(flow) {
-		actions = append(actions, action.Type)
-		if !isOutputTypeInActions && action.Type == fu.OUTPUT {
-			isOutputTypeInActions = true
-		}
+	meterId := fu.GetMeterIdFromFlow(flow)
+	metadataFromwriteMetadata := fu.GetMetadataFromWriteMetadataAction(flow)
+	var fa *fu.FlowArgs
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterId), "write_metadata": metadataFromwriteMetadata},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(egressHop.Ingress),
+		},
-	if len(actions) == 1 && isOutputTypeInActions {
-		var fa *fu.FlowArgs
-		// Parent device flow
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(ingressHop.Ingress),
-				fu.TunnelId(uint64(inPortNo)),
-			},
-			Actions: []*ofp.OfpAction{
-				fu.Output(ingressHop.Egress),
-			},
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
+	// Augment the matchfields with the ofpfields from the flow
+	fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
-		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+	// Augment the Actions
+	filteredAction := fu.GetActions(flow, fu.OUTPUT)
+	filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
+	fa.Actions = filteredAction
-		// Child device flow
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(egressHop.Ingress),
-			},
-			Actions: []*ofp.OfpAction{
-				fu.Output(egressHop.Egress),
-			},
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-		fg = fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
-		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-	} else {
-		var fa *fu.FlowArgs
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				fu.InPort(egressHop.Ingress),
-			},
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-		// Augment the Actions
-		filteredAction := fu.GetActions(flow, fu.OUTPUT)
-		filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
-		fa.Actions = filteredAction
-		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
-		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-	}
+	fg := fu.NewFlowsAndGroups()
+	fg.AddFlow(fu.MkFlowStat(fa))
+	deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	return deviceRules
@@ -559,14 +463,20 @@
 			return deviceRules
 		isUpstream := !ingressDevice.Root
-		if isUpstream {
+		if isUpstream { // Unicast OLT and ONU UL
+			log.Info("processOltAndOnuUpstreamNonControllerBoundUnicastFlows", log.Fields{"flows": flow})
 			deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
-		} else if fu.HasNextTable(flow) {
+		} else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
+			log.Debugw("processOltDownstreamNonControllerBoundFlowWithNextTable", log.Fields{"flows": flow})
 			deviceRules = fd.processDownstreamFlowWithNextTable(agent, route, inPortNo, outPortNo, flow)
-		} else if outPortNo != 0 { // Unicast
+		} else if flow.TableId == 1 && outPortNo != 0 { // Unicast ONU flow DL
+			log.Debugw("processOnuDownstreamUnicastFlow", log.Fields{"flows": flow})
 			deviceRules = fd.processUnicastFlow(agent, route, inPortNo, outPortNo, flow)
-		} else if grpId := fu.GetGroup(flow); grpId != 0 { //Multicast
+		} else if grpId := fu.GetGroup(flow); grpId != 0 && flow.TableId == 0 { //Multicast
+			log.Debugw("processMulticastFlow", log.Fields{"flows": flow})
 			deviceRules = fd.processMulticastFlow(agent, route, inPortNo, outPortNo, flow, grpId, groupMap)
+		} else {
+			log.Errorw("unknown-downstream-flow", log.Fields{"flow": *flow})
 	deviceRules = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index 41a93e4..f4632cd 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -28,9 +28,18 @@
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
-	log.UpdateAllLoggers(log.Fields{"instanceId": "flow-decomposition"})
-	log.SetAllLogLevel(log.WarnLevel)
+	// Setup default logger - applies for packages that do not have specific logger set
+	if _, err := log.SetDefaultLogger(log.JSON, 0, log.Fields{"instanceId": 1}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	// Update all loggers (provisioned via init) with a common field
+	if err := log.UpdateAllLoggers(log.Fields{"instanceId": 1}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	// Update all loggers to log level specified as input parameter
+	log.SetAllLogLevel(0)
 type testDeviceManager struct {
@@ -473,7 +482,6 @@
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		Actions: []*ofp.OfpAction{
@@ -490,14 +498,13 @@
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Nil(t, onu1FlowAndGroup)
-	assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
@@ -510,24 +517,6 @@
 	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
-	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 1000},
-		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-			fu.VlanPcp(0),
-			fu.Metadata_ofp(1),
-			fu.TunnelId(uint64(1)),
-		},
-		Actions: []*ofp.OfpAction{
-			fu.PopVlan(),
-			fu.Output(1),
-		},
-	}
-	expectedOltFlow = fu.MkFlowStat(fa)
-	derivedFlow = oltFlowAndGroup.GetFlow(1)
-	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 func TestDhcpReRouteRuleDecomposition(t *testing.T) {
@@ -537,7 +526,6 @@
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
@@ -557,14 +545,13 @@
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Nil(t, onu1FlowAndGroup)
-	assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
@@ -581,31 +568,12 @@
 	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
-	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 1000},
-		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(2),
-			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-			fu.VlanPcp(0),
-			fu.Metadata_ofp(1),
-			fu.TunnelId(uint64(1)),
-		},
-		Actions: []*ofp.OfpAction{
-			fu.PopVlan(),
-			fu.Output(1),
-		},
-	}
-	expectedOltFlow = fu.MkFlowStat(fa)
-	derivedFlow = oltFlowAndGroup.GetFlow(1)
-	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 func TestUnicastUpstreamRuleDecomposition(t *testing.T) {
 	var fa *fu.FlowArgs
 	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
+		KV: fu.OfpFlowModArgs{"priority": 5000, "table_id": 0},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
@@ -618,7 +586,7 @@
 	var fa2 *fu.FlowArgs
 	fa2 = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500},
+		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
@@ -633,19 +601,29 @@
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa), fu.MkFlowStat(fa2)}}
+	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
+		Data: &ofp.OfpInstruction_GotoTable{
+			GotoTable: &ofp.OfpInstructionGotoTable{
+				TableId: 1,
+			},
+		}}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
+	assert.NotNil(t, onu1FlowAndGroup)
+	assert.NotNil(t, onu1FlowAndGroup.Flows)
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
 	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
 	fa = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500},
+		KV: fu.OfpFlowModArgs{"priority": 5000},
 		MatchFields: []*ofp.OfpOxmOfbField{
@@ -657,8 +635,24 @@
-	expectedOnu1Flow := fu.MkFlowStat(fa)
 	derivedFlow := onu1FlowAndGroup.GetFlow(0)
+	// Form the expected flow
+	expectedOnu1Flow := fu.MkFlowStat(fa)
+	expectedOnu1Flow.Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+		Data: &ofp.OfpInstruction_Actions{
+			Actions: &ofp.OfpInstructionActions{
+				Actions: []*ofp.OfpAction{{
+					Type: 0,
+					Action: &ofp.OfpAction_Output{
+						Output: &ofp.OfpActionOutput{
+							Port:   1,
+							MaxLen: 65509,
+						},
+					}}}}}}}
+	expectedOnu1Flow.Id = derivedFlow.Id //  Assign same flow ID as derived flowID to match completely
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 	fa = &fu.FlowArgs{
@@ -682,9 +676,10 @@
 func TestUnicastDownstreamRuleDecomposition(t *testing.T) {
+	log.Debugf("Starting Test Unicast Downstream")
 	var fa1 *fu.FlowArgs
 	fa1 = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
+		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 0},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.Metadata_ofp((1000 << 32) | 1),
@@ -697,7 +692,7 @@
 	var fa2 *fu.FlowArgs
 	fa2 = &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500},
+		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
@@ -710,10 +705,19 @@
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa1), fu.MkFlowStat(fa2)}}
+	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
+		Data: &ofp.OfpInstruction_GotoTable{
+			GotoTable: &ofp.OfpInstructionGotoTable{
+				TableId: 1,
+			},
+		}}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -725,8 +729,8 @@
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.Metadata_ofp(1000),
-			fu.TunnelId(uint64(1)),
+			fu.TunnelId(uint64(10)),
+			fu.Metadata_ofp(4294967296001),
 		Actions: []*ofp.OfpAction{
@@ -734,8 +738,22 @@
-	expectedOltFlow := fu.MkFlowStat(fa1)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	expectedOltFlow := fu.MkFlowStat(fa1)
+	expectedOltFlow.Instructions = []*ofp.OfpInstruction{{
+		Type: uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+		Data: &ofp.OfpInstruction_Actions{
+			Actions: &ofp.OfpInstructionActions{
+				Actions: []*ofp.OfpAction{{
+					Type: 0,
+					Action: &ofp.OfpAction_Output{
+						Output: &ofp.OfpActionOutput{
+							Port:   1,
+							MaxLen: 65509,
+						},
+					}}}}}}}
+	expectedOltFlow.Id = derivedFlow.Id
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 	fa1 = &fu.FlowArgs{