blob: 3f5bcf6dd883d6aa793d93fbd98791f408ad7f1a [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
Zsolt Haraszti89a27302016-12-08 16:53:06 -080025
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070026import os
27import socket
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028import struct
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -080029import uuid
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070030from pcapy import BPFProgram
31from threading import Thread, Condition
32
33import fcntl
34
35import select
36import structlog
Zsolt Haraszti89a27302016-12-08 16:53:06 -080037import sys
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070038
Zsolt Haraszti89a27302016-12-08 16:53:06 -080039from scapy.data import ETH_P_ALL
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070040from twisted.internet import reactor
Zsolt Haraszti89a27302016-12-08 16:53:06 -080041from zope.interface import implementer
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070042
Zsolt Haraszti89a27302016-12-08 16:53:06 -080043from voltha.registry import IComponent
44
45if sys.platform.startswith('linux'):
46 from common.frameio.third_party.oftest import afpacket, netutils
47elif sys.platform == 'darwin':
Zsolt Haraszti348d1932016-12-10 01:10:07 -080048 from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070049
50log = structlog.get_logger()
51
52
53def hexify(buffer):
54 """
55 Return a hexadecimal string encoding of input buffer
56 """
Zsolt Harasztie3ece3c2016-12-19 23:32:38 -080057 return ''.join('%02x' % ord(c) for c in buffer)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070058
59
60class _SelectWakerDescriptor(object):
61 """
62 A descriptor that can be mixed into a select loop to wake it up.
63 """
64 def __init__(self):
65 self.pipe_read, self.pipe_write = os.pipe()
66 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
67
68 def __del__(self):
69 os.close(self.pipe_read)
70 os.close(self.pipe_write)
71
72 def fileno(self):
73 return self.pipe_read
74
75 def wait(self):
76 os.read(self.pipe_read, 1)
77
78 def notify(self):
79 """Trigger a select loop"""
80 os.write(self.pipe_write, '\x00')
81
82
83class BpfProgramFilter(object):
84 """
85 Convenience packet filter based on the well-tried Berkeley Packet Filter,
86 used by many well known open source tools such as pcap and tcpdump.
87 """
88 def __init__(self, program_string):
89 """
90 Create a filter using the BPF command syntax. To learn more,
91 consult 'man pcap-filter'.
92 :param program_string: The textual definition of the filter. Examples:
93 'vlan 1000'
94 'vlan 1000 and ip src host 10.10.10.10'
95 """
96 self.bpf = BPFProgram(program_string)
97
98 def __call__(self, frame):
99 """
100 Return 1 if frame passes filter.
101 :param frame: Raw frame provided as Python string
102 :return: 1 if frame satisfies filter, 0 otherwise.
103 """
104 return self.bpf.filter(frame)
105
106
107class FrameIOPort(object):
108 """
109 Represents a network interface which we can send/receive raw
110 Ethernet frames.
111 """
112
113 RCV_SIZE_DEFAULT = 4096
114 ETH_P_ALL = 0x03
115 RCV_TIMEOUT = 10000
khenaidoo9b9b7a32017-11-17 12:38:57 -0500116 MIN_PKT_SIZE = 60
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700117
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800118 def __init__(self, iface_name):
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700119 self.iface_name = iface_name
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800120 self.proxies = []
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800121 self.socket = self.open_socket(self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700122 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700123 self.received = 0
124 self.discarded = 0
125
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800126 def add_proxy(self, proxy):
127 self.proxies.append(proxy)
128
129 def del_proxy(self, proxy):
130 self.proxies = [p for p in self.proxies if p.name != proxy.name]
131
132 def open_socket(self, iface_name):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800133 raise NotImplementedError('to be implemented by derived class')
134
135 def rcv_frame(self):
136 raise NotImplementedError('to be implemented by derived class')
137
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700138 def __del__(self):
139 if self.socket:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700140 self.socket.close()
141 self.socket = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800142 log.debug('socket-closed', iface=self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700143
144 def fileno(self):
145 return self.socket.fileno()
146
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800147 def _dispatch(self, proxy, frame):
148 log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
Zsolt Haraszti80175202016-12-24 00:17:51 -0800149 try:
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800150 proxy.callback(proxy, frame)
151 except Exception as e:
Zsolt Haraszti80175202016-12-24 00:17:51 -0800152 log.exception('callback-error',
153 explanation='Callback failed while processing frame',
154 e=e)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700155
156 def recv(self):
157 """Called on the select thread when a packet arrives"""
158 try:
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800159 frame = self.rcv_frame()
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800160 except RuntimeError as e:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700161 # we observed this happens sometimes right after the socket was
162 # attached to a newly created veth interface. So we log it, but
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800163 # allow to continue.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700164 log.warn('afpacket-recv-error', code=-1)
165 return
166
167 log.debug('frame-received', iface=self.iface_name, len=len(frame),
168 hex=hexify(frame))
169 self.received +=1
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800170 dispatched = False
171 for proxy in self.proxies:
172 if proxy.filter is None or proxy.filter(frame):
173 log.debug('frame-dispatched')
174 dispatched = True
175 reactor.callFromThread(self._dispatch, proxy, frame)
176
177 if not dispatched:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700178 self.discarded += 1
179 log.debug('frame-discarded')
180
181 def send(self, frame):
182 log.debug('sending', len=len(frame), iface=self.iface_name)
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800183 sent_bytes = self.send_frame(frame)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700184 if sent_bytes != len(frame):
185 log.error('send-error', iface=self.iface_name,
186 wanted_to_send=len(frame), actually_sent=sent_bytes)
187 return sent_bytes
188
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800189 def send_frame(self, frame):
khenaidoo9b9b7a32017-11-17 12:38:57 -0500190 try:
191 return self.socket.send(frame)
192 except socket.error, err:
193 if err[0] == os.errno.EINVAL:
194 if len(frame) < self.MIN_PKT_SIZE:
195 padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
196 frame = frame + padding
197 return self.socket.send(frame)
198 else:
199 raise
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800200
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700201 def up(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800202 if sys.platform.startswith('darwin'):
203 pass
204 else:
205 os.system('ip link set {} up'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700206 return self
207
208 def down(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800209 if sys.platform.startswith('darwin'):
210 pass
211 else:
212 os.system('ip link set {} down'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700213 return self
214
215 def statistics(self):
216 return self.received, self.discarded
217
218
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800219class LinuxFrameIOPort(FrameIOPort):
220
221 def open_socket(self, iface_name):
222 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
223 afpacket.enable_auxdata(s)
224 s.bind((self.iface_name, self.ETH_P_ALL))
225 netutils.set_promisc(s, iface_name)
226 s.settimeout(self.RCV_TIMEOUT)
227 return s
228
229 def rcv_frame(self):
230 return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
231
232
233class DarwinFrameIOPort(FrameIOPort):
234
235 def open_socket(self, iface_name):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800236 sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800237 try:
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800238 fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800239 except:
240 pass
241
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800242 # need a different kind of socket for sending out
243 self.sout = dnet.eth(iface_name)
244
245 return sin
246
247 def send_frame(self, frame):
248 return self.sout.send(frame)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800249
250 def rcv_frame(self):
251 pkt = self.socket.next()
252 if pkt is not None:
253 ts, pkt = pkt
254 return pkt
255
256
257if sys.platform == 'darwin':
258 _FrameIOPort = DarwinFrameIOPort
259elif sys.platform.startswith('linux'):
260 _FrameIOPort = LinuxFrameIOPort
261else:
262 raise Exception('Unsupported platform {}'.format(sys.platform))
263 sys.exit(1)
264
265
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800266class FrameIOPortProxy(object):
267 """Makes FrameIOPort sharable between multiple users"""
268
269 def __init__(self, frame_io_port, callback, filter=None, name=None):
270 self.frame_io_port = frame_io_port
271 self.callback = callback
272 self.filter = filter
273 self.name = uuid.uuid4().hex[:12] if name is None else name
274
275 @property
276 def iface_name(self):
277 return self.frame_io_port.iface_name
278
279 def get_iface_name(self):
280 return self.frame_io_port.iface_name
281
282 def send(self, frame):
283 return self.frame_io_port.send(frame)
284
285 def up(self):
286 self.frame_io_port.up()
287 return self
288
289 def down(self):
290 self.frame_io_port.down()
291 return self
292
293
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800294@implementer(IComponent)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700295class FrameIOManager(Thread):
296 """
297 Packet/Frame IO manager that can be used to send/receive raw frames
298 on a set of network interfaces.
299 """
300 def __init__(self):
301 super(FrameIOManager, self).__init__()
302
303 self.ports = {} # iface_name -> ActiveFrameReceiver
304 self.queue = {} # iface_name -> TODO
305
306 self.cvar = Condition()
307 self.waker = _SelectWakerDescriptor()
308 self.stopped = False
309 self.ports_changed = False
310
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800311 # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700312
313 def start(self):
314 """
315 Start the IO manager and its select loop thread
316 """
317 log.debug('starting')
318 super(FrameIOManager, self).start()
319 log.info('started')
320 return self
321
322 def stop(self):
323 """
324 Stop the IO manager and its thread with the select loop
325 """
326 log.debug('stopping')
327 self.stopped = True
328 self.waker.notify()
329 self.join()
330 del self.ports
331 log.info('stopped')
332
333 def list_interfaces(self):
334 """
335 Return list of interfaces listened on
336 :return: List of FrameIOPort objects
337 """
338 return self.ports
339
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800340 def open_port(self, iface_name, callback, filter=None, name=None):
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700341 """
342 Add a new interface and start receiving on it.
343 :param iface_name: Name of the interface. Must be an existing Unix
344 interface (eth0, en0, etc.)
345 :param callback: Called on each received frame;
346 signature: def callback(port, frame) where port is the FrameIOPort
347 instance at which the frame was received, frame is the actual frame
348 received (as binay string)
349 :param filter: An optional filter (predicate), with signature:
350 def filter(frame). If provided, only frames for which filter evaluates
351 to True will be forwarded to callback.
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800352 :return: FrmaeIOPortProxy instance.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700353 """
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700354
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800355 port = self.ports.get(iface_name)
356 if port is None:
357 port = _FrameIOPort(iface_name)
358 self.ports[iface_name] = port
359 self.ports_changed = True
360 self.waker.notify()
361
362 proxy = FrameIOPortProxy(port, callback, filter, name)
363 port.add_proxy(proxy)
364
365 return proxy
366
367 def close_port(self, proxy):
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700368 """
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800369 Remove the proxy. If this is the last proxy on an interface, stop and
370 remove the named interface as well
371 :param proxy: FrameIOPortProxy reference
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700372 :return: None
373 """
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800374 assert isinstance(proxy, FrameIOPortProxy)
375 iface_name = proxy.get_iface_name()
376 assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700377 port = self.ports[iface_name]
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800378 port.del_proxy(proxy)
379
380 if not port.proxies:
381 del self.ports[iface_name]
382 # need to exit select loop to reconstruct select fd lists
383 self.ports_changed = True
384 self.waker.notify()
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700385
386 def send(self, iface_name, frame):
387 """
388 Send frame on given interface
389 :param iface_name: Name of previously registered interface
390 :param frame: frame as string
391 :return: number of bytes sent
392 """
393 return self.ports[iface_name].send(frame)
394
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800395 # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700396
397 def run(self):
398 """
399 Called on the alien thread, this is the core multi-port receive loop
400 """
401
402 log.debug('select-loop-started')
403
404 # outer loop constructs sockets list for select
405 while not self.stopped:
406 sockets = [self.waker] + self.ports.values()
407 self.ports_changed = False
408 empty = []
409 # inner select loop
410
411 while not self.stopped:
412 try:
413 _in, _out, _err = select.select(sockets, empty, empty, 1)
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800414 except Exception as e:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700415 log.exception('frame-io-select-error', e=e)
416 break
417 with self.cvar:
418 for port in _in:
419 if port is self.waker:
420 self.waker.wait()
421 continue
422 else:
423 port.recv()
424 self.cvar.notify_all()
425 if self.ports_changed:
426 break # break inner loop so we reconstruct sockets list
427
428 log.debug('select-loop-exited')
Rouzbahan Rashidi-Tabrizic35866b2017-02-23 14:57:58 -0500429
430 def del_interface(self, iface_name):
431 """
432 Delete interface for stopping
433 """
434
435 log.info('Delete interface')
436 del self.ports[iface_name]
437 log.info('Interface(port) is deleted')