blob: c367a9d9dd6a65d0805a2f5453b5c0f494c14f35 [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#!/usr/bin/env python
2#
3# Copyright 2016 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""
19Run this test inside a docker container using the following syntax:
20
21docker run -ti --rm -v $(pwd):/voltha --privileged cord/voltha-base \
22 env PYTHONPATH=/voltha python /voltha/tests/itests/test_frameio.py
23
24"""
25
26import os
27import random
28from time import sleep
29
30from scapy.layers.inet import IP
31from scapy.layers.l2 import Ether, Dot1Q
32from twisted.internet import reactor
33from twisted.internet.defer import Deferred, inlineCallbacks
34from twisted.internet.error import AlreadyCalled
35from twisted.trial.unittest import TestCase
36
37from common.frameio.frameio import FrameIOManager, BpfProgramFilter
38from common.utils.asleep import asleep
39from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
40
41ident = lambda frame: frame
42none = lambda *args, **kw: None
43
44
45class TestFrameIO(TestCase):
46
47 @inlineCallbacks
48 def make_veth_pairs_if_needed(self):
49
50 def has_iface(iface):
51 return os.system('ip link show {}'.format(iface)) == 0
52
53 def make_veth(iface):
54 os.system('ip link add type veth')
55 os.system('ip link set {} up'.format(iface))
56 peer = iface[:len('veth')] + str(int(iface[len('veth'):]) + 1)
57 os.system('ip link set {} up'.format(peer))
58 assert has_iface(iface)
59
60 for iface_number in (0, 2):
61 iface = 'veth{}'.format(iface_number)
62 if not has_iface(iface):
63 make_veth(iface)
64 yield asleep(2)
65
66 @inlineCallbacks
67 def setUp(self):
68 yield self.make_veth_pairs_if_needed()
69 self.mgr = FrameIOManager().start()
70
71 def tearDown(self):
72 self.mgr.stop()
73
74 @inlineCallbacks
75 def test_packet_send_receive(self):
76 rcvd = DeferredWithTimeout()
77 p0 = self.mgr.add_interface('veth0', none).up()
78 p1 = self.mgr.add_interface('veth1',
79 lambda p, f: rcvd.callback((p, f))).up()
80
81 # sending to veth0 should result in receiving on veth1 and vice versa
82 bogus_frame = 'bogus packet'
83 p0.send(bogus_frame)
84
85 # check that we receved packet
86 port, frame = yield rcvd
87 self.assertEqual(port, p1)
88 self.assertEqual(frame, bogus_frame)
89
90
91 @inlineCallbacks
92 def test_packet_send_receive_with_filter(self):
93 rcvd = DeferredWithTimeout()
94
95 filter = BpfProgramFilter('ip dst host 123.123.123.123')
96 p0 = self.mgr.add_interface('veth0', none).up()
97 p1 = self.mgr.add_interface('veth1',
98 lambda p, f: rcvd.callback((p, f)),
99 filter=filter).up()
100
101 # sending bogus packet would not be received
102 ip_packet = str(Ether()/IP(dst='123.123.123.123'))
103 p0.send(ip_packet)
104
105 # check that we receved packet
106 port, frame = yield rcvd
107 self.assertEqual(port, p1)
108 self.assertEqual(frame, ip_packet)
109
110
111 @inlineCallbacks
112 def test_packet_send_drop_with_filter(self):
113 rcvd = DeferredWithTimeout()
114
115 filter = BpfProgramFilter('ip dst host 123.123.123.123')
116 p0 = self.mgr.add_interface('veth0', none).up()
117 self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
118 filter=filter).up()
119
120 # sending bogus packet would not be received
121 p0.send('bogus packet')
122
123 try:
124 _ = yield rcvd
125 except TimeOutError:
126 pass
127 else:
128 self.fail('not timed out')
129
130
131 @inlineCallbacks
132 def test_concurrent_packet_send_receive(self):
133
134 done = Deferred()
135 queue1 = []
136 queue2 = []
137
138 n = 100
139
140 def append(queue):
141 def _append(_, frame):
142 queue.append(frame)
143 if len(queue1) == n and len(queue2) == n:
144 done.callback(None)
145 return _append
146
147 p1in = self.mgr.add_interface('veth0', none).up()
148 self.mgr.add_interface('veth1', append(queue1)).up()
149 p2in = self.mgr.add_interface('veth2', none).up()
150 self.mgr.add_interface('veth3', append(queue2)).up()
151
152 @inlineCallbacks
153 def send_packets(port, n):
154 for i in xrange(n):
155 port.send(str(i))
156 yield asleep(0.00001 * random.random()) # to interleave
157
158 # sending two concurrent streams
159 send_packets(p1in, n)
160 send_packets(p2in, n)
161
162 # verify that both queue got all packets
163 yield done
164
165 @inlineCallbacks
166 def test_concurrent_packet_send_receive_with_filter(self):
167
168 done = Deferred()
169 queue1 = []
170 queue2 = []
171
172 n = 100
173
174 def append(queue):
175 def _append(_, frame):
176 queue.append(frame)
177 if len(queue1) == n / 2 and len(queue2) == n / 2:
178 done.callback(None)
179 return _append
180
181 filter = BpfProgramFilter('vlan 100')
182 p1in = self.mgr.add_interface('veth0', none).up()
183 self.mgr.add_interface('veth1', append(queue1), filter).up()
184 p2in = self.mgr.add_interface('veth2', none).up()
185 self.mgr.add_interface('veth3', append(queue2), filter).up()
186
187 @inlineCallbacks
188 def send_packets(port, n):
189 for i in xrange(n):
190 # packets have alternating VLAN ids 100 and 101
191 pkt = Ether()/Dot1Q(vlan=100 + i % 2)
192 port.send(str(pkt))
193 yield asleep(0.00001 * random.random()) # to interleave
194
195 # sending two concurrent streams
196 send_packets(p1in, n)
197 send_packets(p2in, n)
198
199 # verify that both queue got all packets
200 yield done
201
202
203if __name__ == '__main__':
204 import unittest
205 unittest.main()