[VOL-4001] MPLS support in vOLTHA-Core

Change-Id: I6b46ccadbbccafe577d717d6fbf3ace7efa4d1aa
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 622257d..5241a95 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -30,6 +30,10 @@
 	"testing"
 	"time"
 
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+
+	"github.com/golang/protobuf/jsonpb"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
@@ -73,6 +77,20 @@
 	maxTimeout        time.Duration
 }
 
+var testLogger log.CLogger
+
+func init() {
+	var err error
+	testLogger, err = log.RegisterPackage(log.JSON, log.InfoLevel, log.Fields{"nbi-handler-test": true})
+	if err != nil {
+		panic(err)
+	}
+
+	if err = log.SetLogLevel(log.InfoLevel); err != nil {
+		panic(err)
+	}
+}
+
 func newNBTest(ctx context.Context) *NBTest {
 	test := &NBTest{}
 	// Start the embedded etcd server
@@ -1429,6 +1447,244 @@
 	wg.Wait()
 }
 
+func (nb *NBTest) testMPLSFlowsAddition(t *testing.T, nbi *NBIHandler) {
+	// Check whether Device already exist
+	devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.NoError(t, err)
+	testLogger.Infow(getContext(), "device-list", log.Fields{"devices": devices})
+	for _, dev := range devices.GetItems() {
+		// Delete the found device for fresh start
+		testLogger.Warnf(getContext(), "deleting-existing-device", dev.GetId())
+		_, err := nbi.DeleteDevice(context.Background(), &voltha.ID{
+			Id: dev.GetId(),
+		})
+		assert.NoError(t, err)
+	}
+
+	// Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+	var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+		return devices != nil && len(devices.Items) == 0
+	}
+	err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+	assert.NoError(t, err)
+
+	// Get list of devices, to make sure the above operation deleted all the devices
+	devices, err = nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(devices.Items))
+
+	// Create device
+	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ff"})
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+
+	// Verify oltDevice exist in the core
+	devices, err = nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.Equal(t, 1, len(devices.Items))
+	assert.Equal(t, oltDevice.Id, devices.Items[0].Id)
+
+	// Enable the oltDevice
+	_, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+
+	// Wait for the logical device to be in the ready state
+	var vldFunction = func(ports []*voltha.LogicalPort) bool {
+		return len(ports) == nb.numONUPerOLT+1
+	}
+	err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+	assert.Nil(t, err)
+
+	// Verify that the devices have been setup correctly
+	nb.verifyDevices(t, nbi)
+
+	// Get latest oltDevice data
+	oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+	testLogger.Infow(getContext(), "olt-device-created-and-verified", log.Fields{"device-id": oltDevice.GetId()})
+
+	// Verify that the logical device has been setup correctly
+	nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+	logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+	assert.NoError(t, err)
+
+	testLogger.Infow(getContext(), "list-logical-devices", log.Fields{"logical-device": logicalDevices.GetItems()[0]})
+	// Add a meter to the logical device, which the flow can refer to
+	meterMod := &ofp.OfpMeterMod{
+		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+		Flags:   rand.Uint32(),
+		MeterId: 1,
+		Bands: []*ofp.OfpMeterBandHeader{
+			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+				Rate:      rand.Uint32(),
+				BurstSize: rand.Uint32(),
+				Data:      nil,
+			},
+		},
+	}
+	_, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{
+		Id:       logicalDevices.GetItems()[0].GetId(),
+		MeterMod: meterMod,
+	})
+	assert.NoError(t, err)
+
+	meters, err := nbi.ListLogicalDeviceMeters(getContext(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+	assert.NoError(t, err)
+
+	for _, item := range meters.GetItems() {
+		testLogger.Infow(getContext(), "list-logical-device-meters", log.Fields{"meter-config": item.GetConfig()})
+	}
+
+	logicalPorts, err := nbi.ListLogicalDevicePorts(context.Background(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+	assert.NoError(t, err)
+	m := jsonpb.Marshaler{}
+	logicalPortsJson, err := m.MarshalToString(logicalPorts)
+	assert.NoError(t, err)
+
+	testLogger.Infow(getContext(), "list-logical-ports", log.Fields{"ports": logicalPortsJson})
+
+	callables := []func() *ofp.OfpFlowMod{getOnuUpstreamRules, getOltUpstreamRules, getOLTDownstreamMplsSingleTagRules,
+		getOLTDownstreamMplsDoubleTagRules, getOLTDownstreamRules, getOnuDownstreamRules}
+
+	for _, callable := range callables {
+		_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &ofp.FlowTableUpdate{Id: logicalDevices.GetItems()[0].GetId(), FlowMod: callable()})
+		assert.NoError(t, err)
+	}
+}
+
+func getOnuUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100100000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(103),
+			flows.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{},
+	}
+
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 0
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "onu-upstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOltUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100000000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(103),
+			flows.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.PushVlan(0x8100),
+			flows.SetField(flows.VlanVid(2)),
+			flows.SetField(flows.EthSrc(1111)),
+			flows.SetField(flows.EthDst(2222)),
+			flows.PushVlan(0x8847),
+			flows.SetField(flows.MplsLabel(100)),
+			flows.SetField(flows.MplsBos(1)),
+			flows.PushVlan(0x8847),
+			flows.SetField(flows.MplsLabel(200)),
+			flows.MplsTtl(64),
+			flows.Output(2),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 1
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-upstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOLTDownstreamMplsSingleTagRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.Metadata_ofp((1000 << 32) | 1),
+			flows.EthType(0x8847),
+			flows.MplsBos(1),
+			flows.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			flows.PopMpls(0x8847),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 0
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-mpls-downstream-single-tag-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOLTDownstreamMplsDoubleTagRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.EthType(0x8847),
+			flows.EthSrc(2222),
+		},
+		Actions: []*ofp.OfpAction{
+			{Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+			flows.PopMpls(0x8847),
+			flows.PopMpls(0x8847),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 0
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-mpls-downstream-double-tagged-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOLTDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 2, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.VlanVid(2),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.PopVlan(),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 1
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "olt-downstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
+func getOnuDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(2),
+			flows.Metadata_ofp((1000 << 32) | 1),
+			flows.VlanVid(4096),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(103),
+		},
+	}
+	flowMod = makeSimpleFlowMod(fa)
+	flowMod.TableId = 2
+	m := jsonpb.Marshaler{}
+	flowModJson, _ := m.MarshalToString(flowMod)
+	testLogger.Infow(getContext(), "onu-downstream-flow", log.Fields{"flow-mod": flowModJson})
+	return
+}
+
 func TestSuiteNbiApiHandler(t *testing.T) {
 	ctx := context.Background()
 	f, err := os.Create("../../../tests/results/profile.cpu")
@@ -1513,7 +1769,62 @@
 		// 13. Test flow add failure
 		nb.testFlowAddFailure(t, nbi)
 
-		// 14.  Clean up
+		// 14. Clean up
 		nb.deleteAllDevices(t, nbi)
 	}
 }
