[VOL-4001] MPLS support in vOLTHA-Core

Change-Id: I6b46ccadbbccafe577d717d6fbf3ace7efa4d1aa
diff --git a/.gitignore b/.gitignore
index 9a6c87e..ca60b49 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,5 +60,6 @@
 # CPU profile
 **/profile.cpu
 
-# etcd - ?
+# etcd - locally generated during test
 rw_core/core/voltha.rwcore.nb.etcd/
+rw_core/core/device/voltha.rwcore.da.etcd/
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 622257d..5241a95 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -30,6 +30,10 @@
 	"testing"
 	"time"
 
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+
+	"github.com/golang/protobuf/jsonpb"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
@@ -73,6 +77,20 @@
 	maxTimeout        time.Duration
 }
 
+var testLogger log.CLogger
+
+func init() {
+	var err error
+	testLogger, err = log.RegisterPackage(log.JSON, log.InfoLevel, log.Fields{"nbi-handler-test": true})
+	if err != nil {
+		panic(err)
+	}
+
+	if err = log.SetLogLevel(log.InfoLevel); err != nil {
+		panic(err)
+	}
+}
+
 func newNBTest(ctx context.Context) *NBTest {
 	test := &NBTest{}
 	// Start the embedded etcd server
@@ -1429,6 +1447,244 @@
 	wg.Wait()
 }
 
+func (nb *NBTest) testMPLSFlowsAddition(t *testing.T, nbi *NBIHandler) {
+	// Check whether Device already exist
+	devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.NoError(t, err)
+	testLogger.Infow(getContext(), "device-list", log.Fields{"devices": devices})
+	for _, dev := range devices.GetItems() {
+		// Delete the found device for fresh start
+		testLogger.Warnf(getContext(), "deleting-existing-device", dev.GetId())
+		_, err := nbi.DeleteDevice(context.Background(), &voltha.ID{
+			Id: dev.GetId(),
+		})
+		assert.NoError(t, err)
+	}
+
+	// Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+	var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+		return devices != nil && len(devices.Items) == 0
+	}
+	err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+	assert.NoError(t, err)
+
+	// Get list of devices, to make sure the above operation deleted all the devices
+	devices, err = nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(devices.Items))
+
+	// Create device
+	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ff"})
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+
+	// Verify oltDevice exist in the core
+	devices, err = nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.Equal(t, 1, len(devices.Items))
+	assert.Equal(t, oltDevice.Id, devices.Items[0].Id)
+
+	// Enable the oltDevice
+	_, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+
+	// Wait for the logical device to be in the ready state
+	var vldFunction = func(ports []*voltha.LogicalPort) bool {
+		return len(ports) == nb.numONUPerOLT+1
+	}
+	err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+	assert.Nil(t, err)
+
+	// Verify that the devices have been setup correctly
+	nb.verifyDevices(t, nbi)
+
+	// Get latest oltDevice data
+	oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+	testLogger.Infow(getContext(), "olt-device-created-and-verified", log.Fields{"device-id": oltDevice.GetId()})
+
+	// Verify that the logical device has been setup correctly
+	nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+	logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+	assert.NoError(t, err)
+
+	testLogger.Infow(getContext(), "list-logical-devices", log.Fields{"logical-device": logicalDevices.GetItems()[0]})
+	// Add a meter to the logical device, which the flow can refer to
+	meterMod := &ofp.OfpMeterMod{
+		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+		Flags:   rand.Uint32(),
+		MeterId: 1,
+		Bands: []*ofp.OfpMeterBandHeader{
+			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+				Rate:      rand.Uint32(),
+				BurstSize: rand.Uint32(),
+				Data:      nil,
+			},
+		},
+	}
+	_, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{
+		Id:       logicalDevices.GetItems()[0].GetId(),
+		MeterMod: meterMod,
+	})
+	assert.NoError(t, err)
+
+	meters, err := nbi.ListLogicalDeviceMeters(getContext(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+	assert.NoError(t, err)
+
+	for _, item := range meters.GetItems() {
+		testLogger.Infow(getContext(), "list-logical-device-meters", log.Fields{"meter-config": item.GetConfig()})
+	}
+
+	logicalPorts, err := nbi.ListLogicalDevicePorts(context.Background(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+	assert.NoError(t, err)
+	m := jsonpb.Marshaler{}
+	logicalPortsJson, err := m.MarshalToString(logicalPorts)
+	assert.NoError(t, err)
+
+	testLogger.Infow(getContext(), "list-logical-ports", log.Fields{"ports": logicalPortsJson})
+
+	callables := []func() *ofp.OfpFlowMod{getOnuUpstreamRules, getOltUpstreamRules, getOLTDownstreamMplsSingleTagRules,
+		getOLTDownstreamMplsDoubleTagRules, getOLTDownstreamRules, getOnuDownstreamRules}
+
+	for _, callable := range callables {
+		_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &ofp.FlowTableUpdate{Id: logicalDevices.GetItems()[0].GetId(), FlowMod: callable()})
+		assert.NoError(t, err)
+	}
+}
+
+func getOnuUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100100000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(103),
+			flows.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{},
+	}
+
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 0
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "onu-upstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOltUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100000000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(103),
+			flows.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.PushVlan(0x8100),
+			flows.SetField(flows.VlanVid(2)),
+			flows.SetField(flows.EthSrc(1111)),
+			flows.SetField(flows.EthDst(2222)),
+			flows.PushVlan(0x8847),
+			flows.SetField(flows.MplsLabel(100)),
+			flows.SetField(flows.MplsBos(1)),
+			flows.PushVlan(0x8847),
+			flows.SetField(flows.MplsLabel(200)),
+			flows.MplsTtl(64),
+			flows.Output(2),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 1
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-upstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOLTDownstreamMplsSingleTagRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.Metadata_ofp((1000 << 32) | 1),
+			flows.EthType(0x8847),
+			flows.MplsBos(1),
+			flows.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			flows.PopMpls(0x8847),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 0
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-mpls-downstream-single-tag-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOLTDownstreamMplsDoubleTagRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.EthType(0x8847),
+			flows.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			flows.PopMpls(0x8847),
+			flows.PopMpls(0x8847),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 0
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-mpls-downstream-double-tagged-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOLTDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 2, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.VlanVid(2),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.PopVlan(),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 1
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-downstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOnuDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.Metadata_ofp((1000 << 32) | 1),
+			flows.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(103),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 2
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "onu-downstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
 func TestSuiteNbiApiHandler(t *testing.T) {
 	ctx := context.Background()
 	f, err := os.Create("../../../tests/results/profile.cpu")
@@ -1513,7 +1769,62 @@
 		// 13. Test flow add failure
 		nb.testFlowAddFailure(t, nbi)
 
-		// 14.  Clean up
+		// 14. Clean up
 		nb.deleteAllDevices(t, nbi)
 	}
 }
