blob: e1f4214a8e9001c97ae28950561e79db6df970e9 [file] [log] [blame]
Rich Laneea873262013-01-11 08:15:55 -08001"""
2Test cases for testing actions taken on packets
3
4See basic.py for other info.
5
6It is recommended that these definitions be kept in their own
7namespace as different groups of tests will likely define
8similar identifiers.
9
10 The switch is actively attempting to contact the controller at the address
11indicated oin oft_config
12
13"""
14
15import logging
16import ipaddr
Rich Laneea873262013-01-11 08:15:55 -080017
18from oftest import config
19import ofp
20import oftest.oft12.testutils as testutils
21import oftest.base_tests as base_tests
22
23TEST_VID_DEFAULT = 2
24
25IPV6_ETHERTYPE = 0x86dd
26ETHERTYPE_VLAN = 0x8100
27ETHERTYPE_MPLS = 0x8847
28TCP_PROTOCOL = 0x6
29UDP_PROTOCOL = 0x11
30ICMPV6_PROTOCOL = 0x3a
31
32
33# TESTS
34class MatchIPv6Simple(base_tests.SimpleDataPlane):
35 """
36 Just send a packet IPv6 to match a simple entry on the matching table
37 """
38 def runTest(self):
39
40 of_ports = config["port_map"].keys()
41 of_ports.sort()
42 ing_port = of_ports[0]
43 egr_port = of_ports[3]
44
45 # Remove all entries Add entry match all
46 rc = testutils.delete_all_flows(self.controller, logging)
47 self.assertEqual(rc, 0, "Failed to delete all flows")
48
49 # Add entry match
50
51 request = ofp.message.flow_mod()
52 request.match.type = ofp.OFPMT_OXM
53 port = ofp.match.in_port(of_ports[0])
54 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
55 eth_dst = ofp.match.eth_dst(ofp.parse.parse_mac("00:01:02:03:04:05"))
56 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
57
58 request.match_fields.tlvs.append(port)
59 request.match_fields.tlvs.append(eth_type)
60 request.match_fields.tlvs.append(eth_dst)
61 request.match_fields.tlvs.append(ipv6_src)
62 act = ofp.action.action_output()
63 act.port = of_ports[3]
64 inst = ofp.instruction.instruction_apply_actions()
65 inst.actions.add(act)
66 request.instructions.add(inst)
67 request.buffer_id = 0xffffffff
68
69 request.priority = 1000
70 logging.debug("Adding flow ")
71
72 rv = self.controller.message_send(request)
73 self.assertTrue(rv != -1, "Failed to insert test flow")
74
75 #Send packet
76 pkt = testutils.simple_ipv6_packet(dl_dst='00:01:02:03:04:05',ip_src='fe80::2420:52ff:fe8f:5189')
77 logging.info("Sending IPv6 packet to " + str(ing_port))
78 logging.debug("Data: " + str(pkt).encode('hex'))
79 self.dataplane.send(ing_port, str(pkt))
80
81 #Receive packet
82 exp_pkt = testutils.simple_ipv6_packet()
83 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
84
85 #Remove flows
86 rc = testutils.delete_all_flows(self.controller, logging)
87 self.assertEqual(rc, 0, "Failed to delete all flows")
88
89
90class MatchICMPv6Simple(base_tests.SimpleDataPlane):
91 """
92 Match on an ICMPv6 packet
93 """
94 def runTest(self):
95 of_ports = config["port_map"].keys()
96 of_ports.sort()
97 ing_port = of_ports[0]
98 egr_port = of_ports[3]
99
100 # Remove all entries Add entry match all
101 rc = testutils.delete_all_flows(self.controller, logging)
102 self.assertEqual(rc, 0, "Failed to delete all flows")
103
104 # Add entry match
105
106 request = ofp.message.flow_mod()
107 request.match.type = ofp.OFPMT_OXM
108 port = ofp.match.in_port(of_ports[0])
109 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
110 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
111 ip_proto = ofp.match.ip_proto(ICMPV6_PROTOCOL)
112 icmpv6_type = ofp.match.icmpv6_type(128)
113
114 request.match_fields.tlvs.append(port)
115 request.match_fields.tlvs.append(eth_type)
116 request.match_fields.tlvs.append(ipv6_src)
117 request.match_fields.tlvs.append(ip_proto)
118 request.match_fields.tlvs.append(icmpv6_type)
119
120 act = ofp.action.action_output()
121 act.port = of_ports[3]
122 inst = ofp.instruction.instruction_apply_actions()
123 inst.actions.add(act)
124 request.instructions.add(inst)
125 request.buffer_id = 0xffffffff
126
127 request.priority = 1000
128 logging.debug("Adding flow ")
129
130 rv = self.controller.message_send(request)
131 self.assertTrue(rv != -1, "Failed to insert test flow")
132
133 #Send packet
134 pkt = testutils.simple_icmpv6_packet()
135 logging.info("Sending IPv6 packet to " + str(ing_port))
136 logging.debug("Data: " + str(pkt).encode('hex'))
137 self.dataplane.send(ing_port, str(pkt))
138
139 #Receive packet
140 exp_pkt = testutils.simple_icmpv6_packet()
141 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
142
143 #Remove flows
144 rc = testutils.delete_all_flows(self.controller, logging)
145 self.assertEqual(rc, 0, "Failed to delete all flows")
146
147
148class IPv6SetField(base_tests.SimpleDataPlane):
149
150 def runTest(self):
151 of_ports = config["port_map"].keys()
152 of_ports.sort()
153 ing_port = of_ports[0]
154 egr_port = of_ports[3]
155
156 # Remove all entries Add entry match all
157 rc = testutils.delete_all_flows(self.controller, logging)
158 self.assertEqual(rc, 0, "Failed to delete all flows")
159
160 # Add entry match
161
162 request = ofp.message.flow_mod()
163 request.match.type = ofp.OFPMT_OXM
164 port = ofp.match.in_port(of_ports[0])
165 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
166 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
167
168 request.match_fields.tlvs.append(port)
169 request.match_fields.tlvs.append(eth_type)
170 request.match_fields.tlvs.append(ipv6_src)
171
172 field_2b_set = ofp.match.ipv6_dst(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:DDDD'))
173 act_setfield = ofp.action.action_set_field()
174 act_setfield.field = field_2b_set
175
176# TODO: insert action set field properly
177 act_out = ofp.action.action_output()
178 act_out.port = of_ports[3]
179
180
181 inst = ofp.instruction.instruction_apply_actions()
182 inst.actions.add(act_setfield)
183 inst.actions.add(act_out)
184 request.instructions.add(inst)
185 request.buffer_id = 0xffffffff
186
187 request.priority = 1000
188 logging.debug("Adding flow ")
189
190 rv = self.controller.message_send(request)
191 self.assertTrue(rv != -1, "Failed to insert test flow")
192
193 #Send packet
194 pkt = testutils.simple_ipv6_packet(ip_src='fe80::2420:52ff:fe8f:5189',ip_dst='fe80::2420:52ff:fe8f:5190')
195 logging.info("Sending IPv6 packet to " + str(ing_port))
196 logging.debug("Data: " + str(pkt).encode('hex'))
197 self.dataplane.send(ing_port, str(pkt))
198
199 #Receive packet
200 exp_pkt = testutils.simple_ipv6_packet(ip_dst='fe80::2420:52ff:fe8f:DDDD')
201 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
202
203 #See flow match
204 response = testutils.flow_stats_get(self)
205 logging.debug("Response" + response.show())
206
207 #Remove flows
208 rc = testutils.delete_all_flows(self.controller, logging)
209 self.assertEqual(rc, 0, "Failed to delete all flows")
210
211
212class MatchIPv6TCP(base_tests.SimpleDataPlane):
213
214 def runTest(self):
215 # Config
216 of_ports = config["port_map"].keys()
217 of_ports.sort()
218 ing_port = of_ports[0]
219 egr_port = of_ports[3]
220
221 # Remove flows
222 rc = testutils.delete_all_flows(self.controller, logging)
223 self.assertEqual(rc, 0, "Failed to delete all flows")
224
225 # Add entry match
226
227 request = ofp.message.flow_mod()
228 request.match.type = ofp.OFPMT_OXM
229 port = ofp.match.in_port(of_ports[0])
230 eth_type = ofp.match.eth_type(IPV6_ETHERTYPE)
231 ipv6_src = ofp.match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
232 ip_proto = ofp.match.ip_proto(TCP_PROTOCOL)
233 tcp_port = ofp.match.tcp_src(80)
234
235
236 request.match_fields.tlvs.append(port)
237 request.match_fields.tlvs.append(eth_type)
238 request.match_fields.tlvs.append(ipv6_src)
239 request.match_fields.tlvs.append(ip_proto)
240 request.match_fields.tlvs.append(tcp_port)
241
242 act = ofp.action.action_output()
243 act.port = of_ports[3]
244 inst = ofp.instruction.instruction_apply_actions()
245 inst.actions.add(act)
246 request.instructions.add(inst)
247 request.buffer_id = 0xffffffff
248
249 request.priority = 1000
250 logging.debug("Adding flow ")
251
252 rv = self.controller.message_send(request)
253 self.assertTrue(rv != -1, "Failed to send test flow")
254
255 #Send packet
256 pkt = testutils.simple_ipv6_packet(tcp_sport=80, tcp_dport=8080)
257
258 logging.info("Sending IPv6 packet to " + str(ing_port))
259 logging.debug("Data: " + str(pkt).encode('hex'))
260
261 self.dataplane.send(ing_port, str(pkt))
262
263 #Receive packet
264 exp_pkt = testutils.simple_ipv6_packet(tcp_sport=80, tcp_dport=8080)
265
266 testutils.receive_pkt_verify(self, egr_port, exp_pkt)
267
268 #Remove flows
269 rc = testutils.delete_all_flows(self.controller, logging)
270 self.assertEqual(rc, 0, "Failed to delete all flows")
271
272if __name__ == "__main__":
273 print "Please run through oft script: ./oft --test-spec=ipv6"