blob: 01fabb9604d46f4eb68dcb5a2b362bc478be1105 [file] [log] [blame]
Pier23784aa2016-09-19 20:08:21 -07001"""
2Check README file
3"""
4import Queue
Pier265ad5f2017-02-28 17:46:28 +01005import time
Pier23784aa2016-09-19 20:08:21 -07006
7from oftest import config
8import inspect
9import logging
10import oftest.base_tests as base_tests
11import ofp
12from oftest.testutils import *
13from accton_util import *
14from utils import *
15
16class UntaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
17 """
18 This is meant to test the PW Initiation. The traffic
19 arrives untagged to the MPLS-TP CE device, it goes out
20 untagged, with a new ethernet header and 2 mpls labels.
21 """
22 def runTest( self ):
23
24 Groups = Queue.LifoQueue( )
25 Groups2 = Queue.LifoQueue( )
26 try:
27 if len( config[ "port_map" ] ) < 1:
28 logging.info( "Port count less than 1, can't run this case" )
29 assert (False)
30 return
31 ports = config[ "port_map" ].keys( )
32
33 for pair in itertools.product(ports, ports):
34 # we generate all possible products
35 in_port = pair[0]
36 out_port = pair[1]
37 if out_port == in_port:
38 continue
39 # we fill the pipeline for the initiation
40 (
41 port_to_mpls_label_2,
42 port_to_mpls_label_1,
43 port_to_mpls_label_pw,
44 port_to_in_vlan_3,
45 port_to_in_vlan_2,
46 port_to_in_vlan_1,
47 port_to_src_mac_str,
48 port_to_dst_mac_str,
49 Groups ) = fill_pw_initiation_pipeline(
50 controller=self.controller,
51 logging=logging,
52 in_port=in_port,
53 out_port=out_port,
54 ingress_tags=0,
55 egress_tag=EGRESS_UNTAGGED,
56 mpls_labels=1
57 )
58 # we fill the pipeline for the termination
59 # on the reverse path
60 (
61 port_to_mpls_label_pw_x,
62 port_to_vlan_2_x,
63 port_to_vlan_1_x,
64 port_to_switch_mac_str_x,
65 Groups2 ) = fill_pw_termination_pipeline(
66 controller=self.controller,
67 logging=logging,
68 in_port=out_port,
69 out_port=in_port,
70 egress_tags=0
71 )
72 # we send a simple tcp packet
73 parsed_pkt = simple_tcp_packet(
74 pktlen=104,
75 )
76 pkt = str( parsed_pkt )
77 self.dataplane.send( in_port, pkt )
78 # we verify the pw packet has been generated
79 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
80 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
81 cw = (0, 0, 0, 0)
82 parsed_pkt = pw_packet(
83 pktlen=130,
84 out_eth_dst=port_to_dst_mac_str[in_port],
85 out_eth_src=port_to_src_mac_str[out_port],
86 label=[label_1, label_pw],
87 cw=cw
88 )
89 pkt = str( parsed_pkt )
90 # Assertions
91 verify_packet( self, pkt, out_port )
92 verify_no_packet( self, pkt, in_port )
93 verify_no_other_packets( self )
94 # Flush all the rules for the next couple
95 delete_all_flows( self.controller )
96 delete_groups( self.controller, Groups )
97 delete_groups( self.controller, Groups2 )
98 delete_all_groups( self.controller )
99
100 finally:
101 delete_all_flows( self.controller )
102 delete_groups( self.controller, Groups )
103 delete_groups( self.controller, Groups2 )
104 delete_all_groups( self.controller )
105
106class Untagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
107 """
108 This is meant to test the PW Initiation. The traffic
109 arrives untagged to the MPLS-TP CE device, it goes out
110 tagged, with a new ethernet header and 2 mpls labels.
111 """
112 def runTest( self ):
113
114 Groups = Queue.LifoQueue( )
115 Groups2 = Queue.LifoQueue( )
116 try:
117 if len( config[ "port_map" ] ) < 1:
118 logging.info( "Port count less than 1, can't run this case" )
119 assert (False)
120 return
121 ports = config[ "port_map" ].keys( )
122
123 for pair in itertools.product(ports, ports):
124 # we generate all possible products
125 in_port = pair[0]
126 out_port = pair[1]
127 if out_port == in_port:
128 continue
129 # we fill the pipeline for the initiation
130 (
131 port_to_mpls_label_2,
132 port_to_mpls_label_1,
133 port_to_mpls_label_pw,
134 port_to_in_vlan_3,
135 port_to_in_vlan_2,
136 port_to_in_vlan_1,
137 port_to_src_mac_str,
138 port_to_dst_mac_str,
139 Groups ) = fill_pw_initiation_pipeline(
140 controller=self.controller,
141 logging=logging,
142 in_port=in_port,
143 out_port=out_port,
144 ingress_tags=0,
145 egress_tag=EGRESS_TAGGED,
146 mpls_labels=1
147 )
148 # we fill the pipeline for the termination
149 # on the reverse path
150 (
151 port_to_mpls_label_pw_x,
152 port_to_vlan_2_x,
153 port_to_vlan_1_x,
154 port_to_switch_mac_str_x,
155 Groups2 ) = fill_pw_termination_pipeline(
156 controller=self.controller,
157 logging=logging,
158 in_port=out_port,
159 out_port=in_port,
160 egress_tags=0
161 )
162 # we send a simple tcp packet
163 parsed_pkt = simple_tcp_packet(
164 pktlen=104,
165 )
166 pkt = str( parsed_pkt )
167 self.dataplane.send( in_port, pkt )
168 # we verify the pw packet has been generated
169 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
170 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
171 cw = (0, 0, 0, 0)
172 parsed_pkt = pw_packet(
173 pktlen=134,
174 out_eth_dst=port_to_dst_mac_str[in_port],
175 out_eth_src=port_to_src_mac_str[out_port],
176 label=[label_1, label_pw],
177 cw=cw,
178 out_dl_vlan_enable=True,
179 out_vlan_vid=port_to_in_vlan_1[in_port],
180 )
181 pkt = str( parsed_pkt )
182 # Assertions
183 verify_packet( self, pkt, out_port )
184 verify_no_packet( self, pkt, in_port )
185 verify_no_other_packets( self )
186 # Flush all the rules for the next couple
187 delete_all_flows( self.controller )
188 delete_groups( self.controller, Groups )
189 delete_groups( self.controller, Groups2 )
190 delete_all_groups( self.controller )
191
192 finally:
193 delete_all_flows( self.controller )
194 delete_groups( self.controller, Groups )
195 delete_groups( self.controller, Groups2 )
196 delete_all_groups( self.controller )
197
198class UntaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
199 """
200 This is meant to test the PW Initiation. The traffic
201 arrives untagged to the MPLS-TP CE device, it goes out
202 untagged, with a new ethernet header and 3 mpls labels.
203 """
204 def runTest( self ):
205
206 Groups = Queue.LifoQueue( )
207 Groups2 = Queue.LifoQueue( )
208 try:
209 if len( config[ "port_map" ] ) < 1:
210 logging.info( "Port count less than 1, can't run this case" )
211 assert (False)
212 return
213 ports = config[ "port_map" ].keys( )
214
215 for pair in itertools.product(ports, ports):
216 # we generate all possible products
217 in_port = pair[0]
218 out_port = pair[1]
219 if out_port == in_port:
220 continue
221 # we fill the pipeline for the pw initiation
222 (
223 port_to_mpls_label_2,
224 port_to_mpls_label_1,
225 port_to_mpls_label_pw,
226 port_to_in_vlan_3,
227 port_to_in_vlan_2,
228 port_to_in_vlan_1,
229 port_to_src_mac_str,
230 port_to_dst_mac_str,
231 Groups ) = fill_pw_initiation_pipeline(
232 controller=self.controller,
233 logging=logging,
234 in_port=in_port,
235 out_port=out_port,
236 ingress_tags=0,
237 egress_tag=EGRESS_UNTAGGED,
238 mpls_labels=2
239 )
240 # we fill the pipeline for the pw termination
241 # on the reverse path
242 (
243 port_to_mpls_label_pw_x,
244 port_to_vlan_2_x,
245 port_to_vlan_1_x,
246 port_to_switch_mac_str_x,
247 Groups2 ) = fill_pw_termination_pipeline(
248 controller=self.controller,
249 logging=logging,
250 in_port=out_port,
251 out_port=in_port,
252 egress_tags=0
253 )
254 # we generate a simple tcp packet
255 parsed_pkt = simple_tcp_packet(
256 pktlen=104,
257 )
258 pkt = str( parsed_pkt )
259 self.dataplane.send( in_port, pkt )
260 # we generate the pw packet we expect on the out port
261 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
262 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
263 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
264 cw = (0, 0, 0, 0)
265 parsed_pkt = pw_packet(
266 pktlen=134,
267 out_eth_dst=port_to_dst_mac_str[in_port],
268 out_eth_src=port_to_src_mac_str[out_port],
269 label=[label_2, label_1, label_pw],
270 cw=cw
271 )
272 pkt = str( parsed_pkt )
273 # Assertions
274 verify_packet( self, pkt, out_port )
275 verify_no_packet( self, pkt, in_port )
276 verify_no_other_packets( self )
277 delete_all_flows( self.controller )
278 delete_groups( self.controller, Groups )
279 delete_groups( self.controller, Groups2 )
280 delete_all_groups( self.controller )
281
282 finally:
283 delete_all_flows( self.controller )
284 delete_groups( self.controller, Groups )
285 delete_groups( self.controller, Groups2 )
286 delete_all_groups( self.controller )
287
288class Untagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
289 """
290 This is meant to test the PW Initiation. The traffic
291 arrives untagged to the MPLS-TP CE device, it goes out
292 tagged with a new ethernet header and 3 mpls labels.
293 """
294 def runTest( self ):
295
296 Groups = Queue.LifoQueue( )
297 Groups2 = Queue.LifoQueue( )
298 try:
299 if len( config[ "port_map" ] ) < 1:
300 logging.info( "Port count less than 1, can't run this case" )
301 assert (False)
302 return
303 ports = config[ "port_map" ].keys( )
304
305 for pair in itertools.product(ports, ports):
306 # we generate all possible products
307 in_port = pair[0]
308 out_port = pair[1]
309 if out_port == in_port:
310 continue
311 # we fill the pipeline for the pw initiation
312 (
313 port_to_mpls_label_2,
314 port_to_mpls_label_1,
315 port_to_mpls_label_pw,
316 port_to_in_vlan_3,
317 port_to_in_vlan_2,
318 port_to_in_vlan_1,
319 port_to_src_mac_str,
320 port_to_dst_mac_str,
321 Groups ) = fill_pw_initiation_pipeline(
322 controller=self.controller,
323 logging=logging,
324 in_port=in_port,
325 out_port=out_port,
326 ingress_tags=0,
327 egress_tag=EGRESS_TAGGED,
328 mpls_labels=2
329 )
330 # we fill the pipeline for the pw termination
331 # on the reverse path
332 (
333 port_to_mpls_label_pw_x,
334 port_to_vlan_2_x,
335 port_to_vlan_1_x,
336 port_to_switch_mac_str_x,
337 Groups2 ) = fill_pw_termination_pipeline(
338 controller=self.controller,
339 logging=logging,
340 in_port=out_port,
341 out_port=in_port,
342 egress_tags=0
343 )
344 # we generate a simple tcp packet
345 parsed_pkt = simple_tcp_packet(
346 pktlen=104,
347 )
348 pkt = str( parsed_pkt )
349 self.dataplane.send( in_port, pkt )
350 # we generate the pw packet we expect on the out port
351 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
352 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
353 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
354 cw = (0, 0, 0, 0)
355 parsed_pkt = pw_packet(
356 pktlen=138,
357 out_eth_dst=port_to_dst_mac_str[in_port],
358 out_eth_src=port_to_src_mac_str[out_port],
359 label=[label_2, label_1, label_pw],
360 cw=cw,
361 out_dl_vlan_enable=True,
362 out_vlan_vid=port_to_in_vlan_1[in_port],
363 )
364 pkt = str( parsed_pkt )
365 # Asserions
366 verify_packet( self, pkt, out_port )
367 verify_no_packet( self, pkt, in_port )
368 verify_no_other_packets( self )
369 delete_all_flows( self.controller )
370 delete_groups( self.controller, Groups )
371 delete_groups( self.controller, Groups2 )
372 delete_all_groups( self.controller )
373
374 finally:
375 delete_all_flows( self.controller )
376 delete_groups( self.controller, Groups )
377 delete_groups( self.controller, Groups2 )
378 delete_all_groups( self.controller )
379
380class TaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
381 """
382 This is meant to test the PW Initiation. The traffic
383 arrives tagged to the MPLS-TP CE device, it goes out
384 with the same tag, a new ethernet header and 2 mpls labels.
385 """
386 def runTest( self ):
387
388 Groups = Queue.LifoQueue( )
389 Groups2 = Queue.LifoQueue( )
390 try:
391 if len( config[ "port_map" ] ) < 1:
392 logging.info( "Port count less than 1, can't run this case" )
393 assert (False)
394 return
395 ports = config[ "port_map" ].keys( )
396 for pair in itertools.product(ports, ports):
397 # we generate all possible products
398 in_port = pair[0]
399 out_port = pair[1]
400 if out_port == in_port:
401 continue
402 # we fill the pipeline for the pw initiation
403 (
404 port_to_mpls_label_2,
405 port_to_mpls_label_1,
406 port_to_mpls_label_pw,
407 port_to_in_vlan_3,
408 port_to_in_vlan_2,
409 port_to_in_vlan_1,
410 port_to_src_mac_str,
411 port_to_dst_mac_str,
412 Groups ) = fill_pw_initiation_pipeline(
413 controller=self.controller,
414 logging=logging,
415 in_port=in_port,
416 out_port=out_port,
417 ingress_tags=1,
418 egress_tag=EGRESS_TAGGED,
419 mpls_labels=1
420 )
421 # we fill the pipeline for the pw termination
422 # on the reverse path
423 (
424 port_to_mpls_label_pw_x,
425 port_to_vlan_2_x,
426 port_to_vlan_1_x,
427 port_to_switch_mac_str_x,
428 Groups2 ) = fill_pw_termination_pipeline(
429 controller=self.controller,
430 logging=logging,
431 in_port=out_port,
432 out_port=in_port,
433 egress_tags=1
434 )
435 # we generate a simple tcp packet tagged
436 parsed_pkt = simple_tcp_packet(
437 pktlen=104,
438 dl_vlan_enable=True,
439 vlan_vid=port_to_in_vlan_1[in_port]
440 )
441 pkt = str( parsed_pkt )
442 self.dataplane.send( in_port, pkt )
443 # we generate the expected pw packet
444 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
445 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
446 cw = (0, 0, 0, 0)
447 parsed_pkt = pw_packet(
448 pktlen=130,
449 out_eth_dst=port_to_dst_mac_str[in_port],
450 out_eth_src=port_to_src_mac_str[out_port],
451 label=[label_1, label_pw],
452 cw=cw,
453 out_dl_vlan_enable=True,
454 out_vlan_vid=port_to_in_vlan_1[in_port],
455 )
456 pkt = str( parsed_pkt )
457 # Assertions
458 verify_packet( self, pkt, out_port )
459 verify_no_packet( self, pkt, in_port )
460 verify_no_other_packets( self )
461 delete_all_flows( self.controller )
462 delete_groups( self.controller, Groups )
463 delete_groups( self.controller, Groups2 )
464 delete_all_groups( self.controller )
465
466 finally:
467 delete_all_flows( self.controller )
468 delete_groups( self.controller, Groups )
469 delete_groups( self.controller, Groups2 )
470 delete_all_groups( self.controller )
471
472class Tagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
473 """
474 This is meant to test the PW Initiation. The traffic
475 arrives tagged to the MPLS-TP CE device, it goes out
476 with a different vlan, with a new ethernet header and 2 mpls labels.
477 """
478 def runTest( self ):
479
480 Groups = Queue.LifoQueue( )
481 Groups2 = Queue.LifoQueue( )
482 try:
483 if len( config[ "port_map" ] ) < 1:
484 logging.info( "Port count less than 1, can't run this case" )
485 assert (False)
486 return
487 ports = config[ "port_map" ].keys( )
488
489 for pair in itertools.product(ports, ports):
490 # we generate all possible products
491 in_port = pair[0]
492 out_port = pair[1]
493 if out_port == in_port:
494 continue
495 # we fill the pipeline for the pw initiation
496 (
497 port_to_mpls_label_2,
498 port_to_mpls_label_1,
499 port_to_mpls_label_pw,
500 port_to_in_vlan_3,
501 port_to_in_vlan_2,
502 port_to_in_vlan_1,
503 port_to_src_mac_str,
504 port_to_dst_mac_str,
505 Groups ) = fill_pw_initiation_pipeline(
506 controller=self.controller,
507 logging=logging,
508 in_port=in_port,
509 out_port=out_port,
510 ingress_tags=1,
511 egress_tag=EGRESS_TAGGED_TRANS,
512 mpls_labels=1
513 )
514 # we fill the pipeline for the pw termination
515 # on the reverse path
516 (
517 port_to_mpls_label_pw_x,
518 port_to_vlan_2_x,
519 port_to_vlan_1_x,
520 port_to_switch_mac_str_x,
521 Groups2 ) = fill_pw_termination_pipeline(
522 controller=self.controller,
523 logging=logging,
524 in_port=out_port,
525 out_port=in_port,
526 egress_tags=1
527 )
528 # we generate a simple tcp packet tagged
529 parsed_pkt = simple_tcp_packet(
530 pktlen=104,
531 dl_vlan_enable=True,
532 vlan_vid=port_to_in_vlan_1[in_port]
533 )
534 pkt = str( parsed_pkt )
535 self.dataplane.send( in_port, pkt )
536 # we generate the expected pw packet
537 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
538 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
539 cw = (0, 0, 0, 0)
540 parsed_pkt = pw_packet(
541 pktlen=130,
542 out_eth_dst=port_to_dst_mac_str[in_port],
543 out_eth_src=port_to_src_mac_str[out_port],
544 label=[label_1, label_pw],
545 cw=cw,
546 out_dl_vlan_enable=True,
547 out_vlan_vid=port_to_in_vlan_3[in_port],
548 )
549 pkt = str( parsed_pkt )
550 # Assertions
551 verify_packet( self, pkt, out_port )
552 verify_no_packet( self, pkt, in_port )
553 verify_no_other_packets( self )
554 delete_all_flows( self.controller )
555 delete_groups( self.controller, Groups )
556 delete_groups( self.controller, Groups2 )
557 delete_all_groups( self.controller )
558
559 finally:
560 delete_all_flows( self.controller )
561 delete_groups( self.controller, Groups )
562 delete_groups( self.controller, Groups2 )
563 delete_all_groups( self.controller )
564
565class TaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
566 """
567 This is meant to test the PW Initiation. The traffic
568 arrives tagged to the MPLS-TP CE device, it goes out
569 with the same vlan, with a new ethernet header and 3 mpls labels.
570 """
571 def runTest( self ):
572
573 Groups = Queue.LifoQueue( )
574 Groups2 = Queue.LifoQueue( )
575 try:
576 if len( config[ "port_map" ] ) < 1:
577 logging.info( "Port count less than 1, can't run this case" )
578 assert (False)
579 return
580 ports = config[ "port_map" ].keys( )
581
582 for pair in itertools.product(ports, ports):
583 # we generate all possible products
584 in_port = pair[0]
585 out_port = pair[1]
586 if out_port == in_port:
587 continue
588 # we fill the pipeline for the pw initiation
589 (
590 port_to_mpls_label_2,
591 port_to_mpls_label_1,
592 port_to_mpls_label_pw,
593 port_to_in_vlan_3,
594 port_to_in_vlan_2,
595 port_to_in_vlan_1,
596 port_to_src_mac_str,
597 port_to_dst_mac_str,
598 Groups ) = fill_pw_initiation_pipeline(
599 controller=self.controller,
600 logging=logging,
601 in_port=in_port,
602 out_port=out_port,
603 ingress_tags=1,
604 egress_tag=EGRESS_TAGGED,
605 mpls_labels=2
606 )
607 # we fill the pipeline for the pw termination
608 # on the reverse path
609 (
610 port_to_mpls_label_pw_x,
611 port_to_vlan_2_x,
612 port_to_vlan_1_x,
613 port_to_switch_mac_str_x,
614 Groups2 ) = fill_pw_termination_pipeline(
615 controller=self.controller,
616 logging=logging,
617 in_port=out_port,
618 out_port=in_port,
619 egress_tags=1
620 )
621 # we generate a simple tcp packet tagged
622 parsed_pkt = simple_tcp_packet(
623 pktlen=104,
624 dl_vlan_enable=True,
625 vlan_vid=port_to_in_vlan_1[in_port]
626 )
627 pkt = str( parsed_pkt )
628 self.dataplane.send( in_port, pkt )
629 # we generate the expect pw packet
630 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
631 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
632 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
633 cw = (0, 0, 0, 0)
634 parsed_pkt = pw_packet(
635 pktlen=134,
636 out_eth_dst=port_to_dst_mac_str[in_port],
637 out_eth_src=port_to_src_mac_str[out_port],
638 label=[label_2, label_1, label_pw],
639 cw=cw,
640 out_dl_vlan_enable=True,
641 out_vlan_vid=port_to_in_vlan_1[in_port],
642 )
643 pkt = str( parsed_pkt )
644 # Assertions
645 verify_packet( self, pkt, out_port )
646 verify_no_packet( self, pkt, in_port )
647 verify_no_other_packets( self )
648 delete_all_flows( self.controller )
649 delete_groups( self.controller, Groups )
650 delete_groups( self.controller, Groups2)
651 delete_all_groups( self.controller )
652
653 finally:
654 delete_all_flows( self.controller )
655 delete_groups( self.controller, Groups )
656 delete_groups( self.controller, Groups2)
657 delete_all_groups( self.controller )
658
659class Tagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
660 """
661 This is meant to test the PW Initiation. The traffic
662 arrives tagged to the MPLS-TP CE device, it goes out
663 with a different vlam, with a new ethernet header and 3 mpls labels.
664 """
665 def runTest( self ):
666
667 Groups = Queue.LifoQueue( )
668 Groups2 = Queue.LifoQueue( )
669 try:
670 if len( config[ "port_map" ] ) < 1:
671 logging.info( "Port count less than 1, can't run this case" )
672 assert (False)
673 return
674 ports = config[ "port_map" ].keys( )
675
676 for pair in itertools.product(ports, ports):
677 # we generate all possible products
678 in_port = pair[0]
679 out_port = pair[1]
680 if out_port == in_port:
681 continue
682 # we fill the pipeline for the pw initiation
683 (
684 port_to_mpls_label_2,
685 port_to_mpls_label_1,
686 port_to_mpls_label_pw,
687 port_to_in_vlan_3,
688 port_to_in_vlan_2,
689 port_to_in_vlan_1,
690 port_to_src_mac_str,
691 port_to_dst_mac_str,
692 Groups ) = fill_pw_initiation_pipeline(
693 controller=self.controller,
694 logging=logging,
695 in_port=in_port,
696 out_port=out_port,
697 ingress_tags=1,
698 egress_tag=EGRESS_TAGGED_TRANS,
699 mpls_labels=2
700 )
701 # we fill the pipeline for the pw termination
702 # on the reverse path
703 (
704 port_to_mpls_label_pw_x,
705 port_to_vlan_2_x,
706 port_to_vlan_1_x,
707 port_to_switch_mac_str_x,
708 Groups2 ) = fill_pw_termination_pipeline(
709 controller=self.controller,
710 logging=logging,
711 in_port=out_port,
712 out_port=in_port,
713 egress_tags=1
714 )
715 # we generate a simple tcp packet tagged
716 parsed_pkt = simple_tcp_packet(
717 pktlen=104,
718 dl_vlan_enable=True,
719 vlan_vid=port_to_in_vlan_1[in_port]
720 )
721 pkt = str( parsed_pkt )
722 self.dataplane.send( in_port, pkt )
723 # we generate the expect pw packet
724 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
725 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
726 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
727 cw = (0, 0, 0, 0)
728 parsed_pkt = pw_packet(
729 pktlen=134,
730 out_eth_dst=port_to_dst_mac_str[in_port],
731 out_eth_src=port_to_src_mac_str[out_port],
732 label=[label_2, label_1, label_pw],
733 cw=cw,
734 out_dl_vlan_enable=True,
735 out_vlan_vid=port_to_in_vlan_3[in_port],
736 )
737 pkt = str( parsed_pkt )
738 # Assertions
739 verify_packet( self, pkt, out_port )
740 verify_no_packet( self, pkt, in_port )
741 verify_no_other_packets( self )
742 delete_all_flows( self.controller )
743 delete_groups( self.controller, Groups )
744 delete_groups( self.controller, Groups2 )
745 delete_all_groups( self.controller )
746
747 finally:
748 delete_all_flows( self.controller )
749 delete_groups( self.controller, Groups )
750 delete_groups( self.controller, Groups2 )
751 delete_all_groups( self.controller )
752
753class DoubleTaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
754 """
755 This is meant to test the PW Initiation. The traffic
756 arrives double tagged to the MPLS-TP CE device, it goes out
757 with the same outer vlan, with a new ethernet header and 2 mpls labels.
758 """
759 def runTest( self ):
760
761 Groups = Queue.LifoQueue( )
762 Groups2 = Queue.LifoQueue( )
763 try:
764 if len( config[ "port_map" ] ) < 1:
765 logging.info( "Port count less than 1, can't run this case" )
766 assert (False)
767 return
768 ports = config[ "port_map" ].keys( )
769
770 for pair in itertools.product(ports, ports):
771 # we generate all possible products
772 in_port = pair[0]
773 out_port = pair[1]
774 if out_port == in_port:
775 continue
776 # we fill the pipeline for the pw initiation
777 (
778 port_to_mpls_label_2,
779 port_to_mpls_label_1,
780 port_to_mpls_label_pw,
781 port_to_in_vlan_3,
782 port_to_in_vlan_2,
783 port_to_in_vlan_1,
784 port_to_src_mac_str,
785 port_to_dst_mac_str,
786 Groups ) = fill_pw_initiation_pipeline(
787 controller=self.controller,
788 logging=logging,
789 in_port=in_port,
790 out_port=out_port,
791 ingress_tags=2,
792 egress_tag=EGRESS_TAGGED,
793 mpls_labels=1
794 )
795 # we fill the pipeline for the pw termination
796 # on the reverse path
797 (
798 port_to_mpls_label_pw_x,
799 port_to_vlan_2_x,
800 port_to_vlan_1_x,
801 port_to_switch_mac_str_x,
802 Groups2 ) = fill_pw_termination_pipeline(
803 controller=self.controller,
804 logging=logging,
805 in_port=out_port,
806 out_port=in_port,
807 egress_tags=2
808 )
809 # we generate a simple tcp packet with two vlans
810 parsed_pkt = simple_tcp_packet_two_vlan(
811 pktlen=108,
812 out_dl_vlan_enable=True,
813 out_vlan_vid=port_to_in_vlan_2[in_port],
814 in_dl_vlan_enable=True,
815 in_vlan_vid=port_to_in_vlan_1[in_port],
816 )
817 pkt = str( parsed_pkt )
818 self.dataplane.send( in_port, pkt )
819 # we generate the expected pw packet
820 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
821 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
822 cw = (0, 0, 0, 0)
823 parsed_pkt = pw_packet(
824 pktlen=134,
825 out_eth_dst=port_to_dst_mac_str[in_port],
826 out_eth_src=port_to_src_mac_str[out_port],
827 label=[label_1, label_pw],
828 cw=cw,
829 out_dl_vlan_enable=True,
830 out_vlan_vid=port_to_in_vlan_2[in_port],
831 in_dl_vlan_enable=True,
832 in_vlan_vid=port_to_in_vlan_1[in_port],
833 )
834 pkt = str( parsed_pkt )
835 # Assertions
836 verify_packet( self, pkt, out_port )
837 verify_no_packet( self, pkt, in_port )
838 verify_no_other_packets( self )
839 delete_all_flows( self.controller )
840 delete_groups( self.controller, Groups )
841 delete_groups( self.controller, Groups2 )
842 delete_all_groups( self.controller )
843
844 finally:
845 delete_all_flows( self.controller )
846 delete_groups( self.controller, Groups )
847 delete_groups( self.controller, Groups2 )
848 delete_all_groups( self.controller )
849
850class DoubleTagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
851 """
852 This is meant to test the PW Initiation. The traffic
853 arrives double tagged to the MPLS-TP CE device and goes out
854 with a new ethernet header and 2 mpls labels.
855 """
856 def runTest( self ):
857
858 Groups = Queue.LifoQueue( )
859 Groups2 = Queue.LifoQueue( )
860 try:
861 if len( config[ "port_map" ] ) < 1:
862 logging.info( "Port count less than 1, can't run this case" )
863 assert (False)
864 return
865 ports = config[ "port_map" ].keys( )
866
867 for pair in itertools.product(ports, ports):
868 # we generate all possible products
869 in_port = pair[0]
870 out_port = pair[1]
871 if out_port == in_port:
872 continue
873 # we fill the pipeline for the pw initiation
874 (
875 port_to_mpls_label_2,
876 port_to_mpls_label_1,
877 port_to_mpls_label_pw,
878 port_to_in_vlan_3,
879 port_to_in_vlan_2,
880 port_to_in_vlan_1,
881 port_to_src_mac_str,
882 port_to_dst_mac_str,
883 Groups ) = fill_pw_initiation_pipeline(
884 controller=self.controller,
885 logging=logging,
886 in_port=in_port,
887 out_port=out_port,
888 ingress_tags=2,
889 egress_tag=EGRESS_TAGGED_TRANS,
890 mpls_labels=1
891 )
892 # we fill the pipeline for the pw termination
893 # on the reverse path
894 (
895 port_to_mpls_label_pw_x,
896 port_to_vlan_2_x,
897 port_to_vlan_1_x,
898 port_to_switch_mac_str_x,
899 Groups2 ) = fill_pw_termination_pipeline(
900 controller=self.controller,
901 logging=logging,
902 in_port=out_port,
903 out_port=in_port,
904 egress_tags=2
905 )
906 # we generate a simple tcp packet with two vlans
907 parsed_pkt = simple_tcp_packet_two_vlan(
908 pktlen=108,
909 out_dl_vlan_enable=True,
910 out_vlan_vid=port_to_in_vlan_2[in_port],
911 in_dl_vlan_enable=True,
912 in_vlan_vid=port_to_in_vlan_1[in_port],
913 )
914 pkt = str( parsed_pkt )
915 self.dataplane.send( in_port, pkt )
916 # we generate the expected pw packet
917 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
918 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
919 cw = (0, 0, 0, 0)
920 parsed_pkt = pw_packet(
921 pktlen=134,
922 out_eth_dst=port_to_dst_mac_str[in_port],
923 out_eth_src=port_to_src_mac_str[out_port],
924 label=[label_1, label_pw],
925 cw=cw,
926 out_dl_vlan_enable=True,
927 out_vlan_vid=port_to_in_vlan_3[in_port],
928 in_dl_vlan_enable=True,
929 in_vlan_vid=port_to_in_vlan_1[in_port],
930 )
931 pkt = str( parsed_pkt )
932 # Assertions
933 verify_packet( self, pkt, out_port )
934 verify_no_packet( self, pkt, in_port )
935 verify_no_other_packets( self )
936 delete_all_flows( self.controller )
937 delete_groups( self.controller, Groups )
938 delete_groups( self.controller, Groups2 )
939 delete_all_groups( self.controller )
940
941 finally:
942 delete_all_flows( self.controller )
943 delete_groups( self.controller, Groups )
944 delete_groups( self.controller, Groups2 )
945 delete_all_groups( self.controller )
946
947class DoubleTaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
948 """
949 This is meant to test the PW Initiation. The traffic
950 arrives double tagged to the MPLS-TP CE device and goes out
951 with a new ethernet header and 3 mpls labels.
952 """
953 def runTest( self ):
954
955 Groups = Queue.LifoQueue( )
956 Groups2 = Queue.LifoQueue( )
957 try:
958 if len( config[ "port_map" ] ) < 1:
959 logging.info( "Port count less than 1, can't run this case" )
960 assert (False)
961 return
962 ports = config[ "port_map" ].keys( )
963
964 for pair in itertools.product(ports, ports):
965 # we generate all possible products
966 in_port = pair[0]
967 out_port = pair[1]
968 if out_port == in_port:
969 continue
970 # we fill the pipeline for the pw initiation
971 (
972 port_to_mpls_label_2,
973 port_to_mpls_label_1,
974 port_to_mpls_label_pw,
975 port_to_in_vlan_3,
976 port_to_in_vlan_2,
977 port_to_in_vlan_1,
978 port_to_src_mac_str,
979 port_to_dst_mac_str,
980 Groups ) = fill_pw_initiation_pipeline(
981 controller=self.controller,
982 logging=logging,
983 in_port=in_port,
984 out_port=out_port,
985 ingress_tags=2,
986 egress_tag=EGRESS_TAGGED,
987 mpls_labels=2
988 )
989 # we fill the pipeline for the pw termination
990 # on the reverse path
991 (
992 port_to_mpls_label_pw_x,
993 port_to_vlan_2_x,
994 port_to_vlan_1_x,
995 port_to_switch_mac_str_x,
996 Groups2 ) = fill_pw_termination_pipeline(
997 controller=self.controller,
998 logging=logging,
999 in_port=out_port,
1000 out_port=in_port,
1001 egress_tags=2
1002 )
1003 # we generate a simple tcp packet with two wlan
1004 parsed_pkt = simple_tcp_packet_two_vlan(
1005 pktlen=108,
1006 out_dl_vlan_enable=True,
1007 out_vlan_vid=port_to_in_vlan_2[in_port],
1008 in_dl_vlan_enable=True,
1009 in_vlan_vid=port_to_in_vlan_1[in_port],
1010 )
1011 pkt = str( parsed_pkt )
1012 self.dataplane.send( in_port, pkt )
1013 # we generate the expected pw packet
1014 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
1015 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
1016 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
1017 cw = (0, 0, 0, 0)
1018 parsed_pkt = pw_packet(
1019 pktlen=138,
1020 out_eth_dst=port_to_dst_mac_str[in_port],
1021 out_eth_src=port_to_src_mac_str[out_port],
1022 label=[label_2, label_1, label_pw],
1023 cw=cw,
1024 out_dl_vlan_enable=True,
1025 out_vlan_vid=port_to_in_vlan_2[in_port],
1026 in_dl_vlan_enable=True,
1027 in_vlan_vid=port_to_in_vlan_1[in_port],
1028 )
1029 pkt = str( parsed_pkt )
1030 # Assertions
1031 verify_packet( self, pkt, out_port )
1032 verify_no_packet( self, pkt, in_port )
1033 verify_no_other_packets( self )
1034 delete_all_flows( self.controller )
1035 delete_groups( self.controller, Groups )
1036 delete_groups( self.controller, Groups2 )
1037 delete_all_groups( self.controller )
1038
1039 finally:
1040 delete_all_flows( self.controller )
1041 delete_groups( self.controller, Groups )
1042 delete_groups( self.controller, Groups2 )
1043 delete_all_groups( self.controller )
1044
1045class DoubleTagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
1046 """
1047 This is meant to test the PW Initiation. The traffic
1048 arrives double tagged to the MPLS-TP CE device and goes out
1049 with a new ethernet header and 3 mpls labels.
1050 """
1051 def runTest( self ):
1052
1053 Groups = Queue.LifoQueue( )
1054 Groups2 = Queue.LifoQueue( )
1055 try:
1056 if len( config[ "port_map" ] ) < 1:
1057 logging.info( "Port count less than 1, can't run this case" )
1058 assert (False)
1059 return
1060 ports = config[ "port_map" ].keys( )
1061
1062 for pair in itertools.product(ports, ports):
1063 # we generate all possible products
1064 in_port = pair[0]
1065 out_port = pair[1]
1066 if out_port == in_port:
1067 continue
1068 # we fill the pipeline for the pw initiation
1069 (
1070 port_to_mpls_label_2,
1071 port_to_mpls_label_1,
1072 port_to_mpls_label_pw,
1073 port_to_in_vlan_3,
1074 port_to_in_vlan_2,
1075 port_to_in_vlan_1,
1076 port_to_src_mac_str,
1077 port_to_dst_mac_str,
1078 Groups ) = fill_pw_initiation_pipeline(
1079 controller=self.controller,
1080 logging=logging,
1081 in_port=in_port,
1082 out_port=out_port,
1083 ingress_tags=2,
1084 egress_tag=EGRESS_TAGGED_TRANS,
1085 mpls_labels=2
1086 )
1087 # we fill the pipeline for the pw termination
1088 # on the reverse path
1089 (
1090 port_to_mpls_label_pw_x,
1091 port_to_vlan_2_x,
1092 port_to_vlan_1_x,
1093 port_to_switch_mac_str_x,
1094 Groups2 ) = fill_pw_termination_pipeline(
1095 controller=self.controller,
1096 logging=logging,
1097 in_port=out_port,
1098 out_port=in_port,
1099 egress_tags=2
1100 )
1101 # we generate a simple tcp packet with two wlan
1102 parsed_pkt = simple_tcp_packet_two_vlan(
1103 pktlen=108,
1104 out_dl_vlan_enable=True,
1105 out_vlan_vid=port_to_in_vlan_2[in_port],
1106 in_dl_vlan_enable=True,
1107 in_vlan_vid=port_to_in_vlan_1[in_port],
1108 )
1109 pkt = str( parsed_pkt )
1110 self.dataplane.send( in_port, pkt )
1111 # we generate the expected pw packet
1112 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
1113 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
1114 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
1115 cw = (0, 0, 0, 0)
1116 parsed_pkt = pw_packet(
1117 pktlen=138,
1118 out_eth_dst=port_to_dst_mac_str[in_port],
1119 out_eth_src=port_to_src_mac_str[out_port],
1120 label=[label_2, label_1, label_pw],
1121 cw=cw,
1122 out_dl_vlan_enable=True,
1123 out_vlan_vid=port_to_in_vlan_3[in_port],
1124 in_dl_vlan_enable=True,
1125 in_vlan_vid=port_to_in_vlan_1[in_port],
1126 )
1127 pkt = str( parsed_pkt )
1128 # Assertions
1129 verify_packet( self, pkt, out_port )
1130 verify_no_packet( self, pkt, in_port )
1131 verify_no_other_packets( self )
1132 delete_all_flows( self.controller )
1133 delete_groups( self.controller, Groups )
1134 delete_groups( self.controller, Groups2 )
1135 delete_all_groups( self.controller )
1136
1137 finally:
1138 delete_all_flows( self.controller )
1139 delete_groups( self.controller, Groups )
1140 delete_groups( self.controller, Groups2 )
1141 delete_all_groups( self.controller )
1142
1143class IntraCO_2_Labels( base_tests.SimpleDataPlane ):
1144 """
1145 This is meant to test the PW intermediate transport.
1146 Incoming packet has 2 labels (SR/PW) (intermediate leaf switch).
1147 There is no VLAN tag in the incoming packet. Pop outer MPLS label
1148 """
1149 def runTest( self ):
1150
1151 Groups = Queue.LifoQueue( )
1152 try:
1153 if len( config[ "port_map" ] ) < 1:
1154 logging.info( "Port count less than 1, can't run this case" )
1155 assert (False)
1156 return
1157 ports = config[ "port_map" ].keys( )
1158 # we fill the pw pipeline for the intermediate transport
1159 (
1160 port_to_mpls_label_2,
1161 port_to_mpls_label_1,
1162 port_to_mpls_label_pw,
1163 port_to_switch_mac_str,
1164 port_to_src_mac_str,
1165 port_to_dst_mac_str,
1166 Groups
1167 ) = fill_pw_intermediate_transport_pipeline(
1168 self.controller,
1169 logging,
1170 ports,
1171 mpls_labels=3
1172 )
1173
1174 for pair in itertools.product(ports, ports):
1175 # we generate all possible products
1176 in_port = pair[0]
1177 out_port = pair[1]
1178 if out_port == in_port:
1179 continue
1180 # we geneate the pw packet
1181 label_1 = (port_to_mpls_label_2[in_port], 0, 0, 32)
1182 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
1183 parsed_pkt = mpls_packet(
1184 pktlen=104,
1185 ip_ttl=63,
1186 eth_dst=port_to_switch_mac_str[in_port],
1187 label=[ label_1, label_pw ]
1188 )
1189 pkt = str( parsed_pkt )
1190 self.dataplane.send( in_port, pkt )
1191 # we geneate the expected pw packet
1192 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 31)
1193 parsed_pkt = mpls_packet(
1194 pktlen=100,
1195 ip_ttl=63,
1196 eth_dst=port_to_dst_mac_str[in_port],
1197 eth_src=port_to_src_mac_str[out_port],
1198 label=[ label_pw ]
1199 )
1200 pkt = str( parsed_pkt )
1201 # Assertions
1202 verify_packet( self, pkt, out_port )
1203 verify_no_packet( self, pkt, in_port )
1204 verify_no_other_packets( self )
1205
1206 finally:
1207 delete_all_flows( self.controller )
1208 delete_groups( self.controller, Groups )
1209 delete_all_groups( self.controller )
1210
1211class IntraCO_3_Labels( base_tests.SimpleDataPlane ):
1212 """
1213 This is meant to test the PW intermediate transport.
1214 Incoming packet has 3 labels (SR/SR/PW) (spine switch).
1215 There is no VLAN tag in the incoming packet. Pop outer MPLS label
1216 """
1217 def runTest( self ):
1218
1219 Groups = Queue.LifoQueue( )
1220 try:
1221 if len( config[ "port_map" ] ) < 1:
1222 logging.info( "Port count less than 1, can't run this case" )
1223 assert (False)
1224 return
1225 ports = config[ "port_map" ].keys( )
1226 # we fill the pipeline for the intermediate transport
1227 (
1228 port_to_mpls_label_2,
1229 port_to_mpls_label_1,
1230 port_to_mpls_label_pw,
1231 port_to_switch_mac_str,
1232 port_to_src_mac_str,
1233 port_to_dst_mac_str,
1234 Groups
1235 ) = fill_pw_intermediate_transport_pipeline(
1236 self.controller,
1237 logging,
1238 ports,
1239 mpls_labels=3
1240 )
1241 for pair in itertools.product(ports, ports):
1242 # we generate all possible products
1243 in_port = pair[0]
1244 out_port = pair[1]
1245 if out_port == in_port:
1246 continue
1247 # we generate the pw packet
1248 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 32)
1249 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 32)
1250 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
1251 parsed_pkt = mpls_packet(
1252 pktlen=104,
1253 ip_ttl=63,
1254 eth_dst=port_to_switch_mac_str[in_port],
1255 label=[ label_2, label_1, label_pw ]
1256 )
1257 pkt = str( parsed_pkt )
1258 self.dataplane.send( in_port, pkt )
1259 # we generate the expected pw packet
1260 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 31)
1261 parsed_pkt = mpls_packet(
1262 pktlen=100,
1263 ip_ttl=63,
1264 eth_dst=port_to_dst_mac_str[in_port],
1265 eth_src=port_to_src_mac_str[out_port],
1266 label=[ label_1, label_pw ]
1267 )
1268 pkt = str( parsed_pkt )
1269 # Assertions
1270 verify_packet( self, pkt, out_port )
1271 verify_no_packet( self, pkt, in_port )
1272 verify_no_other_packets( self )
1273
1274 finally:
1275 delete_all_flows( self.controller )
1276 delete_groups( self.controller, Groups )
1277 delete_all_groups( self.controller )
1278
1279class InterCO( base_tests.SimpleDataPlane ):
1280 """
1281 This is meant to test the PW intermediate transport.
1282 Incoming packet has 1 labels (PW) (Intermediate CO leaf switch).
1283 There is no VLAN tag in the incoming packet. Push up to 2 MPLS labels
1284 """
1285 def runTest( self ):
1286
1287 Groups = Queue.LifoQueue( )
1288 try:
1289 if len( config[ "port_map" ] ) < 1:
1290 logging.info( "Port count less than 1, can't run this case" )
1291 assert (False)
1292 return
1293 ports = config[ "port_map" ].keys( )
1294 # we fill the pipeline for the intermediate transport
1295 (
1296 port_to_mpls_label_2,
1297 port_to_mpls_label_1,
1298 port_to_mpls_label_pw,
1299 port_to_switch_mac_str,
1300 port_to_src_mac_str,
1301 port_to_dst_mac_str,
1302 Groups
1303 ) = fill_pw_intermediate_transport_pipeline(
1304 self.controller,
1305 logging,
1306 ports,
1307 mpls_labels=1
1308 )
1309 for pair in itertools.product(ports, ports):
1310 # we generate all possible products
1311 in_port = pair[0]
1312 out_port = pair[1]
1313 if out_port == in_port:
1314 continue
1315 # we generate the pw packet
1316 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
1317 parsed_pkt = mpls_packet(
1318 pktlen=104,
1319 ip_ttl=63,
1320 eth_dst=port_to_switch_mac_str[in_port],
1321 label=[ label_pw ]
1322 )
1323 pkt = str( parsed_pkt )
1324 self.dataplane.send( in_port, pkt )
1325 # we generate the expected pw packet
1326 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 31)
1327 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 31)
1328 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 31)
1329 parsed_pkt = mpls_packet(
1330 pktlen=112,
1331 ip_ttl=63,
1332 eth_dst=port_to_dst_mac_str[in_port],
1333 eth_src=port_to_src_mac_str[out_port],
1334 label=[ label_2, label_1, label_pw ]
1335 )
1336 pkt = str( parsed_pkt )
1337 # Assertions
1338 verify_packet( self, pkt, out_port )
1339 verify_no_packet( self, pkt, in_port )
1340 verify_no_other_packets( self )
1341
1342 finally:
1343 delete_all_flows( self.controller )
1344 delete_groups( self.controller, Groups )
1345 delete_all_groups( self.controller )
1346
1347class UntaggedPWTermination( base_tests.SimpleDataPlane ):
1348 """
1349 This is meant to test the PW Termination. The traffic
1350 arrives untagged to the MPLS-TP CE device and goes out
1351 without the outer ethernet header and untagged.
1352 """
1353 def runTest( self ):
1354
1355 Groups = Queue.LifoQueue( )
1356 Groups2 = Queue.LifoQueue( )
1357 try:
1358 if len( config[ "port_map" ] ) < 1:
1359 logging.info( "Port count less than 1, can't run this case" )
1360 assert (False)
1361 return
1362 ports = config[ "port_map" ].keys( )
1363 for pair in itertools.product(ports, ports):
1364 # we generate all possible products
1365 in_port = pair[0]
1366 out_port = pair[1]
1367 if out_port == in_port:
1368 continue
1369 # we fill the pipeline for the pw initiation
1370 # on the reverse path
1371 (
1372 port_to_mpls_label_2,
1373 port_to_mpls_label_1,
1374 port_to_mpls_label_pw,
1375 port_to_in_vlan_3,
1376 port_to_in_vlan_2,
1377 port_to_in_vlan_1,
1378 port_to_src_mac_str,
1379 port_to_dst_mac_str,
1380 Groups ) = fill_pw_initiation_pipeline(
1381 controller=self.controller,
1382 logging=logging,
1383 in_port=out_port,
1384 out_port=in_port,
1385 ingress_tags=0,
1386 egress_tag=EGRESS_UNTAGGED,
1387 mpls_labels=1
1388 )
1389 # we fill the pipeline for the pw termination
1390 (
1391 port_to_mpls_label_pw,
1392 port_to_in_vlan_2,
1393 port_to_in_vlan_1,
1394 port_to_dst_mac_str,
1395 Groups2 ) = fill_pw_termination_pipeline(
1396 controller=self.controller,
1397 logging=logging,
1398 in_port=in_port,
1399 out_port=out_port,
1400 egress_tags=0
1401 )
1402 # we generate the pw packet
1403 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1404 cw = (0, 0, 0, 0)
1405 parsed_pkt = pw_packet(
1406 pktlen=104,
1407 out_eth_dst=port_to_dst_mac_str[in_port],
1408 label=[label_pw],
1409 cw=cw
1410 )
1411 pkt = str( parsed_pkt )
1412 self.dataplane.send( in_port, pkt )
1413 # we generate the expected tcp packet
1414 parsed_pkt = simple_tcp_packet(
1415 pktlen=82,
1416 )
1417 pkt = str( parsed_pkt )
1418 # Assertions
1419 verify_packet( self, pkt, out_port )
1420 verify_no_packet( self, pkt, in_port )
1421 verify_no_other_packets( self )
1422 delete_all_flows( self.controller )
1423 delete_groups( self.controller, Groups )
1424 delete_groups( self.controller, Groups2 )
1425 delete_all_groups( self.controller )
1426 finally:
1427 delete_all_flows( self.controller )
1428 delete_groups( self.controller, Groups )
1429 delete_groups( self.controller, Groups2 )
1430 delete_all_groups( self.controller )
1431
1432class Untagged2PWTermination( base_tests.SimpleDataPlane ):
1433 """
1434 This is meant to test the PW Termination. The traffic
1435 arrives untagged to the MPLS-TP CE device and goes out
1436 without the outer ethernet header and untagged
1437 but was originally tagged.
1438 """
1439 def runTest( self ):
1440
1441 Groups = Queue.LifoQueue( )
1442 Groups2 = Queue.LifoQueue( )
1443 try:
1444 if len( config[ "port_map" ] ) < 1:
1445 logging.info( "Port count less than 1, can't run this case" )
1446 assert (False)
1447 return
1448 ports = config[ "port_map" ].keys( )
1449 for pair in itertools.product(ports, ports):
1450 # we generate all possible products
1451 in_port = pair[0]
1452 out_port = pair[1]
1453 if out_port == in_port:
1454 continue
1455 # we fill the pipeline for the pw initiation
1456 # on the reverse path
1457 (
1458 port_to_mpls_label_2,
1459 port_to_mpls_label_1,
1460 port_to_mpls_label_pw,
1461 port_to_in_vlan_3,
1462 port_to_in_vlan_2,
1463 port_to_in_vlan_1,
1464 port_to_src_mac_str,
1465 port_to_dst_mac_str,
1466 Groups ) = fill_pw_initiation_pipeline(
1467 controller=self.controller,
1468 logging=logging,
1469 in_port=out_port,
1470 out_port=in_port,
1471 ingress_tags=0,
1472 egress_tag=EGRESS_TAGGED,
1473 mpls_labels=1
1474 )
1475 # we fill the pipeline for the pw termination
1476 (
1477 port_to_mpls_label_pw,
1478 port_to_in_vlan_2,
1479 port_to_in_vlan_1,
1480 port_to_dst_mac_str,
1481 Groups2 ) = fill_pw_termination_pipeline(
1482 controller=self.controller,
1483 logging=logging,
1484 in_port=in_port,
1485 out_port=out_port,
1486 egress_tags=0
1487 )
1488 # we generate the pw packet
1489 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1490 cw = (0, 0, 0, 0)
1491 parsed_pkt = pw_packet(
1492 pktlen=104,
1493 out_eth_dst=port_to_dst_mac_str[in_port],
1494 label=[label_pw],
1495 cw=cw,
1496 out_dl_vlan_enable=True,
1497 out_vlan_vid=port_to_in_vlan_1[out_port],
1498 )
1499 pkt = str( parsed_pkt )
1500 self.dataplane.send( in_port, pkt )
1501 # we generate the expected tcp packet
1502 parsed_pkt = simple_tcp_packet(
1503 pktlen=78,
1504 )
1505 pkt = str( parsed_pkt )
1506 # Assertions
1507 verify_packet( self, pkt, out_port )
1508 verify_no_packet( self, pkt, in_port )
1509 verify_no_other_packets( self )
1510 delete_all_flows( self.controller )
1511 delete_groups( self.controller, Groups )
1512 delete_groups( self.controller, Groups2 )
1513 delete_all_groups( self.controller )
1514 finally:
1515 delete_all_flows( self.controller )
1516 delete_groups( self.controller, Groups )
1517 delete_groups( self.controller, Groups2 )
1518 delete_all_groups( self.controller )
1519
1520class TaggedPWTermination( base_tests.SimpleDataPlane ):
1521 """
1522 This is meant to test the PW Termination. The traffic
1523 arrives untagged to the MPLS-TP CE device and goes out
1524 without the outer ethernet header and with a vlan tag.
1525 """
1526 def runTest( self ):
1527
1528 Groups = Queue.LifoQueue( )
1529 Groups2 = Queue.LifoQueue( )
1530 try:
1531 if len( config[ "port_map" ] ) < 1:
1532 logging.info( "Port count less than 1, can't run this case" )
1533 assert (False)
1534 return
1535 ports = config[ "port_map" ].keys( )
1536 for pair in itertools.product(ports, ports):
1537 # we generate all possible products
1538 in_port = pair[0]
1539 out_port = pair[1]
1540 if out_port == in_port:
1541 continue
1542 # we fill the pipeline for the pw initiation
1543 # on the reverse path
1544 (
1545 port_to_mpls_label_2,
1546 port_to_mpls_label_1,
1547 port_to_mpls_label_pw,
1548 port_to_in_vlan_3,
1549 port_to_in_vlan_2,
1550 port_to_in_vlan_1,
1551 port_to_src_mac_str,
1552 port_to_dst_mac_str,
1553 Groups ) = fill_pw_initiation_pipeline(
1554 controller=self.controller,
1555 logging=logging,
1556 in_port=out_port,
1557 out_port=in_port,
1558 ingress_tags=1,
1559 egress_tag=EGRESS_TAGGED,
1560 mpls_labels=1
1561 )
1562 # we fill the pipeline for the pw termination
1563 (
1564 port_to_mpls_label_pw,
1565 port_to_in_vlan_2,
1566 port_to_in_vlan_1,
1567 port_to_dst_mac_str,
1568 Groups2 ) = fill_pw_termination_pipeline(
1569 controller=self.controller,
1570 logging=logging,
1571 in_port=in_port,
1572 out_port=out_port,
1573 egress_tags=1
1574 )
1575 # we generate the pw packet
1576 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1577 cw = (0, 0, 0, 0)
1578 parsed_pkt = pw_packet(
1579 pktlen=104,
1580 out_eth_dst=port_to_dst_mac_str[in_port],
1581 label=[label_pw],
1582 cw=cw,
1583 out_dl_vlan_enable=True,
1584 out_vlan_vid=port_to_in_vlan_1[out_port],
1585 )
1586 pkt = str( parsed_pkt )
1587 self.dataplane.send( in_port, pkt )
1588 # we generate the expected tcp packet
1589 # with a vlan tag
1590 parsed_pkt = simple_tcp_packet(
1591 pktlen=82,
1592 dl_vlan_enable=True,
1593 vlan_vid=port_to_in_vlan_1[out_port]
1594 )
1595 pkt = str( parsed_pkt )
1596 # Assertions
1597 verify_packet( self, pkt, out_port )
1598 verify_no_packet( self, pkt, in_port )
1599 verify_no_other_packets( self )
1600 delete_all_flows( self.controller )
1601 delete_groups( self.controller, Groups )
1602 delete_groups( self.controller, Groups2 )
1603 delete_all_groups( self.controller )
1604 finally:
1605 delete_all_flows( self.controller )
1606 delete_groups( self.controller, Groups )
1607 delete_groups( self.controller, Groups2 )
1608 delete_all_groups( self.controller )
1609
1610class DoubleTaggedPWTermination( base_tests.SimpleDataPlane ):
1611 """
1612 This is meant to test the PW Termination. The traffic
1613 arrives untagged to the MPLS-TP CE device and goes out
1614 without the outer ethernet header and 2 vlan tags.
1615 """
1616 def runTest( self ):
1617
1618 Groups = Queue.LifoQueue( )
1619 Groups2 = Queue.LifoQueue( )
1620 try:
1621 if len( config[ "port_map" ] ) < 1:
1622 logging.info( "Port count less than 1, can't run this case" )
1623 assert (False)
1624 return
1625 ports = config[ "port_map" ].keys( )
1626 for pair in itertools.product(ports, ports):
1627 # we generate all possible products
1628 in_port = pair[0]
1629 out_port = pair[1]
1630 if out_port == in_port:
1631 continue
1632 # we fill the pipeline for the pw initiation
1633 # on the reverse path
1634 (
1635 port_to_mpls_label_2,
1636 port_to_mpls_label_1,
1637 port_to_mpls_label_pw,
1638 port_to_in_vlan_3,
1639 port_to_in_vlan_2,
1640 port_to_in_vlan_1,
1641 port_to_src_mac_str,
1642 port_to_dst_mac_str,
1643 Groups ) = fill_pw_initiation_pipeline(
1644 controller=self.controller,
1645 logging=logging,
1646 in_port=out_port,
1647 out_port=in_port,
1648 ingress_tags=2,
1649 egress_tag = EGRESS_TAGGED,
1650 mpls_labels=1
1651 )
1652 # we fill the pipeline for the pw termination
1653 (
1654 port_to_mpls_label_pw,
1655 port_to_in_vlan_2,
1656 port_to_in_vlan_1,
1657 port_to_dst_mac_str,
1658 Groups2 ) = fill_pw_termination_pipeline(
1659 controller=self.controller,
1660 logging=logging,
1661 in_port=in_port,
1662 out_port=out_port,
1663 egress_tags=2
1664 )
1665 # we generate the pw packet
1666 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1667 cw = (0, 0, 0, 0)
1668 parsed_pkt = pw_packet(
1669 pktlen=104,
1670 out_eth_dst=port_to_dst_mac_str[in_port],
1671 label=[label_pw],
1672 cw=cw,
1673 out_dl_vlan_enable=True,
1674 out_vlan_vid=port_to_in_vlan_2[out_port],
1675 in_dl_vlan_enable=True,
1676 in_vlan_vid=port_to_in_vlan_1[out_port]
1677 )
1678 pkt = str( parsed_pkt )
1679 self.dataplane.send( in_port, pkt )
1680 # we generate the expected tcp
1681 # packet with two vlan tags
1682 parsed_pkt = simple_tcp_packet_two_vlan(
1683 pktlen=82,
1684 out_dl_vlan_enable=True,
1685 out_vlan_vid=port_to_in_vlan_2[out_port],
1686 in_dl_vlan_enable=True,
1687 in_vlan_vid=port_to_in_vlan_1[out_port]
1688 )
1689 pkt = str( parsed_pkt )
1690 # Assertions
1691 verify_packet( self, pkt, out_port )
1692 verify_no_packet( self, pkt, in_port )
1693 verify_no_other_packets( self )
1694 delete_all_flows( self.controller )
1695 delete_groups( self.controller, Groups )
1696 delete_groups( self.controller, Groups2 )
1697 delete_all_groups( self.controller )
1698 finally:
1699 delete_all_flows( self.controller )
1700 delete_groups( self.controller, Groups )
1701 delete_groups( self.controller, Groups2 )
1702 delete_all_groups( self.controller )
Pier265ad5f2017-02-28 17:46:28 +01001703
1704class BoSBug( base_tests.SimpleDataPlane ):
1705 """
1706 This test is meant to verify the forwarding of the default traffic
1707 when the rule for the PW transport (BoS=0) has been installed in the
1708 switch, together with the rule for the transport of default routing
1709 traffic. There is a bug in OFDPA 3.0EA4, which requires BOS=0 flow
1710 to be installed before BOS=1 flow to generate correct packets. Incoming
1711 packet has 1 label, and there is no VLAN tag in the incoming packet.
1712 The expected behvior is the Pop of the outer MPLS label and plain IP
1713 packet should exit from the switch.
1714 """
1715 def runTest( self ):
1716 Groups = Queue.LifoQueue( )
1717 try:
1718 if len( config[ "port_map" ] ) < 1:
1719 logging.info( "Port count less than 1, can't run this case" )
1720 assert (False)
1721 return
1722 ports = config[ "port_map" ].keys( )
1723 in_port = ports[0]
1724 out_port = ports[1]
1725 out_vlan = 4094
1726 src_mac = [ 0x00, 0x00, 0x00, 0x00, 0x11, 0x01 ]
1727 src_mac_str = ':'.join( [ '%02X' % x for x in src_mac ] )
1728 dst_mac = [ 0x00, 0x00, 0x00, 0x11, 0x11, 0x01 ]
1729 dst_mac_str = ':'.join( [ '%02X' % x for x in dst_mac ] )
1730 mpls_label = 100
1731 # Add l2 interface group, we have to pop the VLAN;
1732 l2_intf_gid, l2_intf_msg = add_one_l2_interface_group(
1733 ctrl=self.controller,
1734 port=out_port,
1735 vlan_id=out_vlan,
1736 is_tagged=False,
1737 send_barrier=False
1738 )
1739 Groups._put( l2_intf_gid )
1740 # add MPLS interface group
1741 mpls_intf_gid, mpls_intf_msg = add_mpls_intf_group(
1742 ctrl=self.controller,
1743 ref_gid=l2_intf_gid,
1744 dst_mac=dst_mac,
1745 src_mac=src_mac,
1746 vid=out_vlan,
1747 index=in_port
1748 )
1749 Groups._put( mpls_intf_gid )
1750 # Add L3 Unicast group
1751 l3_msg = add_l3_unicast_group(
1752 ctrl=self.controller,
1753 port=out_port,
1754 vlanid=out_vlan,
1755 id=in_port,
1756 src_mac=src_mac,
1757 dst_mac=dst_mac
1758 )
1759 Groups._put( l3_msg.group_id )
1760 # Add L3 ecmp group
1761 ecmp_msg = add_l3_ecmp_group(
1762 ctrl=self.controller,
1763 id=in_port,
1764 l3_ucast_groups=[ l3_msg.group_id ]
1765 )
1766 Groups._put( ecmp_msg.group_id )
1767 # Add MPLS flow with BoS=1
1768 add_mpls_flow(
1769 ctrl=self.controller,
1770 action_group_id=ecmp_msg.group_id,
1771 label=mpls_label
1772 )
1773 # add MPLS flow with BoS=0
1774 add_mpls_flow_pw(
1775 ctrl=self.controller,
1776 action_group_id=mpls_intf_gid,
1777 label=mpls_label,
1778 ethertype=0x8847,
1779 tunnel_index=1,
1780 bos=0
1781 )
1782 # add Termination flow
1783 add_termination_flow(
1784 ctrl=self.controller,
1785 in_port=in_port,
1786 eth_type=0x8847,
1787 dst_mac=src_mac,
1788 vlanid=out_vlan,
1789 goto_table=23
1790 )
1791 # add VLAN flows
1792 add_one_vlan_table_flow(
1793 ctrl=self.controller,
1794 of_port=in_port,
1795 vlan_id=out_vlan,
1796 flag=VLAN_TABLE_FLAG_ONLY_TAG,
1797 )
1798 add_one_vlan_table_flow(
1799 ctrl=self.controller,
1800 of_port=in_port,
1801 vlan_id=out_vlan,
1802 flag=VLAN_TABLE_FLAG_ONLY_UNTAG
1803 )
1804 # Packet generation with sleep
1805 time.sleep(2)
1806 label = (mpls_label, 0, 1, 32)
1807 parsed_pkt = mpls_packet(
1808 pktlen=104,
1809 vlan_vid=out_vlan,
1810 ip_ttl=63,
1811 eth_dst=src_mac_str,
1812 label=[ label]
1813 )
1814 pkt = str( parsed_pkt )
1815 self.dataplane.send( in_port, pkt )
1816 # we geneate the expected pw packet
1817 parsed_pkt = simple_tcp_packet(
1818 pktlen=100,
1819 vlan_vid=out_vlan,
1820 ip_ttl=31,
1821 eth_dst=dst_mac_str,
1822 eth_src=src_mac_str,
1823 )
1824 pkt = str( parsed_pkt )
1825 # Assertions
1826 verify_packet( self, pkt, out_port )
1827 verify_no_packet( self, pkt, in_port )
1828 verify_no_other_packets( self )
1829 finally:
1830 delete_all_flows( self.controller )
1831 delete_groups( self.controller, Groups )
1832 delete_all_groups( self.controller )