blob: daa7deb187379d0468dd7b6638fa8170290e6043 [file] [log] [blame]
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A module that can send and receive raw ethernet frames on a set of interfaces
and it can manage a set of vlan interfaces on top of existing
interfaces. Due to reliance on raw sockets, this module requires
root access. Also, raw sockets are hard to deal with in Twisted (not
directly supported) we need to run the receiver select loop on a dedicated
thread.
"""
import os
import socket
from pcapy import BPFProgram
from threading import Thread, Condition
import fcntl
import select
import structlog
from twisted.internet import reactor
from common.frameio.third_party.oftest import afpacket, netutils
log = structlog.get_logger()
def hexify(buffer):
"""
Return a hexadecimal string encoding of input buffer
"""
return ' '.join('%02x' % ord(c) for c in buffer)
class _SelectWakerDescriptor(object):
"""
A descriptor that can be mixed into a select loop to wake it up.
"""
def __init__(self):
self.pipe_read, self.pipe_write = os.pipe()
fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
def __del__(self):
os.close(self.pipe_read)
os.close(self.pipe_write)
def fileno(self):
return self.pipe_read
def wait(self):
os.read(self.pipe_read, 1)
def notify(self):
"""Trigger a select loop"""
os.write(self.pipe_write, '\x00')
class BpfProgramFilter(object):
"""
Convenience packet filter based on the well-tried Berkeley Packet Filter,
used by many well known open source tools such as pcap and tcpdump.
"""
def __init__(self, program_string):
"""
Create a filter using the BPF command syntax. To learn more,
consult 'man pcap-filter'.
:param program_string: The textual definition of the filter. Examples:
'vlan 1000'
'vlan 1000 and ip src host 10.10.10.10'
"""
self.bpf = BPFProgram(program_string)
def __call__(self, frame):
"""
Return 1 if frame passes filter.
:param frame: Raw frame provided as Python string
:return: 1 if frame satisfies filter, 0 otherwise.
"""
return self.bpf.filter(frame)
class FrameIOPort(object):
"""
Represents a network interface which we can send/receive raw
Ethernet frames.
"""
RCV_SIZE_DEFAULT = 4096
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
def __init__(self, iface_name, callback, filter=None):
self.iface_name = iface_name
self.callback = callback
self.filter = filter
self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
afpacket.enable_auxdata(self.socket)
self.socket.bind((self.iface_name, self.ETH_P_ALL))
netutils.set_promisc(self.socket, self.iface_name)
self.socket.settimeout(self.RCV_TIMEOUT)
log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
self.received = 0
self.discarded = 0
def __del__(self):
if self.socket:
fn = self.fileno()
self.socket.close()
self.socket = None
log.debug('socket-closed', fn=fn, iface=self.iface_name)
def fileno(self):
return self.socket.fileno()
def _dispatch(self, frame):
log.debug('calling-publisher', frame=hexify(frame))
self.callback(self, frame)
def recv(self):
"""Called on the select thread when a packet arrives"""
try:
frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
except RuntimeError, e:
# we observed this happens sometimes right after the socket was
# attached to a newly created veth interface. So we log it, but
# allow to continue
log.warn('afpacket-recv-error', code=-1)
return
log.debug('frame-received', iface=self.iface_name, len=len(frame),
hex=hexify(frame))
self.received +=1
if self.filter is None or self.filter(frame):
log.debug('frame-dispatched')
reactor.callFromThread(self._dispatch, frame)
else:
self.discarded += 1
log.debug('frame-discarded')
def send(self, frame):
log.debug('sending', len=len(frame), iface=self.iface_name)
sent_bytes = self.socket.send(frame)
if sent_bytes != len(frame):
log.error('send-error', iface=self.iface_name,
wanted_to_send=len(frame), actually_sent=sent_bytes)
return sent_bytes
def up(self):
os.system('ip link set {} up'.format(self.iface_name))
return self
def down(self):
os.system('ip link set {] down'.format(self.iface_name))
return self
def statistics(self):
return self.received, self.discarded
class FrameIOManager(Thread):
"""
Packet/Frame IO manager that can be used to send/receive raw frames
on a set of network interfaces.
"""
def __init__(self):
super(FrameIOManager, self).__init__()
self.ports = {} # iface_name -> ActiveFrameReceiver
self.queue = {} # iface_name -> TODO
self.cvar = Condition()
self.waker = _SelectWakerDescriptor()
self.stopped = False
self.ports_changed = False
#~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
def start(self):
"""
Start the IO manager and its select loop thread
"""
log.debug('starting')
super(FrameIOManager, self).start()
log.info('started')
return self
def stop(self):
"""
Stop the IO manager and its thread with the select loop
"""
log.debug('stopping')
self.stopped = True
self.waker.notify()
self.join()
del self.ports
log.info('stopped')
def list_interfaces(self):
"""
Return list of interfaces listened on
:return: List of FrameIOPort objects
"""
return self.ports
def add_interface(self, iface_name, callback, filter=None):
"""
Add a new interface and start receiving on it.
:param iface_name: Name of the interface. Must be an existing Unix
interface (eth0, en0, etc.)
:param callback: Called on each received frame;
signature: def callback(port, frame) where port is the FrameIOPort
instance at which the frame was received, frame is the actual frame
received (as binay string)
:param filter: An optional filter (predicate), with signature:
def filter(frame). If provided, only frames for which filter evaluates
to True will be forwarded to callback.
:return: FrmaeIOPort instance.
"""
"""Add a new interface"""
assert iface_name not in self.ports
port = FrameIOPort(iface_name, callback, filter)
self.ports[iface_name] = port
# need to exit select loop to reconstruct select fd lists
self.ports_changed = True
self.waker.notify()
return port
def del_interface(self, iface_name):
"""
Stop and remove named interface
:param iface_name: Name of previoysly registered interface
:return: None
"""
assert iface_name in self.ports
port = self.ports[iface_name]
port.stop()
del self.ports[iface_name]
# need to exit select loop to reconstruct select fd lists
self.ports_changed = True
self.waker.notify()
def send(self, iface_name, frame):
"""
Send frame on given interface
:param iface_name: Name of previously registered interface
:param frame: frame as string
:return: number of bytes sent
"""
return self.ports[iface_name].send(frame)
#~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
def run(self):
"""
Called on the alien thread, this is the core multi-port receive loop
"""
log.debug('select-loop-started')
# outer loop constructs sockets list for select
while not self.stopped:
sockets = [self.waker] + self.ports.values()
self.ports_changed = False
empty = []
# inner select loop
while not self.stopped:
try:
_in, _out, _err = select.select(sockets, empty, empty, 1)
except Exception, e:
log.exception('frame-io-select-error', e=e)
break
with self.cvar:
for port in _in:
if port is self.waker:
self.waker.wait()
continue
else:
port.recv()
self.cvar.notify_all()
if self.ports_changed:
break # break inner loop so we reconstruct sockets list
log.debug('select-loop-exited')