blob: 1d576a8cdca412d053c61a734608a288c64a0af4 [file] [log] [blame]
Zack Williams41513bf2018-07-07 20:08:35 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
Nathan Knuth418fdc82016-09-16 22:51:15 -070014import logging
15from oftest.testutils import *
16import oftest.base_tests as base_tests
17import ofp
18from scapy.all import Ether
19from hexdump import hexdump
20
21
22in_out_port = test_param_get("in_out_port", "ep1")
23
24
25class PacketOutTest(base_tests.SimpleDataPlane):
26 """
27 Test that a packet sent out by the controller is forwarded to the in-out
28 test port with the out_port encoded as a Dot1Q shim vlan_id
29 """
30
31 def runTest(self):
32 logging.info("Running %s" % self.__class__.__name__)
33
34 # These cleanups are not really needed. We do it to verify connection
35 # to the agent.
36 delete_all_flows(self.controller)
37 delete_all_groups(self.controller)
38
39 # Send packet out and capture it at the in-out port
40 pktlen = 60
41 pkt_kws = dict(
42 eth_src='de:ad:be:ef:00:01',
43 eth_dst='de:ad:be:ef:00:02',
44 ip_src='192.168.0.1',
45 ip_dst='192.168.0.2'
46 )
47
48 pkt = simple_udp_packet(pktlen=pktlen, **pkt_kws)
49 expected_pkt = simple_udp_packet(pktlen + 4, dl_vlan_enable=True,
50 vlan_vid=1, **pkt_kws)
51
52 msg = ofp.message.packet_out(
53 in_port=ofp.OFPP_CONTROLLER,
54 actions=[ofp.action.output(port=in_out_port)],
55 buffer_id=ofp.OFP_NO_BUFFER,
56 data=str(pkt)
57 )
58 self.controller.message_send(msg)
59 verify_no_errors(self.controller)
60
61 # now verify that we received the correct packet with proper vlan tag
62 verify_packet(self, str(expected_pkt), in_out_port)
63
64
65class PacketInTest(base_tests.SimpleDataPlane):
66 """
67 Test that a packet arriving at the in-out test port is forwarded to
68 the controller as a packet-in message with in_port being the vlan_id
69 of the outer Dot1Q shim (which is popped before the packet is sent in)
70 """
71
72 def runTest(self):
73 logging.info("Running %s" % self.__class__.__name__)
74
75 # These cleanups are not really needed. We do it to verify connection
76 # to the agent.
77 delete_all_flows(self.controller)
78 delete_all_groups(self.controller)
79
80 # Send packet out and capture it at the in-out port
81 pktlen = 60
82 pkt_kws = dict(
83 eth_src='de:ad:be:ef:00:01',
84 eth_dst='de:ad:be:ef:00:02',
85 ip_src='192.168.0.1',
86 ip_dst='192.168.0.2'
87 )
88
89 pkt = simple_udp_packet(pktlen + 4, dl_vlan_enable=True,
90 vlan_vid=1, **pkt_kws)
91 expected_pkt = simple_udp_packet(pktlen=pktlen, **pkt_kws)
92
93 # send a test packet into the in_out_port
94 self.dataplane.send(in_out_port, str(pkt))
95
96 # expect it to become a packet_in
97 verify_packet_in(self, str(expected_pkt), 1, ofp.OFPR_ACTION)