FrameIO to allow sharing same Linux interface

Change-Id: I30a8dd660477980069801952861d38e0dbe09739
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index a0633f6..4ed999e 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -26,6 +26,7 @@
 import os
 import socket
 import struct
+import uuid
 from pcapy import BPFProgram
 from threading import Thread, Condition
 
@@ -113,16 +114,21 @@
     ETH_P_ALL = 0x03
     RCV_TIMEOUT = 10000
 
-    def __init__(self, iface_name, callback, filter=None):
+    def __init__(self, iface_name):
         self.iface_name = iface_name
-        self.callback = callback
-        self.filter = filter
+        self.proxies = []
         self.socket = self.open_socket(self.iface_name)
         log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
         self.received = 0
         self.discarded = 0
 
-    def open_sockets(self, iface_name):
+    def add_proxy(self, proxy):
+        self.proxies.append(proxy)
+
+    def del_proxy(self, proxy):
+        self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+    def open_socket(self, iface_name):
         raise NotImplementedError('to be implemented by derived class')
 
     def rcv_frame(self):
@@ -137,11 +143,11 @@
     def fileno(self):
         return self.socket.fileno()
 
-    def _dispatch(self, frame):
-        log.debug('calling-publisher', frame=hexify(frame))
+    def _dispatch(self, proxy, frame):
+        log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
         try:
-            self.callback(self, frame)
-        except Exception, e:
+            proxy.callback(proxy, frame)
+        except Exception as e:
             log.exception('callback-error',
                           explanation='Callback failed while processing frame',
                           e=e)
@@ -150,7 +156,7 @@
         """Called on the select thread when a packet arrives"""
         try:
             frame = self.rcv_frame()
-        except RuntimeError, e:
+        except RuntimeError as e:
             # we observed this happens sometimes right after the socket was
             # attached to a newly created veth interface. So we log it, but
             # allow to continue.
@@ -160,10 +166,14 @@
         log.debug('frame-received', iface=self.iface_name, len=len(frame),
                   hex=hexify(frame))
         self.received +=1
-        if self.filter is None or self.filter(frame):
-            log.debug('frame-dispatched')
-            reactor.callFromThread(self._dispatch, frame)
-        else:
+        dispatched = False
+        for proxy in self.proxies:
+            if proxy.filter is None or proxy.filter(frame):
+                log.debug('frame-dispatched')
+                dispatched = True
+                reactor.callFromThread(self._dispatch, proxy, frame)
+
+        if not dispatched:
             self.discarded += 1
             log.debug('frame-discarded')
 
@@ -243,6 +253,34 @@
     sys.exit(1)
 
 
+class FrameIOPortProxy(object):
+    """Makes FrameIOPort sharable between multiple users"""
+
+    def __init__(self, frame_io_port, callback, filter=None, name=None):
+        self.frame_io_port = frame_io_port
+        self.callback = callback
+        self.filter = filter
+        self.name = uuid.uuid4().hex[:12] if name is None else name
+
+    @property
+    def iface_name(self):
+        return self.frame_io_port.iface_name
+
+    def get_iface_name(self):
+        return self.frame_io_port.iface_name
+
+    def send(self, frame):
+        return self.frame_io_port.send(frame)
+
+    def up(self):
+        self.frame_io_port.up()
+        return self
+
+    def down(self):
+        self.frame_io_port.down()
+        return self
+
+
 @implementer(IComponent)
 class FrameIOManager(Thread):
     """
@@ -289,7 +327,7 @@
         """
         return self.ports
 
-    def add_interface(self, iface_name, callback, filter=None):
+    def open_port(self, iface_name, callback, filter=None, name=None):
         """
         Add a new interface and start receiving on it.
         :param iface_name: Name of the interface. Must be an existing Unix
@@ -301,29 +339,39 @@
         :param filter: An optional filter (predicate), with signature:
         def filter(frame). If provided, only frames for which filter evaluates
         to True will be forwarded to callback.
-        :return: FrmaeIOPort instance.
+        :return: FrmaeIOPortProxy instance.
         """
-        """Add a new interface"""
-        assert iface_name not in self.ports
-        port = _FrameIOPort(iface_name, callback, filter)
-        self.ports[iface_name] = port
-        # need to exit select loop to reconstruct select fd lists
-        self.ports_changed = True
-        self.waker.notify()
-        return port
 
-    def del_interface(self, iface_name):
+        port = self.ports.get(iface_name)
+        if port is None:
+            port = _FrameIOPort(iface_name)
+            self.ports[iface_name] = port
+            self.ports_changed = True
+            self.waker.notify()
+
+        proxy = FrameIOPortProxy(port, callback, filter, name)
+        port.add_proxy(proxy)
+
+        return proxy
+
+    def close_port(self, proxy):
         """
-        Stop and remove named interface
-        :param iface_name: Name of previoysly registered interface
+        Remove the proxy. If this is the last proxy on an interface, stop and
+        remove the named interface as well
+        :param proxy: FrameIOPortProxy reference
         :return: None
         """
-        assert iface_name in self.ports
+        assert isinstance(proxy, FrameIOPortProxy)
+        iface_name = proxy.get_iface_name()
+        assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
         port = self.ports[iface_name]
-        del self.ports[iface_name]
-        # need to exit select loop to reconstruct select fd lists
-        self.ports_changed = True
-        self.waker.notify()
+        port.del_proxy(proxy)
+
+        if not port.proxies:
+            del self.ports[iface_name]
+            # need to exit select loop to reconstruct select fd lists
+            self.ports_changed = True
+            self.waker.notify()
 
     def send(self, iface_name, frame):
         """
@@ -353,7 +401,7 @@
             while not self.stopped:
                 try:
                     _in, _out, _err = select.select(sockets, empty, empty, 1)
-                except Exception, e:
+                except Exception as e:
                     log.exception('frame-io-select-error', e=e)
                     break
                 with self.cvar: