FrameIO to allow sharing same Linux interface

Change-Id: I30a8dd660477980069801952861d38e0dbe09739
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index a0633f6..4ed999e 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -26,6 +26,7 @@
 import os
 import socket
 import struct
+import uuid
 from pcapy import BPFProgram
 from threading import Thread, Condition
 
@@ -113,16 +114,21 @@
     ETH_P_ALL = 0x03
     RCV_TIMEOUT = 10000
 
-    def __init__(self, iface_name, callback, filter=None):
+    def __init__(self, iface_name):
         self.iface_name = iface_name
-        self.callback = callback
-        self.filter = filter
+        self.proxies = []
         self.socket = self.open_socket(self.iface_name)
         log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
         self.received = 0
         self.discarded = 0
 
-    def open_sockets(self, iface_name):
+    def add_proxy(self, proxy):
+        self.proxies.append(proxy)
+
+    def del_proxy(self, proxy):
+        self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+    def open_socket(self, iface_name):
         raise NotImplementedError('to be implemented by derived class')
 
     def rcv_frame(self):
@@ -137,11 +143,11 @@
     def fileno(self):
         return self.socket.fileno()
 
-    def _dispatch(self, frame):
-        log.debug('calling-publisher', frame=hexify(frame))
+    def _dispatch(self, proxy, frame):
+        log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
         try:
-            self.callback(self, frame)
-        except Exception, e:
+            proxy.callback(proxy, frame)
+        except Exception as e:
             log.exception('callback-error',
                           explanation='Callback failed while processing frame',
                           e=e)
@@ -150,7 +156,7 @@
         """Called on the select thread when a packet arrives"""
         try:
             frame = self.rcv_frame()
-        except RuntimeError, e:
+        except RuntimeError as e:
             # we observed this happens sometimes right after the socket was
             # attached to a newly created veth interface. So we log it, but
             # allow to continue.
@@ -160,10 +166,14 @@
         log.debug('frame-received', iface=self.iface_name, len=len(frame),
                   hex=hexify(frame))
         self.received +=1
-        if self.filter is None or self.filter(frame):
-            log.debug('frame-dispatched')
-            reactor.callFromThread(self._dispatch, frame)
-        else:
+        dispatched = False
+        for proxy in self.proxies:
+            if proxy.filter is None or proxy.filter(frame):
+                log.debug('frame-dispatched')
+                dispatched = True
+                reactor.callFromThread(self._dispatch, proxy, frame)
+
+        if not dispatched:
             self.discarded += 1
             log.debug('frame-discarded')
 
@@ -243,6 +253,34 @@
     sys.exit(1)
 
 
+class FrameIOPortProxy(object):
+    """Makes FrameIOPort sharable between multiple users"""
+
+    def __init__(self, frame_io_port, callback, filter=None, name=None):
+        self.frame_io_port = frame_io_port
+        self.callback = callback
+        self.filter = filter
+        self.name = uuid.uuid4().hex[:12] if name is None else name
+
+    @property
+    def iface_name(self):
+        return self.frame_io_port.iface_name
+
+    def get_iface_name(self):
+        return self.frame_io_port.iface_name
+
+    def send(self, frame):
+        return self.frame_io_port.send(frame)
+
+    def up(self):
+        self.frame_io_port.up()
+        return self
+
+    def down(self):
+        self.frame_io_port.down()
+        return self
+
+
 @implementer(IComponent)
 class FrameIOManager(Thread):
     """
@@ -289,7 +327,7 @@
         """
         return self.ports
 
-    def add_interface(self, iface_name, callback, filter=None):
+    def open_port(self, iface_name, callback, filter=None, name=None):
         """
         Add a new interface and start receiving on it.
         :param iface_name: Name of the interface. Must be an existing Unix
@@ -301,29 +339,39 @@
         :param filter: An optional filter (predicate), with signature:
         def filter(frame). If provided, only frames for which filter evaluates
         to True will be forwarded to callback.
-        :return: FrmaeIOPort instance.
+        :return: FrmaeIOPortProxy instance.
         """
-        """Add a new interface"""
-        assert iface_name not in self.ports
-        port = _FrameIOPort(iface_name, callback, filter)
-        self.ports[iface_name] = port
-        # need to exit select loop to reconstruct select fd lists
-        self.ports_changed = True
-        self.waker.notify()
-        return port
 
-    def del_interface(self, iface_name):
+        port = self.ports.get(iface_name)
+        if port is None:
+            port = _FrameIOPort(iface_name)
+            self.ports[iface_name] = port
+            self.ports_changed = True
+            self.waker.notify()
+
+        proxy = FrameIOPortProxy(port, callback, filter, name)
+        port.add_proxy(proxy)
+
+        return proxy
+
+    def close_port(self, proxy):
         """
-        Stop and remove named interface
-        :param iface_name: Name of previoysly registered interface
+        Remove the proxy. If this is the last proxy on an interface, stop and
+        remove the named interface as well
+        :param proxy: FrameIOPortProxy reference
         :return: None
         """
