Async frame receive/send module

Change-Id: I75b6ecdecf05f2c72b3b0fa1191d849b7d8e67ef
diff --git a/tests/itests/test_frameio.py b/tests/itests/test_frameio.py
new file mode 100644
index 0000000..c367a9d
--- /dev/null
+++ b/tests/itests/test_frameio.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Run this test inside a docker container using the following syntax:
+
+docker run -ti --rm -v $(pwd):/voltha  --privileged cord/voltha-base \
+    env PYTHONPATH=/voltha python /voltha/tests/itests/test_frameio.py
+
+"""
+
+import os
+import random
+from time import sleep
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.error import AlreadyCalled
+from twisted.trial.unittest import TestCase
+
+from common.frameio.frameio import FrameIOManager, BpfProgramFilter
+from common.utils.asleep import asleep
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+ident = lambda frame: frame
+none = lambda *args, **kw: None
+
+
+class TestFrameIO(TestCase):
+
+    @inlineCallbacks
+    def make_veth_pairs_if_needed(self):
+
+        def has_iface(iface):
+            return os.system('ip link show {}'.format(iface)) == 0
+
+        def make_veth(iface):
+            os.system('ip link add type veth')
+            os.system('ip link set {} up'.format(iface))
+            peer = iface[:len('veth')] + str(int(iface[len('veth'):]) + 1)
+            os.system('ip link set {} up'.format(peer))
+            assert has_iface(iface)
+
+        for iface_number in (0, 2):
+            iface = 'veth{}'.format(iface_number)
+            if not has_iface(iface):
+                make_veth(iface)
+                yield asleep(2)
+
+    @inlineCallbacks
+    def setUp(self):
+        yield self.make_veth_pairs_if_needed()
+        self.mgr = FrameIOManager().start()
+
+    def tearDown(self):
+        self.mgr.stop()
+
+    @inlineCallbacks
+    def test_packet_send_receive(self):
+        rcvd = DeferredWithTimeout()
+        p0 = self.mgr.add_interface('veth0', none).up()
+        p1 = self.mgr.add_interface('veth1',
+                                    lambda p, f: rcvd.callback((p, f))).up()
+
+        # sending to veth0 should result in receiving on veth1 and vice versa
+        bogus_frame = 'bogus packet'
+        p0.send(bogus_frame)
+
+        # check that we receved packet
+        port, frame = yield rcvd
+        self.assertEqual(port, p1)
+        self.assertEqual(frame, bogus_frame)
+
+
+    @inlineCallbacks
+    def test_packet_send_receive_with_filter(self):
+        rcvd = DeferredWithTimeout()
+
+        filter = BpfProgramFilter('ip dst host 123.123.123.123')
+        p0 = self.mgr.add_interface('veth0', none).up()
+        p1 = self.mgr.add_interface('veth1',
+                                    lambda p, f: rcvd.callback((p, f)),
+                                    filter=filter).up()
+
+        # sending bogus packet would not be received
+        ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+        p0.send(ip_packet)
+
+        # check that we receved packet
+        port, frame = yield rcvd
+        self.assertEqual(port, p1)
+        self.assertEqual(frame, ip_packet)
+
+
+    @inlineCallbacks
+    def test_packet_send_drop_with_filter(self):
+        rcvd = DeferredWithTimeout()
+
+        filter = BpfProgramFilter('ip dst host 123.123.123.123')
+        p0 = self.mgr.add_interface('veth0', none).up()
+        self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
+                               filter=filter).up()
+
+        # sending bogus packet would not be received
+        p0.send('bogus packet')
+
+        try:
+            _ = yield rcvd
+        except TimeOutError:
+            pass
+        else:
+            self.fail('not timed out')
+
+
+    @inlineCallbacks
+    def test_concurrent_packet_send_receive(self):
+
+        done = Deferred()
+        queue1 = []
+        queue2 = []
+
+        n = 100
+
+        def append(queue):
+            def _append(_, frame):
+                queue.append(frame)
+                if len(queue1) == n and len(queue2) == n:
+                    done.callback(None)
+            return _append
+
+        p1in = self.mgr.add_interface('veth0', none).up()
+        self.mgr.add_interface('veth1', append(queue1)).up()
+        p2in = self.mgr.add_interface('veth2', none).up()
+        self.mgr.add_interface('veth3', append(queue2)).up()
+
+        @inlineCallbacks
+        def send_packets(port, n):
+            for i in xrange(n):
+                port.send(str(i))
+                yield asleep(0.00001 * random.random())  # to interleave
+
+        # sending two concurrent streams
+        send_packets(p1in, n)
+        send_packets(p2in, n)
+
+        # verify that both queue got all packets
+        yield done
+
+    @inlineCallbacks
+    def test_concurrent_packet_send_receive_with_filter(self):
+
+        done = Deferred()
+        queue1 = []
+        queue2 = []
+
+        n = 100
+
+        def append(queue):
+            def _append(_, frame):
+                queue.append(frame)
+                if len(queue1) == n / 2 and len(queue2) == n / 2:
+                    done.callback(None)
+            return _append
+
+        filter = BpfProgramFilter('vlan 100')
+        p1in = self.mgr.add_interface('veth0', none).up()
+        self.mgr.add_interface('veth1', append(queue1), filter).up()
+        p2in = self.mgr.add_interface('veth2', none).up()
+        self.mgr.add_interface('veth3', append(queue2), filter).up()
+
+        @inlineCallbacks
+        def send_packets(port, n):
+            for i in xrange(n):
+                # packets have alternating VLAN ids 100 and 101
+                pkt = Ether()/Dot1Q(vlan=100 + i % 2)
+                port.send(str(pkt))
+                yield asleep(0.00001 * random.random())  # to interleave
+
+        # sending two concurrent streams
+        send_packets(p1in, n)
+        send_packets(p2in, n)
+
+        # verify that both queue got all packets
+        yield done
+
+
+if __name__ == '__main__':
+    import unittest
+    unittest.main()