blob: 544d462e9d50fc34640b67b805df9655ff5a59e0 [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
Zsolt Haraszti89a27302016-12-08 16:53:06 -080025
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070026import os
27import socket
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028import struct
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -080029import uuid
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070030from pcapy import BPFProgram
31from threading import Thread, Condition
32
33import fcntl
34
35import select
36import structlog
Zsolt Haraszti89a27302016-12-08 16:53:06 -080037import sys
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070038
Zsolt Haraszti89a27302016-12-08 16:53:06 -080039from scapy.data import ETH_P_ALL
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070040from twisted.internet import reactor
Zsolt Haraszti89a27302016-12-08 16:53:06 -080041from zope.interface import implementer
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070042
Zsolt Haraszti89a27302016-12-08 16:53:06 -080043from voltha.registry import IComponent
44
45if sys.platform.startswith('linux'):
46 from common.frameio.third_party.oftest import afpacket, netutils
47elif sys.platform == 'darwin':
Zsolt Haraszti348d1932016-12-10 01:10:07 -080048 from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070049
50log = structlog.get_logger()
51
52
53def hexify(buffer):
54 """
55 Return a hexadecimal string encoding of input buffer
56 """
Zsolt Harasztie3ece3c2016-12-19 23:32:38 -080057 return ''.join('%02x' % ord(c) for c in buffer)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070058
59
60class _SelectWakerDescriptor(object):
61 """
62 A descriptor that can be mixed into a select loop to wake it up.
63 """
64 def __init__(self):
65 self.pipe_read, self.pipe_write = os.pipe()
66 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
67
68 def __del__(self):
69 os.close(self.pipe_read)
70 os.close(self.pipe_write)
71
72 def fileno(self):
73 return self.pipe_read
74
75 def wait(self):
76 os.read(self.pipe_read, 1)
77
78 def notify(self):
79 """Trigger a select loop"""
80 os.write(self.pipe_write, '\x00')
81
82
83class BpfProgramFilter(object):
84 """
85 Convenience packet filter based on the well-tried Berkeley Packet Filter,
86 used by many well known open source tools such as pcap and tcpdump.
87 """
88 def __init__(self, program_string):
89 """
90 Create a filter using the BPF command syntax. To learn more,
91 consult 'man pcap-filter'.
92 :param program_string: The textual definition of the filter. Examples:
93 'vlan 1000'
94 'vlan 1000 and ip src host 10.10.10.10'
95 """
96 self.bpf = BPFProgram(program_string)
97
98 def __call__(self, frame):
99 """
100 Return 1 if frame passes filter.
101 :param frame: Raw frame provided as Python string
102 :return: 1 if frame satisfies filter, 0 otherwise.
103 """
104 return self.bpf.filter(frame)
105
106
107class FrameIOPort(object):
108 """
109 Represents a network interface which we can send/receive raw
110 Ethernet frames.
111 """
112
113 RCV_SIZE_DEFAULT = 4096
114 ETH_P_ALL = 0x03
115 RCV_TIMEOUT = 10000
116
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800117 def __init__(self, iface_name):
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700118 self.iface_name = iface_name
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800119 self.proxies = []
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800120 self.socket = self.open_socket(self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700121 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700122 self.received = 0
123 self.discarded = 0
124
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800125 def add_proxy(self, proxy):
126 self.proxies.append(proxy)
127
128 def del_proxy(self, proxy):
129 self.proxies = [p for p in self.proxies if p.name != proxy.name]
130
131 def open_socket(self, iface_name):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800132 raise NotImplementedError('to be implemented by derived class')
133
134 def rcv_frame(self):
135 raise NotImplementedError('to be implemented by derived class')
136
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700137 def __del__(self):
138 if self.socket:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700139 self.socket.close()
140 self.socket = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800141 log.debug('socket-closed', iface=self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700142
143 def fileno(self):
144 return self.socket.fileno()
145
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800146 def _dispatch(self, proxy, frame):
147 log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
Zsolt Haraszti80175202016-12-24 00:17:51 -0800148 try:
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800149 proxy.callback(proxy, frame)
150 except Exception as e:
Zsolt Haraszti80175202016-12-24 00:17:51 -0800151 log.exception('callback-error',
152 explanation='Callback failed while processing frame',
153 e=e)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700154
155 def recv(self):
156 """Called on the select thread when a packet arrives"""
157 try:
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800158 frame = self.rcv_frame()
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800159 except RuntimeError as e:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700160 # we observed this happens sometimes right after the socket was
161 # attached to a newly created veth interface. So we log it, but
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800162 # allow to continue.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700163 log.warn('afpacket-recv-error', code=-1)
164 return
165
166 log.debug('frame-received', iface=self.iface_name, len=len(frame),
167 hex=hexify(frame))
168 self.received +=1
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800169 dispatched = False
170 for proxy in self.proxies:
171 if proxy.filter is None or proxy.filter(frame):
172 log.debug('frame-dispatched')
173 dispatched = True
174 reactor.callFromThread(self._dispatch, proxy, frame)
175
176 if not dispatched:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700177 self.discarded += 1
178 log.debug('frame-discarded')
179
180 def send(self, frame):
181 log.debug('sending', len=len(frame), iface=self.iface_name)
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800182 sent_bytes = self.send_frame(frame)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700183 if sent_bytes != len(frame):
184 log.error('send-error', iface=self.iface_name,
185 wanted_to_send=len(frame), actually_sent=sent_bytes)
186 return sent_bytes
187
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800188 def send_frame(self, frame):
189 return self.socket.send(frame)
190
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700191 def up(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800192 if sys.platform.startswith('darwin'):
193 pass
194 else:
195 os.system('ip link set {} up'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700196 return self
197
198 def down(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800199 if sys.platform.startswith('darwin'):
200 pass
201 else:
202 os.system('ip link set {} down'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700203 return self
204
205 def statistics(self):
206 return self.received, self.discarded
207
208
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800209class LinuxFrameIOPort(FrameIOPort):
210
211 def open_socket(self, iface_name):
212 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
213 afpacket.enable_auxdata(s)
214 s.bind((self.iface_name, self.ETH_P_ALL))
215 netutils.set_promisc(s, iface_name)
216 s.settimeout(self.RCV_TIMEOUT)
217 return s
218
219 def rcv_frame(self):
220 return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
221
222
223class DarwinFrameIOPort(FrameIOPort):
224
225 def open_socket(self, iface_name):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800226 sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800227 try:
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800228 fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800229 except:
230 pass
231
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800232 # need a different kind of socket for sending out
233 self.sout = dnet.eth(iface_name)
234
235 return sin
236
237 def send_frame(self, frame):
238 return self.sout.send(frame)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800239
240 def rcv_frame(self):
241 pkt = self.socket.next()
242 if pkt is not None:
243 ts, pkt = pkt
244 return pkt
245
246
247if sys.platform == 'darwin':
248 _FrameIOPort = DarwinFrameIOPort
249elif sys.platform.startswith('linux'):
250 _FrameIOPort = LinuxFrameIOPort
251else:
252 raise Exception('Unsupported platform {}'.format(sys.platform))
253 sys.exit(1)
254
255
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800256class FrameIOPortProxy(object):
257 """Makes FrameIOPort sharable between multiple users"""
258
259 def __init__(self, frame_io_port, callback, filter=None, name=None):
260 self.frame_io_port = frame_io_port
261 self.callback = callback
262 self.filter = filter
263 self.name = uuid.uuid4().hex[:12] if name is None else name
264
265 @property
266 def iface_name(self):
267 return self.frame_io_port.iface_name
268
269 def get_iface_name(self):
270 return self.frame_io_port.iface_name
271
272 def send(self, frame):
273 return self.frame_io_port.send(frame)
274
275 def up(self):
276 self.frame_io_port.up()
277 return self
278
279 def down(self):
280 self.frame_io_port.down()
281 return self
282
283
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800284@implementer(IComponent)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700285class FrameIOManager(Thread):
286 """
287 Packet/Frame IO manager that can be used to send/receive raw frames
288 on a set of network interfaces.
289 """
290 def __init__(self):
291 super(FrameIOManager, self).__init__()
292
293 self.ports = {} # iface_name -> ActiveFrameReceiver
294 self.queue = {} # iface_name -> TODO
295
296 self.cvar = Condition()
297 self.waker = _SelectWakerDescriptor()
298 self.stopped = False
299 self.ports_changed = False
300
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800301 # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700302
303 def start(self):
304 """
305 Start the IO manager and its select loop thread
306 """
307 log.debug('starting')
308 super(FrameIOManager, self).start()
309 log.info('started')
310 return self
311
312 def stop(self):
313 """
314 Stop the IO manager and its thread with the select loop
315 """
316 log.debug('stopping')
317 self.stopped = True
318 self.waker.notify()
319 self.join()
320 del self.ports
321 log.info('stopped')
322
323 def list_interfaces(self):
324 """
325 Return list of interfaces listened on
326 :return: List of FrameIOPort objects
327 """
328 return self.ports
329
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800330 def open_port(self, iface_name, callback, filter=None, name=None):
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700331 """
332 Add a new interface and start receiving on it.
333 :param iface_name: Name of the interface. Must be an existing Unix
334 interface (eth0, en0, etc.)
335 :param callback: Called on each received frame;
336 signature: def callback(port, frame) where port is the FrameIOPort
337 instance at which the frame was received, frame is the actual frame
338 received (as binay string)
339 :param filter: An optional filter (predicate), with signature:
340 def filter(frame). If provided, only frames for which filter evaluates
341 to True will be forwarded to callback.
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800342 :return: FrmaeIOPortProxy instance.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700343 """
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700344
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800345 port = self.ports.get(iface_name)
346 if port is None:
347 port = _FrameIOPort(iface_name)
348 self.ports[iface_name] = port
349 self.ports_changed = True
350 self.waker.notify()
351
352 proxy = FrameIOPortProxy(port, callback, filter, name)
353 port.add_proxy(proxy)
354
355 return proxy
356
357 def close_port(self, proxy):
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700358 """
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800359 Remove the proxy. If this is the last proxy on an interface, stop and
360 remove the named interface as well
361 :param proxy: FrameIOPortProxy reference
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700362 :return: None
363 """
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800364 assert isinstance(proxy, FrameIOPortProxy)
365 iface_name = proxy.get_iface_name()
366 assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700367 port = self.ports[iface_name]
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800368 port.del_proxy(proxy)
369
370 if not port.proxies:
371 del self.ports[iface_name]
372 # need to exit select loop to reconstruct select fd lists
373 self.ports_changed = True
374 self.waker.notify()
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700375
376 def send(self, iface_name, frame):
377 """
378 Send frame on given interface
379 :param iface_name: Name of previously registered interface
380 :param frame: frame as string
381 :return: number of bytes sent
382 """
383 return self.ports[iface_name].send(frame)
384
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800385 # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700386
387 def run(self):
388 """
389 Called on the alien thread, this is the core multi-port receive loop
390 """
391
392 log.debug('select-loop-started')
393
394 # outer loop constructs sockets list for select
395 while not self.stopped:
396 sockets = [self.waker] + self.ports.values()
397 self.ports_changed = False
398 empty = []
399 # inner select loop
400
401 while not self.stopped:
402 try:
403 _in, _out, _err = select.select(sockets, empty, empty, 1)
Zsolt Haraszti3e6f0892017-01-19 11:51:40 -0800404 except Exception as e:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700405 log.exception('frame-io-select-error', e=e)
406 break
407 with self.cvar:
408 for port in _in:
409 if port is self.waker:
410 self.waker.wait()
411 continue
412 else:
413 port.recv()
414 self.cvar.notify_all()
415 if self.ports_changed:
416 break # break inner loop so we reconstruct sockets list
417
418 log.debug('select-loop-exited')
Rouzbahan Rashidi-Tabrizic35866b2017-02-23 14:57:58 -0500419
420 def del_interface(self, iface_name):
421 """
422 Delete interface for stopping
423 """
424
425 log.info('Delete interface')
426 del self.ports[iface_name]
427 log.info('Interface(port) is deleted')