blob: 7143350d2221ac7fdc02e1ec8101fd7f6419ec25 [file] [log] [blame]
Rich Laneb6255512013-05-07 14:58:43 -07001#!/usr/bin/env python
2import unittest
3import parse
4import scapy.all as scapy
5
6class TestPacketToFlowMatchV3(unittest.TestCase):
7 def test_tcp(self):
8 import loxi.of12 as ofp
9 self.maxDiff = None
10 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
11 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
12 scapy.TCP(sport=1234, dport=80)
13 expected = [
14 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
15 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
16 ofp.oxm.eth_type(0x0800),
17 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
18 ofp.oxm.ip_proto(6),
19 ofp.oxm.ip_dscp(32),
20 ofp.oxm.ip_ecn(2),
21 ofp.oxm.ipv4_src(0xc0a80001),
22 ofp.oxm.ipv4_dst(0xc0a80002),
23 ofp.oxm.tcp_src(1234),
24 ofp.oxm.tcp_dst(80)
25 ]
26 result = parse.packet_to_flow_match_v3(pkt).oxm_list
27 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
28
29 def test_udp(self):
30 import loxi.of12 as ofp
31 self.maxDiff = None
32 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
33 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
34 scapy.UDP(sport=1234, dport=80)
35 expected = [
36 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
37 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
38 ofp.oxm.eth_type(0x0800),
39 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
40 ofp.oxm.ip_proto(17),
41 ofp.oxm.ip_dscp(32),
42 ofp.oxm.ip_ecn(2),
43 ofp.oxm.ipv4_src(0xc0a80001),
44 ofp.oxm.ipv4_dst(0xc0a80002),
45 ofp.oxm.udp_src(1234),
46 ofp.oxm.udp_dst(80)
47 ]
48 result = parse.packet_to_flow_match_v3(pkt).oxm_list
49 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
50
51 def test_icmp(self):
52 import loxi.of12 as ofp
53 self.maxDiff = None
54 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
55 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
56 scapy.ICMP(type=8, code=1)
57 expected = [
58 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
59 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
60 ofp.oxm.eth_type(0x0800),
61 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
62 ofp.oxm.ip_proto(1),
63 ofp.oxm.ip_dscp(32),
64 ofp.oxm.ip_ecn(2),
65 ofp.oxm.ipv4_src(0xc0a80001),
66 ofp.oxm.ipv4_dst(0xc0a80002),
67 ofp.oxm.icmpv4_type(8),
68 ofp.oxm.icmpv4_code(1)
69 ]
70 result = parse.packet_to_flow_match_v3(pkt).oxm_list
71 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
72
73 def test_arp(self):
74 import loxi.of12 as ofp
75 self.maxDiff = None
76 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
77 scapy.ARP(hwsrc='00:01:02:03:04:05', hwdst='00:06:07:08:09:0a', \
78 psrc='192.168.0.1', pdst='192.168.0.2', op=1)
79 expected = [
80 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
81 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
82 ofp.oxm.eth_type(0x0806),
83 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
84 ofp.oxm.arp_op(1),
85 ofp.oxm.arp_spa(0xc0a80001),
86 ofp.oxm.arp_tpa(0xc0a80002),
87 ofp.oxm.arp_sha([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
88 ofp.oxm.arp_tha([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
89 ]
90 result = parse.packet_to_flow_match_v3(pkt).oxm_list
91 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
92
93 def test_tcpv6(self):
94 import loxi.of12 as ofp
95 self.maxDiff = None
96 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
97 scapy.IPv6(src="::1", dst="::2", nh=6, tc=2 | (32 << 2), fl=7)/ \
98 scapy.TCP(sport=1234, dport=80)
99 expected = [
100 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
101 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
102 ofp.oxm.eth_type(0x86dd),
103 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
104 ofp.oxm.ip_proto(6),
105 ofp.oxm.ip_dscp(32),
106 ofp.oxm.ip_ecn(2),
107 ofp.oxm.ipv6_src("\x00" * 15 + "\x01"),
108 ofp.oxm.ipv6_dst("\x00" * 15 + "\x02"),
109 ofp.oxm.ipv6_flabel(7),
110 ofp.oxm.tcp_src(1234),
111 ofp.oxm.tcp_dst(80)
112 ]
113 result = parse.packet_to_flow_match_v3(pkt).oxm_list
114 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
115
116 def test_icmpv6(self):
117 import loxi.of12 as ofp
118 self.maxDiff = None
119 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
120 scapy.IPv6(src="::1", dst="::2", tc=2 | (32 << 2), fl=7)/ \
121 scapy.ICMPv6EchoRequest()
122 expected = [
123 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
124 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
125 ofp.oxm.eth_type(0x86dd),
126 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
127 ofp.oxm.ip_proto(0x3a),
128 ofp.oxm.ip_dscp(32),
129 ofp.oxm.ip_ecn(2),
130 ofp.oxm.ipv6_src("\x00" * 15 + "\x01"),
131 ofp.oxm.ipv6_dst("\x00" * 15 + "\x02"),
132 ofp.oxm.ipv6_flabel(7),
133 ofp.oxm.icmpv6_type(128),
134 ofp.oxm.icmpv6_code(0)
135 ]
136 result = parse.packet_to_flow_match_v3(pkt).oxm_list
137 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
138
139 def test_vlan(self):
140 import loxi.of12 as ofp
141 self.maxDiff = None
142 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
143 scapy.Dot1Q(vlan=50, prio=5)/ \
144 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
145 scapy.TCP(sport=1234, dport=80)
146 expected = [
147 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
148 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
149 ofp.oxm.eth_type(0x0800),
150 ofp.oxm.vlan_vid(50),
151 ofp.oxm.vlan_pcp(5),
152 ofp.oxm.ip_proto(6),
153 ofp.oxm.ip_dscp(32),
154 ofp.oxm.ip_ecn(2),
155 ofp.oxm.ipv4_src(0xc0a80001),
156 ofp.oxm.ipv4_dst(0xc0a80002),
157 ofp.oxm.tcp_src(1234),
158 ofp.oxm.tcp_dst(80)
159 ]
160 result = parse.packet_to_flow_match_v3(pkt).oxm_list
161 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
162
163 def test_unknown_ethertype(self):
164 import loxi.of12 as ofp
165 self.maxDiff = None
166 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a', type=0x0801)/ \
167 ('\x11' * 20)
168 expected = [
169 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
170 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
171 ofp.oxm.eth_type(0x0801),
172 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
173 ]
174 result = parse.packet_to_flow_match_v3(pkt).oxm_list
175 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
176
177
178if __name__ == '__main__':
179 unittest.main(verbosity=2)