blob: d4762ef81e3055c0ee71411ac224c3a9ac0ff9b3 [file] [log] [blame]
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17#!/usr/bin/env python
18import unittest
19import parse
20import packet as scapy
21
22class TestPacketToFlowMatchV3(unittest.TestCase):
23 def test_tcp(self):
24 import loxi.of12 as ofp
25 self.maxDiff = None
26 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
27 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
28 scapy.TCP(sport=1234, dport=80)
29 expected = [
30 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
31 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
32 ofp.oxm.eth_type(0x0800),
33 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
34 ofp.oxm.ip_proto(6),
35 ofp.oxm.ip_dscp(32),
36 ofp.oxm.ip_ecn(2),
37 ofp.oxm.ipv4_src(0xc0a80001),
38 ofp.oxm.ipv4_dst(0xc0a80002),
39 ofp.oxm.tcp_src(1234),
40 ofp.oxm.tcp_dst(80)
41 ]
42 result = parse.packet_to_flow_match_v3(pkt).oxm_list
43 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
44
45 def test_udp(self):
46 import loxi.of12 as ofp
47 self.maxDiff = None
48 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
49 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
50 scapy.UDP(sport=1234, dport=80)
51 expected = [
52 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
53 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
54 ofp.oxm.eth_type(0x0800),
55 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
56 ofp.oxm.ip_proto(17),
57 ofp.oxm.ip_dscp(32),
58 ofp.oxm.ip_ecn(2),
59 ofp.oxm.ipv4_src(0xc0a80001),
60 ofp.oxm.ipv4_dst(0xc0a80002),
61 ofp.oxm.udp_src(1234),
62 ofp.oxm.udp_dst(80)
63 ]
64 result = parse.packet_to_flow_match_v3(pkt).oxm_list
65 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
66
67 def test_icmp(self):
68 import loxi.of12 as ofp
69 self.maxDiff = None
70 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
71 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
72 scapy.ICMP(type=8, code=1)
73 expected = [
74 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
75 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
76 ofp.oxm.eth_type(0x0800),
77 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
78 ofp.oxm.ip_proto(1),
79 ofp.oxm.ip_dscp(32),
80 ofp.oxm.ip_ecn(2),
81 ofp.oxm.ipv4_src(0xc0a80001),
82 ofp.oxm.ipv4_dst(0xc0a80002),
83 ofp.oxm.icmpv4_type(8),
84 ofp.oxm.icmpv4_code(1)
85 ]
86 result = parse.packet_to_flow_match_v3(pkt).oxm_list
87 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
88
89 def test_arp(self):
90 import loxi.of12 as ofp
91 self.maxDiff = None
92 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
93 scapy.ARP(hwsrc='00:01:02:03:04:05', hwdst='00:06:07:08:09:0a', \
94 psrc='192.168.0.1', pdst='192.168.0.2', op=1)
95 expected = [
96 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
97 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
98 ofp.oxm.eth_type(0x0806),
99 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
100 ofp.oxm.arp_op(1),
101 ofp.oxm.arp_spa(0xc0a80001),
102 ofp.oxm.arp_tpa(0xc0a80002),
103 ofp.oxm.arp_sha([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
104 ofp.oxm.arp_tha([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
105 ]
106 result = parse.packet_to_flow_match_v3(pkt).oxm_list
107 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
108
109 def test_tcpv6(self):
110 import loxi.of12 as ofp
111 self.maxDiff = None
112 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
113 scapy.IPv6(src="::1", dst="::2", nh=6, tc=2 | (32 << 2), fl=7)/ \
114 scapy.TCP(sport=1234, dport=80)
115 expected = [
116 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
117 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
118 ofp.oxm.eth_type(0x86dd),
119 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
120 ofp.oxm.ip_proto(6),
121 ofp.oxm.ip_dscp(32),
122 ofp.oxm.ip_ecn(2),
123 ofp.oxm.ipv6_src("\x00" * 15 + "\x01"),
124 ofp.oxm.ipv6_dst("\x00" * 15 + "\x02"),
125 ofp.oxm.ipv6_flabel(7),
126 ofp.oxm.tcp_src(1234),
127 ofp.oxm.tcp_dst(80)
128 ]
129 result = parse.packet_to_flow_match_v3(pkt).oxm_list
130 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
131
132 def test_icmpv6(self):
133 import loxi.of12 as ofp
134 self.maxDiff = None
135 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
136 scapy.IPv6(src="::1", dst="::2", tc=2 | (32 << 2), fl=7)/ \
137 scapy.ICMPv6EchoRequest()
138 expected = [
139 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
140 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
141 ofp.oxm.eth_type(0x86dd),
142 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
143 ofp.oxm.ip_proto(0x3a),
144 ofp.oxm.ip_dscp(32),
145 ofp.oxm.ip_ecn(2),
146 ofp.oxm.ipv6_src("\x00" * 15 + "\x01"),
147 ofp.oxm.ipv6_dst("\x00" * 15 + "\x02"),
148 ofp.oxm.ipv6_flabel(7),
149 ofp.oxm.icmpv6_type(128),
150 ofp.oxm.icmpv6_code(0)
151 ]
152 result = parse.packet_to_flow_match_v3(pkt).oxm_list
153 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
154
155 def test_vlan(self):
156 import loxi.of12 as ofp
157 self.maxDiff = None
158 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a')/ \
159 scapy.Dot1Q(vlan=50, prio=5)/ \
160 scapy.IP(src='192.168.0.1', dst='192.168.0.2', tos=2 | (32 << 2), ttl=64)/ \
161 scapy.TCP(sport=1234, dport=80)
162 expected = [
163 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
164 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
165 ofp.oxm.eth_type(0x0800),
166 ofp.oxm.vlan_vid(50),
167 ofp.oxm.vlan_pcp(5),
168 ofp.oxm.ip_proto(6),
169 ofp.oxm.ip_dscp(32),
170 ofp.oxm.ip_ecn(2),
171 ofp.oxm.ipv4_src(0xc0a80001),
172 ofp.oxm.ipv4_dst(0xc0a80002),
173 ofp.oxm.tcp_src(1234),
174 ofp.oxm.tcp_dst(80)
175 ]
176 result = parse.packet_to_flow_match_v3(pkt).oxm_list
177 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
178
179 def test_unknown_ethertype(self):
180 import loxi.of12 as ofp
181 self.maxDiff = None
182 pkt = scapy.Ether(dst='00:01:02:03:04:05', src='00:06:07:08:09:0a', type=0x0801)/ \
183 ('\x11' * 20)
184 expected = [
185 ofp.oxm.eth_dst([0x00, 0x01, 0x02, 0x03, 0x04, 0x05]),
186 ofp.oxm.eth_src([0x00, 0x06, 0x07, 0x08, 0x09, 0x0a]),
187 ofp.oxm.eth_type(0x0801),
188 ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE),
189 ]
190 result = parse.packet_to_flow_match_v3(pkt).oxm_list
191 self.assertEquals([x.show() for x in expected], [x.show() for x in result])
192
193
194if __name__ == '__main__':
195 unittest.main(verbosity=2)