Add automated tests for PW
Changes:
- Adds 12 procedures to test all initiation scenarios
- Adds 3 procedures to test all int. transport scenarios
- Adds 4 procedures to test all termination scenarios
- Implements PW generation
- Updates README
Change-Id: Ic56fa02a46ede2b5bf04298338ef839378d09235
diff --git a/src/python/oftest/testutils.py b/src/python/oftest/testutils.py
index 323119a..1d48c60 100755
--- a/src/python/oftest/testutils.py
+++ b/src/python/oftest/testutils.py
@@ -69,7 +69,9 @@
"""
logging.info("Deleting groups")
while (not group_queue.empty()):
- msg = ofp.message.group_delete(group_id=group_queue.get())
+ group_id = group_queue.get()
+ print "0x%x" % group_id
+ msg = ofp.message.group_delete(group_id=group_id)
ctrl.message_send(msg)
do_barrier(ctrl)
@@ -450,6 +452,87 @@
return pkt
+def pw_packet(pktlen=100,
+ out_eth_dst='00:01:02:03:04:05',
+ out_eth_src='00:06:07:08:09:0a',
+ label=None,
+ cw=None,
+ in_eth_dst='00:01:02:03:04:05',
+ in_eth_src='00:06:07:08:09:0a',
+ out_dl_vlan_enable=False,
+ in_dl_vlan_enable=False,
+ out_vlan_vid=0,
+ out_vlan_pcp=0,
+ out_dl_vlan_cfi=0,
+ in_vlan_vid=0,
+ in_vlan_pcp=0,
+ in_dl_vlan_cfi=0,
+ ip_src='192.168.0.1',
+ ip_dst='192.168.0.2',
+ ip_tos=0,
+ ip_ttl=64,
+ tcp_sport=1234,
+ tcp_dport=80,
+ tcp_flags="S",
+ ip_ihl=None,
+ ip_options=False
+ ):
+ """
+ Return a simple dataplane TCP packet encapsulated
+ in a pw packet
+ """
+
+ # Add the outer ethernet header
+ if MINSIZE > pktlen:
+ pktlen = MINSIZE
+
+ pkt = scapy.Ether(dst=out_eth_dst, src=out_eth_src)
+
+ #add MPLS header
+ for i in range(len(label)):
+ l,c,s,t=label[i]
+ pkt = pkt/scapy.MPLS(label=l, cos=c, s=s, ttl=t)
+
+ #add the PW CW
+ l,c,s,t=cw
+ pkt = pkt/scapy.MPLS(label=l, cos=c, s=s, ttl=t)
+
+ # Note Dot1Q.id is really CFI
+ if (out_dl_vlan_enable and in_dl_vlan_enable):
+
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.Dot1Q(prio=out_vlan_pcp, id=out_dl_vlan_cfi, vlan=out_vlan_vid)
+
+ pkt = pkt/scapy.Dot1Q(prio=in_vlan_pcp, id=in_dl_vlan_cfi, vlan=in_vlan_vid)
+
+ pkt = pkt/scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+
+ elif (out_dl_vlan_enable and (not in_dl_vlan_enable)):
+
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.Dot1Q(prio=out_vlan_pcp, id=out_dl_vlan_cfi, vlan=out_vlan_vid)/ \
+ scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+
+ elif ((not out_dl_vlan_enable) and in_dl_vlan_enable):
+
+ assert(0) #shall not have this caes
+
+ else:
+ if not ip_options:
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+ else:
+ pkt = pkt/scapy.Ether(dst=in_eth_dst, src=in_eth_src)/ \
+ scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl, options=ip_options)/ \
+ scapy.TCP(sport=tcp_sport, dport=tcp_dport, flags=tcp_flags)
+
+ pkt = pkt/("D" * (pktlen - len(pkt)))
+
+ return pkt
+
def simple_mpls_packet(eth_dst='00:01:02:03:04:05',
eth_src='00:06:07:08:09:0a',
dl_vlan_enable=False,