blob: b3863d97f7bf81fd706e7e822c5b0375f3ac72fe [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#!/usr/bin/env python
2#
3# Copyright 2016 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""
19Run this test inside a docker container using the following syntax:
20
21docker run -ti --rm -v $(pwd):/voltha --privileged cord/voltha-base \
Khen Nursimulu96bb5322016-11-09 20:16:03 -080022 env PYTHONPATH=/voltha python \
23 /voltha/tests/itests/frameio_tests/run_as_root/test_frameio.py
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070024
25"""
26
27import os
28import random
29from time import sleep
30
31from scapy.layers.inet import IP
32from scapy.layers.l2 import Ether, Dot1Q
33from twisted.internet import reactor
34from twisted.internet.defer import Deferred, inlineCallbacks
35from twisted.internet.error import AlreadyCalled
36from twisted.trial.unittest import TestCase
37
38from common.frameio.frameio import FrameIOManager, BpfProgramFilter
39from common.utils.asleep import asleep
40from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
41
42ident = lambda frame: frame
43none = lambda *args, **kw: None
44
45
46class TestFrameIO(TestCase):
47
48 @inlineCallbacks
49 def make_veth_pairs_if_needed(self):
50
51 def has_iface(iface):
52 return os.system('ip link show {}'.format(iface)) == 0
53
54 def make_veth(iface):
55 os.system('ip link add type veth')
56 os.system('ip link set {} up'.format(iface))
57 peer = iface[:len('veth')] + str(int(iface[len('veth'):]) + 1)
58 os.system('ip link set {} up'.format(peer))
59 assert has_iface(iface)
60
61 for iface_number in (0, 2):
62 iface = 'veth{}'.format(iface_number)
63 if not has_iface(iface):
64 make_veth(iface)
65 yield asleep(2)
66
67 @inlineCallbacks
68 def setUp(self):
69 yield self.make_veth_pairs_if_needed()
70 self.mgr = FrameIOManager().start()
71
72 def tearDown(self):
73 self.mgr.stop()
74
75 @inlineCallbacks
76 def test_packet_send_receive(self):
77 rcvd = DeferredWithTimeout()
78 p0 = self.mgr.add_interface('veth0', none).up()
79 p1 = self.mgr.add_interface('veth1',
80 lambda p, f: rcvd.callback((p, f))).up()
81
82 # sending to veth0 should result in receiving on veth1 and vice versa
83 bogus_frame = 'bogus packet'
84 p0.send(bogus_frame)
85
86 # check that we receved packet
87 port, frame = yield rcvd
88 self.assertEqual(port, p1)
89 self.assertEqual(frame, bogus_frame)
90
91
92 @inlineCallbacks
93 def test_packet_send_receive_with_filter(self):
94 rcvd = DeferredWithTimeout()
95
96 filter = BpfProgramFilter('ip dst host 123.123.123.123')
97 p0 = self.mgr.add_interface('veth0', none).up()
98 p1 = self.mgr.add_interface('veth1',
99 lambda p, f: rcvd.callback((p, f)),
100 filter=filter).up()
101
102 # sending bogus packet would not be received
103 ip_packet = str(Ether()/IP(dst='123.123.123.123'))
104 p0.send(ip_packet)
105
106 # check that we receved packet
107 port, frame = yield rcvd
108 self.assertEqual(port, p1)
109 self.assertEqual(frame, ip_packet)
110
111
112 @inlineCallbacks
113 def test_packet_send_drop_with_filter(self):
114 rcvd = DeferredWithTimeout()
115
116 filter = BpfProgramFilter('ip dst host 123.123.123.123')
117 p0 = self.mgr.add_interface('veth0', none).up()
118 self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
119 filter=filter).up()
120
121 # sending bogus packet would not be received
122 p0.send('bogus packet')
123
124 try:
125 _ = yield rcvd
126 except TimeOutError:
127 pass
128 else:
129 self.fail('not timed out')
130
131
132 @inlineCallbacks
133 def test_concurrent_packet_send_receive(self):
134
135 done = Deferred()
136 queue1 = []
137 queue2 = []
138
139 n = 100
140
141 def append(queue):
142 def _append(_, frame):
143 queue.append(frame)
144 if len(queue1) == n and len(queue2) == n:
145 done.callback(None)
146 return _append
147
148 p1in = self.mgr.add_interface('veth0', none).up()
149 self.mgr.add_interface('veth1', append(queue1)).up()
150 p2in = self.mgr.add_interface('veth2', none).up()
151 self.mgr.add_interface('veth3', append(queue2)).up()
152
153 @inlineCallbacks
154 def send_packets(port, n):
155 for i in xrange(n):
156 port.send(str(i))
157 yield asleep(0.00001 * random.random()) # to interleave
158
159 # sending two concurrent streams
160 send_packets(p1in, n)
161 send_packets(p2in, n)
162
163 # verify that both queue got all packets
164 yield done
165
166 @inlineCallbacks
167 def test_concurrent_packet_send_receive_with_filter(self):
168
169 done = Deferred()
170 queue1 = []
171 queue2 = []
172
173 n = 100
174
175 def append(queue):
176 def _append(_, frame):
177 queue.append(frame)
178 if len(queue1) == n / 2 and len(queue2) == n / 2:
179 done.callback(None)
180 return _append
181
182 filter = BpfProgramFilter('vlan 100')
183 p1in = self.mgr.add_interface('veth0', none).up()
184 self.mgr.add_interface('veth1', append(queue1), filter).up()
185 p2in = self.mgr.add_interface('veth2', none).up()
186 self.mgr.add_interface('veth3', append(queue2), filter).up()
187
188 @inlineCallbacks
189 def send_packets(port, n):
190 for i in xrange(n):
191 # packets have alternating VLAN ids 100 and 101
192 pkt = Ether()/Dot1Q(vlan=100 + i % 2)
193 port.send(str(pkt))
194 yield asleep(0.00001 * random.random()) # to interleave
195
196 # sending two concurrent streams
197 send_packets(p1in, n)
198 send_packets(p2in, n)
199
200 # verify that both queue got all packets
201 yield done
202
203
204if __name__ == '__main__':
205 import unittest
206 unittest.main()