blob: 27aed12d48d1c36e46554f8789df1148cf1b6d97 [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
Zsolt Haraszti89a27302016-12-08 16:53:06 -080025
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070026import os
27import socket
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028import struct
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070029from pcapy import BPFProgram
30from threading import Thread, Condition
31
32import fcntl
33
34import select
35import structlog
Zsolt Haraszti89a27302016-12-08 16:53:06 -080036import sys
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070037
Zsolt Haraszti89a27302016-12-08 16:53:06 -080038from scapy.data import ETH_P_ALL
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070039from twisted.internet import reactor
Zsolt Haraszti89a27302016-12-08 16:53:06 -080040from zope.interface import implementer
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070041
Zsolt Haraszti89a27302016-12-08 16:53:06 -080042from voltha.registry import IComponent
43
44if sys.platform.startswith('linux'):
45 from common.frameio.third_party.oftest import afpacket, netutils
46elif sys.platform == 'darwin':
Zsolt Haraszti348d1932016-12-10 01:10:07 -080047 from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070048
49log = structlog.get_logger()
50
51
52def hexify(buffer):
53 """
54 Return a hexadecimal string encoding of input buffer
55 """
Zsolt Harasztie3ece3c2016-12-19 23:32:38 -080056 return ''.join('%02x' % ord(c) for c in buffer)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070057
58
59class _SelectWakerDescriptor(object):
60 """
61 A descriptor that can be mixed into a select loop to wake it up.
62 """
63 def __init__(self):
64 self.pipe_read, self.pipe_write = os.pipe()
65 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
66
67 def __del__(self):
68 os.close(self.pipe_read)
69 os.close(self.pipe_write)
70
71 def fileno(self):
72 return self.pipe_read
73
74 def wait(self):
75 os.read(self.pipe_read, 1)
76
77 def notify(self):
78 """Trigger a select loop"""
79 os.write(self.pipe_write, '\x00')
80
81
82class BpfProgramFilter(object):
83 """
84 Convenience packet filter based on the well-tried Berkeley Packet Filter,
85 used by many well known open source tools such as pcap and tcpdump.
86 """
87 def __init__(self, program_string):
88 """
89 Create a filter using the BPF command syntax. To learn more,
90 consult 'man pcap-filter'.
91 :param program_string: The textual definition of the filter. Examples:
92 'vlan 1000'
93 'vlan 1000 and ip src host 10.10.10.10'
94 """
95 self.bpf = BPFProgram(program_string)
96
97 def __call__(self, frame):
98 """
99 Return 1 if frame passes filter.
100 :param frame: Raw frame provided as Python string
101 :return: 1 if frame satisfies filter, 0 otherwise.
102 """
103 return self.bpf.filter(frame)
104
105
106class FrameIOPort(object):
107 """
108 Represents a network interface which we can send/receive raw
109 Ethernet frames.
110 """
111
112 RCV_SIZE_DEFAULT = 4096
113 ETH_P_ALL = 0x03
114 RCV_TIMEOUT = 10000
115
116 def __init__(self, iface_name, callback, filter=None):
117 self.iface_name = iface_name
118 self.callback = callback
119 self.filter = filter
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800120 self.socket = self.open_socket(self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700121 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700122 self.received = 0
123 self.discarded = 0
124
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800125 def open_sockets(self, iface_name):
126 raise NotImplementedError('to be implemented by derived class')
127
128 def rcv_frame(self):
129 raise NotImplementedError('to be implemented by derived class')
130
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700131 def __del__(self):
132 if self.socket:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700133 self.socket.close()
134 self.socket = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800135 log.debug('socket-closed', iface=self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700136
137 def fileno(self):
138 return self.socket.fileno()
139
140 def _dispatch(self, frame):
141 log.debug('calling-publisher', frame=hexify(frame))
142 self.callback(self, frame)
143
144 def recv(self):
145 """Called on the select thread when a packet arrives"""
146 try:
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800147 frame = self.rcv_frame()
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700148 except RuntimeError, e:
149 # we observed this happens sometimes right after the socket was
150 # attached to a newly created veth interface. So we log it, but
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800151 # allow to continue.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700152 log.warn('afpacket-recv-error', code=-1)
153 return
154
155 log.debug('frame-received', iface=self.iface_name, len=len(frame),
156 hex=hexify(frame))
157 self.received +=1
158 if self.filter is None or self.filter(frame):
159 log.debug('frame-dispatched')
160 reactor.callFromThread(self._dispatch, frame)
161 else:
162 self.discarded += 1
163 log.debug('frame-discarded')
164
165 def send(self, frame):
166 log.debug('sending', len=len(frame), iface=self.iface_name)
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800167 sent_bytes = self.send_frame(frame)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700168 if sent_bytes != len(frame):
169 log.error('send-error', iface=self.iface_name,
170 wanted_to_send=len(frame), actually_sent=sent_bytes)
171 return sent_bytes
172
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800173 def send_frame(self, frame):
174 return self.socket.send(frame)
175
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700176 def up(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800177 if sys.platform.startswith('darwin'):
178 pass
179 else:
180 os.system('ip link set {} up'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700181 return self
182
183 def down(self):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800184 if sys.platform.startswith('darwin'):
185 pass
186 else:
187 os.system('ip link set {} down'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700188 return self
189
190 def statistics(self):
191 return self.received, self.discarded
192
193
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800194class LinuxFrameIOPort(FrameIOPort):
195
196 def open_socket(self, iface_name):
197 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
198 afpacket.enable_auxdata(s)
199 s.bind((self.iface_name, self.ETH_P_ALL))
200 netutils.set_promisc(s, iface_name)
201 s.settimeout(self.RCV_TIMEOUT)
202 return s
203
204 def rcv_frame(self):
205 return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
206
207
208class DarwinFrameIOPort(FrameIOPort):
209
210 def open_socket(self, iface_name):
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800211 sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800212 try:
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800213 fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800214 except:
215 pass
216
Zsolt Haraszti348d1932016-12-10 01:10:07 -0800217 # need a different kind of socket for sending out
218 self.sout = dnet.eth(iface_name)
219
220 return sin
221
222 def send_frame(self, frame):
223 return self.sout.send(frame)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800224
225 def rcv_frame(self):
226 pkt = self.socket.next()
227 if pkt is not None:
228 ts, pkt = pkt
229 return pkt
230
231
232if sys.platform == 'darwin':
233 _FrameIOPort = DarwinFrameIOPort
234elif sys.platform.startswith('linux'):
235 _FrameIOPort = LinuxFrameIOPort
236else:
237 raise Exception('Unsupported platform {}'.format(sys.platform))
238 sys.exit(1)
239
240
241@implementer(IComponent)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700242class FrameIOManager(Thread):
243 """
244 Packet/Frame IO manager that can be used to send/receive raw frames
245 on a set of network interfaces.
246 """
247 def __init__(self):
248 super(FrameIOManager, self).__init__()
249
250 self.ports = {} # iface_name -> ActiveFrameReceiver
251 self.queue = {} # iface_name -> TODO
252
253 self.cvar = Condition()
254 self.waker = _SelectWakerDescriptor()
255 self.stopped = False
256 self.ports_changed = False
257
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800258 # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700259
260 def start(self):
261 """
262 Start the IO manager and its select loop thread
263 """
264 log.debug('starting')
265 super(FrameIOManager, self).start()
266 log.info('started')
267 return self
268
269 def stop(self):
270 """
271 Stop the IO manager and its thread with the select loop
272 """
273 log.debug('stopping')
274 self.stopped = True
275 self.waker.notify()
276 self.join()
277 del self.ports
278 log.info('stopped')
279
280 def list_interfaces(self):
281 """
282 Return list of interfaces listened on
283 :return: List of FrameIOPort objects
284 """
285 return self.ports
286
287 def add_interface(self, iface_name, callback, filter=None):
288 """
289 Add a new interface and start receiving on it.
290 :param iface_name: Name of the interface. Must be an existing Unix
291 interface (eth0, en0, etc.)
292 :param callback: Called on each received frame;
293 signature: def callback(port, frame) where port is the FrameIOPort
294 instance at which the frame was received, frame is the actual frame
295 received (as binay string)
296 :param filter: An optional filter (predicate), with signature:
297 def filter(frame). If provided, only frames for which filter evaluates
298 to True will be forwarded to callback.
299 :return: FrmaeIOPort instance.
300 """
301 """Add a new interface"""
302 assert iface_name not in self.ports
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800303 port = _FrameIOPort(iface_name, callback, filter)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700304 self.ports[iface_name] = port
305 # need to exit select loop to reconstruct select fd lists
306 self.ports_changed = True
307 self.waker.notify()
308 return port
309
310 def del_interface(self, iface_name):
311 """
312 Stop and remove named interface
313 :param iface_name: Name of previoysly registered interface
314 :return: None
315 """
316 assert iface_name in self.ports
317 port = self.ports[iface_name]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700318 del self.ports[iface_name]
319 # need to exit select loop to reconstruct select fd lists
320 self.ports_changed = True
321 self.waker.notify()
322
323 def send(self, iface_name, frame):
324 """
325 Send frame on given interface
326 :param iface_name: Name of previously registered interface
327 :param frame: frame as string
328 :return: number of bytes sent
329 """
330 return self.ports[iface_name].send(frame)
331
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800332 # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700333
334 def run(self):
335 """
336 Called on the alien thread, this is the core multi-port receive loop
337 """
338
339 log.debug('select-loop-started')
340
341 # outer loop constructs sockets list for select
342 while not self.stopped:
343 sockets = [self.waker] + self.ports.values()
344 self.ports_changed = False
345 empty = []
346 # inner select loop
347
348 while not self.stopped:
349 try:
350 _in, _out, _err = select.select(sockets, empty, empty, 1)
351 except Exception, e:
352 log.exception('frame-io-select-error', e=e)
353 break
354 with self.cvar:
355 for port in _in:
356 if port is self.waker:
357 self.waker.wait()
358 continue
359 else:
360 port.recv()
361 self.cvar.notify_all()
362 if self.ports_changed:
363 break # break inner loop so we reconstruct sockets list
364
365 log.debug('select-loop-exited')