+
+func TestFlowAddition(t *testing.T) {
+	ctx := context.Background()
+	nb := newNBTest(ctx)
+	assert.NotNil(t, nb)
+
+	defer nb.stopAll(ctx)
+
+	// Start the Core
+	nb.startCore(false)
+
+	// Set the grpc API interface - no grpc server is running in unit test
+	nbi := NewNBIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+
+	// Create/register the adapters
+	nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
+	// 2. Test adapter registration
+	nb.testAdapterRegistration(t, nbi)
+
+	// 3. Test MPLS flows addition where:
+	/*
+		Upstream
+		ONU
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[],
+		transition=TABLE:1, meter=METER:1, metadata=METADATA:4100010000/0]
+		OLT
+		ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[VLAN_PUSH:vlan,
+		VLAN_ID:2, MPLS_PUSH:mpls_unicast, MPLS_LABEL:YYY,MPLS_BOS:true, MPLS_PUSH:mpls_unicast ,MPLS_LABEL:XXX, MPLS_BOS:false,
+		EXTENSION:of:0000000000000227/VolthaPushL2Header{​​​​​​​}​​​​​​​, ETH_SRC:OLT_MAC, ETH_DST:LEAF_MAC,  TTL:64, OUTPUT:65536],
+		meter=METER:1, metadata=METADATA:4100000000/0]
+
+		Downstream
+		OLT
+		//Below flow rule to pop L2 Ethernet headers from packets which have a single MPLS label
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:true, ETH_SRC:LEAF_MAC],
+		treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, EXTENSION:of:0000000000000227/VolthaPopL2Header{},
+		transition=TABLE:1]
+
+		//Below flow rule to pop L2 Ethernet headers from packets which have two MPLS label
+		ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:false, ETH_SRC:LEAF_MAC],
+		treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, MPLS_POP:mpls_unicast ,
+		EXTENSION:of:0000000000000227/VolthaPopL2Header{}, transition=TABLE:1]
+
+		//Below flow rules are unchanged from the current implementations except for the table numbers
+		ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:65536, VLAN_VID:2], treatment=[immediate=[VLAN_POP], transition=TABLE:2,
+		meter=METER:2, metadata=METADATA:1000004100000020/0]
+		ONU
+		ADDED, bytes=0, packets=0, table=2, priority=1000, selector=[IN_PORT:65536, METADATA:20 VLAN_VID:ANY], treatment=[immediate=[OUTPUT:32],
+		meter=METER:2, metadata=METADATA:4100000000/0]
+	*/
+	nb.testMPLSFlowsAddition(t, nbi)
+}