Async frame receive/send module

Change-Id: I75b6ecdecf05f2c72b3b0fa1191d849b7d8e67ef
diff --git a/Dockerfile.base b/Dockerfile.base
index 5e1601f..ee20521 100644
--- a/Dockerfile.base
+++ b/Dockerfile.base
@@ -23,7 +23,7 @@
 
 # Update to have latest images
 RUN apt-get update && \
-    apt-get install -y python python-pip openssl
+    apt-get install -y python python-pip openssl iproute2 libpcap-dev
 
 COPY requirements.txt /tmp/requirements.txt
 
diff --git a/common/frameio/__init__.py b/common/frameio/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/frameio/__init__.py
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
new file mode 100644
index 0000000..daa7deb
--- /dev/null
+++ b/common/frameio/frameio.py
@@ -0,0 +1,299 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+import os
+import socket
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+
+from twisted.internet import reactor
+
+from common.frameio.third_party.oftest import afpacket, netutils
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+    """
+    Return a hexadecimal string encoding of input buffer
+    """
+    return ' '.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+    """
+    A descriptor that can be mixed into a select loop to wake it up.
+    """
+    def __init__(self):
+        self.pipe_read, self.pipe_write = os.pipe()
+        fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+    def __del__(self):
+        os.close(self.pipe_read)
+        os.close(self.pipe_write)
+
+    def fileno(self):
+        return self.pipe_read
+
+    def wait(self):
+        os.read(self.pipe_read, 1)
+
+    def notify(self):
+        """Trigger a select loop"""
+        os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+    """
+    Convenience packet filter based on the well-tried Berkeley Packet Filter,
+    used by many well known open source tools such as pcap and tcpdump.
+    """
+    def __init__(self, program_string):
+        """
+        Create a filter using the BPF command syntax. To learn more,
+        consult 'man pcap-filter'.
+        :param program_string: The textual definition of the filter. Examples:
+        'vlan 1000'
+        'vlan 1000 and ip src host 10.10.10.10'
+        """
+        self.bpf = BPFProgram(program_string)
+
+    def __call__(self, frame):
+        """
+        Return 1 if frame passes filter.
+        :param frame: Raw frame provided as Python string
+        :return: 1 if frame satisfies filter, 0 otherwise.
+        """
+        return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+    """
+    Represents a network interface which we can send/receive raw
+    Ethernet frames.
+    """
+
+    RCV_SIZE_DEFAULT = 4096
+    ETH_P_ALL = 0x03
+    RCV_TIMEOUT = 10000
+
+    def __init__(self, iface_name, callback, filter=None):
+        self.iface_name = iface_name
+        self.callback = callback
+        self.filter = filter
+        self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+        afpacket.enable_auxdata(self.socket)
+        self.socket.bind((self.iface_name, self.ETH_P_ALL))
+        netutils.set_promisc(self.socket, self.iface_name)
+        self.socket.settimeout(self.RCV_TIMEOUT)
+        log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+
+        self.received = 0
+        self.discarded = 0
+
+    def __del__(self):
+        if self.socket:
+            fn = self.fileno()
+            self.socket.close()
+            self.socket = None
+            log.debug('socket-closed', fn=fn, iface=self.iface_name)
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def _dispatch(self, frame):
+        log.debug('calling-publisher', frame=hexify(frame))
+        self.callback(self, frame)
+
+    def recv(self):
+        """Called on the select thread when a packet arrives"""
+        try:
+            frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+        except RuntimeError, e:
+            # we observed this happens sometimes right after the socket was
+            # attached to a newly created veth interface. So we log it, but
+            # allow to continue
+            log.warn('afpacket-recv-error', code=-1)
+            return
+
+        log.debug('frame-received', iface=self.iface_name, len=len(frame),
+                  hex=hexify(frame))
+        self.received +=1
+        if self.filter is None or self.filter(frame):
+            log.debug('frame-dispatched')
+            reactor.callFromThread(self._dispatch, frame)
+        else:
+            self.discarded += 1
+            log.debug('frame-discarded')
+
+    def send(self, frame):
+        log.debug('sending', len=len(frame), iface=self.iface_name)
+        sent_bytes = self.socket.send(frame)
+        if sent_bytes != len(frame):
+            log.error('send-error', iface=self.iface_name,
+                      wanted_to_send=len(frame), actually_sent=sent_bytes)
+        return sent_bytes
+
+    def up(self):
+        os.system('ip link set {} up'.format(self.iface_name))
+        return self
+
+    def down(self):
+        os.system('ip link set {] down'.format(self.iface_name))
+        return self
+
+    def statistics(self):
+        return self.received, self.discarded
+
+
+class FrameIOManager(Thread):
+    """
+    Packet/Frame IO manager that can be used to send/receive raw frames
+    on a set of network interfaces.
+    """
+    def __init__(self):
+        super(FrameIOManager, self).__init__()
+
+        self.ports = {}  # iface_name -> ActiveFrameReceiver
+        self.queue = {}  # iface_name -> TODO
+
+        self.cvar = Condition()
+        self.waker = _SelectWakerDescriptor()
+        self.stopped = False
+        self.ports_changed = False
+
+    #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+
+    def start(self):
+        """
+        Start the IO manager and its select loop thread
+        """
+        log.debug('starting')
+        super(FrameIOManager, self).start()
+        log.info('started')
+        return self
+
+    def stop(self):
+        """
+        Stop the IO manager and its thread with the select loop
+        """
+        log.debug('stopping')
+        self.stopped = True
+        self.waker.notify()
+        self.join()
+        del self.ports
+        log.info('stopped')
+
+    def list_interfaces(self):
+        """
+        Return list of interfaces listened on
+        :return: List of FrameIOPort objects
+        """
+        return self.ports
+
+    def add_interface(self, iface_name, callback, filter=None):
+        """
+        Add a new interface and start receiving on it.
+        :param iface_name: Name of the interface. Must be an existing Unix
+        interface (eth0, en0, etc.)
+        :param callback: Called on each received frame;
+        signature: def callback(port, frame) where port is the FrameIOPort
+        instance at which the frame was received, frame is the actual frame
+        received (as binay string)
+        :param filter: An optional filter (predicate), with signature:
+        def filter(frame). If provided, only frames for which filter evaluates
+        to True will be forwarded to callback.
+        :return: FrmaeIOPort instance.
+        """
+        """Add a new interface"""
+        assert iface_name not in self.ports
+        port = FrameIOPort(iface_name, callback, filter)
+        self.ports[iface_name] = port
+        # need to exit select loop to reconstruct select fd lists
+        self.ports_changed = True
+        self.waker.notify()
+        return port
+
+    def del_interface(self, iface_name):
+        """
+        Stop and remove named interface
+        :param iface_name: Name of previoysly registered interface
+        :return: None
+        """
+        assert iface_name in self.ports
+        port = self.ports[iface_name]
+        port.stop()
+        del self.ports[iface_name]
+        # need to exit select loop to reconstruct select fd lists
+        self.ports_changed = True
+        self.waker.notify()
+
+    def send(self, iface_name, frame):
+        """
+        Send frame on given interface
+        :param iface_name: Name of previously registered interface
+        :param frame: frame as string
+        :return: number of bytes sent
+        """
+        return self.ports[iface_name].send(frame)
+
+    #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+    def run(self):
+        """
+        Called on the alien thread, this is the core multi-port receive loop
+        """
+
+        log.debug('select-loop-started')
+
+        # outer loop constructs sockets list for select
+        while not self.stopped:
+            sockets = [self.waker] + self.ports.values()
+            self.ports_changed = False
+            empty = []
+            # inner select loop
+
+            while not self.stopped:
+                try:
+                    _in, _out, _err = select.select(sockets, empty, empty, 1)
+                except Exception, e:
+                    log.exception('frame-io-select-error', e=e)
+                    break
+                with self.cvar:
+                    for port in _in:
+                        if port is self.waker:
+                            self.waker.wait()
+                            continue
+                        else:
+                            port.recv()
+                    self.cvar.notify_all()
+                if self.ports_changed:
+                    break  # break inner loop so we reconstruct sockets list
+
+        log.debug('select-loop-exited')
diff --git a/common/frameio/third_party/__init__.py b/common/frameio/third_party/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/frameio/third_party/__init__.py
diff --git a/common/frameio/third_party/oftest/LICENSE b/common/frameio/third_party/oftest/LICENSE
new file mode 100644
index 0000000..3216042
--- /dev/null
+++ b/common/frameio/third_party/oftest/LICENSE
@@ -0,0 +1,36 @@
+OpenFlow Test Framework
+
+Copyright (c) 2010 The Board of Trustees of The Leland Stanford
+Junior University
+
+Except where otherwise noted, this software is distributed under
+the OpenFlow Software License.  See
+http://www.openflowswitch.org/wp/legal/ for current details.
+
+We are making the OpenFlow specification and associated documentation
+(Software) available for public use and benefit with the expectation
+that others will use, modify and enhance the Software and contribute
+those enhancements back to the community. However, since we would like
+to make the Software available for broadest use, with as few
+restrictions as possible permission is hereby granted, free of charge,
+to any person obtaining a copy of this Software to deal in the
+Software under the copyrights without restriction, including without
+limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED -Y´AS IS¡, WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+The name and trademarks of copyright holder(s) may NOT be used in
+advertising or publicity pertaining to the Software or any derivatives
+without specific, written prior permission.
diff --git a/common/frameio/third_party/oftest/README.md b/common/frameio/third_party/oftest/README.md
new file mode 100644
index 0000000..f0cb649
--- /dev/null
+++ b/common/frameio/third_party/oftest/README.md
@@ -0,0 +1,6 @@
+Files in this directory are derived from the respective files
+in oftest (http://github.com/floodlight/oftest).
+ 
+For the licensing terms of these files, see LICENSE in this dir.
+ 
+
diff --git a/common/frameio/third_party/oftest/__init__.py b/common/frameio/third_party/oftest/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/frameio/third_party/oftest/__init__.py
diff --git a/common/frameio/third_party/oftest/afpacket.py b/common/frameio/third_party/oftest/afpacket.py
new file mode 100644
index 0000000..340fc38
--- /dev/null
+++ b/common/frameio/third_party/oftest/afpacket.py
@@ -0,0 +1,111 @@
+"""
+AF_PACKET receive support
+
+When VLAN offload is enabled on the NIC Linux will not deliver the VLAN tag
+in the data returned by recv. Instead, it delivers the VLAN TCI in a control
+message. Python 2.x doesn't have built-in support for recvmsg, so we have to
+use ctypes to call it. The recv function exported by this module reconstructs
+the VLAN tag if it was offloaded.
+"""
+
+import struct
+from ctypes import *
+
+ETH_P_8021Q = 0x8100
+SOL_PACKET = 263
+PACKET_AUXDATA = 8
+TP_STATUS_VLAN_VALID = 1 << 4
+
+class struct_iovec(Structure):
+    _fields_ = [
+        ("iov_base", c_void_p),
+        ("iov_len", c_size_t),
+    ]
+
+class struct_msghdr(Structure):
+    _fields_ = [
+        ("msg_name", c_void_p),
+        ("msg_namelen", c_uint32),
+        ("msg_iov", POINTER(struct_iovec)),
+        ("msg_iovlen", c_size_t),
+        ("msg_control", c_void_p),
+        ("msg_controllen", c_size_t),
+        ("msg_flags", c_int),
+    ]
+
+class struct_cmsghdr(Structure):
+    _fields_ = [
+        ("cmsg_len", c_size_t),
+        ("cmsg_level", c_int),
+        ("cmsg_type", c_int),
+    ]
+
+class struct_tpacket_auxdata(Structure):
+    _fields_ = [
+        ("tp_status", c_uint),
+        ("tp_len", c_uint),
+        ("tp_snaplen", c_uint),
+        ("tp_mac", c_ushort),
+        ("tp_net", c_ushort),
+        ("tp_vlan_tci", c_ushort),
+        ("tp_padding", c_ushort),
+    ]
+
+libc = CDLL("libc.so.6")
+recvmsg = libc.recvmsg
+recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
+recvmsg.retype = c_int
+
+def enable_auxdata(sk):
+    """
+    Ask the kernel to return the VLAN tag in a control message
+
+    Must be called on the socket before afpacket.recv.
+    """
+    sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
+
+def recv(sk, bufsize):
+    """
+    Receive a packet from an AF_PACKET socket
+    @sk Socket
+    @bufsize Maximum packet size
+    """
+    buf = create_string_buffer(bufsize)
+
+    ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
+    ctrl_buf = create_string_buffer(ctrl_bufsize)
+
+    iov = struct_iovec()
+    iov.iov_base = cast(buf, c_void_p)
+    iov.iov_len = bufsize
+
+    msghdr = struct_msghdr()
+    msghdr.msg_name = None
+    msghdr.msg_namelen = 0
+    msghdr.msg_iov = pointer(iov)
+    msghdr.msg_iovlen = 1
+    msghdr.msg_control = cast(ctrl_buf, c_void_p)
+    msghdr.msg_controllen = ctrl_bufsize
+    msghdr.msg_flags = 0
+
+    rv = recvmsg(sk.fileno(), byref(msghdr), 0)
+    if rv < 0:
+        raise RuntimeError("recvmsg failed: rv=%d", rv)
+
+    # The kernel only delivers control messages we ask for. We
+    # only enabled PACKET_AUXDATA, so we can assume it's the
+    # only control message.
+    assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
+
+    cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
+    assert cmsghdr.cmsg_level == SOL_PACKET
+    assert cmsghdr.cmsg_type == PACKET_AUXDATA
+
+    auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
+
+    if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
+        # Insert VLAN tag
+        tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
+        return buf.raw[:12] + tag + buf.raw[12:rv]
+    else:
+        return buf.raw[:rv]
diff --git a/common/frameio/third_party/oftest/netutils.py b/common/frameio/third_party/oftest/netutils.py
new file mode 100644
index 0000000..4d2ec8f
--- /dev/null
+++ b/common/frameio/third_party/oftest/netutils.py
@@ -0,0 +1,60 @@
+"""
+Network utilities for the OpenFlow test framework
+"""
+
+###########################################################################
+##                                                                         ##
+## Promiscuous mode enable/disable                                         ##
+##                                                                         ##
+## Based on code from Scapy by Phillippe Biondi                            ##
+##                                                                         ##
+##                                                                         ##
+## This program is free software; you can redistribute it and/or modify it ##
+## under the terms of the GNU General Public License as published by the   ##
+## Free Software Foundation; either version 2, or (at your option) any     ##
+## later version.                                                          ##
+##                                                                         ##
+## This program is distributed in the hope that it will be useful, but     ##
+## WITHOUT ANY WARRANTY; without even the implied warranty of              ##
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU       ##
+## General Public License for more details.                                ##
+##                                                                         ##
+#############################################################################
+
+import socket
+from fcntl import ioctl
+import struct
+
+# From net/if_arp.h
+ARPHDR_ETHER = 1
+ARPHDR_LOOPBACK = 772
+
+# From bits/ioctls.h
+SIOCGIFHWADDR  = 0x8927          # Get hardware address
+SIOCGIFINDEX   = 0x8933          # name -> if_index mapping
+
+# From netpacket/packet.h
+PACKET_ADD_MEMBERSHIP  = 1
+PACKET_DROP_MEMBERSHIP = 2
+PACKET_MR_PROMISC      = 1
+
+# From bits/socket.h
+SOL_PACKET = 263
+
+def get_if(iff,cmd):
+  s=socket.socket()
+  ifreq = ioctl(s, cmd, struct.pack("16s16x",iff))
+  s.close()
+  return ifreq
+
+def get_if_index(iff):
+  return int(struct.unpack("I",get_if(iff, SIOCGIFINDEX)[16:20])[0])
+
+def set_promisc(s,iff,val=1):
+  mreq = struct.pack("IHH8s", get_if_index(iff), PACKET_MR_PROMISC, 0, "")
+  if val:
+      cmd = PACKET_ADD_MEMBERSHIP
+  else:
+      cmd = PACKET_DROP_MEMBERSHIP
+  s.setsockopt(SOL_PACKET, cmd, mreq)
+
diff --git a/common/utils/deferred_utils.py b/common/utils/deferred_utils.py
new file mode 100644
index 0000000..30e773a
--- /dev/null
+++ b/common/utils/deferred_utils.py
@@ -0,0 +1,43 @@
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.error import AlreadyCalled
+
+
+class TimeOutError(Exception): pass
+
+
+class DeferredWithTimeout(Deferred):
+    """
+    Deferred with a timeout. If neither the callback nor the errback method
+    is not called within the given time, the deferred's errback will be called
+    with a TimeOutError() exception.
+
+    All other uses are the same as of Deferred().
+    """
+    def __init__(self, timeout=1.0):
+        Deferred.__init__(self)
+        self._timeout = timeout
+        self.timer = reactor.callLater(timeout, self.timed_out)
+
+    def timed_out(self):
+        self.errback(
+            TimeOutError('timed out after {} seconds'.format(self._timeout)))
+
+    def callback(self, result):
+        self._cancel_timer()
+        return Deferred.callback(self, result)
+
+    def errback(self, fail):
+        self._cancel_timer()
+        return Deferred.errback(self, fail)
+
+    def cancel(self):
+        self._cancel_timer()
+        return Deferred.cancel(self)
+
+    def _cancel_timer(self):
+        try:
+            self.timer.cancel()
+        except AlreadyCalled:
+            pass
+
diff --git a/requirements.txt b/requirements.txt
index ade7f69..cbdf9c1 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,6 +15,7 @@
 nose-exclude>=0.5.0
 mock>=1.3.0
 netifaces>=0.10.4
+pcapy>=0.10.10
 pep8>=1.5.7
 pep8-naming>=0.3.3
 protobuf-to-dict>=0.1.0
diff --git a/tests/itests/test_frameio.py b/tests/itests/test_frameio.py
new file mode 100644
index 0000000..c367a9d
--- /dev/null
+++ b/tests/itests/test_frameio.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Run this test inside a docker container using the following syntax:
+
+docker run -ti --rm -v $(pwd):/voltha  --privileged cord/voltha-base \
+    env PYTHONPATH=/voltha python /voltha/tests/itests/test_frameio.py
+
+"""
+
+import os
+import random
+from time import sleep
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.error import AlreadyCalled
+from twisted.trial.unittest import TestCase
+
+from common.frameio.frameio import FrameIOManager, BpfProgramFilter
+from common.utils.asleep import asleep
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+ident = lambda frame: frame
+none = lambda *args, **kw: None
+
+
+class TestFrameIO(TestCase):
+
+    @inlineCallbacks
+    def make_veth_pairs_if_needed(self):
+
+        def has_iface(iface):
+            return os.system('ip link show {}'.format(iface)) == 0
+
+        def make_veth(iface):
+            os.system('ip link add type veth')
+            os.system('ip link set {} up'.format(iface))
+            peer = iface[:len('veth')] + str(int(iface[len('veth'):]) + 1)
+            os.system('ip link set {} up'.format(peer))
+            assert has_iface(iface)
+
+        for iface_number in (0, 2):
+            iface = 'veth{}'.format(iface_number)
+            if not has_iface(iface):
+                make_veth(iface)
+                yield asleep(2)
+
+    @inlineCallbacks
+    def setUp(self):
+        yield self.make_veth_pairs_if_needed()
+        self.mgr = FrameIOManager().start()
+
+    def tearDown(self):
+        self.mgr.stop()
+
+    @inlineCallbacks
+    def test_packet_send_receive(self):
+        rcvd = DeferredWithTimeout()
+        p0 = self.mgr.add_interface('veth0', none).up()
+        p1 = self.mgr.add_interface('veth1',
+                                    lambda p, f: rcvd.callback((p, f))).up()
+
+        # sending to veth0 should result in receiving on veth1 and vice versa
+        bogus_frame = 'bogus packet'
+        p0.send(bogus_frame)
+
+        # check that we receved packet
+        port, frame = yield rcvd
+        self.assertEqual(port, p1)
+        self.assertEqual(frame, bogus_frame)
+
+
+    @inlineCallbacks
+    def test_packet_send_receive_with_filter(self):
+        rcvd = DeferredWithTimeout()
+
+        filter = BpfProgramFilter('ip dst host 123.123.123.123')
+        p0 = self.mgr.add_interface('veth0', none).up()
+        p1 = self.mgr.add_interface('veth1',
+                                    lambda p, f: rcvd.callback((p, f)),
+                                    filter=filter).up()
+
+        # sending bogus packet would not be received
+        ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+        p0.send(ip_packet)
+
+        # check that we receved packet
+        port, frame = yield rcvd
+        self.assertEqual(port, p1)
+        self.assertEqual(frame, ip_packet)
+
+
+    @inlineCallbacks
+    def test_packet_send_drop_with_filter(self):
+        rcvd = DeferredWithTimeout()
+
+        filter = BpfProgramFilter('ip dst host 123.123.123.123')
+        p0 = self.mgr.add_interface('veth0', none).up()
+        self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
+                               filter=filter).up()
+
+        # sending bogus packet would not be received
+        p0.send('bogus packet')
+
+        try:
+            _ = yield rcvd
+        except TimeOutError:
+            pass
+        else:
+            self.fail('not timed out')
+
+
+    @inlineCallbacks
+    def test_concurrent_packet_send_receive(self):
+
+        done = Deferred()
+        queue1 = []
+        queue2 = []
+
+        n = 100
+
+        def append(queue):
+            def _append(_, frame):
+                queue.append(frame)
+                if len(queue1) == n and len(queue2) == n:
+                    done.callback(None)
+            return _append
+
+        p1in = self.mgr.add_interface('veth0', none).up()
+        self.mgr.add_interface('veth1', append(queue1)).up()
+        p2in = self.mgr.add_interface('veth2', none).up()
+        self.mgr.add_interface('veth3', append(queue2)).up()
+
+        @inlineCallbacks
+        def send_packets(port, n):
+            for i in xrange(n):
+                port.send(str(i))
+                yield asleep(0.00001 * random.random())  # to interleave
+
+        # sending two concurrent streams
+        send_packets(p1in, n)
+        send_packets(p2in, n)
+
+        # verify that both queue got all packets
+        yield done
+
+    @inlineCallbacks
+    def test_concurrent_packet_send_receive_with_filter(self):
+
+        done = Deferred()
+        queue1 = []
+        queue2 = []
+
+        n = 100
+
+        def append(queue):
+            def _append(_, frame):
+                queue.append(frame)
+                if len(queue1) == n / 2 and len(queue2) == n / 2:
+                    done.callback(None)
+            return _append
+
+        filter = BpfProgramFilter('vlan 100')
+        p1in = self.mgr.add_interface('veth0', none).up()
+        self.mgr.add_interface('veth1', append(queue1), filter).up()
+        p2in = self.mgr.add_interface('veth2', none).up()
+        self.mgr.add_interface('veth3', append(queue2), filter).up()
+
+        @inlineCallbacks
+        def send_packets(port, n):
+            for i in xrange(n):
+                # packets have alternating VLAN ids 100 and 101
+                pkt = Ether()/Dot1Q(vlan=100 + i % 2)
+                port.send(str(pkt))
+                yield asleep(0.00001 * random.random())  # to interleave
+
+        # sending two concurrent streams
+        send_packets(p1in, n)
+        send_packets(p2in, n)
+
+        # verify that both queue got all packets
+        yield done
+
+
+if __name__ == '__main__':
+    import unittest
+    unittest.main()