Async frame receive/send module
Change-Id: I75b6ecdecf05f2c72b3b0fa1191d849b7d8e67ef
diff --git a/Dockerfile.base b/Dockerfile.base
index 5e1601f..ee20521 100644
--- a/Dockerfile.base
+++ b/Dockerfile.base
@@ -23,7 +23,7 @@
# Update to have latest images
RUN apt-get update && \
- apt-get install -y python python-pip openssl
+ apt-get install -y python python-pip openssl iproute2 libpcap-dev
COPY requirements.txt /tmp/requirements.txt
diff --git a/common/frameio/__init__.py b/common/frameio/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/frameio/__init__.py
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
new file mode 100644
index 0000000..daa7deb
--- /dev/null
+++ b/common/frameio/frameio.py
@@ -0,0 +1,299 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+import os
+import socket
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+
+from twisted.internet import reactor
+
+from common.frameio.third_party.oftest import afpacket, netutils
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+ """
+ Return a hexadecimal string encoding of input buffer
+ """
+ return ' '.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+ """
+ A descriptor that can be mixed into a select loop to wake it up.
+ """
+ def __init__(self):
+ self.pipe_read, self.pipe_write = os.pipe()
+ fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def __del__(self):
+ os.close(self.pipe_read)
+ os.close(self.pipe_write)
+
+ def fileno(self):
+ return self.pipe_read
+
+ def wait(self):
+ os.read(self.pipe_read, 1)
+
+ def notify(self):
+ """Trigger a select loop"""
+ os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+ """
+ Convenience packet filter based on the well-tried Berkeley Packet Filter,
+ used by many well known open source tools such as pcap and tcpdump.
+ """
+ def __init__(self, program_string):
+ """
+ Create a filter using the BPF command syntax. To learn more,
+ consult 'man pcap-filter'.
+ :param program_string: The textual definition of the filter. Examples:
+ 'vlan 1000'
+ 'vlan 1000 and ip src host 10.10.10.10'
+ """
+ self.bpf = BPFProgram(program_string)
+
+ def __call__(self, frame):
+ """
+ Return 1 if frame passes filter.
+ :param frame: Raw frame provided as Python string
+ :return: 1 if frame satisfies filter, 0 otherwise.
+ """
+ return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+ """
+ Represents a network interface which we can send/receive raw
+ Ethernet frames.
+ """
+
+ RCV_SIZE_DEFAULT = 4096
+ ETH_P_ALL = 0x03
+ RCV_TIMEOUT = 10000
+
+ def __init__(self, iface_name, callback, filter=None):
+ self.iface_name = iface_name
+ self.callback = callback
+ self.filter = filter
+ self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(self.socket)
+ self.socket.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(self.socket, self.iface_name)
+ self.socket.settimeout(self.RCV_TIMEOUT)
+ log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+
+ self.received = 0
+ self.discarded = 0
+
+ def __del__(self):
+ if self.socket:
+ fn = self.fileno()
+ self.socket.close()
+ self.socket = None
+ log.debug('socket-closed', fn=fn, iface=self.iface_name)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def _dispatch(self, frame):
+ log.debug('calling-publisher', frame=hexify(frame))
+ self.callback(self, frame)
+
+ def recv(self):
+ """Called on the select thread when a packet arrives"""
+ try:
+ frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+ except RuntimeError, e:
+ # we observed this happens sometimes right after the socket was
+ # attached to a newly created veth interface. So we log it, but
+ # allow to continue
+ log.warn('afpacket-recv-error', code=-1)
+ return
+
+ log.debug('frame-received', iface=self.iface_name, len=len(frame),
+ hex=hexify(frame))
+ self.received +=1
+ if self.filter is None or self.filter(frame):
+ log.debug('frame-dispatched')
+ reactor.callFromThread(self._dispatch, frame)
+ else:
+ self.discarded += 1
+ log.debug('frame-discarded')
+
+ def send(self, frame):
+ log.debug('sending', len=len(frame), iface=self.iface_name)
+ sent_bytes = self.socket.send(frame)
+ if sent_bytes != len(frame):
+ log.error('send-error', iface=self.iface_name,
+ wanted_to_send=len(frame), actually_sent=sent_bytes)
+ return sent_bytes
+
+ def up(self):
+ os.system('ip link set {} up'.format(self.iface_name))
+ return self
+
+ def down(self):
+ os.system('ip link set {] down'.format(self.iface_name))
+ return self
+
+ def statistics(self):
+ return self.received, self.discarded
+
+
+class FrameIOManager(Thread):
+ """
+ Packet/Frame IO manager that can be used to send/receive raw frames
+ on a set of network interfaces.
+ """
+ def __init__(self):
+ super(FrameIOManager, self).__init__()
+
+ self.ports = {} # iface_name -> ActiveFrameReceiver
+ self.queue = {} # iface_name -> TODO
+
+ self.cvar = Condition()
+ self.waker = _SelectWakerDescriptor()
+ self.stopped = False
+ self.ports_changed = False
+
+ #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+
+ def start(self):
+ """
+ Start the IO manager and its select loop thread
+ """
+ log.debug('starting')
+ super(FrameIOManager, self).start()
+ log.info('started')
+ return self
+
+ def stop(self):
+ """
+ Stop the IO manager and its thread with the select loop
+ """
+ log.debug('stopping')
+ self.stopped = True
+ self.waker.notify()
+ self.join()
+ del self.ports
+ log.info('stopped')
+
+ def list_interfaces(self):
+ """
+ Return list of interfaces listened on
+ :return: List of FrameIOPort objects
+ """
+ return self.ports
+
+ def add_interface(self, iface_name, callback, filter=None):
+ """
+ Add a new interface and start receiving on it.
+ :param iface_name: Name of the interface. Must be an existing Unix
+ interface (eth0, en0, etc.)
+ :param callback: Called on each received frame;
+ signature: def callback(port, frame) where port is the FrameIOPort
+ instance at which the frame was received, frame is the actual frame
+ received (as binay string)
+ :param filter: An optional filter (predicate), with signature:
+ def filter(frame). If provided, only frames for which filter evaluates
+ to True will be forwarded to callback.
+ :return: FrmaeIOPort instance.
+ """
+ """Add a new interface"""
+ assert iface_name not in self.ports
+ port = FrameIOPort(iface_name, callback, filter)
+ self.ports[iface_name] = port
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+ return port
+
+ def del_interface(self, iface_name):
+ """
+ Stop and remove named interface
+ :param iface_name: Name of previoysly registered interface
+ :return: None
+ """
+ assert iface_name in self.ports
+ port = self.ports[iface_name]
+ port.stop()
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+
+ def send(self, iface_name, frame):
+ """
+ Send frame on given interface
+ :param iface_name: Name of previously registered interface
+ :param frame: frame as string
+ :return: number of bytes sent
+ """
+ return self.ports[iface_name].send(frame)
+
+ #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+ def run(self):
+ """
+ Called on the alien thread, this is the core multi-port receive loop
+ """
+
+ log.debug('select-loop-started')
+
+ # outer loop constructs sockets list for select
+ while not self.stopped:
+ sockets = [self.waker] + self.ports.values()
+ self.ports_changed = False
+ empty = []
+ # inner select loop
+
+ while not self.stopped:
+ try:
+ _in, _out, _err = select.select(sockets, empty, empty, 1)
+ except Exception, e:
+ log.exception('frame-io-select-error', e=e)
+ break
+ with self.cvar:
+ for port in _in:
+ if port is self.waker:
+ self.waker.wait()
+ continue
+ else:
+ port.recv()
+ self.cvar.notify_all()
+ if self.ports_changed:
+ break # break inner loop so we reconstruct sockets list
+
+ log.debug('select-loop-exited')
diff --git a/common/frameio/third_party/__init__.py b/common/frameio/third_party/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/frameio/third_party/__init__.py
diff --git a/common/frameio/third_party/oftest/LICENSE b/common/frameio/third_party/oftest/LICENSE
new file mode 100644
index 0000000..3216042
--- /dev/null
+++ b/common/frameio/third_party/oftest/LICENSE
@@ -0,0 +1,36 @@
+OpenFlow Test Framework
+
+Copyright (c) 2010 The Board of Trustees of The Leland Stanford
+Junior University
+
+Except where otherwise noted, this software is distributed under
+the OpenFlow Software License. See
+http://www.openflowswitch.org/wp/legal/ for current details.
+
+We are making the OpenFlow specification and associated documentation
+(Software) available for public use and benefit with the expectation
+that others will use, modify and enhance the Software and contribute
+those enhancements back to the community. However, since we would like
+to make the Software available for broadest use, with as few
+restrictions as possible permission is hereby granted, free of charge,
+to any person obtaining a copy of this Software to deal in the
+Software under the copyrights without restriction, including without
+limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED -Y´AS IS¡, WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+The name and trademarks of copyright holder(s) may NOT be used in
+advertising or publicity pertaining to the Software or any derivatives
+without specific, written prior permission.
diff --git a/common/frameio/third_party/oftest/README.md b/common/frameio/third_party/oftest/README.md
new file mode 100644
index 0000000..f0cb649
--- /dev/null
+++ b/common/frameio/third_party/oftest/README.md
@@ -0,0 +1,6 @@
+Files in this directory are derived from the respective files
+in oftest (http://github.com/floodlight/oftest).
+
+For the licensing terms of these files, see LICENSE in this dir.
+
+
diff --git a/common/frameio/third_party/oftest/__init__.py b/common/frameio/third_party/oftest/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/frameio/third_party/oftest/__init__.py
diff --git a/common/frameio/third_party/oftest/afpacket.py b/common/frameio/third_party/oftest/afpacket.py
new file mode 100644
index 0000000..340fc38
--- /dev/null
+++ b/common/frameio/third_party/oftest/afpacket.py
@@ -0,0 +1,111 @@
+"""
+AF_PACKET receive support
+
+When VLAN offload is enabled on the NIC Linux will not deliver the VLAN tag
+in the data returned by recv. Instead, it delivers the VLAN TCI in a control
+message. Python 2.x doesn't have built-in support for recvmsg, so we have to
+use ctypes to call it. The recv function exported by this module reconstructs
+the VLAN tag if it was offloaded.
+"""
+
+import struct
+from ctypes import *
+
+ETH_P_8021Q = 0x8100
+SOL_PACKET = 263
+PACKET_AUXDATA = 8
+TP_STATUS_VLAN_VALID = 1 << 4
+
+class struct_iovec(Structure):
+ _fields_ = [
+ ("iov_base", c_void_p),
+ ("iov_len", c_size_t),
+ ]
+
+class struct_msghdr(Structure):
+ _fields_ = [
+ ("msg_name", c_void_p),
+ ("msg_namelen", c_uint32),
+ ("msg_iov", POINTER(struct_iovec)),
+ ("msg_iovlen", c_size_t),
+ ("msg_control", c_void_p),
+ ("msg_controllen", c_size_t),
+ ("msg_flags", c_int),
+ ]
+
+class struct_cmsghdr(Structure):
+ _fields_ = [
+ ("cmsg_len", c_size_t),
+ ("cmsg_level", c_int),
+ ("cmsg_type", c_int),
+ ]
+
+class struct_tpacket_auxdata(Structure):
+ _fields_ = [
+ ("tp_status", c_uint),
+ ("tp_len", c_uint),
+ ("tp_snaplen", c_uint),
+ ("tp_mac", c_ushort),
+ ("tp_net", c_ushort),
+ ("tp_vlan_tci", c_ushort),
+ ("tp_padding", c_ushort),
+ ]
+
+libc = CDLL("libc.so.6")
+recvmsg = libc.recvmsg
+recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
+recvmsg.retype = c_int
+
+def enable_auxdata(sk):
+ """
+ Ask the kernel to return the VLAN tag in a control message
+
+ Must be called on the socket before afpacket.recv.
+ """
+ sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
+
+def recv(sk, bufsize):
+ """
+ Receive a packet from an AF_PACKET socket
+ @sk Socket
+ @bufsize Maximum packet size
+ """
+ buf = create_string_buffer(bufsize)
+
+ ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
+ ctrl_buf = create_string_buffer(ctrl_bufsize)
+
+ iov = struct_iovec()
+ iov.iov_base = cast(buf, c_void_p)
+ iov.iov_len = bufsize
+
+ msghdr = struct_msghdr()
+ msghdr.msg_name = None
+ msghdr.msg_namelen = 0
+ msghdr.msg_iov = pointer(iov)
+ msghdr.msg_iovlen = 1
+ msghdr.msg_control = cast(ctrl_buf, c_void_p)
+ msghdr.msg_controllen = ctrl_bufsize
+ msghdr.msg_flags = 0
+
+ rv = recvmsg(sk.fileno(), byref(msghdr), 0)
+ if rv < 0:
+ raise RuntimeError("recvmsg failed: rv=%d", rv)
+
+ # The kernel only delivers control messages we ask for. We
+ # only enabled PACKET_AUXDATA, so we can assume it's the
+ # only control message.
+ assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
+
+ cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
+ assert cmsghdr.cmsg_level == SOL_PACKET
+ assert cmsghdr.cmsg_type == PACKET_AUXDATA
+
+ auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
+
+ if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
+ # Insert VLAN tag
+ tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
+ return buf.raw[:12] + tag + buf.raw[12:rv]
+ else:
+ return buf.raw[:rv]
diff --git a/common/frameio/third_party/oftest/netutils.py b/common/frameio/third_party/oftest/netutils.py
new file mode 100644
index 0000000..4d2ec8f
--- /dev/null
+++ b/common/frameio/third_party/oftest/netutils.py
@@ -0,0 +1,60 @@
+"""
+Network utilities for the OpenFlow test framework
+"""
+
+###########################################################################
+## ##
+## Promiscuous mode enable/disable ##
+## ##
+## Based on code from Scapy by Phillippe Biondi ##
+## ##
+## ##
+## This program is free software; you can redistribute it and/or modify it ##
+## under the terms of the GNU General Public License as published by the ##
+## Free Software Foundation; either version 2, or (at your option) any ##
+## later version. ##
+## ##
+## This program is distributed in the hope that it will be useful, but ##
+## WITHOUT ANY WARRANTY; without even the implied warranty of ##
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
+## General Public License for more details. ##
+## ##
+#############################################################################
+
+import socket
+from fcntl import ioctl
+import struct
+
+# From net/if_arp.h
+ARPHDR_ETHER = 1
+ARPHDR_LOOPBACK = 772
+
+# From bits/ioctls.h
+SIOCGIFHWADDR = 0x8927 # Get hardware address
+SIOCGIFINDEX = 0x8933 # name -> if_index mapping
+
+# From netpacket/packet.h
+PACKET_ADD_MEMBERSHIP = 1
+PACKET_DROP_MEMBERSHIP = 2
+PACKET_MR_PROMISC = 1
+
+# From bits/socket.h
+SOL_PACKET = 263
+
+def get_if(iff,cmd):
+ s=socket.socket()
+ ifreq = ioctl(s, cmd, struct.pack("16s16x",iff))
+ s.close()
+ return ifreq
+
+def get_if_index(iff):
+ return int(struct.unpack("I",get_if(iff, SIOCGIFINDEX)[16:20])[0])
+
+def set_promisc(s,iff,val=1):
+ mreq = struct.pack("IHH8s", get_if_index(iff), PACKET_MR_PROMISC, 0, "")
+ if val:
+ cmd = PACKET_ADD_MEMBERSHIP
+ else:
+ cmd = PACKET_DROP_MEMBERSHIP
+ s.setsockopt(SOL_PACKET, cmd, mreq)
+
diff --git a/common/utils/deferred_utils.py b/common/utils/deferred_utils.py
new file mode 100644
index 0000000..30e773a
--- /dev/null
+++ b/common/utils/deferred_utils.py
@@ -0,0 +1,43 @@
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.error import AlreadyCalled
+
+
+class TimeOutError(Exception): pass
+
+
+class DeferredWithTimeout(Deferred):
+ """
+ Deferred with a timeout. If neither the callback nor the errback method
+ is not called within the given time, the deferred's errback will be called
+ with a TimeOutError() exception.
+
+ All other uses are the same as of Deferred().
+ """
+ def __init__(self, timeout=1.0):
+ Deferred.__init__(self)
+ self._timeout = timeout
+ self.timer = reactor.callLater(timeout, self.timed_out)
+
+ def timed_out(self):
+ self.errback(
+ TimeOutError('timed out after {} seconds'.format(self._timeout)))
+
+ def callback(self, result):
+ self._cancel_timer()
+ return Deferred.callback(self, result)
+
+ def errback(self, fail):
+ self._cancel_timer()
+ return Deferred.errback(self, fail)
+
+ def cancel(self):
+ self._cancel_timer()
+ return Deferred.cancel(self)
+
+ def _cancel_timer(self):
+ try:
+ self.timer.cancel()
+ except AlreadyCalled:
+ pass
+
diff --git a/requirements.txt b/requirements.txt
index ade7f69..cbdf9c1 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,6 +15,7 @@
nose-exclude>=0.5.0
mock>=1.3.0
netifaces>=0.10.4
+pcapy>=0.10.10
pep8>=1.5.7
pep8-naming>=0.3.3
protobuf-to-dict>=0.1.0
diff --git a/tests/itests/test_frameio.py b/tests/itests/test_frameio.py
new file mode 100644
index 0000000..c367a9d
--- /dev/null
+++ b/tests/itests/test_frameio.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Run this test inside a docker container using the following syntax:
+
+docker run -ti --rm -v $(pwd):/voltha --privileged cord/voltha-base \
+ env PYTHONPATH=/voltha python /voltha/tests/itests/test_frameio.py
+
+"""
+
+import os
+import random
+from time import sleep
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.error import AlreadyCalled
+from twisted.trial.unittest import TestCase
+
+from common.frameio.frameio import FrameIOManager, BpfProgramFilter
+from common.utils.asleep import asleep
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+ident = lambda frame: frame
+none = lambda *args, **kw: None
+
+
+class TestFrameIO(TestCase):
+
+ @inlineCallbacks
+ def make_veth_pairs_if_needed(self):
+
+ def has_iface(iface):
+ return os.system('ip link show {}'.format(iface)) == 0
+
+ def make_veth(iface):
+ os.system('ip link add type veth')
+ os.system('ip link set {} up'.format(iface))
+ peer = iface[:len('veth')] + str(int(iface[len('veth'):]) + 1)
+ os.system('ip link set {} up'.format(peer))
+ assert has_iface(iface)
+
+ for iface_number in (0, 2):
+ iface = 'veth{}'.format(iface_number)
+ if not has_iface(iface):
+ make_veth(iface)
+ yield asleep(2)
+
+ @inlineCallbacks
+ def setUp(self):
+ yield self.make_veth_pairs_if_needed()
+ self.mgr = FrameIOManager().start()
+
+ def tearDown(self):
+ self.mgr.stop()
+
+ @inlineCallbacks
+ def test_packet_send_receive(self):
+ rcvd = DeferredWithTimeout()
+ p0 = self.mgr.add_interface('veth0', none).up()
+ p1 = self.mgr.add_interface('veth1',
+ lambda p, f: rcvd.callback((p, f))).up()
+
+ # sending to veth0 should result in receiving on veth1 and vice versa
+ bogus_frame = 'bogus packet'
+ p0.send(bogus_frame)
+
+ # check that we receved packet
+ port, frame = yield rcvd
+ self.assertEqual(port, p1)
+ self.assertEqual(frame, bogus_frame)
+
+
+ @inlineCallbacks
+ def test_packet_send_receive_with_filter(self):
+ rcvd = DeferredWithTimeout()
+
+ filter = BpfProgramFilter('ip dst host 123.123.123.123')
+ p0 = self.mgr.add_interface('veth0', none).up()
+ p1 = self.mgr.add_interface('veth1',
+ lambda p, f: rcvd.callback((p, f)),
+ filter=filter).up()
+
+ # sending bogus packet would not be received
+ ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+ p0.send(ip_packet)
+
+ # check that we receved packet
+ port, frame = yield rcvd
+ self.assertEqual(port, p1)
+ self.assertEqual(frame, ip_packet)
+
+
+ @inlineCallbacks
+ def test_packet_send_drop_with_filter(self):
+ rcvd = DeferredWithTimeout()
+
+ filter = BpfProgramFilter('ip dst host 123.123.123.123')
+ p0 = self.mgr.add_interface('veth0', none).up()
+ self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
+ filter=filter).up()
+
+ # sending bogus packet would not be received
+ p0.send('bogus packet')
+
+ try:
+ _ = yield rcvd
+ except TimeOutError:
+ pass
+ else:
+ self.fail('not timed out')
+
+
+ @inlineCallbacks
+ def test_concurrent_packet_send_receive(self):
+
+ done = Deferred()
+ queue1 = []
+ queue2 = []
+
+ n = 100
+
+ def append(queue):
+ def _append(_, frame):
+ queue.append(frame)
+ if len(queue1) == n and len(queue2) == n:
+ done.callback(None)
+ return _append
+
+ p1in = self.mgr.add_interface('veth0', none).up()
+ self.mgr.add_interface('veth1', append(queue1)).up()
+ p2in = self.mgr.add_interface('veth2', none).up()
+ self.mgr.add_interface('veth3', append(queue2)).up()
+
+ @inlineCallbacks
+ def send_packets(port, n):
+ for i in xrange(n):
+ port.send(str(i))
+ yield asleep(0.00001 * random.random()) # to interleave
+
+ # sending two concurrent streams
+ send_packets(p1in, n)
+ send_packets(p2in, n)
+
+ # verify that both queue got all packets
+ yield done
+
+ @inlineCallbacks
+ def test_concurrent_packet_send_receive_with_filter(self):
+
+ done = Deferred()
+ queue1 = []
+ queue2 = []
+
+ n = 100
+
+ def append(queue):
+ def _append(_, frame):
+ queue.append(frame)
+ if len(queue1) == n / 2 and len(queue2) == n / 2:
+ done.callback(None)
+ return _append
+
+ filter = BpfProgramFilter('vlan 100')
+ p1in = self.mgr.add_interface('veth0', none).up()
+ self.mgr.add_interface('veth1', append(queue1), filter).up()
+ p2in = self.mgr.add_interface('veth2', none).up()
+ self.mgr.add_interface('veth3', append(queue2), filter).up()
+
+ @inlineCallbacks
+ def send_packets(port, n):
+ for i in xrange(n):
+ # packets have alternating VLAN ids 100 and 101
+ pkt = Ether()/Dot1Q(vlan=100 + i % 2)
+ port.send(str(pkt))
+ yield asleep(0.00001 * random.random()) # to interleave
+
+ # sending two concurrent streams
+ send_packets(p1in, n)
+ send_packets(p2in, n)
+
+ # verify that both queue got all packets
+ yield done
+
+
+if __name__ == '__main__':
+ import unittest
+ unittest.main()