blob: daa7deb187379d0468dd7b6638fa8170290e6043 [file] [log] [blame]
Zsolt Haraszti91350eb2016-11-05 15:33:53 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A module that can send and receive raw ethernet frames on a set of interfaces
19and it can manage a set of vlan interfaces on top of existing
20interfaces. Due to reliance on raw sockets, this module requires
21root access. Also, raw sockets are hard to deal with in Twisted (not
22directly supported) we need to run the receiver select loop on a dedicated
23thread.
24"""
25import os
26import socket
27from pcapy import BPFProgram
28from threading import Thread, Condition
29
30import fcntl
31
32import select
33import structlog
34
35from twisted.internet import reactor
36
37from common.frameio.third_party.oftest import afpacket, netutils
38
39log = structlog.get_logger()
40
41
42def hexify(buffer):
43 """
44 Return a hexadecimal string encoding of input buffer
45 """
46 return ' '.join('%02x' % ord(c) for c in buffer)
47
48
49class _SelectWakerDescriptor(object):
50 """
51 A descriptor that can be mixed into a select loop to wake it up.
52 """
53 def __init__(self):
54 self.pipe_read, self.pipe_write = os.pipe()
55 fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
56
57 def __del__(self):
58 os.close(self.pipe_read)
59 os.close(self.pipe_write)
60
61 def fileno(self):
62 return self.pipe_read
63
64 def wait(self):
65 os.read(self.pipe_read, 1)
66
67 def notify(self):
68 """Trigger a select loop"""
69 os.write(self.pipe_write, '\x00')
70
71
72class BpfProgramFilter(object):
73 """
74 Convenience packet filter based on the well-tried Berkeley Packet Filter,
75 used by many well known open source tools such as pcap and tcpdump.
76 """
77 def __init__(self, program_string):
78 """
79 Create a filter using the BPF command syntax. To learn more,
80 consult 'man pcap-filter'.
81 :param program_string: The textual definition of the filter. Examples:
82 'vlan 1000'
83 'vlan 1000 and ip src host 10.10.10.10'
84 """
85 self.bpf = BPFProgram(program_string)
86
87 def __call__(self, frame):
88 """
89 Return 1 if frame passes filter.
90 :param frame: Raw frame provided as Python string
91 :return: 1 if frame satisfies filter, 0 otherwise.
92 """
93 return self.bpf.filter(frame)
94
95
96class FrameIOPort(object):
97 """
98 Represents a network interface which we can send/receive raw
99 Ethernet frames.
100 """
101
102 RCV_SIZE_DEFAULT = 4096
103 ETH_P_ALL = 0x03
104 RCV_TIMEOUT = 10000
105
106 def __init__(self, iface_name, callback, filter=None):
107 self.iface_name = iface_name
108 self.callback = callback
109 self.filter = filter
110 self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
111 afpacket.enable_auxdata(self.socket)
112 self.socket.bind((self.iface_name, self.ETH_P_ALL))
113 netutils.set_promisc(self.socket, self.iface_name)
114 self.socket.settimeout(self.RCV_TIMEOUT)
115 log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
116
117 self.received = 0
118 self.discarded = 0
119
120 def __del__(self):
121 if self.socket:
122 fn = self.fileno()
123 self.socket.close()
124 self.socket = None
125 log.debug('socket-closed', fn=fn, iface=self.iface_name)
126
127 def fileno(self):
128 return self.socket.fileno()
129
130 def _dispatch(self, frame):
131 log.debug('calling-publisher', frame=hexify(frame))
132 self.callback(self, frame)
133
134 def recv(self):
135 """Called on the select thread when a packet arrives"""
136 try:
137 frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
138 except RuntimeError, e:
139 # we observed this happens sometimes right after the socket was
140 # attached to a newly created veth interface. So we log it, but
141 # allow to continue
142 log.warn('afpacket-recv-error', code=-1)
143 return
144
145 log.debug('frame-received', iface=self.iface_name, len=len(frame),
146 hex=hexify(frame))
147 self.received +=1
148 if self.filter is None or self.filter(frame):
149 log.debug('frame-dispatched')
150 reactor.callFromThread(self._dispatch, frame)
151 else:
152 self.discarded += 1
153 log.debug('frame-discarded')
154
155 def send(self, frame):
156 log.debug('sending', len=len(frame), iface=self.iface_name)
157 sent_bytes = self.socket.send(frame)
158 if sent_bytes != len(frame):
159 log.error('send-error', iface=self.iface_name,
160 wanted_to_send=len(frame), actually_sent=sent_bytes)
161 return sent_bytes
162
163 def up(self):
164 os.system('ip link set {} up'.format(self.iface_name))
165 return self
166
167 def down(self):
168 os.system('ip link set {] down'.format(self.iface_name))
169 return self
170
171 def statistics(self):
172 return self.received, self.discarded
173
174
175class FrameIOManager(Thread):
176 """
177 Packet/Frame IO manager that can be used to send/receive raw frames
178 on a set of network interfaces.
179 """
180 def __init__(self):
181 super(FrameIOManager, self).__init__()
182
183 self.ports = {} # iface_name -> ActiveFrameReceiver
184 self.queue = {} # iface_name -> TODO
185
186 self.cvar = Condition()
187 self.waker = _SelectWakerDescriptor()
188 self.stopped = False
189 self.ports_changed = False
190
191 #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
192
193 def start(self):
194 """
195 Start the IO manager and its select loop thread
196 """
197 log.debug('starting')
198 super(FrameIOManager, self).start()
199 log.info('started')
200 return self
201
202 def stop(self):
203 """
204 Stop the IO manager and its thread with the select loop
205 """
206 log.debug('stopping')
207 self.stopped = True
208 self.waker.notify()
209 self.join()
210 del self.ports
211 log.info('stopped')
212
213 def list_interfaces(self):
214 """
215 Return list of interfaces listened on
216 :return: List of FrameIOPort objects
217 """
218 return self.ports
219
220 def add_interface(self, iface_name, callback, filter=None):
221 """
222 Add a new interface and start receiving on it.
223 :param iface_name: Name of the interface. Must be an existing Unix
224 interface (eth0, en0, etc.)
225 :param callback: Called on each received frame;
226 signature: def callback(port, frame) where port is the FrameIOPort
227 instance at which the frame was received, frame is the actual frame
228 received (as binay string)
229 :param filter: An optional filter (predicate), with signature:
230 def filter(frame). If provided, only frames for which filter evaluates
231 to True will be forwarded to callback.
232 :return: FrmaeIOPort instance.
233 """
234 """Add a new interface"""
235 assert iface_name not in self.ports
236 port = FrameIOPort(iface_name, callback, filter)
237 self.ports[iface_name] = port
238 # need to exit select loop to reconstruct select fd lists
239 self.ports_changed = True
240 self.waker.notify()
241 return port
242
243 def del_interface(self, iface_name):
244 """
245 Stop and remove named interface
246 :param iface_name: Name of previoysly registered interface
247 :return: None
248 """
249 assert iface_name in self.ports
250 port = self.ports[iface_name]
251 port.stop()
252 del self.ports[iface_name]
253 # need to exit select loop to reconstruct select fd lists
254 self.ports_changed = True
255 self.waker.notify()
256
257 def send(self, iface_name, frame):
258 """
259 Send frame on given interface
260 :param iface_name: Name of previously registered interface
261 :param frame: frame as string
262 :return: number of bytes sent
263 """
264 return self.ports[iface_name].send(frame)
265
266 #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
267
268 def run(self):
269 """
270 Called on the alien thread, this is the core multi-port receive loop
271 """
272
273 log.debug('select-loop-started')
274
275 # outer loop constructs sockets list for select
276 while not self.stopped:
277 sockets = [self.waker] + self.ports.values()
278 self.ports_changed = False
279 empty = []
280 # inner select loop
281
282 while not self.stopped:
283 try:
284 _in, _out, _err = select.select(sockets, empty, empty, 1)
285 except Exception, e:
286 log.exception('frame-io-select-error', e=e)
287 break
288 with self.cvar:
289 for port in _in:
290 if port is self.waker:
291 self.waker.wait()
292 continue
293 else:
294 port.recv()
295 self.cvar.notify_all()
296 if self.ports_changed:
297 break # break inner loop so we reconstruct sockets list
298
299 log.debug('select-loop-exited')