Async frame receive/send module

Change-Id: I75b6ecdecf05f2c72b3b0fa1191d849b7d8e67ef
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
new file mode 100644
index 0000000..daa7deb
--- /dev/null
+++ b/common/frameio/frameio.py
@@ -0,0 +1,299 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+import os
+import socket
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+
+from twisted.internet import reactor
+
+from common.frameio.third_party.oftest import afpacket, netutils
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+    """
+    Return a hexadecimal string encoding of input buffer
+    """
+    return ' '.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+    """
+    A descriptor that can be mixed into a select loop to wake it up.
+    """
+    def __init__(self):
+        self.pipe_read, self.pipe_write = os.pipe()
+        fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+    def __del__(self):
+        os.close(self.pipe_read)
+        os.close(self.pipe_write)
+
+    def fileno(self):
+        return self.pipe_read
+
+    def wait(self):
+        os.read(self.pipe_read, 1)
+
+    def notify(self):
+        """Trigger a select loop"""
+        os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+    """
+    Convenience packet filter based on the well-tried Berkeley Packet Filter,
+    used by many well known open source tools such as pcap and tcpdump.
+    """
+    def __init__(self, program_string):
+        """
+        Create a filter using the BPF command syntax. To learn more,
+        consult 'man pcap-filter'.
+        :param program_string: The textual definition of the filter. Examples:
+        'vlan 1000'
+        'vlan 1000 and ip src host 10.10.10.10'
+        """
+        self.bpf = BPFProgram(program_string)
+
+    def __call__(self, frame):
+        """
+        Return 1 if frame passes filter.
+        :param frame: Raw frame provided as Python string
+        :return: 1 if frame satisfies filter, 0 otherwise.
+        """
+        return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+    """
+    Represents a network interface which we can send/receive raw
+    Ethernet frames.
+    """
+
+    RCV_SIZE_DEFAULT = 4096
+    ETH_P_ALL = 0x03
+    RCV_TIMEOUT = 10000
+
+    def __init__(self, iface_name, callback, filter=None):
+        self.iface_name = iface_name
+        self.callback = callback
+        self.filter = filter
+        self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+        afpacket.enable_auxdata(self.socket)
+        self.socket.bind((self.iface_name, self.ETH_P_ALL))
+        netutils.set_promisc(self.socket, self.iface_name)
+        self.socket.settimeout(self.RCV_TIMEOUT)
+        log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+
+        self.received = 0
+        self.discarded = 0
+
+    def __del__(self):
+        if self.socket:
+            fn = self.fileno()
+            self.socket.close()
+            self.socket = None
+            log.debug('socket-closed', fn=fn, iface=self.iface_name)
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def _dispatch(self, frame):
+        log.debug('calling-publisher', frame=hexify(frame))
+        self.callback(self, frame)
+
+    def recv(self):
+        """Called on the select thread when a packet arrives"""
+        try:
+            frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+        except RuntimeError, e:
+            # we observed this happens sometimes right after the socket was
+            # attached to a newly created veth interface. So we log it, but
+            # allow to continue
+            log.warn('afpacket-recv-error', code=-1)
+            return
+
+        log.debug('frame-received', iface=self.iface_name, len=len(frame),
+                  hex=hexify(frame))
+        self.received +=1
+        if self.filter is None or self.filter(frame):
+            log.debug('frame-dispatched')
+            reactor.callFromThread(self._dispatch, frame)
+        else:
+            self.discarded += 1
+            log.debug('frame-discarded')
+
+    def send(self, frame):
+        log.debug('sending', len=len(frame), iface=self.iface_name)
+        sent_bytes = self.socket.send(frame)
+        if sent_bytes != len(frame):
+            log.error('send-error', iface=self.iface_name,
+                      wanted_to_send=len(frame), actually_sent=sent_bytes)
+        return sent_bytes
+
+    def up(self):
+        os.system('ip link set {} up'.format(self.iface_name))
+        return self
+
+    def down(self):
+        os.system('ip link set {] down'.format(self.iface_name))
+        return self
+
+    def statistics(self):
+        return self.received, self.discarded
+
+
+class FrameIOManager(Thread):
+    """
+    Packet/Frame IO manager that can be used to send/receive raw frames
+    on a set of network interfaces.
+    """
+    def __init__(self):
+        super(FrameIOManager, self).__init__()
+
+        self.ports = {}  # iface_name -> ActiveFrameReceiver
+        self.queue = {}  # iface_name -> TODO
+
+        self.cvar = Condition()
+        self.waker = _SelectWakerDescriptor()
+        self.stopped = False
+        self.ports_changed = False
+
+    #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+
+    def start(self):
+        """
+        Start the IO manager and its select loop thread
+        """
+        log.debug('starting')
+        super(FrameIOManager, self).start()
+        log.info('started')
+        return self
+
+    def stop(self):
+        """
+        Stop the IO manager and its thread with the select loop
+        """
+        log.debug('stopping')
+        self.stopped = True
+        self.waker.notify()
+        self.join()
+        del self.ports
+        log.info('stopped')
+
+    def list_interfaces(self):
+        """
+        Return list of interfaces listened on
+        :return: List of FrameIOPort objects
+        """
+        return self.ports
+
+    def add_interface(self, iface_name, callback, filter=None):
+        """
+        Add a new interface and start receiving on it.
+        :param iface_name: Name of the interface. Must be an existing Unix
+        interface (eth0, en0, etc.)
+        :param callback: Called on each received frame;
+        signature: def callback(port, frame) where port is the FrameIOPort
+        instance at which the frame was received, frame is the actual frame
+        received (as binay string)
+        :param filter: An optional filter (predicate), with signature:
+        def filter(frame). If provided, only frames for which filter evaluates
+        to True will be forwarded to callback.
+        :return: FrmaeIOPort instance.
+        """
+        """Add a new interface"""
+        assert iface_name not in self.ports
+        port = FrameIOPort(iface_name, callback, filter)
+        self.ports[iface_name] = port
+        # need to exit select loop to reconstruct select fd lists
+        self.ports_changed = True
+        self.waker.notify()
+        return port
+
+    def del_interface(self, iface_name):
+        """
+        Stop and remove named interface
+        :param iface_name: Name of previoysly registered interface
+        :return: None
+        """
+        assert iface_name in self.ports
+        port = self.ports[iface_name]
+        port.stop()
+        del self.ports[iface_name]
+        # need to exit select loop to reconstruct select fd lists
+        self.ports_changed = True
+        self.waker.notify()
+
+    def send(self, iface_name, frame):
+        """
+        Send frame on given interface
+        :param iface_name: Name of previously registered interface
+        :param frame: frame as string
+        :return: number of bytes sent
+        """
+        return self.ports[iface_name].send(frame)
+
+    #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+    def run(self):
+        """
+        Called on the alien thread, this is the core multi-port receive loop
+        """
+
+        log.debug('select-loop-started')
+
+        # outer loop constructs sockets list for select
+        while not self.stopped:
+            sockets = [self.waker] + self.ports.values()
+            self.ports_changed = False
+            empty = []
+            # inner select loop
+
+            while not self.stopped:
+                try:
+                    _in, _out, _err = select.select(sockets, empty, empty, 1)
+                except Exception, e:
+                    log.exception('frame-io-select-error', e=e)
+                    break
+                with self.cvar:
+                    for port in _in:
+                        if port is self.waker:
+                            self.waker.wait()
+                            continue
+                        else:
+                            port.recv()
+                    self.cvar.notify_all()
+                if self.ports_changed:
+                    break  # break inner loop so we reconstruct sockets list
+
+        log.debug('select-loop-exited')