Add automated tests for PW
Changes:
- Adds 12 procedures to test all initiation scenarios
- Adds 3 procedures to test all int. transport scenarios
- Adds 4 procedures to test all termination scenarios
- Implements PW generation
- Updates README
Change-Id: Ic56fa02a46ede2b5bf04298338ef839378d09235
diff --git a/README.md b/README.md
index b93eecc..fdc088f 100755
--- a/README.md
+++ b/README.md
@@ -132,7 +132,7 @@
The following tests are implemented in vlan_flows.py and these are their results.
Test Results | 3.0 EA0
-------- | -------
+----------------- | -------
L2ForwardingStackedVLAN1 | ok
L2ForwardingStackedVLAN2 | ok
L2ForwardingStackedVLAN3 | ok
@@ -141,3 +141,32 @@
For major details on the test look the comments in the code.
+# Pseudowire Test Result Summary
+
+The following tests are implemented in pw_flows.py and these are their results.
+
+Test Results | 3.0 EA0
+---------------------------------- | -------
+UntaggedPWInitiation_2_Labels | ok
+Untagged2PWInitiation_2_Labels | ok
+UntaggedPWInitiation_3_Labels | ok
+Untagged2PWInitiation_3_Labels | ok
+TaggedPWInitiation_2_Labels | ?
+Tagged2PWInitiation_2_Labels | ?
+TaggedPWInitiation_3_Labels | ?
+Tagged2PWInitiation_3_Labels | ?
+DoubleTaggedPWInitiation_2_Labels | ?
+DoubleTagged2PWInitiation_2_Labels | ?
+DoubleTaggedPWInitiation_3_Labels | ?
+DoubleTagged2PWInitiation_3_Labels | ?
+IntraCO_2_Labels | ok
+IntraCO_3_Labels | ok
+InterCO | ok
+UntaggedPWTermination | ok
+Untagged2PWTermination | ?
+TaggedPWTermination | ok
+DoubleTaggedPWTermination | ok
+
+
+For major details on the test look the comments in the code.
+
diff --git a/accton/accton_util.py b/accton/accton_util.py
index 15e6e99..176ef28 100755
--- a/accton/accton_util.py
+++ b/accton/accton_util.py
@@ -693,9 +693,7 @@
actions=[]
if vrf!=0:
- actions.append(ofp.action.set_field(ofp.oxm.exp2ByteValue(exp_type=1, value=vrf)))
- if mpls_type!=None:
- actions.append(ofp.action.set_field(ofp.oxm.exp2ByteValue(exp_type=ofp.oxm.OFDPA_EXP_TYPE_MPLS_TYPE, value=mpls_type)))
+ actions.append(ofp.action.set_field(ofp.oxm.exp2ByteValue(exp_type=1, value=vrf)))
#actions.append(ofp.action.set_field(ofp.oxm.vlan_vid(value=vlan_id)))
@@ -1736,6 +1734,7 @@
match.oxm_list.append(ofp.oxm.exp4ByteValue(exp_type=ofp.oxm.OFDPA_EXP_TYPE_MPLS_L2_PORT, value=0x00000000 + of_port))
match.oxm_list.append(ofp.oxm.tunnel_id(tunnel_index + ofp.oxm.TUNNEL_ID_BASE))
+
action = []
action.append(ofp.action.group(ref_gid))
assert(qos_index>=0)
diff --git a/ofdpa/flows.py b/ofdpa/flows.py
index dd04fa9..53e0edd 100755
--- a/ofdpa/flows.py
+++ b/ofdpa/flows.py
@@ -12,7 +12,6 @@
from accton_util import *
from utils import *
-
class PacketInUDP( base_tests.SimpleDataPlane ):
"""
Verify ACL rule for IP_PROTO=2 wont match a UDP packet and a rule for IP_PROTO=17 WILL match a UDP packet.
@@ -1924,6 +1923,3 @@
delete_all_flows( self.controller )
delete_groups( self.controller, groups )
delete_all_groups( self.controller )
-
-
-
diff --git a/ofdpa/pw_flows.py b/ofdpa/pw_flows.py
new file mode 100644
index 0000000..564ca56
--- /dev/null
+++ b/ofdpa/pw_flows.py
@@ -0,0 +1,1701 @@
+"""
+Check README file
+"""
+import Queue
+
+from oftest import config
+import inspect
+import logging
+import oftest.base_tests as base_tests
+import ofp
+from oftest.testutils import *
+from accton_util import *
+from utils import *
+
+class UntaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives untagged to the MPLS-TP CE device, it goes out
+ untagged, with a new ethernet header and 2 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=0,
+ egress_tag=EGRESS_UNTAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=0
+ )
+ # we send a simple tcp packet
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we verify the pw packet has been generated
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=130,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_1, label_pw],
+ cw=cw
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ # Flush all the rules for the next couple
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class Untagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives untagged to the MPLS-TP CE device, it goes out
+ tagged, with a new ethernet header and 2 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=0,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=0
+ )
+ # we send a simple tcp packet
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we verify the pw packet has been generated
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=134,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ # Flush all the rules for the next couple
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class UntaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives untagged to the MPLS-TP CE device, it goes out
+ untagged, with a new ethernet header and 3 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=0,
+ egress_tag=EGRESS_UNTAGGED,
+ mpls_labels=2
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=0
+ )
+ # we generate a simple tcp packet
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the pw packet we expect on the out port
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=134,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_2, label_1, label_pw],
+ cw=cw
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class Untagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives untagged to the MPLS-TP CE device, it goes out
+ tagged with a new ethernet header and 3 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=0,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=2
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=0
+ )
+ # we generate a simple tcp packet
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the pw packet we expect on the out port
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=138,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_2, label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Asserions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class TaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives tagged to the MPLS-TP CE device, it goes out
+ with the same tag, a new ethernet header and 2 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=1,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=1
+ )
+ # we generate a simple tcp packet tagged
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ dl_vlan_enable=True,
+ vlan_vid=port_to_in_vlan_1[in_port]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=130,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class Tagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives tagged to the MPLS-TP CE device, it goes out
+ with a different vlan, with a new ethernet header and 2 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=1,
+ egress_tag=EGRESS_TAGGED_TRANS,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=1
+ )
+ # we generate a simple tcp packet tagged
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ dl_vlan_enable=True,
+ vlan_vid=port_to_in_vlan_1[in_port]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=130,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_3[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class TaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives tagged to the MPLS-TP CE device, it goes out
+ with the same vlan, with a new ethernet header and 3 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=1,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=2
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=1
+ )
+ # we generate a simple tcp packet tagged
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ dl_vlan_enable=True,
+ vlan_vid=port_to_in_vlan_1[in_port]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expect pw packet
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=134,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_2, label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2)
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2)
+ delete_all_groups( self.controller )
+
+class Tagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives tagged to the MPLS-TP CE device, it goes out
+ with a different vlam, with a new ethernet header and 3 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=1,
+ egress_tag=EGRESS_TAGGED_TRANS,
+ mpls_labels=2
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=1
+ )
+ # we generate a simple tcp packet tagged
+ parsed_pkt = simple_tcp_packet(
+ pktlen=104,
+ dl_vlan_enable=True,
+ vlan_vid=port_to_in_vlan_1[in_port]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expect pw packet
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=134,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_2, label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_3[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class DoubleTaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives double tagged to the MPLS-TP CE device, it goes out
+ with the same outer vlan, with a new ethernet header and 2 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=2,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=2
+ )
+ # we generate a simple tcp packet with two vlans
+ parsed_pkt = simple_tcp_packet_two_vlan(
+ pktlen=108,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=134,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class DoubleTagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives double tagged to the MPLS-TP CE device and goes out
+ with a new ethernet header and 2 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=2,
+ egress_tag=EGRESS_TAGGED_TRANS,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=2
+ )
+ # we generate a simple tcp packet with two vlans
+ parsed_pkt = simple_tcp_packet_two_vlan(
+ pktlen=108,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=134,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_3[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class DoubleTaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives double tagged to the MPLS-TP CE device and goes out
+ with a new ethernet header and 3 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=2,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=2
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=2
+ )
+ # we generate a simple tcp packet with two wlan
+ parsed_pkt = simple_tcp_packet_two_vlan(
+ pktlen=108,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=138,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_2, label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class DoubleTagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Initiation. The traffic
+ arrives double tagged to the MPLS-TP CE device and goes out
+ with a new ethernet header and 3 mpls labels.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ ingress_tags=2,
+ egress_tag=EGRESS_TAGGED_TRANS,
+ mpls_labels=2
+ )
+ # we fill the pipeline for the pw termination
+ # on the reverse path
+ (
+ port_to_mpls_label_pw_x,
+ port_to_vlan_2_x,
+ port_to_vlan_1_x,
+ port_to_switch_mac_str_x,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ egress_tags=2
+ )
+ # we generate a simple tcp packet with two wlan
+ parsed_pkt = simple_tcp_packet_two_vlan(
+ pktlen=108,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=138,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ out_eth_src=port_to_src_mac_str[out_port],
+ label=[label_2, label_1, label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_3[in_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[in_port],
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class IntraCO_2_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW intermediate transport.
+ Incoming packet has 2 labels (SR/PW) (intermediate leaf switch).
+ There is no VLAN tag in the incoming packet. Pop outer MPLS label
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ # we fill the pw pipeline for the intermediate transport
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_switch_mac_str,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups
+ ) = fill_pw_intermediate_transport_pipeline(
+ self.controller,
+ logging,
+ ports,
+ mpls_labels=3
+ )
+
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we geneate the pw packet
+ label_1 = (port_to_mpls_label_2[in_port], 0, 0, 32)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
+ parsed_pkt = mpls_packet(
+ pktlen=104,
+ ip_ttl=63,
+ eth_dst=port_to_switch_mac_str[in_port],
+ label=[ label_1, label_pw ]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we geneate the expected pw packet
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 31)
+ parsed_pkt = mpls_packet(
+ pktlen=100,
+ ip_ttl=63,
+ eth_dst=port_to_dst_mac_str[in_port],
+ eth_src=port_to_src_mac_str[out_port],
+ label=[ label_pw ]
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_all_groups( self.controller )
+
+class IntraCO_3_Labels( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW intermediate transport.
+ Incoming packet has 3 labels (SR/SR/PW) (spine switch).
+ There is no VLAN tag in the incoming packet. Pop outer MPLS label
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ # we fill the pipeline for the intermediate transport
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_switch_mac_str,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups
+ ) = fill_pw_intermediate_transport_pipeline(
+ self.controller,
+ logging,
+ ports,
+ mpls_labels=3
+ )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we generate the pw packet
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 32)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 32)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
+ parsed_pkt = mpls_packet(
+ pktlen=104,
+ ip_ttl=63,
+ eth_dst=port_to_switch_mac_str[in_port],
+ label=[ label_2, label_1, label_pw ]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 31)
+ parsed_pkt = mpls_packet(
+ pktlen=100,
+ ip_ttl=63,
+ eth_dst=port_to_dst_mac_str[in_port],
+ eth_src=port_to_src_mac_str[out_port],
+ label=[ label_1, label_pw ]
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_all_groups( self.controller )
+
+class InterCO( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW intermediate transport.
+ Incoming packet has 1 labels (PW) (Intermediate CO leaf switch).
+ There is no VLAN tag in the incoming packet. Push up to 2 MPLS labels
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ # we fill the pipeline for the intermediate transport
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_switch_mac_str,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups
+ ) = fill_pw_intermediate_transport_pipeline(
+ self.controller,
+ logging,
+ ports,
+ mpls_labels=1
+ )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we generate the pw packet
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
+ parsed_pkt = mpls_packet(
+ pktlen=104,
+ ip_ttl=63,
+ eth_dst=port_to_switch_mac_str[in_port],
+ label=[ label_pw ]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected pw packet
+ label_2 = (port_to_mpls_label_2[in_port], 0, 0, 31)
+ label_1 = (port_to_mpls_label_1[in_port], 0, 0, 31)
+ label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 31)
+ parsed_pkt = mpls_packet(
+ pktlen=112,
+ ip_ttl=63,
+ eth_dst=port_to_dst_mac_str[in_port],
+ eth_src=port_to_src_mac_str[out_port],
+ label=[ label_2, label_1, label_pw ]
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_all_groups( self.controller )
+
+class UntaggedPWTermination( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Termination. The traffic
+ arrives untagged to the MPLS-TP CE device and goes out
+ without the outer ethernet header and untagged.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ # on the reverse path
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ ingress_tags=0,
+ egress_tag=EGRESS_UNTAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ (
+ port_to_mpls_label_pw,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_dst_mac_str,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ egress_tags=0
+ )
+ # we generate the pw packet
+ label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=104,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ label=[label_pw],
+ cw=cw
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected tcp packet
+ parsed_pkt = simple_tcp_packet(
+ pktlen=82,
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class Untagged2PWTermination( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Termination. The traffic
+ arrives untagged to the MPLS-TP CE device and goes out
+ without the outer ethernet header and untagged
+ but was originally tagged.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ # on the reverse path
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ ingress_tags=0,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ (
+ port_to_mpls_label_pw,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_dst_mac_str,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ egress_tags=0
+ )
+ # we generate the pw packet
+ label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=104,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ label=[label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_1[out_port],
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected tcp packet
+ parsed_pkt = simple_tcp_packet(
+ pktlen=78,
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class TaggedPWTermination( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Termination. The traffic
+ arrives untagged to the MPLS-TP CE device and goes out
+ without the outer ethernet header and with a vlan tag.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ # on the reverse path
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ ingress_tags=1,
+ egress_tag=EGRESS_TAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ (
+ port_to_mpls_label_pw,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_dst_mac_str,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ egress_tags=1
+ )
+ # we generate the pw packet
+ label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=104,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ label=[label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_1[out_port],
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected tcp packet
+ # with a vlan tag
+ parsed_pkt = simple_tcp_packet(
+ pktlen=82,
+ dl_vlan_enable=True,
+ vlan_vid=port_to_in_vlan_1[out_port]
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+
+class DoubleTaggedPWTermination( base_tests.SimpleDataPlane ):
+ """
+ This is meant to test the PW Termination. The traffic
+ arrives untagged to the MPLS-TP CE device and goes out
+ without the outer ethernet header and 2 vlan tags.
+ """
+ def runTest( self ):
+
+ Groups = Queue.LifoQueue( )
+ Groups2 = Queue.LifoQueue( )
+ try:
+ if len( config[ "port_map" ] ) < 1:
+ logging.info( "Port count less than 1, can't run this case" )
+ assert (False)
+ return
+ ports = config[ "port_map" ].keys( )
+ for pair in itertools.product(ports, ports):
+ # we generate all possible products
+ in_port = pair[0]
+ out_port = pair[1]
+ if out_port == in_port:
+ continue
+ # we fill the pipeline for the pw initiation
+ # on the reverse path
+ (
+ port_to_mpls_label_2,
+ port_to_mpls_label_1,
+ port_to_mpls_label_pw,
+ port_to_in_vlan_3,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_src_mac_str,
+ port_to_dst_mac_str,
+ Groups ) = fill_pw_initiation_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=out_port,
+ out_port=in_port,
+ ingress_tags=2,
+ egress_tag = EGRESS_TAGGED,
+ mpls_labels=1
+ )
+ # we fill the pipeline for the pw termination
+ (
+ port_to_mpls_label_pw,
+ port_to_in_vlan_2,
+ port_to_in_vlan_1,
+ port_to_dst_mac_str,
+ Groups2 ) = fill_pw_termination_pipeline(
+ controller=self.controller,
+ logging=logging,
+ in_port=in_port,
+ out_port=out_port,
+ egress_tags=2
+ )
+ # we generate the pw packet
+ label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
+ cw = (0, 0, 0, 0)
+ parsed_pkt = pw_packet(
+ pktlen=104,
+ out_eth_dst=port_to_dst_mac_str[in_port],
+ label=[label_pw],
+ cw=cw,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[out_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[out_port]
+ )
+ pkt = str( parsed_pkt )
+ self.dataplane.send( in_port, pkt )
+ # we generate the expected tcp
+ # packet with two vlan tags
+ parsed_pkt = simple_tcp_packet_two_vlan(
+ pktlen=82,
+ out_dl_vlan_enable=True,
+ out_vlan_vid=port_to_in_vlan_2[out_port],
+ in_dl_vlan_enable=True,
+ in_vlan_vid=port_to_in_vlan_1[out_port]
+ )
+ pkt = str( parsed_pkt )
+ # Assertions
+ verify_packet( self, pkt, out_port )
+ verify_no_packet( self, pkt, in_port )
+ verify_no_other_packets( self )
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
+ finally:
+ delete_all_flows( self.controller )
+ delete_groups( self.controller, Groups )
+ delete_groups( self.controller, Groups2 )
+ delete_all_groups( self.controller )
diff --git a/ofdpa/utils.py b/ofdpa/utils.py
index cf797e8..167763b 100644
--- a/ofdpa/utils.py
+++ b/ofdpa/utils.py
@@ -69,7 +69,6 @@
controller.message_send( request )
do_barrier( controller )
-
"""
MULTICAST Pipelines
"""
@@ -331,7 +330,6 @@
port_to_src_mac_str = {}
port_to_dst_mac = {}
port_to_dst_mac_str = {}
-
port_to_mpls_label_1 = {}
port_to_mpls_label_2 = {}
port_to_mpls_label_pw = {}
@@ -408,7 +406,6 @@
)
Groups._put( mpls_tunnel_gid )
mpls_gid = mpls_tunnel_gid
-
# add MPLS L2 VPN group
mpls_l2_vpn_gid, mpls_l2_vpn_msg = add_mpls_label_group(
ctrl=controller,
@@ -423,7 +420,6 @@
set_bos=1,
cpy_ttl_outward=True
)
-
Groups._put( mpls_l2_vpn_gid )
# add MPLS L2 port flow
add_mpls_l2_port_flow(
@@ -641,7 +637,6 @@
of_port=in_port,
vlan_id=out_vlan,
flag=VLAN_TABLE_FLAG_ONLY_TAG,
- mpls_type=None
)
add_one_vlan_table_flow(
ctrl=controller,
@@ -665,7 +660,7 @@
logging,
in_port,
out_port,
- egress_tags,
+ egress_tags
):
"""
This method, according to the scenario, fills properly
@@ -747,7 +742,6 @@
of_port=in_port,
vlan_id=out_vlan,
flag=VLAN_TABLE_FLAG_ONLY_TAG,
- mpls_type=None
)
add_one_vlan_table_flow(
ctrl=controller,
@@ -759,7 +753,7 @@
return (
port_to_mpls_label_pw,
port_to_vlan_2,
- port_to_in_vlan_1,
+ port_to_vlan_1,
port_to_switch_mac_str,
Groups
)
diff --git a/src/python/oftest/testutils.py b/src/python/oftest/testutils.py
index 323119a..1d48c60 100755
--- a/src/python/oftest/testutils.py
+++ b/src/python/oftest/testutils.py
@@ -69,7 +69,9 @@
"""
logging.info("Deleting groups")
while (not group_queue.empty()):
- msg = ofp.message.group_delete(group_id=group_queue.get())
+ group_id = group_queue.get()
+ print "0x%x" % group_id
+ msg = ofp.message.group_delete(group_id=group_id)
ctrl.message_send(msg)
do_barrier(ctrl)
@@ -450,6 +452,87 @@
return pkt
+def pw_packet(pktlen=100,
+ out_eth_dst='00:01:02:03:04:05',
+ out_eth_src='00:06:07:08:09:0a',
+ label=None,
+ cw=None,
+ in_eth_dst='00:01:02:03:04:05',
+ in_eth_src='00:06:07:08:09:0a',
+ out_dl_vlan_enable=False,
+ in_dl_vlan_enable=False,
+ out_vlan_vid=0,
+ out_vlan_pcp=0,
+ out_dl_vlan_cfi=0,
+ in_vlan_vid=0,
+ in_vlan_pcp=0,
+ in_dl_vlan_cfi=0,
+ ip_src='192.168.0.1',
+ ip_dst='192.168.0.2',
+ ip_tos=0,
+ ip_ttl=64,
+ tcp_sport=1234,
+ tcp_dport=80,
+ tcp_flags="S",
+ ip_ihl=None,
+ ip_options=False
+ ):
+ """
+ Return a simple dataplane TCP packet encapsulated
+ in a pw packet
+ """
+
+ # Add the outer ethernet header
+ if MINSIZE > pktlen:
+ pktlen = MINSIZE
+
+ pkt = scapy.Ether(dst=out_eth_dst, src=out_eth_src)
+
+ #add MPLS header
+ for i in range(len(label)):
+ l,c,s,t=label[i]
+ pkt = pkt/scapy.MPLS(label=l, cos=c, s=s, ttl=t)
+
+ #add the PW CW
+ l,c,s,t=cw
+ pkt = pkt/scapy.MPLS(label=l, cos=c, s=s, ttl=t)
+
+ # Note Dot1Q.id is really CFI
+ if (out_dl_vlan_enable and in_dl_vlan_enable):
+
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.Dot1Q(prio=out_vlan_pcp, id=out_dl_vlan_cfi, vlan=out_vlan_vid)
+
+ pkt = pkt/scapy.Dot1Q(prio=in_vlan_pcp, id=in_dl_vlan_cfi, vlan=in_vlan_vid)
+
+ pkt = pkt/scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+
+ elif (out_dl_vlan_enable and (not in_dl_vlan_enable)):
+
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.Dot1Q(prio=out_vlan_pcp, id=out_dl_vlan_cfi, vlan=out_vlan_vid)/ \
+ scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+
+ elif ((not out_dl_vlan_enable) and in_dl_vlan_enable):
+
+ assert(0) #shall not have this caes
+
+ else:
+ if not ip_options:
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+ else:
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl, options=ip_options)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+
+ pkt = pkt/("D" * (pktlen - len(pkt)))
+
+ return pkt
+
def simple_mpls_packet(eth_dst='00:01:02:03:04:05',
eth_src='00:06:07:08:09:0a',
dl_vlan_enable=False,