FrameIO to allow sharing same Linux interface
Change-Id: I30a8dd660477980069801952861d38e0dbe09739
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index a0633f6..4ed999e 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -26,6 +26,7 @@
import os
import socket
import struct
+import uuid
from pcapy import BPFProgram
from threading import Thread, Condition
@@ -113,16 +114,21 @@
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
- def __init__(self, iface_name, callback, filter=None):
+ def __init__(self, iface_name):
self.iface_name = iface_name
- self.callback = callback
- self.filter = filter
+ self.proxies = []
self.socket = self.open_socket(self.iface_name)
log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
self.received = 0
self.discarded = 0
- def open_sockets(self, iface_name):
+ def add_proxy(self, proxy):
+ self.proxies.append(proxy)
+
+ def del_proxy(self, proxy):
+ self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+ def open_socket(self, iface_name):
raise NotImplementedError('to be implemented by derived class')
def rcv_frame(self):
@@ -137,11 +143,11 @@
def fileno(self):
return self.socket.fileno()
- def _dispatch(self, frame):
- log.debug('calling-publisher', frame=hexify(frame))
+ def _dispatch(self, proxy, frame):
+ log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
try:
- self.callback(self, frame)
- except Exception, e:
+ proxy.callback(proxy, frame)
+ except Exception as e:
log.exception('callback-error',
explanation='Callback failed while processing frame',
e=e)
@@ -150,7 +156,7 @@
"""Called on the select thread when a packet arrives"""
try:
frame = self.rcv_frame()
- except RuntimeError, e:
+ except RuntimeError as e:
# we observed this happens sometimes right after the socket was
# attached to a newly created veth interface. So we log it, but
# allow to continue.
@@ -160,10 +166,14 @@
log.debug('frame-received', iface=self.iface_name, len=len(frame),
hex=hexify(frame))
self.received +=1
- if self.filter is None or self.filter(frame):
- log.debug('frame-dispatched')
- reactor.callFromThread(self._dispatch, frame)
- else:
+ dispatched = False
+ for proxy in self.proxies:
+ if proxy.filter is None or proxy.filter(frame):
+ log.debug('frame-dispatched')
+ dispatched = True
+ reactor.callFromThread(self._dispatch, proxy, frame)
+
+ if not dispatched:
self.discarded += 1
log.debug('frame-discarded')
@@ -243,6 +253,34 @@
sys.exit(1)
+class FrameIOPortProxy(object):
+ """Makes FrameIOPort sharable between multiple users"""
+
+ def __init__(self, frame_io_port, callback, filter=None, name=None):
+ self.frame_io_port = frame_io_port
+ self.callback = callback
+ self.filter = filter
+ self.name = uuid.uuid4().hex[:12] if name is None else name
+
+ @property
+ def iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def get_iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def send(self, frame):
+ return self.frame_io_port.send(frame)
+
+ def up(self):
+ self.frame_io_port.up()
+ return self
+
+ def down(self):
+ self.frame_io_port.down()
+ return self
+
+
@implementer(IComponent)
class FrameIOManager(Thread):
"""
@@ -289,7 +327,7 @@
"""
return self.ports
- def add_interface(self, iface_name, callback, filter=None):
+ def open_port(self, iface_name, callback, filter=None, name=None):
"""
Add a new interface and start receiving on it.
:param iface_name: Name of the interface. Must be an existing Unix
@@ -301,29 +339,39 @@
:param filter: An optional filter (predicate), with signature:
def filter(frame). If provided, only frames for which filter evaluates
to True will be forwarded to callback.
- :return: FrmaeIOPort instance.
+ :return: FrmaeIOPortProxy instance.
"""
- """Add a new interface"""
- assert iface_name not in self.ports
- port = _FrameIOPort(iface_name, callback, filter)
- self.ports[iface_name] = port
- # need to exit select loop to reconstruct select fd lists
- self.ports_changed = True
- self.waker.notify()
- return port
- def del_interface(self, iface_name):
+ port = self.ports.get(iface_name)
+ if port is None:
+ port = _FrameIOPort(iface_name)
+ self.ports[iface_name] = port
+ self.ports_changed = True
+ self.waker.notify()
+
+ proxy = FrameIOPortProxy(port, callback, filter, name)
+ port.add_proxy(proxy)
+
+ return proxy
+
+ def close_port(self, proxy):
"""
- Stop and remove named interface
- :param iface_name: Name of previoysly registered interface
+ Remove the proxy. If this is the last proxy on an interface, stop and
+ remove the named interface as well
+ :param proxy: FrameIOPortProxy reference
:return: None
"""
- assert iface_name in self.ports
+ assert isinstance(proxy, FrameIOPortProxy)
+ iface_name = proxy.get_iface_name()
+ assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
port = self.ports[iface_name]
- del self.ports[iface_name]
- # need to exit select loop to reconstruct select fd lists
- self.ports_changed = True
- self.waker.notify()
+ port.del_proxy(proxy)
+
+ if not port.proxies:
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
def send(self, iface_name, frame):
"""
@@ -353,7 +401,7 @@
while not self.stopped:
try:
_in, _out, _err = select.select(sockets, empty, empty, 1)
- except Exception, e:
+ except Exception as e:
log.exception('frame-io-select-error', e=e)
break
with self.cvar: