blob: 1687ce0c83a06ccc7f0606c1e8d0e87b8eda8198 [file] [log] [blame]
Rich Laneea873262013-01-11 08:15:55 -08001"""
2Test cases for testing actions taken on packets
3
4See basic.py for other info.
5
6It is recommended that these definitions be kept in their own
7namespace as different groups of tests will likely define
8similar identifiers.
9
10 The switch is actively attempting to contact the controller at the address
11indicated oin oft_config
12
13"""
14
15import logging
16import ipaddr
17import os.path
18import subprocess
19
20from oftest import config
21import ofp
22import oftest.oft12.testutils as testutils
23import oftest.base_tests as base_tests
24
25TEST_VID_DEFAULT = 2
26
27IPV6_ETHERTYPE = 0x86dd
28ETHERTYPE_VLAN = 0x8100
29ETHERTYPE_MPLS = 0x8847
30TCP_PROTOCOL = 0x6
31UDP_PROTOCOL = 0x11
32ICMPV6_PROTOCOL = 0x3a
33
34
35# TESTS
36class MatchIPv6Simple(base_tests.SimpleDataPlane):
37 """
38 Just send a packet IPv6 to match a simple entry on the matching table
39 """
40 def runTest(self):
41
42 of_ports = config["port_map"].keys()
43 of_ports.sort()
44 ing_port = of_ports[0]
45 egr_port = of_ports[3]
46
47 # Remove all entries Add entry match all
48 rc = testutils.delete_all_flows(self.controller, logging)
49 self.assertEqual(rc, 0, "Failed to delete all flows")
50
51 # Add entry match
52
53 request = ofp.message.flow_mod()
54 request.match.type = ofp.OFPMT_OXM
55 port = ofp.match.in_port(of_ports[0])
56 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
57 eth_dst = ofp.match.eth_dst(ofp.parse.parse_mac("00:01:02:03:04:05"))
58 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
59
60 request.match_fields.tlvs.append(port)
61 request.match_fields.tlvs.append(eth_type)
62 request.match_fields.tlvs.append(eth_dst)
63 request.match_fields.tlvs.append(ipv6_src)
64 act = ofp.action.action_output()
65 act.port = of_ports[3]
66 inst = ofp.instruction.instruction_apply_actions()
67 inst.actions.add(act)
68 request.instructions.add(inst)
69 request.buffer_id = 0xffffffff
70
71 request.priority = 1000
72 logging.debug("Adding flow ")
73
74 rv = self.controller.message_send(request)
75 self.assertTrue(rv != -1, "Failed to insert test flow")
76
77 #Send packet
78 pkt = testutils.simple_ipv6_packet(dl_dst='00:01:02:03:04:05',ip_src='fe80::2420:52ff:fe8f:5189')
79 logging.info("Sending IPv6 packet to " + str(ing_port))
80 logging.debug("Data: " + str(pkt).encode('hex'))
81 self.dataplane.send(ing_port, str(pkt))
82
83 #Receive packet
84 exp_pkt = testutils.simple_ipv6_packet()
85 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
86
87 #Remove flows
88 rc = testutils.delete_all_flows(self.controller, logging)
89 self.assertEqual(rc, 0, "Failed to delete all flows")
90
91
92class MatchICMPv6Simple(base_tests.SimpleDataPlane):
93 """
94 Match on an ICMPv6 packet
95 """
96 def runTest(self):
97 of_ports = config["port_map"].keys()
98 of_ports.sort()
99 ing_port = of_ports[0]
100 egr_port = of_ports[3]
101
102 # Remove all entries Add entry match all
103 rc = testutils.delete_all_flows(self.controller, logging)
104 self.assertEqual(rc, 0, "Failed to delete all flows")
105
106 # Add entry match
107
108 request = ofp.message.flow_mod()
109 request.match.type = ofp.OFPMT_OXM
110 port = ofp.match.in_port(of_ports[0])
111 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
112 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
113 ip_proto = ofp.match.ip_proto(ICMPV6_PROTOCOL)
114 icmpv6_type = ofp.match.icmpv6_type(128)
115
116 request.match_fields.tlvs.append(port)
117 request.match_fields.tlvs.append(eth_type)
118 request.match_fields.tlvs.append(ipv6_src)
119 request.match_fields.tlvs.append(ip_proto)
120 request.match_fields.tlvs.append(icmpv6_type)
121
122 act = ofp.action.action_output()
123 act.port = of_ports[3]
124 inst = ofp.instruction.instruction_apply_actions()
125 inst.actions.add(act)
126 request.instructions.add(inst)
127 request.buffer_id = 0xffffffff
128
129 request.priority = 1000
130 logging.debug("Adding flow ")
131
132 rv = self.controller.message_send(request)
133 self.assertTrue(rv != -1, "Failed to insert test flow")
134
135 #Send packet
136 pkt = testutils.simple_icmpv6_packet()
137 logging.info("Sending IPv6 packet to " + str(ing_port))
138 logging.debug("Data: " + str(pkt).encode('hex'))
139 self.dataplane.send(ing_port, str(pkt))
140
141 #Receive packet
142 exp_pkt = testutils.simple_icmpv6_packet()
143 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
144
145 #Remove flows
146 rc = testutils.delete_all_flows(self.controller, logging)
147 self.assertEqual(rc, 0, "Failed to delete all flows")
148
149
150class IPv6SetField(base_tests.SimpleDataPlane):
151
152 def runTest(self):
153 of_ports = config["port_map"].keys()
154 of_ports.sort()
155 ing_port = of_ports[0]
156 egr_port = of_ports[3]
157
158 # Remove all entries Add entry match all
159 rc = testutils.delete_all_flows(self.controller, logging)
160 self.assertEqual(rc, 0, "Failed to delete all flows")
161
162 # Add entry match
163
164 request = ofp.message.flow_mod()
165 request.match.type = ofp.OFPMT_OXM
166 port = ofp.match.in_port(of_ports[0])
167 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
168 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
169
170 request.match_fields.tlvs.append(port)
171 request.match_fields.tlvs.append(eth_type)
172 request.match_fields.tlvs.append(ipv6_src)
173
174 field_2b_set = ofp.match.ipv6_dst(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:DDDD'))
175 act_setfield = ofp.action.action_set_field()
176 act_setfield.field = field_2b_set
177
178# TODO: insert action set field properly
179 act_out = ofp.action.action_output()
180 act_out.port = of_ports[3]
181
182
183 inst = ofp.instruction.instruction_apply_actions()
184 inst.actions.add(act_setfield)
185 inst.actions.add(act_out)
186 request.instructions.add(inst)
187 request.buffer_id = 0xffffffff
188
189 request.priority = 1000
190 logging.debug("Adding flow ")
191
192 rv = self.controller.message_send(request)
193 self.assertTrue(rv != -1, "Failed to insert test flow")
194
195 #Send packet
196 pkt = testutils.simple_ipv6_packet(ip_src='fe80::2420:52ff:fe8f:5189',ip_dst='fe80::2420:52ff:fe8f:5190')
197 logging.info("Sending IPv6 packet to " + str(ing_port))
198 logging.debug("Data: " + str(pkt).encode('hex'))
199 self.dataplane.send(ing_port, str(pkt))
200
201 #Receive packet
202 exp_pkt = testutils.simple_ipv6_packet(ip_dst='fe80::2420:52ff:fe8f:DDDD')
203 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
204
205 #See flow match
206 response = testutils.flow_stats_get(self)
207 logging.debug("Response" + response.show())
208
209 #Remove flows
210 rc = testutils.delete_all_flows(self.controller, logging)
211 self.assertEqual(rc, 0, "Failed to delete all flows")
212
213
214class MatchIPv6TCP(base_tests.SimpleDataPlane):
215
216 def runTest(self):
217 # Config
218 of_ports = config["port_map"].keys()
219 of_ports.sort()
220 ing_port = of_ports[0]
221 egr_port = of_ports[3]
222
223 # Remove flows
224 rc = testutils.delete_all_flows(self.controller, logging)
225 self.assertEqual(rc, 0, "Failed to delete all flows")
226
227 # Add entry match
228
229 request = ofp.message.flow_mod()
230 request.match.type = ofp.OFPMT_OXM
231 port = ofp.match.in_port(of_ports[0])
232 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
233 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
234 ip_proto = ofp.match.ip_proto(TCP_PROTOCOL)
235 tcp_port = ofp.match.tcp_src(80)
236
237
238 request.match_fields.tlvs.append(port)
239 request.match_fields.tlvs.append(eth_type)
240 request.match_fields.tlvs.append(ipv6_src)
241 request.match_fields.tlvs.append(ip_proto)
242 request.match_fields.tlvs.append(tcp_port)
243
244 act = ofp.action.action_output()
245 act.port = of_ports[3]
246 inst = ofp.instruction.instruction_apply_actions()
247 inst.actions.add(act)
248 request.instructions.add(inst)
249 request.buffer_id = 0xffffffff
250
251 request.priority = 1000
252 logging.debug("Adding flow ")
253
254 rv = self.controller.message_send(request)
255 self.assertTrue(rv != -1, "Failed to send test flow")
256
257 #Send packet
258 pkt = testutils.simple_ipv6_packet(tcp_sport=80, tcp_dport=8080)
259
260 logging.info("Sending IPv6 packet to " + str(ing_port))
261 logging.debug("Data: " + str(pkt).encode('hex'))
262
263 self.dataplane.send(ing_port, str(pkt))
264
265 #Receive packet
266 exp_pkt = testutils.simple_ipv6_packet(tcp_sport=80, tcp_dport=8080)
267
268 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
269
270 #Remove flows
271 rc = testutils.delete_all_flows(self.controller, logging)
272 self.assertEqual(rc, 0, "Failed to delete all flows")
273
274if __name__ == "__main__":
275 print "Please run through oft script: ./oft --test-spec=ipv6"