blob: a0633f69d750043ed601ec2777fb85b7fd0d0af3 [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
Zsolt Haraszti89a27302016-12-08 16:53:06 -080025
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070026import os
27import socket
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028import struct
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070029from pcapy import BPFProgram
30from threading import Thread, Condition
31
32import fcntl
33
34import select
35import structlog
Zsolt Haraszti89a27302016-12-08 16:53:06 -080036import sys
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070037
Zsolt Haraszti89a27302016-12-08 16:53:06 -080038from scapy.data import ETH_P_ALL
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070039from twisted.internet import reactor
Zsolt Haraszti89a27302016-12-08 16:53:06 -080040from zope.interface import implementer
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070041
Zsolt Haraszti89a27302016-12-08 16:53:06 -080042from voltha.registry import IComponent
43
44if sys.platform.startswith('linux'):
45 from common.frameio.third_party.oftest import afpacket, netutils
46elif sys.platform == 'darwin':
Zsolt Haraszti348d1932016-12-10 01:10:07 -080047 from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070048
49log = structlog.get_logger()
50
51
52def hexify(buffer):
53 """
54 Return a hexadecimal string encoding of input buffer
55 """
Zsolt Harasztie3ece3c2016-12-19 23:32:38 -080056 return ''.join('%02x' % ord(c) for c in buffer)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070057
58
59class _SelectWakerDescriptor(object):
60 """
61 A descriptor that can be mixed into a select loop to wake it up.
62 """
63 def __init__(self):
64 self.pipe_read, self.pipe_write = os.pipe()
65 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
66
67 def __del__(self):
68 os.close(self.pipe_read)
69 os.close(self.pipe_write)
70
71 def fileno(self):
72 return self.pipe_read
73
74 def wait(self):
75 os.read(self.pipe_read, 1)
76
77 def notify(self):
78 """Trigger a select loop"""
79 os.write(self.pipe_write, '\x00')
80
81
82class BpfProgramFilter(object):
83 """
84 Convenience packet filter based on the well-tried Berkeley Packet Filter,
85 used by many well known open source tools such as pcap and tcpdump.
86 """
87 def __init__(self, program_string):
88 """
89 Create a filter using the BPF command syntax. To learn more,
90 consult 'man pcap-filter'.
91 :param program_string: The textual definition of the filter. Examples:
92 'vlan 1000'
93 'vlan 1000 and ip src host 10.10.10.10'
94 """
95 self.bpf = BPFProgram(program_string)
96
97 def __call__(self, frame):
98 """
99 Return 1 if frame passes filter.
100 :param frame: Raw frame provided as Python string
101 :return: 1 if frame satisfies filter, 0 otherwise.
102 """
103 return self.bpf.filter(frame)
104
105
106class FrameIOPort(object):
107 """
108 Represents a network interface which we can send/receive raw
109 Ethernet frames.
110 """
111
112 RCV_SIZE_DEFAULT = 4096
113 ETH_P_ALL = 0x03
114 RCV_TIMEOUT = 10000
115
116 def __init__(self, iface_name, callback, filter=None):
117 self.iface_name = iface_name
118 self.callback = callback
119 self.filter = filter
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800120 self.socket = self.open_socket(self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700121 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700122 self.received = 0
123 self.discarded = 0
124
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800125 def open_sockets(self, iface_name):
126 raise NotImplementedError('to be implemented by derived class')
127
128 def rcv_frame(self):
129 raise NotImplementedError('to be implemented by derived class')
130
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700131 def __del__(self):
132 if self.socket:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700133 self.socket.close()
134 self.socket = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800135 log.debug('socket-closed', iface=self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700136
137 def fileno(self):
138 return self.socket.fileno()
139
140 def _dispatch(self, frame):
141 log.debug('calling-publisher', frame=hexify(frame))
Zsolt Haraszti80175202016-12-24 00:17:51 -0800142 try:
143 self.callback(self, frame)
144 except Exception, e:
145 log.exception('callback-error',
146 explanation='Callback failed while processing frame',
147 e=e)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700148
149 def recv(self):
150 """Called on the select thread when a packet arrives"""
151 try:
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800152 frame = self.rcv_frame()
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700153 except RuntimeError, e:
154 # we observed this happens sometimes right after the socket was
155 # attached to a newly created veth interface. So we log it, but
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800156 # allow to continue.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700157 log.warn('afpacket-recv-error', code=-1)
158 return
159
160 log.debug('frame-received', iface=self.iface_name, len=len(frame),
161 hex=hexify(frame))
162 self.received +=1
163 if self.filter is None or self.filter(frame):
164 log.debug('frame-dispatched')
165 reactor.callFromThread(self._dispatch, frame)
166 else:
167 self.discarded += 1
168 log.debug('frame-discarded')
169
170 def send(self, frame):
171 log.debug('sending', len=len(frame), iface=self.iface_name)
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800172 sent_bytes = self.send_frame(frame)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700173 if sent_bytes != len(frame):
174 log.error('send-error', iface=self.iface_name,
175 wanted_to_send=len(frame), actually_sent=sent_bytes)
176 return sent_bytes
177
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800178 def send_frame(self, frame):
179 return self.socket.send(frame)
180
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700181 def up(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800182 if sys.platform.startswith('darwin'):
183 pass
184 else:
185 os.system('ip link set {} up'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700186 return self
187
188 def down(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800189 if sys.platform.startswith('darwin'):
190 pass
191 else:
192 os.system('ip link set {} down'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700193 return self
194
195 def statistics(self):
196 return self.received, self.discarded
197
198
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800199class LinuxFrameIOPort(FrameIOPort):
200
201 def open_socket(self, iface_name):
202 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
203 afpacket.enable_auxdata(s)
204 s.bind((self.iface_name, self.ETH_P_ALL))
205 netutils.set_promisc(s, iface_name)
206 s.settimeout(self.RCV_TIMEOUT)
207 return s
208
209 def rcv_frame(self):
210 return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
211
212
213class DarwinFrameIOPort(FrameIOPort):
214
215 def open_socket(self, iface_name):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800216 sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800217 try:
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800218 fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800219 except:
220 pass
221
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800222 # need a different kind of socket for sending out
223 self.sout = dnet.eth(iface_name)
224
225 return sin
226
227 def send_frame(self, frame):
228 return self.sout.send(frame)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800229
230 def rcv_frame(self):
231 pkt = self.socket.next()
232 if pkt is not None:
233 ts, pkt = pkt
234 return pkt
235
236
237if sys.platform == 'darwin':
238 _FrameIOPort = DarwinFrameIOPort
239elif sys.platform.startswith('linux'):
240 _FrameIOPort = LinuxFrameIOPort
241else:
242 raise Exception('Unsupported platform {}'.format(sys.platform))
243 sys.exit(1)
244
245
246@implementer(IComponent)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700247class FrameIOManager(Thread):
248 """
249 Packet/Frame IO manager that can be used to send/receive raw frames
250 on a set of network interfaces.
251 """
252 def __init__(self):
253 super(FrameIOManager, self).__init__()
254
255 self.ports = {} # iface_name -> ActiveFrameReceiver
256 self.queue = {} # iface_name -> TODO
257
258 self.cvar = Condition()
259 self.waker = _SelectWakerDescriptor()
260 self.stopped = False
261 self.ports_changed = False
262
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800263 # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700264
265 def start(self):
266 """
267 Start the IO manager and its select loop thread
268 """
269 log.debug('starting')
270 super(FrameIOManager, self).start()
271 log.info('started')
272 return self
273
274 def stop(self):
275 """
276 Stop the IO manager and its thread with the select loop
277 """
278 log.debug('stopping')
279 self.stopped = True
280 self.waker.notify()
281 self.join()
282 del self.ports
283 log.info('stopped')
284
285 def list_interfaces(self):
286 """
287 Return list of interfaces listened on
288 :return: List of FrameIOPort objects
289 """
290 return self.ports
291
292 def add_interface(self, iface_name, callback, filter=None):
293 """
294 Add a new interface and start receiving on it.
295 :param iface_name: Name of the interface. Must be an existing Unix
296 interface (eth0, en0, etc.)
297 :param callback: Called on each received frame;
298 signature: def callback(port, frame) where port is the FrameIOPort
299 instance at which the frame was received, frame is the actual frame
300 received (as binay string)
301 :param filter: An optional filter (predicate), with signature:
302 def filter(frame). If provided, only frames for which filter evaluates
303 to True will be forwarded to callback.
304 :return: FrmaeIOPort instance.
305 """
306 """Add a new interface"""
307 assert iface_name not in self.ports
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800308 port = _FrameIOPort(iface_name, callback, filter)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700309 self.ports[iface_name] = port
310 # need to exit select loop to reconstruct select fd lists
311 self.ports_changed = True
312 self.waker.notify()
313 return port
314
315 def del_interface(self, iface_name):
316 """
317 Stop and remove named interface
318 :param iface_name: Name of previoysly registered interface
319 :return: None
320 """
321 assert iface_name in self.ports
322 port = self.ports[iface_name]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700323 del self.ports[iface_name]
324 # need to exit select loop to reconstruct select fd lists
325 self.ports_changed = True
326 self.waker.notify()
327
328 def send(self, iface_name, frame):
329 """
330 Send frame on given interface
331 :param iface_name: Name of previously registered interface
332 :param frame: frame as string
333 :return: number of bytes sent
334 """
335 return self.ports[iface_name].send(frame)
336
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800337 # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700338
339 def run(self):
340 """
341 Called on the alien thread, this is the core multi-port receive loop
342 """
343
344 log.debug('select-loop-started')
345
346 # outer loop constructs sockets list for select
347 while not self.stopped:
348 sockets = [self.waker] + self.ports.values()
349 self.ports_changed = False
350 empty = []
351 # inner select loop
352
353 while not self.stopped:
354 try:
355 _in, _out, _err = select.select(sockets, empty, empty, 1)
356 except Exception, e:
357 log.exception('frame-io-select-error', e=e)
358 break
359 with self.cvar:
360 for port in _in:
361 if port is self.waker:
362 self.waker.wait()
363 continue
364 else:
365 port.recv()
366 self.cvar.notify_all()
367 if self.ports_changed:
368 break # break inner loop so we reconstruct sockets list
369
370 log.debug('select-loop-exited')