+
+func TestFlowAddition(t *testing.T) {
+	ctx := context.Background()
+	nb := newNBTest(ctx)
+	assert.NotNil(t, nb)
+
+	defer nb.stopAll(ctx)
+
+	// Start the Core
+	nb.startCore(false)
+
+	// Set the grpc API interface - no grpc server is running in unit test
+	nbi := NewNBIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+
+	// Create/register the adapters
+	nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
+	// 2. Test adapter registration
+	nb.testAdapterRegistration(t, nbi)
+
+	// 3. Test MPLS flows addition where:
+	/*
+		Upstream
+		ONU
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[],
+		transition=TABLE:1, meter=METER:1, metadata=METADATA:4100010000/0]
+		OLT
+		ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[VLAN_PUSH:vlan,
+		VLAN_ID:2, MPLS_PUSH:mpls_unicast, MPLS_LABEL:YYY,MPLS_BOS:true, MPLS_PUSH:mpls_unicast ,MPLS_LABEL:XXX, MPLS_BOS:false,
+		EXTENSION:of:0000000000000227/VolthaPushL2Header{​​​​​​​}​​​​​​​, ETH_SRC:OLT_MAC, ETH_DST:LEAF_MAC,  TTL:64, OUTPUT:65536],
+		meter=METER:1, metadata=METADATA:4100000000/0]
+
+		Downstream
+		OLT
+		//Below flow rule to pop L2 Ethernet headers from packets which have a single MPLS label
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:true, ETH_SRC:LEAF_MAC],
+		treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, EXTENSION:of:0000000000000227/VolthaPopL2Header{},
+		transition=TABLE:1]
+
+		//Below flow rule to pop L2 Ethernet headers from packets which have two MPLS label
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:false, ETH_SRC:LEAF_MAC],
+		treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, MPLS_POP:mpls_unicast ,
+		EXTENSION:of:0000000000000227/VolthaPopL2Header{}, transition=TABLE:1]
+
+		//Below flow rules are unchanged from the current implementations except for the table numbers
+		ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:65536, VLAN_VID:2], treatment=[immediate=[VLAN_POP], transition=TABLE:2,
+		meter=METER:2, metadata=METADATA:1000004100000020/0]
+		ONU
+		ADDED, bytes=0, packets=0, table=2, priority=1000, selector=[IN_PORT:65536, METADATA:20 VLAN_VID:ANY], treatment=[immediate=[OUTPUT:32],
+		meter=METER:2, metadata=METADATA:4100000000/0]
+	*/
+	nb.testMPLSFlowsAddition(t, nbi)
+}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index f4c88fa..72b915f 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -167,7 +167,7 @@
 		go func() {
 			// Wait for completion
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
-				logger.Infow(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
+				logger.Errorw(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
 					"errors":            res,
 					"logical-device-id": agent.logicalDeviceID,
 					"flow":              flow,
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index ab04891..5a1affd 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -320,11 +320,6 @@
 		return deviceRules, nil
 	}
 
-	if flow.TableId != 0 {
-		logger.Warnw(ctx, "This is not olt pipeline table, so skipping", log.Fields{"tableId": flow.TableId})
-		return deviceRules, nil
-	}
-
 	ingressHop := path[0]
 	egressHop := path[1]
 	if metadataFromwriteMetadata != 0 {
@@ -517,19 +512,25 @@
 		}
 		isUpstream := !ingressDevice.Root
 		if isUpstream { // Unicast OLT and ONU UL
-			logger.Debug(ctx, "process-olt-nd-onu-upstream-noncontrollerbound-unicast-flows", log.Fields{"flows": flow})
+			logger.Debug(ctx, "process-olt-and-onu-upstream-non-controller-bound-uni-cast-flows", log.Fields{"flows": flow})
 			deviceRules, err = fd.processUpstreamNonControllerBoundFlow(ctx, path, inPortNo, outPortNo, flow)
 			if err != nil {
 				return nil, err
 			}
-		} else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
-			logger.Debugw(ctx, "process-olt-downstream-noncontrollerbound-flow-with-nexttable", log.Fields{"flows": flow})
+		} else if fu.HasNextTable(flow) && (flow.GetTableId() == 0 || flow.GetTableId() == 1) { // Unicast OLT Flow
+			// For 'Non-MPLS' flows, this condition will only be true for table-id 0 as only table-id 0 will have the
+			// 'go-to-next-table' instruction
+			// For 'MPLS' flows, this condition will be true for table-id 0 and table-id 1.
+			// So the flow here shall always be an OLT flow
+			logger.Debugw(ctx, "process-olt-downstream-non-controller-bound-flow-with-next-table", log.Fields{"flows": flow})
 			deviceRules, err = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
 			if err != nil {
 				return nil, err
 			}
-		} else if flow.TableId == 1 && outPortNo != 0 { // Unicast ONU flow DL
-			logger.Debugw(ctx, "process-onu-downstream-unicast-flow", log.Fields{"flows": flow})
+		} else if (flow.GetTableId() == 1 || flow.GetTableId() == 2) && outPortNo != 0 { // Unicast ONU flow DL
+			// If this is an MPLS OLT flow (table-id 1, transition-to-table 2), the condition above will already be hit.
+			// So if we are reaching this point, the flow shall always be an ONU flow
+			logger.Debugw(ctx, "process-onu-downstream-uni-cast-flow", log.Fields{"flows": flow})
 			deviceRules, err = fd.processUnicastFlow(ctx, path, inPortNo, outPortNo, flow)
 			if err != nil {
 				return nil, err
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index 944e524..279b39a 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -957,8 +957,9 @@
 		},
 	}
 
+	// If table-id is provided in the flow-args, the same is also used as go-to-next table
 	fa2 := &fu.FlowArgs{
-		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
+		KV: fu.OfpFlowModArgs{"priority": 500 /*"table_id": 1*/},
 		MatchFields: []*ofp.OfpOxmOfbField{
 			fu.InPort(10),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
@@ -974,6 +975,9 @@
 	assert.Nil(t, err)
 	fs2, err := fu.MkFlowStat(fa2)
 	assert.Nil(t, err)
+	// Table-1, without next table
+	fs2.TableId = 1
+
 	fs1.Instructions = []*ofp.OfpInstruction{{
 		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
 		Data: &ofp.OfpInstruction_GotoTable{
@@ -1100,3 +1104,325 @@
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
+
+func TestMplsUpstreamFlowDecomposition(t *testing.T) {
+	// Note: 	Olt-Nni=10
+	// 			Onu1-Uni=1
+
+	/*
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:UNI, VLAN_VID:ANY], treatment=[immediate=[],
+		transition=TABLE:1, meter=METER:1, metadata=METADATA:4100010000/0]
+	*/
+
+	// Here, 'table_id=1' is present to add go-to-table action
+	faOnu := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100100000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1), // Onu Uni
+			fu.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{},
+	}
+	fsOnu, err := fu.MkFlowStat(faOnu)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsOnu)
+	// Update table-id
+	fsOnu.TableId = 0
+
+	/*
+		ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[VLAN_PUSH:vlan,
+		VLAN_ID:2, MPLS_PUSH:mpls_unicast, MPLS_LABEL:YYY,MPLS_BOS:true, MPLS_PUSH:mpls_unicast ,MPLS_LABEL:XXX, MPLS_BOS:false,
+		EXTENSION:of:0000000000000227/VolthaPushL2Header{​​​​​​​}​​​​​​​, ETH_SRC:OLT_MAC, ETH_DST:LEAF_MAC,  TTL:64, OUTPUT:65536],
+		meter=METER:1, metadata=METADATA:4100000000/0]
+	*/
+	faOlt := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1, "write_metadata": 4100000000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1), // Onu-Uni
+			fu.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(2)),
+			fu.SetField(fu.EthSrc(1111)),
+			fu.SetField(fu.EthDst(2222)),
+			fu.PushVlan(0x8847),
+			fu.SetField(fu.MplsLabel(100)),
+			fu.SetField(fu.MplsBos(1)),
+			fu.PushVlan(0x8847),
+			fu.SetField(fu.MplsLabel(200)),
+			fu.MplsTtl(64),
+			fu.Output(10), // Olt-Nni
+		},
+	}
+
+	fsOlt, err := fu.MkFlowStat(faOlt)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsOlt)
+	// Update table-id
+	// table-id is skipped in flow-args above as that would also add the go-to-table action
+	fsOlt.TableId = 1
+
+	flows := map[uint64]*ofp.OfpFlowStats{fsOnu.Id: fsOnu, fsOlt.Id: fsOlt}
+
+	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
+
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
+	assert.Nil(t, err)
+	onuFlowAndGroup := deviceRules.Rules["onu1"]
+	oltFlowAndGroup := deviceRules.Rules["olt"]
+	assert.NotNil(t, onuFlowAndGroup)
+	assert.NotNil(t, onuFlowAndGroup.Flows)
+	assert.Equal(t, 1, onuFlowAndGroup.Flows.Len())
+	assert.Equal(t, 0, onuFlowAndGroup.Groups.Len())
+	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
+
+	// Form expected ONU flow args
+	expectedOnufa := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1, "write_metadata": 4100100000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(2), // Onu Uni
+			fu.TunnelId(uint64(1)),
+			fu.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.Output(1),
+		},
+	}
+
+	// Form the expected ONU flow
+	expectedOnuFlow, err := fu.MkFlowStat(expectedOnufa)
+	assert.Nil(t, err)
+	assert.NotNil(t, expectedOnuFlow)
+	expectedOnuFlow.TableId = 0
+
+	derivedOnuFlow := onuFlowAndGroup.GetFlow(0)
+	expectedOnuFlow.Id = derivedOnuFlow.Id //  Assign same flow ID as derived flowID to match completely
+	assert.Equal(t, expectedOnuFlow.String(), derivedOnuFlow.String())
+
+	expectedOltfa := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1, "write_metadata": 4100000000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1), // Onu-Uni
+			fu.TunnelId(uint64(1)),
+			fu.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(2)),
+			fu.SetField(fu.EthSrc(1111)),
+			fu.SetField(fu.EthDst(2222)),
+			fu.PushVlan(0x8847),
+			fu.SetField(fu.MplsLabel(100)),
+			fu.SetField(fu.MplsBos(1)),
+			fu.PushVlan(0x8847),
+			fu.SetField(fu.MplsLabel(200)),
+			fu.MplsTtl(64),
+			fu.Output(2), // Olt-Nni
+		},
+	}
+
+	expectedOltFlow, err := fu.MkFlowStat(expectedOltfa)
+	assert.NoError(t, err)
+	assert.NotNil(t, expectedOltFlow)
+
+	derivedOltFlow := oltFlowAndGroup.GetFlow(0)
+	expectedOltFlow.Id = derivedOltFlow.Id
+	assert.Equal(t, expectedOltFlow.String(), derivedOltFlow.String())
+}
+
+func TestMplsDownstreamFlowDecomposition(t *testing.T) {
+	faOltSingleMplsLable := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(10),
+			fu.Metadata_ofp((1000 << 32) | 1),
+			fu.EthType(0x8847),
+			fu.MplsBos(1),
+			fu.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			fu.PopMpls(0x8847),
+		},
+	}
+	fsOltSingleMplsLabel, err := fu.MkFlowStat(faOltSingleMplsLable)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsOltSingleMplsLabel)
+	fsOltSingleMplsLabel.TableId = 0
+
+	flows := map[uint64]*ofp.OfpFlowStats{fsOltSingleMplsLabel.Id: fsOltSingleMplsLabel}
+
+	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
+
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
+	assert.Nil(t, err)
+	assert.NotNil(t, deviceRules)
+	oltFlowAndGroup := deviceRules.Rules["olt"]
+	assert.NotNil(t, oltFlowAndGroup)
+
+	derivedFlow := oltFlowAndGroup.GetFlow(0)
+	assert.NotNil(t, derivedFlow)
+
+	// Formulate expected
+	expectedFa := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(2),
+			fu.TunnelId(10),
+			fu.Metadata_ofp((1000 << 32) | 1),
+			fu.EthType(0x8847),
+			fu.MplsBos(1),
+			fu.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			fu.PopMpls(0x8847),
+			fu.Output(1),
+		},
+	}
+	expectedFs, err := fu.MkFlowStat(expectedFa)
+	assert.NoError(t, err)
+	expectedFs.Id = derivedFlow.Id
+
+	assert.Equal(t, expectedFs.String(), derivedFlow.String())
+
+	// Formulate Mpls double label
+	faOltDoubleMplsLabel := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(10),
+			fu.EthType(0x8847),
+			fu.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			fu.PopMpls(0x8847),
+			fu.PopMpls(0x8847),
+		},
+	}
+	fsOltDoubleMplsLabel, err := fu.MkFlowStat(faOltDoubleMplsLabel)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsOltDoubleMplsLabel)
+
+	flows2 := map[uint64]*ofp.OfpFlowStats{fsOltDoubleMplsLabel.Id: fsOltDoubleMplsLabel}
+	assert.NotNil(t, flows2)
+
+	deviceRules, err = tfd.fd.DecomposeRules(context.Background(), tfd, flows2, nil)
+	assert.NoError(t, err)
+	assert.NotNil(t, deviceRules)
+	oltFlowAndGroup = deviceRules.Rules["olt"]
+	assert.NotNil(t, oltFlowAndGroup)
+	derivedFlow = oltFlowAndGroup.GetFlow(0)
+	assert.NotNil(t, derivedFlow)
+
+	expectedFa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(2),
+			fu.TunnelId(10),
+			fu.EthType(0x8847),
+			fu.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			fu.PopMpls(0x8847),
+			fu.PopMpls(0x8847),
+			fu.Output(1),
+		},
+	}
+	expectedFs, err = fu.MkFlowStat(expectedFa)
+	assert.NoError(t, err)
+	assert.NotNil(t, expectedFs)
+	expectedFs.Id = derivedFlow.Id
+	assert.Equal(t, expectedFs.String(), derivedFlow.String())
+
+	//olt downstream flows (table-id=1)
+	faOlt := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 2, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(10),
+			fu.VlanVid(2),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.PopVlan(),
+		},
+	}
+	fsOlt, err := fu.MkFlowStat(faOlt)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsOlt)
+	fsOlt.TableId = 1
+
+	flows3 := map[uint64]*ofp.OfpFlowStats{fsOlt.Id: fsOlt}
+	assert.NotNil(t, flows3)
+	deviceRules, err = tfd.fd.DecomposeRules(context.Background(), tfd, flows3, nil)
+	assert.NoError(t, err)
+	assert.NotNil(t, deviceRules)
+	oltFlowAndGroup = deviceRules.Rules["olt"]
+	assert.NotNil(t, oltFlowAndGroup)
+	derivedFlow = oltFlowAndGroup.GetFlow(0)
+	assert.NotNil(t, derivedFlow)
+
+	faOltExpected := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(2),
+			fu.TunnelId(10),
+			fu.VlanVid(2),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.PopVlan(),
+			fu.Output(1),
+		},
+	}
+	fsOltExpected, err := fu.MkFlowStat(faOltExpected)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsOltExpected)
+	fsOltExpected.Id = derivedFlow.Id
+	assert.Equal(t, fsOltExpected.String(), derivedFlow.String())
+
+	// Onu Downstream
+	faOnu := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(10),
+			fu.Metadata_ofp((1000 << 32) | 1),
+			fu.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.Output(1),
+		},
+	}
+	fsOnu, err := fu.MkFlowStat(faOnu)
+	assert.NoError(t, err)
+	fsOnu.TableId = 2
+
+	flows4 := map[uint64]*ofp.OfpFlowStats{fsOnu.Id: fsOnu}
+	assert.NotNil(t, flows4)
+	deviceRules, err = tfd.fd.DecomposeRules(context.Background(), tfd, flows4, nil)
+	assert.NoError(t, err)
+	assert.NotNil(t, deviceRules)
+	onuFlowAndGroup := deviceRules.Rules["onu1"]
+	assert.NotNil(t, onuFlowAndGroup)
+	derivedFlow = onuFlowAndGroup.GetFlow(0)
+	assert.NotNil(t, derivedFlow)
+
+	faExpected := &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1),
+			fu.Metadata_ofp((1000 << 32) | 1),
+			fu.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.Output(2),
+		},
+	}
+	fsExpected, err := fu.MkFlowStat(faExpected)
+	assert.NoError(t, err)
+	assert.NotNil(t, fsExpected)
+	fsExpected.Id = derivedFlow.Id
+
+	assert.Equal(t, fsExpected.String(), derivedFlow.String())
+}