Async frame receive/send module
Change-Id: I75b6ecdecf05f2c72b3b0fa1191d849b7d8e67ef
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
new file mode 100644
index 0000000..daa7deb
--- /dev/null
+++ b/common/frameio/frameio.py
@@ -0,0 +1,299 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+import os
+import socket
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+
+from twisted.internet import reactor
+
+from common.frameio.third_party.oftest import afpacket, netutils
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+ """
+ Return a hexadecimal string encoding of input buffer
+ """
+ return ' '.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+ """
+ A descriptor that can be mixed into a select loop to wake it up.
+ """
+ def __init__(self):
+ self.pipe_read, self.pipe_write = os.pipe()
+ fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def __del__(self):
+ os.close(self.pipe_read)
+ os.close(self.pipe_write)
+
+ def fileno(self):
+ return self.pipe_read
+
+ def wait(self):
+ os.read(self.pipe_read, 1)
+
+ def notify(self):
+ """Trigger a select loop"""
+ os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+ """
+ Convenience packet filter based on the well-tried Berkeley Packet Filter,
+ used by many well known open source tools such as pcap and tcpdump.
+ """
+ def __init__(self, program_string):
+ """
+ Create a filter using the BPF command syntax. To learn more,
+ consult 'man pcap-filter'.
+ :param program_string: The textual definition of the filter. Examples:
+ 'vlan 1000'
+ 'vlan 1000 and ip src host 10.10.10.10'
+ """
+ self.bpf = BPFProgram(program_string)
+
+ def __call__(self, frame):
+ """
+ Return 1 if frame passes filter.
+ :param frame: Raw frame provided as Python string
+ :return: 1 if frame satisfies filter, 0 otherwise.
+ """
+ return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+ """
+ Represents a network interface which we can send/receive raw
+ Ethernet frames.
+ """
+
+ RCV_SIZE_DEFAULT = 4096
+ ETH_P_ALL = 0x03
+ RCV_TIMEOUT = 10000
+
+ def __init__(self, iface_name, callback, filter=None):
+ self.iface_name = iface_name
+ self.callback = callback
+ self.filter = filter
+ self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(self.socket)
+ self.socket.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(self.socket, self.iface_name)
+ self.socket.settimeout(self.RCV_TIMEOUT)
+ log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+
+ self.received = 0
+ self.discarded = 0
+
+ def __del__(self):
+ if self.socket:
+ fn = self.fileno()
+ self.socket.close()
+ self.socket = None
+ log.debug('socket-closed', fn=fn, iface=self.iface_name)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def _dispatch(self, frame):
+ log.debug('calling-publisher', frame=hexify(frame))
+ self.callback(self, frame)
+
+ def recv(self):
+ """Called on the select thread when a packet arrives"""
+ try:
+ frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+ except RuntimeError, e:
+ # we observed this happens sometimes right after the socket was
+ # attached to a newly created veth interface. So we log it, but
+ # allow to continue
+ log.warn('afpacket-recv-error', code=-1)
+ return
+
+ log.debug('frame-received', iface=self.iface_name, len=len(frame),
+ hex=hexify(frame))
+ self.received +=1
+ if self.filter is None or self.filter(frame):
+ log.debug('frame-dispatched')
+ reactor.callFromThread(self._dispatch, frame)
+ else:
+ self.discarded += 1
+ log.debug('frame-discarded')
+
+ def send(self, frame):
+ log.debug('sending', len=len(frame), iface=self.iface_name)
+ sent_bytes = self.socket.send(frame)
+ if sent_bytes != len(frame):
+ log.error('send-error', iface=self.iface_name,
+ wanted_to_send=len(frame), actually_sent=sent_bytes)
+ return sent_bytes
+
+ def up(self):
+ os.system('ip link set {} up'.format(self.iface_name))
+ return self
+
+ def down(self):
+ os.system('ip link set {] down'.format(self.iface_name))
+ return self
+
+ def statistics(self):
+ return self.received, self.discarded
+
+
+class FrameIOManager(Thread):
+ """
+ Packet/Frame IO manager that can be used to send/receive raw frames
+ on a set of network interfaces.
+ """
+ def __init__(self):
+ super(FrameIOManager, self).__init__()
+
+ self.ports = {} # iface_name -> ActiveFrameReceiver
+ self.queue = {} # iface_name -> TODO
+
+ self.cvar = Condition()
+ self.waker = _SelectWakerDescriptor()
+ self.stopped = False
+ self.ports_changed = False
+
+ #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+
+ def start(self):
+ """
+ Start the IO manager and its select loop thread
+ """
+ log.debug('starting')
+ super(FrameIOManager, self).start()
+ log.info('started')
+ return self
+
+ def stop(self):
+ """
+ Stop the IO manager and its thread with the select loop
+ """
+ log.debug('stopping')
+ self.stopped = True
+ self.waker.notify()
+ self.join()
+ del self.ports
+ log.info('stopped')
+
+ def list_interfaces(self):
+ """
+ Return list of interfaces listened on
+ :return: List of FrameIOPort objects
+ """
+ return self.ports
+
+ def add_interface(self, iface_name, callback, filter=None):
+ """
+ Add a new interface and start receiving on it.
+ :param iface_name: Name of the interface. Must be an existing Unix
+ interface (eth0, en0, etc.)
+ :param callback: Called on each received frame;
+ signature: def callback(port, frame) where port is the FrameIOPort
+ instance at which the frame was received, frame is the actual frame
+ received (as binay string)
+ :param filter: An optional filter (predicate), with signature:
+ def filter(frame). If provided, only frames for which filter evaluates
+ to True will be forwarded to callback.
+ :return: FrmaeIOPort instance.
+ """
+ """Add a new interface"""
+ assert iface_name not in self.ports
+ port = FrameIOPort(iface_name, callback, filter)
+ self.ports[iface_name] = port
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+ return port
+
+ def del_interface(self, iface_name):
+ """
+ Stop and remove named interface
+ :param iface_name: Name of previoysly registered interface
+ :return: None
+ """
+ assert iface_name in self.ports
+ port = self.ports[iface_name]
+ port.stop()
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+
+ def send(self, iface_name, frame):
+ """
+ Send frame on given interface
+ :param iface_name: Name of previously registered interface
+ :param frame: frame as string
+ :return: number of bytes sent
+ """
+ return self.ports[iface_name].send(frame)
+
+ #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+ def run(self):
+ """
+ Called on the alien thread, this is the core multi-port receive loop
+ """
+
+ log.debug('select-loop-started')
+
+ # outer loop constructs sockets list for select
+ while not self.stopped:
+ sockets = [self.waker] + self.ports.values()
+ self.ports_changed = False
+ empty = []
+ # inner select loop
+
+ while not self.stopped:
+ try:
+ _in, _out, _err = select.select(sockets, empty, empty, 1)
+ except Exception, e:
+ log.exception('frame-io-select-error', e=e)
+ break
+ with self.cvar:
+ for port in _in:
+ if port is self.waker:
+ self.waker.wait()
+ continue
+ else:
+ port.recv()
+ self.cvar.notify_all()
+ if self.ports_changed:
+ break # break inner loop so we reconstruct sockets list
+
+ log.debug('select-loop-exited')