blob: 564ca56567dd9c0e440167e14847dabe387bda70 [file] [log] [blame]
Pier23784aa2016-09-19 20:08:21 -07001"""
2Check README file
3"""
4import Queue
5
6from oftest import config
7import inspect
8import logging
9import oftest.base_tests as base_tests
10import ofp
11from oftest.testutils import *
12from accton_util import *
13from utils import *
14
15class UntaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
16 """
17 This is meant to test the PW Initiation. The traffic
18 arrives untagged to the MPLS-TP CE device, it goes out
19 untagged, with a new ethernet header and 2 mpls labels.
20 """
21 def runTest( self ):
22
23 Groups = Queue.LifoQueue( )
24 Groups2 = Queue.LifoQueue( )
25 try:
26 if len( config[ "port_map" ] ) < 1:
27 logging.info( "Port count less than 1, can't run this case" )
28 assert (False)
29 return
30 ports = config[ "port_map" ].keys( )
31
32 for pair in itertools.product(ports, ports):
33 # we generate all possible products
34 in_port = pair[0]
35 out_port = pair[1]
36 if out_port == in_port:
37 continue
38 # we fill the pipeline for the initiation
39 (
40 port_to_mpls_label_2,
41 port_to_mpls_label_1,
42 port_to_mpls_label_pw,
43 port_to_in_vlan_3,
44 port_to_in_vlan_2,
45 port_to_in_vlan_1,
46 port_to_src_mac_str,
47 port_to_dst_mac_str,
48 Groups ) = fill_pw_initiation_pipeline(
49 controller=self.controller,
50 logging=logging,
51 in_port=in_port,
52 out_port=out_port,
53 ingress_tags=0,
54 egress_tag=EGRESS_UNTAGGED,
55 mpls_labels=1
56 )
57 # we fill the pipeline for the termination
58 # on the reverse path
59 (
60 port_to_mpls_label_pw_x,
61 port_to_vlan_2_x,
62 port_to_vlan_1_x,
63 port_to_switch_mac_str_x,
64 Groups2 ) = fill_pw_termination_pipeline(
65 controller=self.controller,
66 logging=logging,
67 in_port=out_port,
68 out_port=in_port,
69 egress_tags=0
70 )
71 # we send a simple tcp packet
72 parsed_pkt = simple_tcp_packet(
73 pktlen=104,
74 )
75 pkt = str( parsed_pkt )
76 self.dataplane.send( in_port, pkt )
77 # we verify the pw packet has been generated
78 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
79 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
80 cw = (0, 0, 0, 0)
81 parsed_pkt = pw_packet(
82 pktlen=130,
83 out_eth_dst=port_to_dst_mac_str[in_port],
84 out_eth_src=port_to_src_mac_str[out_port],
85 label=[label_1, label_pw],
86 cw=cw
87 )
88 pkt = str( parsed_pkt )
89 # Assertions
90 verify_packet( self, pkt, out_port )
91 verify_no_packet( self, pkt, in_port )
92 verify_no_other_packets( self )
93 # Flush all the rules for the next couple
94 delete_all_flows( self.controller )
95 delete_groups( self.controller, Groups )
96 delete_groups( self.controller, Groups2 )
97 delete_all_groups( self.controller )
98
99 finally:
100 delete_all_flows( self.controller )
101 delete_groups( self.controller, Groups )
102 delete_groups( self.controller, Groups2 )
103 delete_all_groups( self.controller )
104
105class Untagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
106 """
107 This is meant to test the PW Initiation. The traffic
108 arrives untagged to the MPLS-TP CE device, it goes out
109 tagged, with a new ethernet header and 2 mpls labels.
110 """
111 def runTest( self ):
112
113 Groups = Queue.LifoQueue( )
114 Groups2 = Queue.LifoQueue( )
115 try:
116 if len( config[ "port_map" ] ) < 1:
117 logging.info( "Port count less than 1, can't run this case" )
118 assert (False)
119 return
120 ports = config[ "port_map" ].keys( )
121
122 for pair in itertools.product(ports, ports):
123 # we generate all possible products
124 in_port = pair[0]
125 out_port = pair[1]
126 if out_port == in_port:
127 continue
128 # we fill the pipeline for the initiation
129 (
130 port_to_mpls_label_2,
131 port_to_mpls_label_1,
132 port_to_mpls_label_pw,
133 port_to_in_vlan_3,
134 port_to_in_vlan_2,
135 port_to_in_vlan_1,
136 port_to_src_mac_str,
137 port_to_dst_mac_str,
138 Groups ) = fill_pw_initiation_pipeline(
139 controller=self.controller,
140 logging=logging,
141 in_port=in_port,
142 out_port=out_port,
143 ingress_tags=0,
144 egress_tag=EGRESS_TAGGED,
145 mpls_labels=1
146 )
147 # we fill the pipeline for the termination
148 # on the reverse path
149 (
150 port_to_mpls_label_pw_x,
151 port_to_vlan_2_x,
152 port_to_vlan_1_x,
153 port_to_switch_mac_str_x,
154 Groups2 ) = fill_pw_termination_pipeline(
155 controller=self.controller,
156 logging=logging,
157 in_port=out_port,
158 out_port=in_port,
159 egress_tags=0
160 )
161 # we send a simple tcp packet
162 parsed_pkt = simple_tcp_packet(
163 pktlen=104,
164 )
165 pkt = str( parsed_pkt )
166 self.dataplane.send( in_port, pkt )
167 # we verify the pw packet has been generated
168 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
169 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
170 cw = (0, 0, 0, 0)
171 parsed_pkt = pw_packet(
172 pktlen=134,
173 out_eth_dst=port_to_dst_mac_str[in_port],
174 out_eth_src=port_to_src_mac_str[out_port],
175 label=[label_1, label_pw],
176 cw=cw,
177 out_dl_vlan_enable=True,
178 out_vlan_vid=port_to_in_vlan_1[in_port],
179 )
180 pkt = str( parsed_pkt )
181 # Assertions
182 verify_packet( self, pkt, out_port )
183 verify_no_packet( self, pkt, in_port )
184 verify_no_other_packets( self )
185 # Flush all the rules for the next couple
186 delete_all_flows( self.controller )
187 delete_groups( self.controller, Groups )
188 delete_groups( self.controller, Groups2 )
189 delete_all_groups( self.controller )
190
191 finally:
192 delete_all_flows( self.controller )
193 delete_groups( self.controller, Groups )
194 delete_groups( self.controller, Groups2 )
195 delete_all_groups( self.controller )
196
197class UntaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
198 """
199 This is meant to test the PW Initiation. The traffic
200 arrives untagged to the MPLS-TP CE device, it goes out
201 untagged, with a new ethernet header and 3 mpls labels.
202 """
203 def runTest( self ):
204
205 Groups = Queue.LifoQueue( )
206 Groups2 = Queue.LifoQueue( )
207 try:
208 if len( config[ "port_map" ] ) < 1:
209 logging.info( "Port count less than 1, can't run this case" )
210 assert (False)
211 return
212 ports = config[ "port_map" ].keys( )
213
214 for pair in itertools.product(ports, ports):
215 # we generate all possible products
216 in_port = pair[0]
217 out_port = pair[1]
218 if out_port == in_port:
219 continue
220 # we fill the pipeline for the pw initiation
221 (
222 port_to_mpls_label_2,
223 port_to_mpls_label_1,
224 port_to_mpls_label_pw,
225 port_to_in_vlan_3,
226 port_to_in_vlan_2,
227 port_to_in_vlan_1,
228 port_to_src_mac_str,
229 port_to_dst_mac_str,
230 Groups ) = fill_pw_initiation_pipeline(
231 controller=self.controller,
232 logging=logging,
233 in_port=in_port,
234 out_port=out_port,
235 ingress_tags=0,
236 egress_tag=EGRESS_UNTAGGED,
237 mpls_labels=2
238 )
239 # we fill the pipeline for the pw termination
240 # on the reverse path
241 (
242 port_to_mpls_label_pw_x,
243 port_to_vlan_2_x,
244 port_to_vlan_1_x,
245 port_to_switch_mac_str_x,
246 Groups2 ) = fill_pw_termination_pipeline(
247 controller=self.controller,
248 logging=logging,
249 in_port=out_port,
250 out_port=in_port,
251 egress_tags=0
252 )
253 # we generate a simple tcp packet
254 parsed_pkt = simple_tcp_packet(
255 pktlen=104,
256 )
257 pkt = str( parsed_pkt )
258 self.dataplane.send( in_port, pkt )
259 # we generate the pw packet we expect on the out port
260 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
261 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
262 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
263 cw = (0, 0, 0, 0)
264 parsed_pkt = pw_packet(
265 pktlen=134,
266 out_eth_dst=port_to_dst_mac_str[in_port],
267 out_eth_src=port_to_src_mac_str[out_port],
268 label=[label_2, label_1, label_pw],
269 cw=cw
270 )
271 pkt = str( parsed_pkt )
272 # Assertions
273 verify_packet( self, pkt, out_port )
274 verify_no_packet( self, pkt, in_port )
275 verify_no_other_packets( self )
276 delete_all_flows( self.controller )
277 delete_groups( self.controller, Groups )
278 delete_groups( self.controller, Groups2 )
279 delete_all_groups( self.controller )
280
281 finally:
282 delete_all_flows( self.controller )
283 delete_groups( self.controller, Groups )
284 delete_groups( self.controller, Groups2 )
285 delete_all_groups( self.controller )
286
287class Untagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
288 """
289 This is meant to test the PW Initiation. The traffic
290 arrives untagged to the MPLS-TP CE device, it goes out
291 tagged with a new ethernet header and 3 mpls labels.
292 """
293 def runTest( self ):
294
295 Groups = Queue.LifoQueue( )
296 Groups2 = Queue.LifoQueue( )
297 try:
298 if len( config[ "port_map" ] ) < 1:
299 logging.info( "Port count less than 1, can't run this case" )
300 assert (False)
301 return
302 ports = config[ "port_map" ].keys( )
303
304 for pair in itertools.product(ports, ports):
305 # we generate all possible products
306 in_port = pair[0]
307 out_port = pair[1]
308 if out_port == in_port:
309 continue
310 # we fill the pipeline for the pw initiation
311 (
312 port_to_mpls_label_2,
313 port_to_mpls_label_1,
314 port_to_mpls_label_pw,
315 port_to_in_vlan_3,
316 port_to_in_vlan_2,
317 port_to_in_vlan_1,
318 port_to_src_mac_str,
319 port_to_dst_mac_str,
320 Groups ) = fill_pw_initiation_pipeline(
321 controller=self.controller,
322 logging=logging,
323 in_port=in_port,
324 out_port=out_port,
325 ingress_tags=0,
326 egress_tag=EGRESS_TAGGED,
327 mpls_labels=2
328 )
329 # we fill the pipeline for the pw termination
330 # on the reverse path
331 (
332 port_to_mpls_label_pw_x,
333 port_to_vlan_2_x,
334 port_to_vlan_1_x,
335 port_to_switch_mac_str_x,
336 Groups2 ) = fill_pw_termination_pipeline(
337 controller=self.controller,
338 logging=logging,
339 in_port=out_port,
340 out_port=in_port,
341 egress_tags=0
342 )
343 # we generate a simple tcp packet
344 parsed_pkt = simple_tcp_packet(
345 pktlen=104,
346 )
347 pkt = str( parsed_pkt )
348 self.dataplane.send( in_port, pkt )
349 # we generate the pw packet we expect on the out port
350 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
351 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
352 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
353 cw = (0, 0, 0, 0)
354 parsed_pkt = pw_packet(
355 pktlen=138,
356 out_eth_dst=port_to_dst_mac_str[in_port],
357 out_eth_src=port_to_src_mac_str[out_port],
358 label=[label_2, label_1, label_pw],
359 cw=cw,
360 out_dl_vlan_enable=True,
361 out_vlan_vid=port_to_in_vlan_1[in_port],
362 )
363 pkt = str( parsed_pkt )
364 # Asserions
365 verify_packet( self, pkt, out_port )
366 verify_no_packet( self, pkt, in_port )
367 verify_no_other_packets( self )
368 delete_all_flows( self.controller )
369 delete_groups( self.controller, Groups )
370 delete_groups( self.controller, Groups2 )
371 delete_all_groups( self.controller )
372
373 finally:
374 delete_all_flows( self.controller )
375 delete_groups( self.controller, Groups )
376 delete_groups( self.controller, Groups2 )
377 delete_all_groups( self.controller )
378
379class TaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
380 """
381 This is meant to test the PW Initiation. The traffic
382 arrives tagged to the MPLS-TP CE device, it goes out
383 with the same tag, a new ethernet header and 2 mpls labels.
384 """
385 def runTest( self ):
386
387 Groups = Queue.LifoQueue( )
388 Groups2 = Queue.LifoQueue( )
389 try:
390 if len( config[ "port_map" ] ) < 1:
391 logging.info( "Port count less than 1, can't run this case" )
392 assert (False)
393 return
394 ports = config[ "port_map" ].keys( )
395 for pair in itertools.product(ports, ports):
396 # we generate all possible products
397 in_port = pair[0]
398 out_port = pair[1]
399 if out_port == in_port:
400 continue
401 # we fill the pipeline for the pw initiation
402 (
403 port_to_mpls_label_2,
404 port_to_mpls_label_1,
405 port_to_mpls_label_pw,
406 port_to_in_vlan_3,
407 port_to_in_vlan_2,
408 port_to_in_vlan_1,
409 port_to_src_mac_str,
410 port_to_dst_mac_str,
411 Groups ) = fill_pw_initiation_pipeline(
412 controller=self.controller,
413 logging=logging,
414 in_port=in_port,
415 out_port=out_port,
416 ingress_tags=1,
417 egress_tag=EGRESS_TAGGED,
418 mpls_labels=1
419 )
420 # we fill the pipeline for the pw termination
421 # on the reverse path
422 (
423 port_to_mpls_label_pw_x,
424 port_to_vlan_2_x,
425 port_to_vlan_1_x,
426 port_to_switch_mac_str_x,
427 Groups2 ) = fill_pw_termination_pipeline(
428 controller=self.controller,
429 logging=logging,
430 in_port=out_port,
431 out_port=in_port,
432 egress_tags=1
433 )
434 # we generate a simple tcp packet tagged
435 parsed_pkt = simple_tcp_packet(
436 pktlen=104,
437 dl_vlan_enable=True,
438 vlan_vid=port_to_in_vlan_1[in_port]
439 )
440 pkt = str( parsed_pkt )
441 self.dataplane.send( in_port, pkt )
442 # we generate the expected pw packet
443 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
444 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
445 cw = (0, 0, 0, 0)
446 parsed_pkt = pw_packet(
447 pktlen=130,
448 out_eth_dst=port_to_dst_mac_str[in_port],
449 out_eth_src=port_to_src_mac_str[out_port],
450 label=[label_1, label_pw],
451 cw=cw,
452 out_dl_vlan_enable=True,
453 out_vlan_vid=port_to_in_vlan_1[in_port],
454 )
455 pkt = str( parsed_pkt )
456 # Assertions
457 verify_packet( self, pkt, out_port )
458 verify_no_packet( self, pkt, in_port )
459 verify_no_other_packets( self )
460 delete_all_flows( self.controller )
461 delete_groups( self.controller, Groups )
462 delete_groups( self.controller, Groups2 )
463 delete_all_groups( self.controller )
464
465 finally:
466 delete_all_flows( self.controller )
467 delete_groups( self.controller, Groups )
468 delete_groups( self.controller, Groups2 )
469 delete_all_groups( self.controller )
470
471class Tagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
472 """
473 This is meant to test the PW Initiation. The traffic
474 arrives tagged to the MPLS-TP CE device, it goes out
475 with a different vlan, with a new ethernet header and 2 mpls labels.
476 """
477 def runTest( self ):
478
479 Groups = Queue.LifoQueue( )
480 Groups2 = Queue.LifoQueue( )
481 try:
482 if len( config[ "port_map" ] ) < 1:
483 logging.info( "Port count less than 1, can't run this case" )
484 assert (False)
485 return
486 ports = config[ "port_map" ].keys( )
487
488 for pair in itertools.product(ports, ports):
489 # we generate all possible products
490 in_port = pair[0]
491 out_port = pair[1]
492 if out_port == in_port:
493 continue
494 # we fill the pipeline for the pw initiation
495 (
496 port_to_mpls_label_2,
497 port_to_mpls_label_1,
498 port_to_mpls_label_pw,
499 port_to_in_vlan_3,
500 port_to_in_vlan_2,
501 port_to_in_vlan_1,
502 port_to_src_mac_str,
503 port_to_dst_mac_str,
504 Groups ) = fill_pw_initiation_pipeline(
505 controller=self.controller,
506 logging=logging,
507 in_port=in_port,
508 out_port=out_port,
509 ingress_tags=1,
510 egress_tag=EGRESS_TAGGED_TRANS,
511 mpls_labels=1
512 )
513 # we fill the pipeline for the pw termination
514 # on the reverse path
515 (
516 port_to_mpls_label_pw_x,
517 port_to_vlan_2_x,
518 port_to_vlan_1_x,
519 port_to_switch_mac_str_x,
520 Groups2 ) = fill_pw_termination_pipeline(
521 controller=self.controller,
522 logging=logging,
523 in_port=out_port,
524 out_port=in_port,
525 egress_tags=1
526 )
527 # we generate a simple tcp packet tagged
528 parsed_pkt = simple_tcp_packet(
529 pktlen=104,
530 dl_vlan_enable=True,
531 vlan_vid=port_to_in_vlan_1[in_port]
532 )
533 pkt = str( parsed_pkt )
534 self.dataplane.send( in_port, pkt )
535 # we generate the expected pw packet
536 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
537 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
538 cw = (0, 0, 0, 0)
539 parsed_pkt = pw_packet(
540 pktlen=130,
541 out_eth_dst=port_to_dst_mac_str[in_port],
542 out_eth_src=port_to_src_mac_str[out_port],
543 label=[label_1, label_pw],
544 cw=cw,
545 out_dl_vlan_enable=True,
546 out_vlan_vid=port_to_in_vlan_3[in_port],
547 )
548 pkt = str( parsed_pkt )
549 # Assertions
550 verify_packet( self, pkt, out_port )
551 verify_no_packet( self, pkt, in_port )
552 verify_no_other_packets( self )
553 delete_all_flows( self.controller )
554 delete_groups( self.controller, Groups )
555 delete_groups( self.controller, Groups2 )
556 delete_all_groups( self.controller )
557
558 finally:
559 delete_all_flows( self.controller )
560 delete_groups( self.controller, Groups )
561 delete_groups( self.controller, Groups2 )
562 delete_all_groups( self.controller )
563
564class TaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
565 """
566 This is meant to test the PW Initiation. The traffic
567 arrives tagged to the MPLS-TP CE device, it goes out
568 with the same vlan, with a new ethernet header and 3 mpls labels.
569 """
570 def runTest( self ):
571
572 Groups = Queue.LifoQueue( )
573 Groups2 = Queue.LifoQueue( )
574 try:
575 if len( config[ "port_map" ] ) < 1:
576 logging.info( "Port count less than 1, can't run this case" )
577 assert (False)
578 return
579 ports = config[ "port_map" ].keys( )
580
581 for pair in itertools.product(ports, ports):
582 # we generate all possible products
583 in_port = pair[0]
584 out_port = pair[1]
585 if out_port == in_port:
586 continue
587 # we fill the pipeline for the pw initiation
588 (
589 port_to_mpls_label_2,
590 port_to_mpls_label_1,
591 port_to_mpls_label_pw,
592 port_to_in_vlan_3,
593 port_to_in_vlan_2,
594 port_to_in_vlan_1,
595 port_to_src_mac_str,
596 port_to_dst_mac_str,
597 Groups ) = fill_pw_initiation_pipeline(
598 controller=self.controller,
599 logging=logging,
600 in_port=in_port,
601 out_port=out_port,
602 ingress_tags=1,
603 egress_tag=EGRESS_TAGGED,
604 mpls_labels=2
605 )
606 # we fill the pipeline for the pw termination
607 # on the reverse path
608 (
609 port_to_mpls_label_pw_x,
610 port_to_vlan_2_x,
611 port_to_vlan_1_x,
612 port_to_switch_mac_str_x,
613 Groups2 ) = fill_pw_termination_pipeline(
614 controller=self.controller,
615 logging=logging,
616 in_port=out_port,
617 out_port=in_port,
618 egress_tags=1
619 )
620 # we generate a simple tcp packet tagged
621 parsed_pkt = simple_tcp_packet(
622 pktlen=104,
623 dl_vlan_enable=True,
624 vlan_vid=port_to_in_vlan_1[in_port]
625 )
626 pkt = str( parsed_pkt )
627 self.dataplane.send( in_port, pkt )
628 # we generate the expect pw packet
629 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
630 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
631 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
632 cw = (0, 0, 0, 0)
633 parsed_pkt = pw_packet(
634 pktlen=134,
635 out_eth_dst=port_to_dst_mac_str[in_port],
636 out_eth_src=port_to_src_mac_str[out_port],
637 label=[label_2, label_1, label_pw],
638 cw=cw,
639 out_dl_vlan_enable=True,
640 out_vlan_vid=port_to_in_vlan_1[in_port],
641 )
642 pkt = str( parsed_pkt )
643 # Assertions
644 verify_packet( self, pkt, out_port )
645 verify_no_packet( self, pkt, in_port )
646 verify_no_other_packets( self )
647 delete_all_flows( self.controller )
648 delete_groups( self.controller, Groups )
649 delete_groups( self.controller, Groups2)
650 delete_all_groups( self.controller )
651
652 finally:
653 delete_all_flows( self.controller )
654 delete_groups( self.controller, Groups )
655 delete_groups( self.controller, Groups2)
656 delete_all_groups( self.controller )
657
658class Tagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
659 """
660 This is meant to test the PW Initiation. The traffic
661 arrives tagged to the MPLS-TP CE device, it goes out
662 with a different vlam, with a new ethernet header and 3 mpls labels.
663 """
664 def runTest( self ):
665
666 Groups = Queue.LifoQueue( )
667 Groups2 = Queue.LifoQueue( )
668 try:
669 if len( config[ "port_map" ] ) < 1:
670 logging.info( "Port count less than 1, can't run this case" )
671 assert (False)
672 return
673 ports = config[ "port_map" ].keys( )
674
675 for pair in itertools.product(ports, ports):
676 # we generate all possible products
677 in_port = pair[0]
678 out_port = pair[1]
679 if out_port == in_port:
680 continue
681 # we fill the pipeline for the pw initiation
682 (
683 port_to_mpls_label_2,
684 port_to_mpls_label_1,
685 port_to_mpls_label_pw,
686 port_to_in_vlan_3,
687 port_to_in_vlan_2,
688 port_to_in_vlan_1,
689 port_to_src_mac_str,
690 port_to_dst_mac_str,
691 Groups ) = fill_pw_initiation_pipeline(
692 controller=self.controller,
693 logging=logging,
694 in_port=in_port,
695 out_port=out_port,
696 ingress_tags=1,
697 egress_tag=EGRESS_TAGGED_TRANS,
698 mpls_labels=2
699 )
700 # we fill the pipeline for the pw termination
701 # on the reverse path
702 (
703 port_to_mpls_label_pw_x,
704 port_to_vlan_2_x,
705 port_to_vlan_1_x,
706 port_to_switch_mac_str_x,
707 Groups2 ) = fill_pw_termination_pipeline(
708 controller=self.controller,
709 logging=logging,
710 in_port=out_port,
711 out_port=in_port,
712 egress_tags=1
713 )
714 # we generate a simple tcp packet tagged
715 parsed_pkt = simple_tcp_packet(
716 pktlen=104,
717 dl_vlan_enable=True,
718 vlan_vid=port_to_in_vlan_1[in_port]
719 )
720 pkt = str( parsed_pkt )
721 self.dataplane.send( in_port, pkt )
722 # we generate the expect pw packet
723 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
724 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
725 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
726 cw = (0, 0, 0, 0)
727 parsed_pkt = pw_packet(
728 pktlen=134,
729 out_eth_dst=port_to_dst_mac_str[in_port],
730 out_eth_src=port_to_src_mac_str[out_port],
731 label=[label_2, label_1, label_pw],
732 cw=cw,
733 out_dl_vlan_enable=True,
734 out_vlan_vid=port_to_in_vlan_3[in_port],
735 )
736 pkt = str( parsed_pkt )
737 # Assertions
738 verify_packet( self, pkt, out_port )
739 verify_no_packet( self, pkt, in_port )
740 verify_no_other_packets( self )
741 delete_all_flows( self.controller )
742 delete_groups( self.controller, Groups )
743 delete_groups( self.controller, Groups2 )
744 delete_all_groups( self.controller )
745
746 finally:
747 delete_all_flows( self.controller )
748 delete_groups( self.controller, Groups )
749 delete_groups( self.controller, Groups2 )
750 delete_all_groups( self.controller )
751
752class DoubleTaggedPWInitiation_2_Labels( base_tests.SimpleDataPlane ):
753 """
754 This is meant to test the PW Initiation. The traffic
755 arrives double tagged to the MPLS-TP CE device, it goes out
756 with the same outer vlan, with a new ethernet header and 2 mpls labels.
757 """
758 def runTest( self ):
759
760 Groups = Queue.LifoQueue( )
761 Groups2 = Queue.LifoQueue( )
762 try:
763 if len( config[ "port_map" ] ) < 1:
764 logging.info( "Port count less than 1, can't run this case" )
765 assert (False)
766 return
767 ports = config[ "port_map" ].keys( )
768
769 for pair in itertools.product(ports, ports):
770 # we generate all possible products
771 in_port = pair[0]
772 out_port = pair[1]
773 if out_port == in_port:
774 continue
775 # we fill the pipeline for the pw initiation
776 (
777 port_to_mpls_label_2,
778 port_to_mpls_label_1,
779 port_to_mpls_label_pw,
780 port_to_in_vlan_3,
781 port_to_in_vlan_2,
782 port_to_in_vlan_1,
783 port_to_src_mac_str,
784 port_to_dst_mac_str,
785 Groups ) = fill_pw_initiation_pipeline(
786 controller=self.controller,
787 logging=logging,
788 in_port=in_port,
789 out_port=out_port,
790 ingress_tags=2,
791 egress_tag=EGRESS_TAGGED,
792 mpls_labels=1
793 )
794 # we fill the pipeline for the pw termination
795 # on the reverse path
796 (
797 port_to_mpls_label_pw_x,
798 port_to_vlan_2_x,
799 port_to_vlan_1_x,
800 port_to_switch_mac_str_x,
801 Groups2 ) = fill_pw_termination_pipeline(
802 controller=self.controller,
803 logging=logging,
804 in_port=out_port,
805 out_port=in_port,
806 egress_tags=2
807 )
808 # we generate a simple tcp packet with two vlans
809 parsed_pkt = simple_tcp_packet_two_vlan(
810 pktlen=108,
811 out_dl_vlan_enable=True,
812 out_vlan_vid=port_to_in_vlan_2[in_port],
813 in_dl_vlan_enable=True,
814 in_vlan_vid=port_to_in_vlan_1[in_port],
815 )
816 pkt = str( parsed_pkt )
817 self.dataplane.send( in_port, pkt )
818 # we generate the expected pw packet
819 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
820 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
821 cw = (0, 0, 0, 0)
822 parsed_pkt = pw_packet(
823 pktlen=134,
824 out_eth_dst=port_to_dst_mac_str[in_port],
825 out_eth_src=port_to_src_mac_str[out_port],
826 label=[label_1, label_pw],
827 cw=cw,
828 out_dl_vlan_enable=True,
829 out_vlan_vid=port_to_in_vlan_2[in_port],
830 in_dl_vlan_enable=True,
831 in_vlan_vid=port_to_in_vlan_1[in_port],
832 )
833 pkt = str( parsed_pkt )
834 # Assertions
835 verify_packet( self, pkt, out_port )
836 verify_no_packet( self, pkt, in_port )
837 verify_no_other_packets( self )
838 delete_all_flows( self.controller )
839 delete_groups( self.controller, Groups )
840 delete_groups( self.controller, Groups2 )
841 delete_all_groups( self.controller )
842
843 finally:
844 delete_all_flows( self.controller )
845 delete_groups( self.controller, Groups )
846 delete_groups( self.controller, Groups2 )
847 delete_all_groups( self.controller )
848
849class DoubleTagged2PWInitiation_2_Labels( base_tests.SimpleDataPlane ):
850 """
851 This is meant to test the PW Initiation. The traffic
852 arrives double tagged to the MPLS-TP CE device and goes out
853 with a new ethernet header and 2 mpls labels.
854 """
855 def runTest( self ):
856
857 Groups = Queue.LifoQueue( )
858 Groups2 = Queue.LifoQueue( )
859 try:
860 if len( config[ "port_map" ] ) < 1:
861 logging.info( "Port count less than 1, can't run this case" )
862 assert (False)
863 return
864 ports = config[ "port_map" ].keys( )
865
866 for pair in itertools.product(ports, ports):
867 # we generate all possible products
868 in_port = pair[0]
869 out_port = pair[1]
870 if out_port == in_port:
871 continue
872 # we fill the pipeline for the pw initiation
873 (
874 port_to_mpls_label_2,
875 port_to_mpls_label_1,
876 port_to_mpls_label_pw,
877 port_to_in_vlan_3,
878 port_to_in_vlan_2,
879 port_to_in_vlan_1,
880 port_to_src_mac_str,
881 port_to_dst_mac_str,
882 Groups ) = fill_pw_initiation_pipeline(
883 controller=self.controller,
884 logging=logging,
885 in_port=in_port,
886 out_port=out_port,
887 ingress_tags=2,
888 egress_tag=EGRESS_TAGGED_TRANS,
889 mpls_labels=1
890 )
891 # we fill the pipeline for the pw termination
892 # on the reverse path
893 (
894 port_to_mpls_label_pw_x,
895 port_to_vlan_2_x,
896 port_to_vlan_1_x,
897 port_to_switch_mac_str_x,
898 Groups2 ) = fill_pw_termination_pipeline(
899 controller=self.controller,
900 logging=logging,
901 in_port=out_port,
902 out_port=in_port,
903 egress_tags=2
904 )
905 # we generate a simple tcp packet with two vlans
906 parsed_pkt = simple_tcp_packet_two_vlan(
907 pktlen=108,
908 out_dl_vlan_enable=True,
909 out_vlan_vid=port_to_in_vlan_2[in_port],
910 in_dl_vlan_enable=True,
911 in_vlan_vid=port_to_in_vlan_1[in_port],
912 )
913 pkt = str( parsed_pkt )
914 self.dataplane.send( in_port, pkt )
915 # we generate the expected pw packet
916 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
917 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
918 cw = (0, 0, 0, 0)
919 parsed_pkt = pw_packet(
920 pktlen=134,
921 out_eth_dst=port_to_dst_mac_str[in_port],
922 out_eth_src=port_to_src_mac_str[out_port],
923 label=[label_1, label_pw],
924 cw=cw,
925 out_dl_vlan_enable=True,
926 out_vlan_vid=port_to_in_vlan_3[in_port],
927 in_dl_vlan_enable=True,
928 in_vlan_vid=port_to_in_vlan_1[in_port],
929 )
930 pkt = str( parsed_pkt )
931 # Assertions
932 verify_packet( self, pkt, out_port )
933 verify_no_packet( self, pkt, in_port )
934 verify_no_other_packets( self )
935 delete_all_flows( self.controller )
936 delete_groups( self.controller, Groups )
937 delete_groups( self.controller, Groups2 )
938 delete_all_groups( self.controller )
939
940 finally:
941 delete_all_flows( self.controller )
942 delete_groups( self.controller, Groups )
943 delete_groups( self.controller, Groups2 )
944 delete_all_groups( self.controller )
945
946class DoubleTaggedPWInitiation_3_Labels( base_tests.SimpleDataPlane ):
947 """
948 This is meant to test the PW Initiation. The traffic
949 arrives double tagged to the MPLS-TP CE device and goes out
950 with a new ethernet header and 3 mpls labels.
951 """
952 def runTest( self ):
953
954 Groups = Queue.LifoQueue( )
955 Groups2 = Queue.LifoQueue( )
956 try:
957 if len( config[ "port_map" ] ) < 1:
958 logging.info( "Port count less than 1, can't run this case" )
959 assert (False)
960 return
961 ports = config[ "port_map" ].keys( )
962
963 for pair in itertools.product(ports, ports):
964 # we generate all possible products
965 in_port = pair[0]
966 out_port = pair[1]
967 if out_port == in_port:
968 continue
969 # we fill the pipeline for the pw initiation
970 (
971 port_to_mpls_label_2,
972 port_to_mpls_label_1,
973 port_to_mpls_label_pw,
974 port_to_in_vlan_3,
975 port_to_in_vlan_2,
976 port_to_in_vlan_1,
977 port_to_src_mac_str,
978 port_to_dst_mac_str,
979 Groups ) = fill_pw_initiation_pipeline(
980 controller=self.controller,
981 logging=logging,
982 in_port=in_port,
983 out_port=out_port,
984 ingress_tags=2,
985 egress_tag=EGRESS_TAGGED,
986 mpls_labels=2
987 )
988 # we fill the pipeline for the pw termination
989 # on the reverse path
990 (
991 port_to_mpls_label_pw_x,
992 port_to_vlan_2_x,
993 port_to_vlan_1_x,
994 port_to_switch_mac_str_x,
995 Groups2 ) = fill_pw_termination_pipeline(
996 controller=self.controller,
997 logging=logging,
998 in_port=out_port,
999 out_port=in_port,
1000 egress_tags=2
1001 )
1002 # we generate a simple tcp packet with two wlan
1003 parsed_pkt = simple_tcp_packet_two_vlan(
1004 pktlen=108,
1005 out_dl_vlan_enable=True,
1006 out_vlan_vid=port_to_in_vlan_2[in_port],
1007 in_dl_vlan_enable=True,
1008 in_vlan_vid=port_to_in_vlan_1[in_port],
1009 )
1010 pkt = str( parsed_pkt )
1011 self.dataplane.send( in_port, pkt )
1012 # we generate the expected pw packet
1013 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
1014 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
1015 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
1016 cw = (0, 0, 0, 0)
1017 parsed_pkt = pw_packet(
1018 pktlen=138,
1019 out_eth_dst=port_to_dst_mac_str[in_port],
1020 out_eth_src=port_to_src_mac_str[out_port],
1021 label=[label_2, label_1, label_pw],
1022 cw=cw,
1023 out_dl_vlan_enable=True,
1024 out_vlan_vid=port_to_in_vlan_2[in_port],
1025 in_dl_vlan_enable=True,
1026 in_vlan_vid=port_to_in_vlan_1[in_port],
1027 )
1028 pkt = str( parsed_pkt )
1029 # Assertions
1030 verify_packet( self, pkt, out_port )
1031 verify_no_packet( self, pkt, in_port )
1032 verify_no_other_packets( self )
1033 delete_all_flows( self.controller )
1034 delete_groups( self.controller, Groups )
1035 delete_groups( self.controller, Groups2 )
1036 delete_all_groups( self.controller )
1037
1038 finally:
1039 delete_all_flows( self.controller )
1040 delete_groups( self.controller, Groups )
1041 delete_groups( self.controller, Groups2 )
1042 delete_all_groups( self.controller )
1043
1044class DoubleTagged2PWInitiation_3_Labels( base_tests.SimpleDataPlane ):
1045 """
1046 This is meant to test the PW Initiation. The traffic
1047 arrives double tagged to the MPLS-TP CE device and goes out
1048 with a new ethernet header and 3 mpls labels.
1049 """
1050 def runTest( self ):
1051
1052 Groups = Queue.LifoQueue( )
1053 Groups2 = Queue.LifoQueue( )
1054 try:
1055 if len( config[ "port_map" ] ) < 1:
1056 logging.info( "Port count less than 1, can't run this case" )
1057 assert (False)
1058 return
1059 ports = config[ "port_map" ].keys( )
1060
1061 for pair in itertools.product(ports, ports):
1062 # we generate all possible products
1063 in_port = pair[0]
1064 out_port = pair[1]
1065 if out_port == in_port:
1066 continue
1067 # we fill the pipeline for the pw initiation
1068 (
1069 port_to_mpls_label_2,
1070 port_to_mpls_label_1,
1071 port_to_mpls_label_pw,
1072 port_to_in_vlan_3,
1073 port_to_in_vlan_2,
1074 port_to_in_vlan_1,
1075 port_to_src_mac_str,
1076 port_to_dst_mac_str,
1077 Groups ) = fill_pw_initiation_pipeline(
1078 controller=self.controller,
1079 logging=logging,
1080 in_port=in_port,
1081 out_port=out_port,
1082 ingress_tags=2,
1083 egress_tag=EGRESS_TAGGED_TRANS,
1084 mpls_labels=2
1085 )
1086 # we fill the pipeline for the pw termination
1087 # on the reverse path
1088 (
1089 port_to_mpls_label_pw_x,
1090 port_to_vlan_2_x,
1091 port_to_vlan_1_x,
1092 port_to_switch_mac_str_x,
1093 Groups2 ) = fill_pw_termination_pipeline(
1094 controller=self.controller,
1095 logging=logging,
1096 in_port=out_port,
1097 out_port=in_port,
1098 egress_tags=2
1099 )
1100 # we generate a simple tcp packet with two wlan
1101 parsed_pkt = simple_tcp_packet_two_vlan(
1102 pktlen=108,
1103 out_dl_vlan_enable=True,
1104 out_vlan_vid=port_to_in_vlan_2[in_port],
1105 in_dl_vlan_enable=True,
1106 in_vlan_vid=port_to_in_vlan_1[in_port],
1107 )
1108 pkt = str( parsed_pkt )
1109 self.dataplane.send( in_port, pkt )
1110 # we generate the expected pw packet
1111 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 63)
1112 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 63)
1113 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 63)
1114 cw = (0, 0, 0, 0)
1115 parsed_pkt = pw_packet(
1116 pktlen=138,
1117 out_eth_dst=port_to_dst_mac_str[in_port],
1118 out_eth_src=port_to_src_mac_str[out_port],
1119 label=[label_2, label_1, label_pw],
1120 cw=cw,
1121 out_dl_vlan_enable=True,
1122 out_vlan_vid=port_to_in_vlan_3[in_port],
1123 in_dl_vlan_enable=True,
1124 in_vlan_vid=port_to_in_vlan_1[in_port],
1125 )
1126 pkt = str( parsed_pkt )
1127 # Assertions
1128 verify_packet( self, pkt, out_port )
1129 verify_no_packet( self, pkt, in_port )
1130 verify_no_other_packets( self )
1131 delete_all_flows( self.controller )
1132 delete_groups( self.controller, Groups )
1133 delete_groups( self.controller, Groups2 )
1134 delete_all_groups( self.controller )
1135
1136 finally:
1137 delete_all_flows( self.controller )
1138 delete_groups( self.controller, Groups )
1139 delete_groups( self.controller, Groups2 )
1140 delete_all_groups( self.controller )
1141
1142class IntraCO_2_Labels( base_tests.SimpleDataPlane ):
1143 """
1144 This is meant to test the PW intermediate transport.
1145 Incoming packet has 2 labels (SR/PW) (intermediate leaf switch).
1146 There is no VLAN tag in the incoming packet. Pop outer MPLS label
1147 """
1148 def runTest( self ):
1149
1150 Groups = Queue.LifoQueue( )
1151 try:
1152 if len( config[ "port_map" ] ) < 1:
1153 logging.info( "Port count less than 1, can't run this case" )
1154 assert (False)
1155 return
1156 ports = config[ "port_map" ].keys( )
1157 # we fill the pw pipeline for the intermediate transport
1158 (
1159 port_to_mpls_label_2,
1160 port_to_mpls_label_1,
1161 port_to_mpls_label_pw,
1162 port_to_switch_mac_str,
1163 port_to_src_mac_str,
1164 port_to_dst_mac_str,
1165 Groups
1166 ) = fill_pw_intermediate_transport_pipeline(
1167 self.controller,
1168 logging,
1169 ports,
1170 mpls_labels=3
1171 )
1172
1173 for pair in itertools.product(ports, ports):
1174 # we generate all possible products
1175 in_port = pair[0]
1176 out_port = pair[1]
1177 if out_port == in_port:
1178 continue
1179 # we geneate the pw packet
1180 label_1 = (port_to_mpls_label_2[in_port], 0, 0, 32)
1181 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
1182 parsed_pkt = mpls_packet(
1183 pktlen=104,
1184 ip_ttl=63,
1185 eth_dst=port_to_switch_mac_str[in_port],
1186 label=[ label_1, label_pw ]
1187 )
1188 pkt = str( parsed_pkt )
1189 self.dataplane.send( in_port, pkt )
1190 # we geneate the expected pw packet
1191 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 31)
1192 parsed_pkt = mpls_packet(
1193 pktlen=100,
1194 ip_ttl=63,
1195 eth_dst=port_to_dst_mac_str[in_port],
1196 eth_src=port_to_src_mac_str[out_port],
1197 label=[ label_pw ]
1198 )
1199 pkt = str( parsed_pkt )
1200 # Assertions
1201 verify_packet( self, pkt, out_port )
1202 verify_no_packet( self, pkt, in_port )
1203 verify_no_other_packets( self )
1204
1205 finally:
1206 delete_all_flows( self.controller )
1207 delete_groups( self.controller, Groups )
1208 delete_all_groups( self.controller )
1209
1210class IntraCO_3_Labels( base_tests.SimpleDataPlane ):
1211 """
1212 This is meant to test the PW intermediate transport.
1213 Incoming packet has 3 labels (SR/SR/PW) (spine switch).
1214 There is no VLAN tag in the incoming packet. Pop outer MPLS label
1215 """
1216 def runTest( self ):
1217
1218 Groups = Queue.LifoQueue( )
1219 try:
1220 if len( config[ "port_map" ] ) < 1:
1221 logging.info( "Port count less than 1, can't run this case" )
1222 assert (False)
1223 return
1224 ports = config[ "port_map" ].keys( )
1225 # we fill the pipeline for the intermediate transport
1226 (
1227 port_to_mpls_label_2,
1228 port_to_mpls_label_1,
1229 port_to_mpls_label_pw,
1230 port_to_switch_mac_str,
1231 port_to_src_mac_str,
1232 port_to_dst_mac_str,
1233 Groups
1234 ) = fill_pw_intermediate_transport_pipeline(
1235 self.controller,
1236 logging,
1237 ports,
1238 mpls_labels=3
1239 )
1240 for pair in itertools.product(ports, ports):
1241 # we generate all possible products
1242 in_port = pair[0]
1243 out_port = pair[1]
1244 if out_port == in_port:
1245 continue
1246 # we generate the pw packet
1247 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 32)
1248 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 32)
1249 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
1250 parsed_pkt = mpls_packet(
1251 pktlen=104,
1252 ip_ttl=63,
1253 eth_dst=port_to_switch_mac_str[in_port],
1254 label=[ label_2, label_1, label_pw ]
1255 )
1256 pkt = str( parsed_pkt )
1257 self.dataplane.send( in_port, pkt )
1258 # we generate the expected pw packet
1259 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 31)
1260 parsed_pkt = mpls_packet(
1261 pktlen=100,
1262 ip_ttl=63,
1263 eth_dst=port_to_dst_mac_str[in_port],
1264 eth_src=port_to_src_mac_str[out_port],
1265 label=[ label_1, label_pw ]
1266 )
1267 pkt = str( parsed_pkt )
1268 # Assertions
1269 verify_packet( self, pkt, out_port )
1270 verify_no_packet( self, pkt, in_port )
1271 verify_no_other_packets( self )
1272
1273 finally:
1274 delete_all_flows( self.controller )
1275 delete_groups( self.controller, Groups )
1276 delete_all_groups( self.controller )
1277
1278class InterCO( base_tests.SimpleDataPlane ):
1279 """
1280 This is meant to test the PW intermediate transport.
1281 Incoming packet has 1 labels (PW) (Intermediate CO leaf switch).
1282 There is no VLAN tag in the incoming packet. Push up to 2 MPLS labels
1283 """
1284 def runTest( self ):
1285
1286 Groups = Queue.LifoQueue( )
1287 try:
1288 if len( config[ "port_map" ] ) < 1:
1289 logging.info( "Port count less than 1, can't run this case" )
1290 assert (False)
1291 return
1292 ports = config[ "port_map" ].keys( )
1293 # we fill the pipeline for the intermediate transport
1294 (
1295 port_to_mpls_label_2,
1296 port_to_mpls_label_1,
1297 port_to_mpls_label_pw,
1298 port_to_switch_mac_str,
1299 port_to_src_mac_str,
1300 port_to_dst_mac_str,
1301 Groups
1302 ) = fill_pw_intermediate_transport_pipeline(
1303 self.controller,
1304 logging,
1305 ports,
1306 mpls_labels=1
1307 )
1308 for pair in itertools.product(ports, ports):
1309 # we generate all possible products
1310 in_port = pair[0]
1311 out_port = pair[1]
1312 if out_port == in_port:
1313 continue
1314 # we generate the pw packet
1315 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 32)
1316 parsed_pkt = mpls_packet(
1317 pktlen=104,
1318 ip_ttl=63,
1319 eth_dst=port_to_switch_mac_str[in_port],
1320 label=[ label_pw ]
1321 )
1322 pkt = str( parsed_pkt )
1323 self.dataplane.send( in_port, pkt )
1324 # we generate the expected pw packet
1325 label_2 = (port_to_mpls_label_2[in_port], 0, 0, 31)
1326 label_1 = (port_to_mpls_label_1[in_port], 0, 0, 31)
1327 label_pw = (port_to_mpls_label_pw[in_port], 0, 1, 31)
1328 parsed_pkt = mpls_packet(
1329 pktlen=112,
1330 ip_ttl=63,
1331 eth_dst=port_to_dst_mac_str[in_port],
1332 eth_src=port_to_src_mac_str[out_port],
1333 label=[ label_2, label_1, label_pw ]
1334 )
1335 pkt = str( parsed_pkt )
1336 # Assertions
1337 verify_packet( self, pkt, out_port )
1338 verify_no_packet( self, pkt, in_port )
1339 verify_no_other_packets( self )
1340
1341 finally:
1342 delete_all_flows( self.controller )
1343 delete_groups( self.controller, Groups )
1344 delete_all_groups( self.controller )
1345
1346class UntaggedPWTermination( base_tests.SimpleDataPlane ):
1347 """
1348 This is meant to test the PW Termination. The traffic
1349 arrives untagged to the MPLS-TP CE device and goes out
1350 without the outer ethernet header and untagged.
1351 """
1352 def runTest( self ):
1353
1354 Groups = Queue.LifoQueue( )
1355 Groups2 = Queue.LifoQueue( )
1356 try:
1357 if len( config[ "port_map" ] ) < 1:
1358 logging.info( "Port count less than 1, can't run this case" )
1359 assert (False)
1360 return
1361 ports = config[ "port_map" ].keys( )
1362 for pair in itertools.product(ports, ports):
1363 # we generate all possible products
1364 in_port = pair[0]
1365 out_port = pair[1]
1366 if out_port == in_port:
1367 continue
1368 # we fill the pipeline for the pw initiation
1369 # on the reverse path
1370 (
1371 port_to_mpls_label_2,
1372 port_to_mpls_label_1,
1373 port_to_mpls_label_pw,
1374 port_to_in_vlan_3,
1375 port_to_in_vlan_2,
1376 port_to_in_vlan_1,
1377 port_to_src_mac_str,
1378 port_to_dst_mac_str,
1379 Groups ) = fill_pw_initiation_pipeline(
1380 controller=self.controller,
1381 logging=logging,
1382 in_port=out_port,
1383 out_port=in_port,
1384 ingress_tags=0,
1385 egress_tag=EGRESS_UNTAGGED,
1386 mpls_labels=1
1387 )
1388 # we fill the pipeline for the pw termination
1389 (
1390 port_to_mpls_label_pw,
1391 port_to_in_vlan_2,
1392 port_to_in_vlan_1,
1393 port_to_dst_mac_str,
1394 Groups2 ) = fill_pw_termination_pipeline(
1395 controller=self.controller,
1396 logging=logging,
1397 in_port=in_port,
1398 out_port=out_port,
1399 egress_tags=0
1400 )
1401 # we generate the pw packet
1402 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1403 cw = (0, 0, 0, 0)
1404 parsed_pkt = pw_packet(
1405 pktlen=104,
1406 out_eth_dst=port_to_dst_mac_str[in_port],
1407 label=[label_pw],
1408 cw=cw
1409 )
1410 pkt = str( parsed_pkt )
1411 self.dataplane.send( in_port, pkt )
1412 # we generate the expected tcp packet
1413 parsed_pkt = simple_tcp_packet(
1414 pktlen=82,
1415 )
1416 pkt = str( parsed_pkt )
1417 # Assertions
1418 verify_packet( self, pkt, out_port )
1419 verify_no_packet( self, pkt, in_port )
1420 verify_no_other_packets( self )
1421 delete_all_flows( self.controller )
1422 delete_groups( self.controller, Groups )
1423 delete_groups( self.controller, Groups2 )
1424 delete_all_groups( self.controller )
1425 finally:
1426 delete_all_flows( self.controller )
1427 delete_groups( self.controller, Groups )
1428 delete_groups( self.controller, Groups2 )
1429 delete_all_groups( self.controller )
1430
1431class Untagged2PWTermination( base_tests.SimpleDataPlane ):
1432 """
1433 This is meant to test the PW Termination. The traffic
1434 arrives untagged to the MPLS-TP CE device and goes out
1435 without the outer ethernet header and untagged
1436 but was originally tagged.
1437 """
1438 def runTest( self ):
1439
1440 Groups = Queue.LifoQueue( )
1441 Groups2 = Queue.LifoQueue( )
1442 try:
1443 if len( config[ "port_map" ] ) < 1:
1444 logging.info( "Port count less than 1, can't run this case" )
1445 assert (False)
1446 return
1447 ports = config[ "port_map" ].keys( )
1448 for pair in itertools.product(ports, ports):
1449 # we generate all possible products
1450 in_port = pair[0]
1451 out_port = pair[1]
1452 if out_port == in_port:
1453 continue
1454 # we fill the pipeline for the pw initiation
1455 # on the reverse path
1456 (
1457 port_to_mpls_label_2,
1458 port_to_mpls_label_1,
1459 port_to_mpls_label_pw,
1460 port_to_in_vlan_3,
1461 port_to_in_vlan_2,
1462 port_to_in_vlan_1,
1463 port_to_src_mac_str,
1464 port_to_dst_mac_str,
1465 Groups ) = fill_pw_initiation_pipeline(
1466 controller=self.controller,
1467 logging=logging,
1468 in_port=out_port,
1469 out_port=in_port,
1470 ingress_tags=0,
1471 egress_tag=EGRESS_TAGGED,
1472 mpls_labels=1
1473 )
1474 # we fill the pipeline for the pw termination
1475 (
1476 port_to_mpls_label_pw,
1477 port_to_in_vlan_2,
1478 port_to_in_vlan_1,
1479 port_to_dst_mac_str,
1480 Groups2 ) = fill_pw_termination_pipeline(
1481 controller=self.controller,
1482 logging=logging,
1483 in_port=in_port,
1484 out_port=out_port,
1485 egress_tags=0
1486 )
1487 # we generate the pw packet
1488 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1489 cw = (0, 0, 0, 0)
1490 parsed_pkt = pw_packet(
1491 pktlen=104,
1492 out_eth_dst=port_to_dst_mac_str[in_port],
1493 label=[label_pw],
1494 cw=cw,
1495 out_dl_vlan_enable=True,
1496 out_vlan_vid=port_to_in_vlan_1[out_port],
1497 )
1498 pkt = str( parsed_pkt )
1499 self.dataplane.send( in_port, pkt )
1500 # we generate the expected tcp packet
1501 parsed_pkt = simple_tcp_packet(
1502 pktlen=78,
1503 )
1504 pkt = str( parsed_pkt )
1505 # Assertions
1506 verify_packet( self, pkt, out_port )
1507 verify_no_packet( self, pkt, in_port )
1508 verify_no_other_packets( self )
1509 delete_all_flows( self.controller )
1510 delete_groups( self.controller, Groups )
1511 delete_groups( self.controller, Groups2 )
1512 delete_all_groups( self.controller )
1513 finally:
1514 delete_all_flows( self.controller )
1515 delete_groups( self.controller, Groups )
1516 delete_groups( self.controller, Groups2 )
1517 delete_all_groups( self.controller )
1518
1519class TaggedPWTermination( base_tests.SimpleDataPlane ):
1520 """
1521 This is meant to test the PW Termination. The traffic
1522 arrives untagged to the MPLS-TP CE device and goes out
1523 without the outer ethernet header and with a vlan tag.
1524 """
1525 def runTest( self ):
1526
1527 Groups = Queue.LifoQueue( )
1528 Groups2 = Queue.LifoQueue( )
1529 try:
1530 if len( config[ "port_map" ] ) < 1:
1531 logging.info( "Port count less than 1, can't run this case" )
1532 assert (False)
1533 return
1534 ports = config[ "port_map" ].keys( )
1535 for pair in itertools.product(ports, ports):
1536 # we generate all possible products
1537 in_port = pair[0]
1538 out_port = pair[1]
1539 if out_port == in_port:
1540 continue
1541 # we fill the pipeline for the pw initiation
1542 # on the reverse path
1543 (
1544 port_to_mpls_label_2,
1545 port_to_mpls_label_1,
1546 port_to_mpls_label_pw,
1547 port_to_in_vlan_3,
1548 port_to_in_vlan_2,
1549 port_to_in_vlan_1,
1550 port_to_src_mac_str,
1551 port_to_dst_mac_str,
1552 Groups ) = fill_pw_initiation_pipeline(
1553 controller=self.controller,
1554 logging=logging,
1555 in_port=out_port,
1556 out_port=in_port,
1557 ingress_tags=1,
1558 egress_tag=EGRESS_TAGGED,
1559 mpls_labels=1
1560 )
1561 # we fill the pipeline for the pw termination
1562 (
1563 port_to_mpls_label_pw,
1564 port_to_in_vlan_2,
1565 port_to_in_vlan_1,
1566 port_to_dst_mac_str,
1567 Groups2 ) = fill_pw_termination_pipeline(
1568 controller=self.controller,
1569 logging=logging,
1570 in_port=in_port,
1571 out_port=out_port,
1572 egress_tags=1
1573 )
1574 # we generate the pw packet
1575 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1576 cw = (0, 0, 0, 0)
1577 parsed_pkt = pw_packet(
1578 pktlen=104,
1579 out_eth_dst=port_to_dst_mac_str[in_port],
1580 label=[label_pw],
1581 cw=cw,
1582 out_dl_vlan_enable=True,
1583 out_vlan_vid=port_to_in_vlan_1[out_port],
1584 )
1585 pkt = str( parsed_pkt )
1586 self.dataplane.send( in_port, pkt )
1587 # we generate the expected tcp packet
1588 # with a vlan tag
1589 parsed_pkt = simple_tcp_packet(
1590 pktlen=82,
1591 dl_vlan_enable=True,
1592 vlan_vid=port_to_in_vlan_1[out_port]
1593 )
1594 pkt = str( parsed_pkt )
1595 # Assertions
1596 verify_packet( self, pkt, out_port )
1597 verify_no_packet( self, pkt, in_port )
1598 verify_no_other_packets( self )
1599 delete_all_flows( self.controller )
1600 delete_groups( self.controller, Groups )
1601 delete_groups( self.controller, Groups2 )
1602 delete_all_groups( self.controller )
1603 finally:
1604 delete_all_flows( self.controller )
1605 delete_groups( self.controller, Groups )
1606 delete_groups( self.controller, Groups2 )
1607 delete_all_groups( self.controller )
1608
1609class DoubleTaggedPWTermination( base_tests.SimpleDataPlane ):
1610 """
1611 This is meant to test the PW Termination. The traffic
1612 arrives untagged to the MPLS-TP CE device and goes out
1613 without the outer ethernet header and 2 vlan tags.
1614 """
1615 def runTest( self ):
1616
1617 Groups = Queue.LifoQueue( )
1618 Groups2 = Queue.LifoQueue( )
1619 try:
1620 if len( config[ "port_map" ] ) < 1:
1621 logging.info( "Port count less than 1, can't run this case" )
1622 assert (False)
1623 return
1624 ports = config[ "port_map" ].keys( )
1625 for pair in itertools.product(ports, ports):
1626 # we generate all possible products
1627 in_port = pair[0]
1628 out_port = pair[1]
1629 if out_port == in_port:
1630 continue
1631 # we fill the pipeline for the pw initiation
1632 # on the reverse path
1633 (
1634 port_to_mpls_label_2,
1635 port_to_mpls_label_1,
1636 port_to_mpls_label_pw,
1637 port_to_in_vlan_3,
1638 port_to_in_vlan_2,
1639 port_to_in_vlan_1,
1640 port_to_src_mac_str,
1641 port_to_dst_mac_str,
1642 Groups ) = fill_pw_initiation_pipeline(
1643 controller=self.controller,
1644 logging=logging,
1645 in_port=out_port,
1646 out_port=in_port,
1647 ingress_tags=2,
1648 egress_tag = EGRESS_TAGGED,
1649 mpls_labels=1
1650 )
1651 # we fill the pipeline for the pw termination
1652 (
1653 port_to_mpls_label_pw,
1654 port_to_in_vlan_2,
1655 port_to_in_vlan_1,
1656 port_to_dst_mac_str,
1657 Groups2 ) = fill_pw_termination_pipeline(
1658 controller=self.controller,
1659 logging=logging,
1660 in_port=in_port,
1661 out_port=out_port,
1662 egress_tags=2
1663 )
1664 # we generate the pw packet
1665 label_pw = (port_to_mpls_label_pw[out_port], 0, 1, 63)
1666 cw = (0, 0, 0, 0)
1667 parsed_pkt = pw_packet(
1668 pktlen=104,
1669 out_eth_dst=port_to_dst_mac_str[in_port],
1670 label=[label_pw],
1671 cw=cw,
1672 out_dl_vlan_enable=True,
1673 out_vlan_vid=port_to_in_vlan_2[out_port],
1674 in_dl_vlan_enable=True,
1675 in_vlan_vid=port_to_in_vlan_1[out_port]
1676 )
1677 pkt = str( parsed_pkt )
1678 self.dataplane.send( in_port, pkt )
1679 # we generate the expected tcp
1680 # packet with two vlan tags
1681 parsed_pkt = simple_tcp_packet_two_vlan(
1682 pktlen=82,
1683 out_dl_vlan_enable=True,
1684 out_vlan_vid=port_to_in_vlan_2[out_port],
1685 in_dl_vlan_enable=True,
1686 in_vlan_vid=port_to_in_vlan_1[out_port]
1687 )
1688 pkt = str( parsed_pkt )
1689 # Assertions
1690 verify_packet( self, pkt, out_port )
1691 verify_no_packet( self, pkt, in_port )
1692 verify_no_other_packets( self )
1693 delete_all_flows( self.controller )
1694 delete_groups( self.controller, Groups )
1695 delete_groups( self.controller, Groups2 )
1696 delete_all_groups( self.controller )
1697 finally:
1698 delete_all_flows( self.controller )
1699 delete_groups( self.controller, Groups )
1700 delete_groups( self.controller, Groups2 )
1701 delete_all_groups( self.controller )