blob: 2f68ef86792d8c71f435a51bbc5d3e769158573a [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
25
26import os
27import socket
28import struct
29import uuid
30from pcapy import BPFProgram
31from threading import Thread, Condition
32
33import fcntl
34
35import select
36import structlog
37import sys
38
39from scapy.data import ETH_P_ALL
40from twisted.internet import reactor
41from zope.interface import implementer
42
43from adapters.common.utils.registry import IComponent
44
45if sys.platform.startswith('linux'):
46 from adapters.common.frameio.third_party.oftest import afpacket, netutils
47elif sys.platform == 'darwin':
48 from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
49
50log = structlog.get_logger()
51
52
53def hexify(buffer):
54 """
55 Return a hexadecimal string encoding of input buffer
56 """
57 return ''.join('%02x' % ord(c) for c in buffer)
58
59
60class _SelectWakerDescriptor(object):
61 """
62 A descriptor that can be mixed into a select loop to wake it up.
63 """
64 def __init__(self):
65 self.pipe_read, self.pipe_write = os.pipe()
66 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
67
68 def __del__(self):
69 os.close(self.pipe_read)
70 os.close(self.pipe_write)
71
72 def fileno(self):
73 return self.pipe_read
74
75 def wait(self):
76 os.read(self.pipe_read, 1)
77
78 def notify(self):
79 """Trigger a select loop"""
80 os.write(self.pipe_write, '\x00')
81
82
83class BpfProgramFilter(object):
84 """
85 Convenience packet filter based on the well-tried Berkeley Packet Filter,
86 used by many well known open source tools such as pcap and tcpdump.
87 """
88 def __init__(self, program_string):
89 """
90 Create a filter using the BPF command syntax. To learn more,
91 consult 'man pcap-filter'.
92 :param program_string: The textual definition of the filter. Examples:
93 'vlan 1000'
94 'vlan 1000 and ip src host 10.10.10.10'
95 """
96 self.bpf = BPFProgram(program_string)
97
98 def __call__(self, frame):
99 """
100 Return 1 if frame passes filter.
101 :param frame: Raw frame provided as Python string
102 :return: 1 if frame satisfies filter, 0 otherwise.
103 """
104 return self.bpf.filter(frame)
105
106
107class FrameIOPort(object):
108 """
109 Represents a network interface which we can send/receive raw
110 Ethernet frames.
111 """
112
113 RCV_SIZE_DEFAULT = 4096
114 ETH_P_ALL = 0x03
115 RCV_TIMEOUT = 10000
116 MIN_PKT_SIZE = 60
117
118 def __init__(self, iface_name):
119 self.iface_name = iface_name
120 self.proxies = []
121 self.socket = self.open_socket(self.iface_name)
122 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
123 self.received = 0
124 self.discarded = 0
125
126 def add_proxy(self, proxy):
127 self.proxies.append(proxy)
128
129 def del_proxy(self, proxy):
130 self.proxies = [p for p in self.proxies if p.name != proxy.name]
131
132 def open_socket(self, iface_name):
133 raise NotImplementedError('to be implemented by derived class')
134
135 def rcv_frame(self):
136 raise NotImplementedError('to be implemented by derived class')
137
138 def __del__(self):
139 if self.socket:
140 self.socket.close()
141 self.socket = None
142 log.debug('socket-closed', iface=self.iface_name)
143
144 def fileno(self):
145 return self.socket.fileno()
146
147 def _dispatch(self, proxy, frame):
148 log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
149 try:
150 proxy.callback(proxy, frame)
151 except Exception as e:
152 log.exception('callback-error',
153 explanation='Callback failed while processing frame',
154 e=e)
155
156 def recv(self):
157 """Called on the select thread when a packet arrives"""
158 try:
159 frame = self.rcv_frame()
160 except RuntimeError as e:
161 # we observed this happens sometimes right after the socket was
162 # attached to a newly created veth interface. So we log it, but
163 # allow to continue.
164 log.warn('afpacket-recv-error', code=-1)
165 return
166
167 log.debug('frame-received', iface=self.iface_name, len=len(frame),
168 hex=hexify(frame))
169 self.received +=1
170 dispatched = False
171 for proxy in self.proxies:
172 if proxy.filter is None or proxy.filter(frame):
173 log.debug('frame-dispatched')
174 dispatched = True
175 reactor.callFromThread(self._dispatch, proxy, frame)
176
177 if not dispatched:
178 self.discarded += 1
179 log.debug('frame-discarded')
180
181 def send(self, frame):
182 log.debug('sending', len=len(frame), iface=self.iface_name)
183 sent_bytes = self.send_frame(frame)
184 if sent_bytes != len(frame):
185 log.error('send-error', iface=self.iface_name,
186 wanted_to_send=len(frame), actually_sent=sent_bytes)
187 return sent_bytes
188
189 def send_frame(self, frame):
190 try:
191 return self.socket.send(frame)
192 except socket.error, err:
193 if err[0] == os.errno.EINVAL:
194 if len(frame) < self.MIN_PKT_SIZE:
195 padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
196 frame = frame + padding
197 return self.socket.send(frame)
198 else:
199 raise
200
201 def up(self):
202 if sys.platform.startswith('darwin'):
203 pass
204 else:
205 os.system('ip link set {} up'.format(self.iface_name))
206 return self
207
208 def down(self):
209 if sys.platform.startswith('darwin'):
210 pass
211 else:
212 os.system('ip link set {} down'.format(self.iface_name))
213 return self
214
215 def statistics(self):
216 return self.received, self.discarded
217
218
219class LinuxFrameIOPort(FrameIOPort):
220
221 def open_socket(self, iface_name):
222 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
223 afpacket.enable_auxdata(s)
224 s.bind((self.iface_name, self.ETH_P_ALL))
225 netutils.set_promisc(s, iface_name)
226 s.settimeout(self.RCV_TIMEOUT)
227 return s
228
229 def rcv_frame(self):
230 return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
231
232
233class DarwinFrameIOPort(FrameIOPort):
234
235 def open_socket(self, iface_name):
236 sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
237 try:
238 fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
239 except:
240 pass
241
242 # need a different kind of socket for sending out
243 self.sout = dnet.eth(iface_name)
244
245 return sin
246
247 def send_frame(self, frame):
248 return self.sout.send(frame)
249
250 def rcv_frame(self):
251 pkt = self.socket.next()
252 if pkt is not None:
253 ts, pkt = pkt
254 return pkt
255
256
257if sys.platform == 'darwin':
258 _FrameIOPort = DarwinFrameIOPort
259elif sys.platform.startswith('linux'):
260 _FrameIOPort = LinuxFrameIOPort
261else:
262 raise Exception('Unsupported platform {}'.format(sys.platform))
263 sys.exit(1)
264
265
266class FrameIOPortProxy(object):
267 """Makes FrameIOPort sharable between multiple users"""
268
269 def __init__(self, frame_io_port, callback, filter=None, name=None):
270 self.frame_io_port = frame_io_port
271 self.callback = callback
272 self.filter = filter
273 self.name = uuid.uuid4().hex[:12] if name is None else name
274
275 @property
276 def iface_name(self):
277 return self.frame_io_port.iface_name
278
279 def get_iface_name(self):
280 return self.frame_io_port.iface_name
281
282 def send(self, frame):
283 return self.frame_io_port.send(frame)
284
285 def up(self):
286 self.frame_io_port.up()
287 return self
288
289 def down(self):
290 self.frame_io_port.down()
291 return self
292
293
294@implementer(IComponent)
295class FrameIOManager(Thread):
296 """
297 Packet/Frame IO manager that can be used to send/receive raw frames
298 on a set of network interfaces.
299 """
300 def __init__(self):
301 super(FrameIOManager, self).__init__()
302
303 self.ports = {} # iface_name -> ActiveFrameReceiver
304 self.queue = {} # iface_name -> TODO
305
306 self.cvar = Condition()
307 self.waker = _SelectWakerDescriptor()
308 self.stopped = False
309 self.ports_changed = False
310
311 # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
312
313 def start(self):
314 """
315 Start the IO manager and its select loop thread
316 """
317 log.debug('starting')
318 super(FrameIOManager, self).start()
319 log.info('started')
320 return self
321
322 def stop(self):
323 """
324 Stop the IO manager and its thread with the select loop
325 """
326 log.debug('stopping')
327 self.stopped = True
328 self.waker.notify()
329 self.join()
330 del self.ports
331 log.info('stopped')
332
333 def list_interfaces(self):
334 """
335 Return list of interfaces listened on
336 :return: List of FrameIOPort objects
337 """
338 return self.ports
339
340 def open_port(self, iface_name, callback, filter=None, name=None):
341 """
342 Add a new interface and start receiving on it.
343 :param iface_name: Name of the interface. Must be an existing Unix
344 interface (eth0, en0, etc.)
345 :param callback: Called on each received frame;
346 signature: def callback(port, frame) where port is the FrameIOPort
347 instance at which the frame was received, frame is the actual frame
348 received (as binay string)
349 :param filter: An optional filter (predicate), with signature:
350 def filter(frame). If provided, only frames for which filter evaluates
351 to True will be forwarded to callback.
352 :return: FrmaeIOPortProxy instance.
353 """
354
355 port = self.ports.get(iface_name)
356 if port is None:
357 port = _FrameIOPort(iface_name)
358 self.ports[iface_name] = port
359 self.ports_changed = True
360 self.waker.notify()
361
362 proxy = FrameIOPortProxy(port, callback, filter, name)
363 port.add_proxy(proxy)
364
365 return proxy
366
367 def close_port(self, proxy):
368 """
369 Remove the proxy. If this is the last proxy on an interface, stop and
370 remove the named interface as well
371 :param proxy: FrameIOPortProxy reference
372 :return: None
373 """
374 assert isinstance(proxy, FrameIOPortProxy)
375 iface_name = proxy.get_iface_name()
376 assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
377 port = self.ports[iface_name]
378 port.del_proxy(proxy)
379
380 if not port.proxies:
381 del self.ports[iface_name]
382 # need to exit select loop to reconstruct select fd lists
383 self.ports_changed = True
384 self.waker.notify()
385
386 def send(self, iface_name, frame):
387 """
388 Send frame on given interface
389 :param iface_name: Name of previously registered interface
390 :param frame: frame as string
391 :return: number of bytes sent
392 """
393 return self.ports[iface_name].send(frame)
394
395 # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
396
397 def run(self):
398 """
399 Called on the alien thread, this is the core multi-port receive loop
400 """
401
402 log.debug('select-loop-started')
403
404 # outer loop constructs sockets list for select
405 while not self.stopped:
406 sockets = [self.waker] + self.ports.values()
407 self.ports_changed = False
408 empty = []
409 # inner select loop
410
411 while not self.stopped:
412 try:
413 _in, _out, _err = select.select(sockets, empty, empty, 1)
414 except Exception as e:
415 log.exception('frame-io-select-error', e=e)
416 break
417 with self.cvar:
418 for port in _in:
419 if port is self.waker:
420 self.waker.wait()
421 continue
422 else:
423 port.recv()
424 self.cvar.notify_all()
425 if self.ports_changed:
426 break # break inner loop so we reconstruct sockets list
427
428 log.debug('select-loop-exited')
429
430 def del_interface(self, iface_name):
431 """
432 Delete interface for stopping
433 """
434
435 log.info('Delete interface')
436 del self.ports[iface_name]
437 log.info('Interface(port) is deleted')