[VOL-4001] MPLS support in vOLTHA-Core
Change-Id: I6b46ccadbbccafe577d717d6fbf3ace7efa4d1aa
diff --git a/.gitignore b/.gitignore
index 9a6c87e..ca60b49 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,5 +60,6 @@
# CPU profile
**/profile.cpu
-# etcd - ?
+# etcd - locally generated during test
rw_core/core/voltha.rwcore.nb.etcd/
+rw_core/core/device/voltha.rwcore.da.etcd/
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 622257d..5241a95 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -30,6 +30,10 @@
"testing"
"time"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+
+ "github.com/golang/protobuf/jsonpb"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
@@ -73,6 +77,20 @@
maxTimeout time.Duration
}
+var testLogger log.CLogger
+
+func init() {
+ var err error
+ testLogger, err = log.RegisterPackage(log.JSON, log.InfoLevel, log.Fields{"nbi-handler-test": true})
+ if err != nil {
+ panic(err)
+ }
+
+ if err = log.SetLogLevel(log.InfoLevel); err != nil {
+ panic(err)
+ }
+}
+
func newNBTest(ctx context.Context) *NBTest {
test := &NBTest{}
// Start the embedded etcd server
@@ -1429,6 +1447,244 @@
wg.Wait()
}
+func (nb *NBTest) testMPLSFlowsAddition(t *testing.T, nbi *NBIHandler) {
+ // Check whether Device already exist
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.NoError(t, err)
+ testLogger.Infow(getContext(), "device-list", log.Fields{"devices": devices})
+ for _, dev := range devices.GetItems() {
+ // Delete the found device for fresh start
+ testLogger.Warnf(getContext(), "deleting-existing-device", dev.GetId())
+ _, err := nbi.DeleteDevice(context.Background(), &voltha.ID{
+ Id: dev.GetId(),
+ })
+ assert.NoError(t, err)
+ }
+
+ // Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+ var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ return devices != nil && len(devices.Items) == 0
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.NoError(t, err)
+
+ // Get list of devices, to make sure the above operation deleted all the devices
+ devices, err = nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(devices.Items))
+
+ // Create device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ff"})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Verify oltDevice exist in the core
+ devices, err = nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(devices.Items))
+ assert.Equal(t, oltDevice.Id, devices.Items[0].Id)
+
+ // Enable the oltDevice
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Verify that the devices have been setup correctly
+ nb.verifyDevices(t, nbi)
+
+ // Get latest oltDevice data
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ testLogger.Infow(getContext(), "olt-device-created-and-verified", log.Fields{"device-id": oltDevice.GetId()})
+
+ // Verify that the logical device has been setup correctly
+ nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.NoError(t, err)
+
+ testLogger.Infow(getContext(), "list-logical-devices", log.Fields{"logical-device": logicalDevices.GetItems()[0]})
+ // Add a meter to the logical device, which the flow can refer to
+ meterMod := &ofp.OfpMeterMod{
+ Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+ Flags: rand.Uint32(),
+ MeterId: 1,
+ Bands: []*ofp.OfpMeterBandHeader{
+ {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+ Rate: rand.Uint32(),
+ BurstSize: rand.Uint32(),
+ Data: nil,
+ },
+ },
+ }
+ _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{
+ Id: logicalDevices.GetItems()[0].GetId(),
+ MeterMod: meterMod,
+ })
+ assert.NoError(t, err)
+
+ meters, err := nbi.ListLogicalDeviceMeters(getContext(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+ assert.NoError(t, err)
+
+ for _, item := range meters.GetItems() {
+ testLogger.Infow(getContext(), "list-logical-device-meters", log.Fields{"meter-config": item.GetConfig()})
+ }
+
+ logicalPorts, err := nbi.ListLogicalDevicePorts(context.Background(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+ assert.NoError(t, err)
+ m := jsonpb.Marshaler{}
+ logicalPortsJson, err := m.MarshalToString(logicalPorts)
+ assert.NoError(t, err)
+
+ testLogger.Infow(getContext(), "list-logical-ports", log.Fields{"ports": logicalPortsJson})
+
+ callables := []func() *ofp.OfpFlowMod{getOnuUpstreamRules, getOltUpstreamRules, getOLTDownstreamMplsSingleTagRules,
+ getOLTDownstreamMplsDoubleTagRules, getOLTDownstreamRules, getOnuDownstreamRules}
+
+ for _, callable := range callables {
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &ofp.FlowTableUpdate{Id: logicalDevices.GetItems()[0].GetId(), FlowMod: callable()})
+ assert.NoError(t, err)
+ }
+}
+
+func getOnuUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100100000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(103),
+ flows.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{},
+ }
+
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 0
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "onu-upstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOltUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100000000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(103),
+ flows.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.PushVlan(0x8100),
+ flows.SetField(flows.VlanVid(2)),
+ flows.SetField(flows.EthSrc(1111)),
+ flows.SetField(flows.EthDst(2222)),
+ flows.PushVlan(0x8847),
+ flows.SetField(flows.MplsLabel(100)),
+ flows.SetField(flows.MplsBos(1)),
+ flows.PushVlan(0x8847),
+ flows.SetField(flows.MplsLabel(200)),
+ flows.MplsTtl(64),
+ flows.Output(2),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 1
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-upstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOLTDownstreamMplsSingleTagRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.Metadata_ofp((1000 << 32) | 1),
+ flows.EthType(0x8847),
+ flows.MplsBos(1),
+ flows.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ flows.PopMpls(0x8847),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 0
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-mpls-downstream-single-tag-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOLTDownstreamMplsDoubleTagRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.EthType(0x8847),
+ flows.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ flows.PopMpls(0x8847),
+ flows.PopMpls(0x8847),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 0
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-mpls-downstream-double-tagged-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOLTDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 2, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.VlanVid(2),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.PopVlan(),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 1
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-downstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOnuDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.Metadata_ofp((1000 << 32) | 1),
+ flows.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(103),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 2
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "onu-downstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
func TestSuiteNbiApiHandler(t *testing.T) {
ctx := context.Background()
f, err := os.Create("../../../tests/results/profile.cpu")
@@ -1513,7 +1769,62 @@
// 13. Test flow add failure
nb.testFlowAddFailure(t, nbi)
- // 14. Clean up
+ // 14. Clean up
nb.deleteAllDevices(t, nbi)
}
}
+
+func TestFlowAddition(t *testing.T) {
+ ctx := context.Background()
+ nb := newNBTest(ctx)
+ assert.NotNil(t, nb)
+
+ defer nb.stopAll(ctx)
+
+ // Start the Core
+ nb.startCore(false)
+
+ // Set the grpc API interface - no grpc server is running in unit test
+ nbi := NewNBIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+
+ // Create/register the adapters
+ nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+ nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+ nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
+ // 2. Test adapter registration
+ nb.testAdapterRegistration(t, nbi)
+
+ // 3. Test MPLS flows addition where:
+ /*
+ Upstream
+ ONU
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[],
+ transition=TABLE:1, meter=METER:1, metadata=METADATA:4100010000/0]
+ OLT
+ ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[VLAN_PUSH:vlan,
+ VLAN_ID:2, MPLS_PUSH:mpls_unicast, MPLS_LABEL:YYY,MPLS_BOS:true, MPLS_PUSH:mpls_unicast ,MPLS_LABEL:XXX, MPLS_BOS:false,
+ EXTENSION:of:0000000000000227/VolthaPushL2Header{}, ETH_SRC:OLT_MAC, ETH_DST:LEAF_MAC, TTL:64, OUTPUT:65536],
+ meter=METER:1, metadata=METADATA:4100000000/0]
+
+ Downstream
+ OLT
+ //Below flow rule to pop L2 Ethernet headers from packets which have a single MPLS label
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:true, ETH_SRC:LEAF_MAC],
+ treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, EXTENSION:of:0000000000000227/VolthaPopL2Header{},
+ transition=TABLE:1]
+
+ //Below flow rule to pop L2 Ethernet headers from packets which have two MPLS label
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:false, ETH_SRC:LEAF_MAC],
+ treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, MPLS_POP:mpls_unicast ,
+ EXTENSION:of:0000000000000227/VolthaPopL2Header{}, transition=TABLE:1]
+
+ //Below flow rules are unchanged from the current implementations except for the table numbers
+ ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:65536, VLAN_VID:2], treatment=[immediate=[VLAN_POP], transition=TABLE:2,
+ meter=METER:2, metadata=METADATA:1000004100000020/0]
+ ONU
+ ADDED, bytes=0, packets=0, table=2, priority=1000, selector=[IN_PORT:65536, METADATA:20 VLAN_VID:ANY], treatment=[immediate=[OUTPUT:32],
+ meter=METER:2, metadata=METADATA:4100000000/0]
+ */
+ nb.testMPLSFlowsAddition(t, nbi)
+}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index f4c88fa..72b915f 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -167,7 +167,7 @@
go func() {
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
- logger.Infow(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
+ logger.Errorw(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
"errors": res,
"logical-device-id": agent.logicalDeviceID,
"flow": flow,
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index ab04891..5a1affd 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -320,11 +320,6 @@
return deviceRules, nil
}
- if flow.TableId != 0 {
- logger.Warnw(ctx, "This is not olt pipeline table, so skipping", log.Fields{"tableId": flow.TableId})
- return deviceRules, nil
- }
-
ingressHop := path[0]
egressHop := path[1]
if metadataFromwriteMetadata != 0 {
@@ -517,19 +512,25 @@
}
isUpstream := !ingressDevice.Root
if isUpstream { // Unicast OLT and ONU UL
- logger.Debug(ctx, "process-olt-nd-onu-upstream-noncontrollerbound-unicast-flows", log.Fields{"flows": flow})
+ logger.Debug(ctx, "process-olt-and-onu-upstream-non-controller-bound-uni-cast-flows", log.Fields{"flows": flow})
deviceRules, err = fd.processUpstreamNonControllerBoundFlow(ctx, path, inPortNo, outPortNo, flow)
if err != nil {
return nil, err
}
- } else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
- logger.Debugw(ctx, "process-olt-downstream-noncontrollerbound-flow-with-nexttable", log.Fields{"flows": flow})
+ } else if fu.HasNextTable(flow) && (flow.GetTableId() == 0 || flow.GetTableId() == 1) { // Unicast OLT Flow
+ // For 'Non-MPLS' flows, this condition will only be true for table-id 0 as only table-id 0 will have the
+ // 'go-to-next-table' instruction
+ // For 'MPLS' flows, this condition will be true for table-id 0 and table-id 1.
+ // So the flow here shall always be an OLT flow
+ logger.Debugw(ctx, "process-olt-downstream-non-controller-bound-flow-with-next-table", log.Fields{"flows": flow})
deviceRules, err = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
if err != nil {
return nil, err
}
- } else if flow.TableId == 1 && outPortNo != 0 { // Unicast ONU flow DL
- logger.Debugw(ctx, "process-onu-downstream-unicast-flow", log.Fields{"flows": flow})
+ } else if (flow.GetTableId() == 1 || flow.GetTableId() == 2) && outPortNo != 0 { // Unicast ONU flow DL
+ // If this is an MPLS OLT flow (table-id 1, transition-to-table 2), the condition above will already be hit.
+ // So if we are reaching this point, the flow shall always be an ONU flow
+ logger.Debugw(ctx, "process-onu-downstream-uni-cast-flow", log.Fields{"flows": flow})
deviceRules, err = fd.processUnicastFlow(ctx, path, inPortNo, outPortNo, flow)
if err != nil {
return nil, err
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index 944e524..279b39a 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -957,8 +957,9 @@
},
}
+ // If table-id is provided in the flow-args, the same is also used as go-to-next table
fa2 := &fu.FlowArgs{
- KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
+ KV: fu.OfpFlowModArgs{"priority": 500 /*"table_id": 1*/},
MatchFields: []*ofp.OfpOxmOfbField{
fu.InPort(10),
fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
@@ -974,6 +975,9 @@
assert.Nil(t, err)
fs2, err := fu.MkFlowStat(fa2)
assert.Nil(t, err)
+ // Table-1, without next table
+ fs2.TableId = 1
+
fs1.Instructions = []*ofp.OfpInstruction{{
Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
Data: &ofp.OfpInstruction_GotoTable{
@@ -1100,3 +1104,325 @@
derivedFlow := oltFlowAndGroup.GetFlow(0)
assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
}
+
+func TestMplsUpstreamFlowDecomposition(t *testing.T) {
+ // Note: Olt-Nni=10
+ // Onu1-Uni=1
+
+ /*
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:UNI, VLAN_VID:ANY], treatment=[immediate=[],
+ transition=TABLE:1, meter=METER:1, metadata=METADATA:4100010000/0]
+ */
+
+ // Here, 'table_id=1' is present to add go-to-table action
+ faOnu := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100100000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1), // Onu Uni
+ fu.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{},
+ }
+ fsOnu, err := fu.MkFlowStat(faOnu)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsOnu)
+ // Update table-id
+ fsOnu.TableId = 0
+
+ /*
+ ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[VLAN_PUSH:vlan,
+ VLAN_ID:2, MPLS_PUSH:mpls_unicast, MPLS_LABEL:YYY,MPLS_BOS:true, MPLS_PUSH:mpls_unicast ,MPLS_LABEL:XXX, MPLS_BOS:false,
+ EXTENSION:of:0000000000000227/VolthaPushL2Header{}, ETH_SRC:OLT_MAC, ETH_DST:LEAF_MAC, TTL:64, OUTPUT:65536],
+ meter=METER:1, metadata=METADATA:4100000000/0]
+ */
+ faOlt := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1, "write_metadata": 4100000000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1), // Onu-Uni
+ fu.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.PushVlan(0x8100),
+ fu.SetField(fu.VlanVid(2)),
+ fu.SetField(fu.EthSrc(1111)),
+ fu.SetField(fu.EthDst(2222)),
+ fu.PushVlan(0x8847),
+ fu.SetField(fu.MplsLabel(100)),
+ fu.SetField(fu.MplsBos(1)),
+ fu.PushVlan(0x8847),
+ fu.SetField(fu.MplsLabel(200)),
+ fu.MplsTtl(64),
+ fu.Output(10), // Olt-Nni
+ },
+ }
+
+ fsOlt, err := fu.MkFlowStat(faOlt)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsOlt)
+ // Update table-id
+ // table-id is skipped in flow-args above as that would also add the go-to-table action
+ fsOlt.TableId = 1
+
+ flows := map[uint64]*ofp.OfpFlowStats{fsOnu.Id: fsOnu, fsOlt.Id: fsOlt}
+
+ tfd := newTestFlowDecomposer(t, newTestDeviceManager())
+
+ deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
+ assert.Nil(t, err)
+ onuFlowAndGroup := deviceRules.Rules["onu1"]
+ oltFlowAndGroup := deviceRules.Rules["olt"]
+ assert.NotNil(t, onuFlowAndGroup)
+ assert.NotNil(t, onuFlowAndGroup.Flows)
+ assert.Equal(t, 1, onuFlowAndGroup.Flows.Len())
+ assert.Equal(t, 0, onuFlowAndGroup.Groups.Len())
+ assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
+ assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
+
+ // Form expected ONU flow args
+ expectedOnufa := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1, "write_metadata": 4100100000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(2), // Onu Uni
+ fu.TunnelId(uint64(1)),
+ fu.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(1),
+ },
+ }
+
+ // Form the expected ONU flow
+ expectedOnuFlow, err := fu.MkFlowStat(expectedOnufa)
+ assert.Nil(t, err)
+ assert.NotNil(t, expectedOnuFlow)
+ expectedOnuFlow.TableId = 0
+
+ derivedOnuFlow := onuFlowAndGroup.GetFlow(0)
+ expectedOnuFlow.Id = derivedOnuFlow.Id // Assign same flow ID as derived flowID to match completely
+ assert.Equal(t, expectedOnuFlow.String(), derivedOnuFlow.String())
+
+ expectedOltfa := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1, "write_metadata": 4100000000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1), // Onu-Uni
+ fu.TunnelId(uint64(1)),
+ fu.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.PushVlan(0x8100),
+ fu.SetField(fu.VlanVid(2)),
+ fu.SetField(fu.EthSrc(1111)),
+ fu.SetField(fu.EthDst(2222)),
+ fu.PushVlan(0x8847),
+ fu.SetField(fu.MplsLabel(100)),
+ fu.SetField(fu.MplsBos(1)),
+ fu.PushVlan(0x8847),
+ fu.SetField(fu.MplsLabel(200)),
+ fu.MplsTtl(64),
+ fu.Output(2), // Olt-Nni
+ },
+ }
+
+ expectedOltFlow, err := fu.MkFlowStat(expectedOltfa)
+ assert.NoError(t, err)
+ assert.NotNil(t, expectedOltFlow)
+
+ derivedOltFlow := oltFlowAndGroup.GetFlow(0)
+ expectedOltFlow.Id = derivedOltFlow.Id
+ assert.Equal(t, expectedOltFlow.String(), derivedOltFlow.String())
+}
+
+func TestMplsDownstreamFlowDecomposition(t *testing.T) {
+ faOltSingleMplsLable := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(10),
+ fu.Metadata_ofp((1000 << 32) | 1),
+ fu.EthType(0x8847),
+ fu.MplsBos(1),
+ fu.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ fu.PopMpls(0x8847),
+ },
+ }
+ fsOltSingleMplsLabel, err := fu.MkFlowStat(faOltSingleMplsLable)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsOltSingleMplsLabel)
+ fsOltSingleMplsLabel.TableId = 0
+
+ flows := map[uint64]*ofp.OfpFlowStats{fsOltSingleMplsLabel.Id: fsOltSingleMplsLabel}
+
+ tfd := newTestFlowDecomposer(t, newTestDeviceManager())
+
+ deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
+ assert.Nil(t, err)
+ assert.NotNil(t, deviceRules)
+ oltFlowAndGroup := deviceRules.Rules["olt"]
+ assert.NotNil(t, oltFlowAndGroup)
+
+ derivedFlow := oltFlowAndGroup.GetFlow(0)
+ assert.NotNil(t, derivedFlow)
+
+ // Formulate expected
+ expectedFa := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(2),
+ fu.TunnelId(10),
+ fu.Metadata_ofp((1000 << 32) | 1),
+ fu.EthType(0x8847),
+ fu.MplsBos(1),
+ fu.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ fu.PopMpls(0x8847),
+ fu.Output(1),
+ },
+ }
+ expectedFs, err := fu.MkFlowStat(expectedFa)
+ assert.NoError(t, err)
+ expectedFs.Id = derivedFlow.Id
+
+ assert.Equal(t, expectedFs.String(), derivedFlow.String())
+
+ // Formulate Mpls double label
+ faOltDoubleMplsLabel := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(10),
+ fu.EthType(0x8847),
+ fu.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ fu.PopMpls(0x8847),
+ fu.PopMpls(0x8847),
+ },
+ }
+ fsOltDoubleMplsLabel, err := fu.MkFlowStat(faOltDoubleMplsLabel)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsOltDoubleMplsLabel)
+
+ flows2 := map[uint64]*ofp.OfpFlowStats{fsOltDoubleMplsLabel.Id: fsOltDoubleMplsLabel}
+ assert.NotNil(t, flows2)
+
+ deviceRules, err = tfd.fd.DecomposeRules(context.Background(), tfd, flows2, nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, deviceRules)
+ oltFlowAndGroup = deviceRules.Rules["olt"]
+ assert.NotNil(t, oltFlowAndGroup)
+ derivedFlow = oltFlowAndGroup.GetFlow(0)
+ assert.NotNil(t, derivedFlow)
+
+ expectedFa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(2),
+ fu.TunnelId(10),
+ fu.EthType(0x8847),
+ fu.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ fu.PopMpls(0x8847),
+ fu.PopMpls(0x8847),
+ fu.Output(1),
+ },
+ }
+ expectedFs, err = fu.MkFlowStat(expectedFa)
+ assert.NoError(t, err)
+ assert.NotNil(t, expectedFs)
+ expectedFs.Id = derivedFlow.Id
+ assert.Equal(t, expectedFs.String(), derivedFlow.String())
+
+ //olt downstream flows (table-id=1)
+ faOlt := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "table_id": 2, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(10),
+ fu.VlanVid(2),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.PopVlan(),
+ },
+ }
+ fsOlt, err := fu.MkFlowStat(faOlt)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsOlt)
+ fsOlt.TableId = 1
+
+ flows3 := map[uint64]*ofp.OfpFlowStats{fsOlt.Id: fsOlt}
+ assert.NotNil(t, flows3)
+ deviceRules, err = tfd.fd.DecomposeRules(context.Background(), tfd, flows3, nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, deviceRules)
+ oltFlowAndGroup = deviceRules.Rules["olt"]
+ assert.NotNil(t, oltFlowAndGroup)
+ derivedFlow = oltFlowAndGroup.GetFlow(0)
+ assert.NotNil(t, derivedFlow)
+
+ faOltExpected := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(2),
+ fu.TunnelId(10),
+ fu.VlanVid(2),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.PopVlan(),
+ fu.Output(1),
+ },
+ }
+ fsOltExpected, err := fu.MkFlowStat(faOltExpected)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsOltExpected)
+ fsOltExpected.Id = derivedFlow.Id
+ assert.Equal(t, fsOltExpected.String(), derivedFlow.String())
+
+ // Onu Downstream
+ faOnu := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(10),
+ fu.Metadata_ofp((1000 << 32) | 1),
+ fu.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(1),
+ },
+ }
+ fsOnu, err := fu.MkFlowStat(faOnu)
+ assert.NoError(t, err)
+ fsOnu.TableId = 2
+
+ flows4 := map[uint64]*ofp.OfpFlowStats{fsOnu.Id: fsOnu}
+ assert.NotNil(t, flows4)
+ deviceRules, err = tfd.fd.DecomposeRules(context.Background(), tfd, flows4, nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, deviceRules)
+ onuFlowAndGroup := deviceRules.Rules["onu1"]
+ assert.NotNil(t, onuFlowAndGroup)
+ derivedFlow = onuFlowAndGroup.GetFlow(0)
+ assert.NotNil(t, derivedFlow)
+
+ faExpected := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1),
+ fu.Metadata_ofp((1000 << 32) | 1),
+ fu.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(2),
+ },
+ }
+ fsExpected, err := fu.MkFlowStat(faExpected)
+ assert.NoError(t, err)
+ assert.NotNil(t, fsExpected)
+ fsExpected.Id = derivedFlow.Id
+
+ assert.Equal(t, fsExpected.String(), derivedFlow.String())
+}