blob: 0a222add1c7aca893fe7eded9d4ca99c28754d86 [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
Zsolt Haraszti89a27302016-12-08 16:53:06 -080025
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070026import os
27import socket
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028import struct
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070029from pcapy import BPFProgram
30from threading import Thread, Condition
31
32import fcntl
33
34import select
35import structlog
Zsolt Haraszti89a27302016-12-08 16:53:06 -080036import sys
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070037
Zsolt Haraszti89a27302016-12-08 16:53:06 -080038from scapy.data import ETH_P_ALL
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070039from twisted.internet import reactor
Zsolt Haraszti89a27302016-12-08 16:53:06 -080040from zope.interface import implementer
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070041
Zsolt Haraszti89a27302016-12-08 16:53:06 -080042from voltha.registry import IComponent
43
44if sys.platform.startswith('linux'):
45 from common.frameio.third_party.oftest import afpacket, netutils
46elif sys.platform == 'darwin':
47 from scapy.arch import pcapdnet, BIOCIMMEDIATE
Zsolt Haraszti91350eb2016-11-05 15:33:53 -070048
49log = structlog.get_logger()
50
51
52def hexify(buffer):
53 """
54 Return a hexadecimal string encoding of input buffer
55 """
56 return ' '.join('%02x' % ord(c) for c in buffer)
57
58
59class _SelectWakerDescriptor(object):
60 """
61 A descriptor that can be mixed into a select loop to wake it up.
62 """
63 def __init__(self):
64 self.pipe_read, self.pipe_write = os.pipe()
65 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
66
67 def __del__(self):
68 os.close(self.pipe_read)
69 os.close(self.pipe_write)
70
71 def fileno(self):
72 return self.pipe_read
73
74 def wait(self):
75 os.read(self.pipe_read, 1)
76
77 def notify(self):
78 """Trigger a select loop"""
79 os.write(self.pipe_write, '\x00')
80
81
82class BpfProgramFilter(object):
83 """
84 Convenience packet filter based on the well-tried Berkeley Packet Filter,
85 used by many well known open source tools such as pcap and tcpdump.
86 """
87 def __init__(self, program_string):
88 """
89 Create a filter using the BPF command syntax. To learn more,
90 consult 'man pcap-filter'.
91 :param program_string: The textual definition of the filter. Examples:
92 'vlan 1000'
93 'vlan 1000 and ip src host 10.10.10.10'
94 """
95 self.bpf = BPFProgram(program_string)
96
97 def __call__(self, frame):
98 """
99 Return 1 if frame passes filter.
100 :param frame: Raw frame provided as Python string
101 :return: 1 if frame satisfies filter, 0 otherwise.
102 """
103 return self.bpf.filter(frame)
104
105
106class FrameIOPort(object):
107 """
108 Represents a network interface which we can send/receive raw
109 Ethernet frames.
110 """
111
112 RCV_SIZE_DEFAULT = 4096
113 ETH_P_ALL = 0x03
114 RCV_TIMEOUT = 10000
115
116 def __init__(self, iface_name, callback, filter=None):
117 self.iface_name = iface_name
118 self.callback = callback
119 self.filter = filter
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800120 self.socket = self.open_socket(self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700121 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700122 self.received = 0
123 self.discarded = 0
124
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800125 def open_sockets(self, iface_name):
126 raise NotImplementedError('to be implemented by derived class')
127
128 def rcv_frame(self):
129 raise NotImplementedError('to be implemented by derived class')
130
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700131 def __del__(self):
132 if self.socket:
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700133 self.socket.close()
134 self.socket = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800135 log.debug('socket-closed', iface=self.iface_name)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700136
137 def fileno(self):
138 return self.socket.fileno()
139
140 def _dispatch(self, frame):
141 log.debug('calling-publisher', frame=hexify(frame))
142 self.callback(self, frame)
143
144 def recv(self):
145 """Called on the select thread when a packet arrives"""
146 try:
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800147 frame = self.rcv_frame()
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700148 except RuntimeError, e:
149 # we observed this happens sometimes right after the socket was
150 # attached to a newly created veth interface. So we log it, but
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800151 # allow to continue.
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700152 log.warn('afpacket-recv-error', code=-1)
153 return
154
155 log.debug('frame-received', iface=self.iface_name, len=len(frame),
156 hex=hexify(frame))
157 self.received +=1
158 if self.filter is None or self.filter(frame):
159 log.debug('frame-dispatched')
160 reactor.callFromThread(self._dispatch, frame)
161 else:
162 self.discarded += 1
163 log.debug('frame-discarded')
164
165 def send(self, frame):
166 log.debug('sending', len=len(frame), iface=self.iface_name)
167 sent_bytes = self.socket.send(frame)
168 if sent_bytes != len(frame):
169 log.error('send-error', iface=self.iface_name,
170 wanted_to_send=len(frame), actually_sent=sent_bytes)
171 return sent_bytes
172
173 def up(self):
174 os.system('ip link set {} up'.format(self.iface_name))
175 return self
176
177 def down(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800178 os.system('ip link set {} down'.format(self.iface_name))
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700179 return self
180
181 def statistics(self):
182 return self.received, self.discarded
183
184
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800185class LinuxFrameIOPort(FrameIOPort):
186
187 def open_socket(self, iface_name):
188 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
189 afpacket.enable_auxdata(s)
190 s.bind((self.iface_name, self.ETH_P_ALL))
191 netutils.set_promisc(s, iface_name)
192 s.settimeout(self.RCV_TIMEOUT)
193 return s
194
195 def rcv_frame(self):
196 return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
197
198
199class DarwinFrameIOPort(FrameIOPort):
200
201 def open_socket(self, iface_name):
202 s = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
203 try:
204 fcntl.ioctl(s.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
205 except:
206 pass
207
208 return s
209
210 def rcv_frame(self):
211 pkt = self.socket.next()
212 if pkt is not None:
213 ts, pkt = pkt
214 return pkt
215
216
217if sys.platform == 'darwin':
218 _FrameIOPort = DarwinFrameIOPort
219elif sys.platform.startswith('linux'):
220 _FrameIOPort = LinuxFrameIOPort
221else:
222 raise Exception('Unsupported platform {}'.format(sys.platform))
223 sys.exit(1)
224
225
226@implementer(IComponent)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700227class FrameIOManager(Thread):
228 """
229 Packet/Frame IO manager that can be used to send/receive raw frames
230 on a set of network interfaces.
231 """
232 def __init__(self):
233 super(FrameIOManager, self).__init__()
234
235 self.ports = {} # iface_name -> ActiveFrameReceiver
236 self.queue = {} # iface_name -> TODO
237
238 self.cvar = Condition()
239 self.waker = _SelectWakerDescriptor()
240 self.stopped = False
241 self.ports_changed = False
242
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800243 # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700244
245 def start(self):
246 """
247 Start the IO manager and its select loop thread
248 """
249 log.debug('starting')
250 super(FrameIOManager, self).start()
251 log.info('started')
252 return self
253
254 def stop(self):
255 """
256 Stop the IO manager and its thread with the select loop
257 """
258 log.debug('stopping')
259 self.stopped = True
260 self.waker.notify()
261 self.join()
262 del self.ports
263 log.info('stopped')
264
265 def list_interfaces(self):
266 """
267 Return list of interfaces listened on
268 :return: List of FrameIOPort objects
269 """
270 return self.ports
271
272 def add_interface(self, iface_name, callback, filter=None):
273 """
274 Add a new interface and start receiving on it.
275 :param iface_name: Name of the interface. Must be an existing Unix
276 interface (eth0, en0, etc.)
277 :param callback: Called on each received frame;
278 signature: def callback(port, frame) where port is the FrameIOPort
279 instance at which the frame was received, frame is the actual frame
280 received (as binay string)
281 :param filter: An optional filter (predicate), with signature:
282 def filter(frame). If provided, only frames for which filter evaluates
283 to True will be forwarded to callback.
284 :return: FrmaeIOPort instance.
285 """
286 """Add a new interface"""
287 assert iface_name not in self.ports
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800288 port = _FrameIOPort(iface_name, callback, filter)
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700289 self.ports[iface_name] = port
290 # need to exit select loop to reconstruct select fd lists
291 self.ports_changed = True
292 self.waker.notify()
293 return port
294
295 def del_interface(self, iface_name):
296 """
297 Stop and remove named interface
298 :param iface_name: Name of previoysly registered interface
299 :return: None
300 """
301 assert iface_name in self.ports
302 port = self.ports[iface_name]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700303 del self.ports[iface_name]
304 # need to exit select loop to reconstruct select fd lists
305 self.ports_changed = True
306 self.waker.notify()
307
308 def send(self, iface_name, frame):
309 """
310 Send frame on given interface
311 :param iface_name: Name of previously registered interface
312 :param frame: frame as string
313 :return: number of bytes sent
314 """
315 return self.ports[iface_name].send(frame)
316
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800317 # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
Zsolt Haraszti91350eb2016-11-05 15:33:53 -0700318
319 def run(self):
320 """
321 Called on the alien thread, this is the core multi-port receive loop
322 """
323
324 log.debug('select-loop-started')
325
326 # outer loop constructs sockets list for select
327 while not self.stopped:
328 sockets = [self.waker] + self.ports.values()
329 self.ports_changed = False
330 empty = []
331 # inner select loop
332
333 while not self.stopped:
334 try:
335 _in, _out, _err = select.select(sockets, empty, empty, 1)
336 except Exception, e:
337 log.exception('frame-io-select-error', e=e)
338 break
339 with self.cvar:
340 for port in _in:
341 if port is self.waker:
342 self.waker.wait()
343 continue
344 else:
345 port.recv()
346 self.cvar.notify_all()
347 if self.ports_changed:
348 break # break inner loop so we reconstruct sockets list
349
350 log.debug('select-loop-exited')