-        assert iface_name in self.ports
+        assert isinstance(proxy, FrameIOPortProxy)
+        iface_name = proxy.get_iface_name()
+        assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
         port = self.ports[iface_name]
-        del self.ports[iface_name]
-        # need to exit select loop to reconstruct select fd lists
-        self.ports_changed = True
-        self.waker.notify()
+        port.del_proxy(proxy)
+
+        if not port.proxies:
+            del self.ports[iface_name]
+            # need to exit select loop to reconstruct select fd lists
+            self.ports_changed = True
+            self.waker.notify()
 
     def send(self, iface_name, frame):
         """
@@ -353,7 +401,7 @@
             while not self.stopped:
                 try:
                     _in, _out, _err = select.select(sockets, empty, empty, 1)
-                except Exception, e:
+                except Exception as e:
                     log.exception('frame-io-select-error', e=e)
                     break
                 with self.cvar:
diff --git a/ponsim/realio.py b/ponsim/realio.py
index bca3382..84d08ba 100644
--- a/ponsim/realio.py
+++ b/ponsim/realio.py
@@ -37,7 +37,7 @@
         log.debug('starting')
         yield self.frame_io.start()
         for port, iface_name in self.port_to_iface_name.items():
-            io_port = self.frame_io.add_interface(iface_name, self.ingress)
+            io_port = self.frame_io.open_port(iface_name, self.ingress)
             self.io_ports[port] = io_port
         log.info('started')
         returnValue(self)
diff --git a/tests/itests/run_as_root/test_frameio.py b/tests/itests/run_as_root/test_frameio.py
index 057fa28..bd8f770 100644
--- a/tests/itests/run_as_root/test_frameio.py
+++ b/tests/itests/run_as_root/test_frameio.py
@@ -31,11 +31,12 @@
 from scapy.layers.inet import IP
 from scapy.layers.l2 import Ether, Dot1Q
 from twisted.internet import reactor
-from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
 from twisted.internet.error import AlreadyCalled
 from twisted.trial.unittest import TestCase
 
-from common.frameio.frameio import FrameIOManager, BpfProgramFilter
+from common.frameio.frameio import FrameIOManager, BpfProgramFilter, \
+    FrameIOPortProxy
 from common.utils.asleep import asleep
 from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
 
@@ -75,8 +76,8 @@
     @inlineCallbacks
     def test_packet_send_receive(self):
         rcvd = DeferredWithTimeout()
-        p0 = self.mgr.add_interface('veth0', none).up()
-        p1 = self.mgr.add_interface('veth1',
+        p0 = self.mgr.open_port('veth0', none).up()
+        p1 = self.mgr.open_port('veth1',
                                     lambda p, f: rcvd.callback((p, f))).up()
 
         # sending to veth0 should result in receiving on veth1 and vice versa
@@ -93,10 +94,10 @@
         rcvd = DeferredWithTimeout()
 
         filter = BpfProgramFilter('ip dst host 123.123.123.123')
-        p0 = self.mgr.add_interface('veth0', none).up()
-        p1 = self.mgr.add_interface('veth1',
-                                    lambda p, f: rcvd.callback((p, f)),
-                                    filter=filter).up()
+        p0 = self.mgr.open_port('veth0', none).up()
+        p1 = self.mgr.open_port('veth1',
+                                lambda p, f: rcvd.callback((p, f)),
+                                filter=filter).up()
 
         # sending bogus packet would not be received
         ip_packet = str(Ether()/IP(dst='123.123.123.123'))
@@ -112,9 +113,9 @@
         rcvd = DeferredWithTimeout()
 
         filter = BpfProgramFilter('ip dst host 123.123.123.123')
-        p0 = self.mgr.add_interface('veth0', none).up()
-        self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
-                               filter=filter).up()
+        p0 = self.mgr.open_port('veth0', none).up()
+        self.mgr.open_port('veth1', lambda p, f: rcvd.callback((p, f)),
+                           filter=filter).up()
 
         # sending bogus packet would not be received
         p0.send('bogus packet')
@@ -142,10 +143,10 @@
                     done.callback(None)
             return _append
 
-        p1in = self.mgr.add_interface('veth0', none).up()
-        self.mgr.add_interface('veth1', append(queue1)).up()
-        p2in = self.mgr.add_interface('veth2', none).up()
-        self.mgr.add_interface('veth3', append(queue2)).up()
+        p1in = self.mgr.open_port('veth0', none).up()
+        self.mgr.open_port('veth1', append(queue1)).up()
+        p2in = self.mgr.open_port('veth2', none).up()
+        self.mgr.open_port('veth3', append(queue2)).up()
 
         @inlineCallbacks
         def send_packets(port, n):
@@ -177,10 +178,10 @@
             return _append
 
         filter = BpfProgramFilter('vlan 100')
-        p1in = self.mgr.add_interface('veth0', none).up()
-        self.mgr.add_interface('veth1', append(queue1), filter).up()
-        p2in = self.mgr.add_interface('veth2', none).up()
-        self.mgr.add_interface('veth3', append(queue2), filter).up()
+        p1in = self.mgr.open_port('veth0', none).up()
+        self.mgr.open_port('veth1', append(queue1), filter).up()
+        p2in = self.mgr.open_port('veth2', none).up()
+        self.mgr.open_port('veth3', append(queue2), filter).up()
 
         @inlineCallbacks
         def send_packets(port, n):
@@ -197,6 +198,70 @@
         # verify that both queue got all packets
         yield done
 
+    @inlineCallbacks
+    def test_shared_interface(self):
+
+        queue1 = DeferredQueue()
+        queue2 = DeferredQueue()
+
+        # two senders hooked up to the same interface (sharing it)
+        # here we test if they can both send
+        pin1 = self.mgr.open_port('veth0', none).up()
+        pin2 = self.mgr.open_port('veth0', none).up()
+
+        pout1 = self.mgr.open_port(
+            'veth1', lambda p, f: queue1.put((p, f))).up()
+        filter = BpfProgramFilter('ip dst host 123.123.123.123')
+        pout2 = self.mgr.open_port(
+            'veth1', lambda p, f: queue2.put((p, f)), filter=filter).up()
+
+        # sending from pin1, should be received by pout1
+        bogus_frame = 'bogus packet'
+        pin1.send(bogus_frame)
+        port, frame = yield queue1.get()
+        self.assertEqual(port, pout1)
+        self.assertEqual(frame, bogus_frame)
+        self.assertEqual(len(queue1.pending), 0)
+        self.assertEqual(len(queue2.pending), 0)
+
+        # sending from pin2, should be received by pout1
+        bogus_frame = 'bogus packet'
+        pin2.send(bogus_frame)
+        port, frame = yield queue1.get()
+        self.assertEqual(port, pout1)
+        self.assertEqual(frame, bogus_frame)
+        self.assertEqual(len(queue1.pending), 0)
+        self.assertEqual(len(queue2.pending), 0)
+
+        # sending from pin1, should be received by both pouts
+        ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+        pin1.send(ip_packet)
+        port, frame = yield queue1.get()
+        self.assertEqual(port, pout1)
+        self.assertEqual(frame, ip_packet)
+        self.assertEqual(len(queue1.pending), 0)
+        port, frame = yield queue2.get()
+        self.assertEqual(port, pout2)
+        self.assertEqual(frame, ip_packet)
+        self.assertEqual(len(queue2.pending), 0)
+
+        # sending from pin2, should be received by pout1
+        ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+        pin2.send(ip_packet)
+        port, frame = yield queue1.get()
+        self.assertEqual(port, pout1)
+        self.assertEqual(frame, ip_packet)
+        self.assertEqual(len(queue1.pending), 0)
+        port, frame = yield queue2.get()
+        self.assertEqual(port, pout2)
+        self.assertEqual(frame, ip_packet)
+        self.assertEqual(len(queue2.pending), 0)
+
+        self.mgr.close_port(pin1)
+        self.mgr.close_port(pin2)
+        self.mgr.close_port(pout1)
+        self.mgr.close_port(pout2)
+
 
 if __name__ == '__main__':
     import unittest
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 1293561..83906f5 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -193,7 +193,7 @@
 
     def __del__(self):
         if self.io_port is not None:
-            registry('frameio').del_interface(self.interface)
+            registry('frameio').close_port(self.io_port)
 
     def get_channel(self):
         return self.channel
@@ -505,7 +505,7 @@
 
         # finally, open the frameio port to receive in-band packet_in messages
         self.log.info('registering-frameio')
-        self.io_port = registry('frameio').add_interface(
+        self.io_port = registry('frameio').open_port(
             self.interface, self.rcv_io, is_inband_frame)
 
     def rcv_io(self, port, frame):
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 01f008e..dcc55a7 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -158,7 +158,7 @@
 
     def __del__(self):
         if self.io_port is not None:
-            registry('frameio').del_interface(self.interface)
+            registry('frameio').close_port(self.io_port)
 
     def get_channel(self):
         if self.channel is None:
@@ -266,7 +266,7 @@
 
         # finally, open the frameio port to receive in-band packet_in messages
         self.log.info('registering-frameio')
-        self.io_port = registry('frameio').add_interface(
+        self.io_port = registry('frameio').open_port(
             self.interface, self.rcv_io, is_inband_frame)
         self.log.info('registered-frameio')
 
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index d1ee6d1..5d9ab81 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -133,7 +133,7 @@
     def stop(self):
         log.debug('stopping')
         if self.io_port is not None:
-            registry('frameio').del_interface(self.interface)
+            registry('frameio').close_port(self.io_port)
         log.info('stopped')
 
     def adapter_descriptor(self):
@@ -155,7 +155,7 @@
 
     def _activate_io_port(self):
         if self.io_port is None:
-            self.io_port = registry('frameio').add_interface(
+            self.io_port = registry('frameio').open_port(
                 self.interface, self._rcv_io, is_tibit_frame)
 
     @inlineCallbacks
@@ -182,7 +182,7 @@
                 if 1: # TODO check if it is really what we expect, and wait if not
                     break
 
-        except Exception, e:
+        except Exception as e:
             log.exception('launch device failed', e=e)
 
         # if we got response, we can fill out the device info, mark the device