FrameIO to allow sharing same Linux interface
Change-Id: I30a8dd660477980069801952861d38e0dbe09739
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index a0633f6..4ed999e 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -26,6 +26,7 @@
import os
import socket
import struct
+import uuid
from pcapy import BPFProgram
from threading import Thread, Condition
@@ -113,16 +114,21 @@
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
- def __init__(self, iface_name, callback, filter=None):
+ def __init__(self, iface_name):
self.iface_name = iface_name
- self.callback = callback
- self.filter = filter
+ self.proxies = []
self.socket = self.open_socket(self.iface_name)
log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
self.received = 0
self.discarded = 0
- def open_sockets(self, iface_name):
+ def add_proxy(self, proxy):
+ self.proxies.append(proxy)
+
+ def del_proxy(self, proxy):
+ self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+ def open_socket(self, iface_name):
raise NotImplementedError('to be implemented by derived class')
def rcv_frame(self):
@@ -137,11 +143,11 @@
def fileno(self):
return self.socket.fileno()
- def _dispatch(self, frame):
- log.debug('calling-publisher', frame=hexify(frame))
+ def _dispatch(self, proxy, frame):
+ log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
try:
- self.callback(self, frame)
- except Exception, e:
+ proxy.callback(proxy, frame)
+ except Exception as e:
log.exception('callback-error',
explanation='Callback failed while processing frame',
e=e)
@@ -150,7 +156,7 @@
"""Called on the select thread when a packet arrives"""
try:
frame = self.rcv_frame()
- except RuntimeError, e:
+ except RuntimeError as e:
# we observed this happens sometimes right after the socket was
# attached to a newly created veth interface. So we log it, but
# allow to continue.
@@ -160,10 +166,14 @@
log.debug('frame-received', iface=self.iface_name, len=len(frame),
hex=hexify(frame))
self.received +=1
- if self.filter is None or self.filter(frame):
- log.debug('frame-dispatched')
- reactor.callFromThread(self._dispatch, frame)
- else:
+ dispatched = False
+ for proxy in self.proxies:
+ if proxy.filter is None or proxy.filter(frame):
+ log.debug('frame-dispatched')
+ dispatched = True
+ reactor.callFromThread(self._dispatch, proxy, frame)
+
+ if not dispatched:
self.discarded += 1
log.debug('frame-discarded')
@@ -243,6 +253,34 @@
sys.exit(1)
+class FrameIOPortProxy(object):
+ """Makes FrameIOPort sharable between multiple users"""
+
+ def __init__(self, frame_io_port, callback, filter=None, name=None):
+ self.frame_io_port = frame_io_port
+ self.callback = callback
+ self.filter = filter
+ self.name = uuid.uuid4().hex[:12] if name is None else name
+
+ @property
+ def iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def get_iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def send(self, frame):
+ return self.frame_io_port.send(frame)
+
+ def up(self):
+ self.frame_io_port.up()
+ return self
+
+ def down(self):
+ self.frame_io_port.down()
+ return self
+
+
@implementer(IComponent)
class FrameIOManager(Thread):
"""
@@ -289,7 +327,7 @@
"""
return self.ports
- def add_interface(self, iface_name, callback, filter=None):
+ def open_port(self, iface_name, callback, filter=None, name=None):
"""
Add a new interface and start receiving on it.
:param iface_name: Name of the interface. Must be an existing Unix
@@ -301,29 +339,39 @@
:param filter: An optional filter (predicate), with signature:
def filter(frame). If provided, only frames for which filter evaluates
to True will be forwarded to callback.
- :return: FrmaeIOPort instance.
+ :return: FrmaeIOPortProxy instance.
"""
- """Add a new interface"""
- assert iface_name not in self.ports
- port = _FrameIOPort(iface_name, callback, filter)
- self.ports[iface_name] = port
- # need to exit select loop to reconstruct select fd lists
- self.ports_changed = True
- self.waker.notify()
- return port
- def del_interface(self, iface_name):
+ port = self.ports.get(iface_name)
+ if port is None:
+ port = _FrameIOPort(iface_name)
+ self.ports[iface_name] = port
+ self.ports_changed = True
+ self.waker.notify()
+
+ proxy = FrameIOPortProxy(port, callback, filter, name)
+ port.add_proxy(proxy)
+
+ return proxy
+
+ def close_port(self, proxy):
"""
- Stop and remove named interface
- :param iface_name: Name of previoysly registered interface
+ Remove the proxy. If this is the last proxy on an interface, stop and
+ remove the named interface as well
+ :param proxy: FrameIOPortProxy reference
:return: None
"""
- assert iface_name in self.ports
+ assert isinstance(proxy, FrameIOPortProxy)
+ iface_name = proxy.get_iface_name()
+ assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
port = self.ports[iface_name]
- del self.ports[iface_name]
- # need to exit select loop to reconstruct select fd lists
- self.ports_changed = True
- self.waker.notify()
+ port.del_proxy(proxy)
+
+ if not port.proxies:
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
def send(self, iface_name, frame):
"""
@@ -353,7 +401,7 @@
while not self.stopped:
try:
_in, _out, _err = select.select(sockets, empty, empty, 1)
- except Exception, e:
+ except Exception as e:
log.exception('frame-io-select-error', e=e)
break
with self.cvar:
diff --git a/ponsim/realio.py b/ponsim/realio.py
index bca3382..84d08ba 100644
--- a/ponsim/realio.py
+++ b/ponsim/realio.py
@@ -37,7 +37,7 @@
log.debug('starting')
yield self.frame_io.start()
for port, iface_name in self.port_to_iface_name.items():
- io_port = self.frame_io.add_interface(iface_name, self.ingress)
+ io_port = self.frame_io.open_port(iface_name, self.ingress)
self.io_ports[port] = io_port
log.info('started')
returnValue(self)
diff --git a/tests/itests/run_as_root/test_frameio.py b/tests/itests/run_as_root/test_frameio.py
index 057fa28..bd8f770 100644
--- a/tests/itests/run_as_root/test_frameio.py
+++ b/tests/itests/run_as_root/test_frameio.py
@@ -31,11 +31,12 @@
from scapy.layers.inet import IP
from scapy.layers.l2 import Ether, Dot1Q
from twisted.internet import reactor
-from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
from twisted.internet.error import AlreadyCalled
from twisted.trial.unittest import TestCase
-from common.frameio.frameio import FrameIOManager, BpfProgramFilter
+from common.frameio.frameio import FrameIOManager, BpfProgramFilter, \
+ FrameIOPortProxy
from common.utils.asleep import asleep
from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
@@ -75,8 +76,8 @@
@inlineCallbacks
def test_packet_send_receive(self):
rcvd = DeferredWithTimeout()
- p0 = self.mgr.add_interface('veth0', none).up()
- p1 = self.mgr.add_interface('veth1',
+ p0 = self.mgr.open_port('veth0', none).up()
+ p1 = self.mgr.open_port('veth1',
lambda p, f: rcvd.callback((p, f))).up()
# sending to veth0 should result in receiving on veth1 and vice versa
@@ -93,10 +94,10 @@
rcvd = DeferredWithTimeout()
filter = BpfProgramFilter('ip dst host 123.123.123.123')
- p0 = self.mgr.add_interface('veth0', none).up()
- p1 = self.mgr.add_interface('veth1',
- lambda p, f: rcvd.callback((p, f)),
- filter=filter).up()
+ p0 = self.mgr.open_port('veth0', none).up()
+ p1 = self.mgr.open_port('veth1',
+ lambda p, f: rcvd.callback((p, f)),
+ filter=filter).up()
# sending bogus packet would not be received
ip_packet = str(Ether()/IP(dst='123.123.123.123'))
@@ -112,9 +113,9 @@
rcvd = DeferredWithTimeout()
filter = BpfProgramFilter('ip dst host 123.123.123.123')
- p0 = self.mgr.add_interface('veth0', none).up()
- self.mgr.add_interface('veth1', lambda p, f: rcvd.callback((p, f)),
- filter=filter).up()
+ p0 = self.mgr.open_port('veth0', none).up()
+ self.mgr.open_port('veth1', lambda p, f: rcvd.callback((p, f)),
+ filter=filter).up()
# sending bogus packet would not be received
p0.send('bogus packet')
@@ -142,10 +143,10 @@
done.callback(None)
return _append
- p1in = self.mgr.add_interface('veth0', none).up()
- self.mgr.add_interface('veth1', append(queue1)).up()
- p2in = self.mgr.add_interface('veth2', none).up()
- self.mgr.add_interface('veth3', append(queue2)).up()
+ p1in = self.mgr.open_port('veth0', none).up()
+ self.mgr.open_port('veth1', append(queue1)).up()
+ p2in = self.mgr.open_port('veth2', none).up()
+ self.mgr.open_port('veth3', append(queue2)).up()
@inlineCallbacks
def send_packets(port, n):
@@ -177,10 +178,10 @@
return _append
filter = BpfProgramFilter('vlan 100')
- p1in = self.mgr.add_interface('veth0', none).up()
- self.mgr.add_interface('veth1', append(queue1), filter).up()
- p2in = self.mgr.add_interface('veth2', none).up()
- self.mgr.add_interface('veth3', append(queue2), filter).up()
+ p1in = self.mgr.open_port('veth0', none).up()
+ self.mgr.open_port('veth1', append(queue1), filter).up()
+ p2in = self.mgr.open_port('veth2', none).up()
+ self.mgr.open_port('veth3', append(queue2), filter).up()
@inlineCallbacks
def send_packets(port, n):
@@ -197,6 +198,70 @@
# verify that both queue got all packets
yield done
+ @inlineCallbacks
+ def test_shared_interface(self):
+
+ queue1 = DeferredQueue()
+ queue2 = DeferredQueue()
+
+ # two senders hooked up to the same interface (sharing it)
+ # here we test if they can both send
+ pin1 = self.mgr.open_port('veth0', none).up()
+ pin2 = self.mgr.open_port('veth0', none).up()
+
+ pout1 = self.mgr.open_port(
+ 'veth1', lambda p, f: queue1.put((p, f))).up()
+ filter = BpfProgramFilter('ip dst host 123.123.123.123')
+ pout2 = self.mgr.open_port(
+ 'veth1', lambda p, f: queue2.put((p, f)), filter=filter).up()
+
+ # sending from pin1, should be received by pout1
+ bogus_frame = 'bogus packet'
+ pin1.send(bogus_frame)
+ port, frame = yield queue1.get()
+ self.assertEqual(port, pout1)
+ self.assertEqual(frame, bogus_frame)
+ self.assertEqual(len(queue1.pending), 0)
+ self.assertEqual(len(queue2.pending), 0)
+
+ # sending from pin2, should be received by pout1
+ bogus_frame = 'bogus packet'
+ pin2.send(bogus_frame)
+ port, frame = yield queue1.get()
+ self.assertEqual(port, pout1)
+ self.assertEqual(frame, bogus_frame)
+ self.assertEqual(len(queue1.pending), 0)
+ self.assertEqual(len(queue2.pending), 0)
+
+ # sending from pin1, should be received by both pouts
+ ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+ pin1.send(ip_packet)
+ port, frame = yield queue1.get()
+ self.assertEqual(port, pout1)
+ self.assertEqual(frame, ip_packet)
+ self.assertEqual(len(queue1.pending), 0)
+ port, frame = yield queue2.get()
+ self.assertEqual(port, pout2)
+ self.assertEqual(frame, ip_packet)
+ self.assertEqual(len(queue2.pending), 0)
+
+ # sending from pin2, should be received by pout1
+ ip_packet = str(Ether()/IP(dst='123.123.123.123'))
+ pin2.send(ip_packet)
+ port, frame = yield queue1.get()
+ self.assertEqual(port, pout1)
+ self.assertEqual(frame, ip_packet)
+ self.assertEqual(len(queue1.pending), 0)
+ port, frame = yield queue2.get()
+ self.assertEqual(port, pout2)
+ self.assertEqual(frame, ip_packet)
+ self.assertEqual(len(queue2.pending), 0)
+
+ self.mgr.close_port(pin1)
+ self.mgr.close_port(pin2)
+ self.mgr.close_port(pout1)
+ self.mgr.close_port(pout2)
+
if __name__ == '__main__':
import unittest
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 1293561..83906f5 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -193,7 +193,7 @@
def __del__(self):
if self.io_port is not None:
- registry('frameio').del_interface(self.interface)
+ registry('frameio').close_port(self.io_port)
def get_channel(self):
return self.channel
@@ -505,7 +505,7 @@
# finally, open the frameio port to receive in-band packet_in messages
self.log.info('registering-frameio')
- self.io_port = registry('frameio').add_interface(
+ self.io_port = registry('frameio').open_port(
self.interface, self.rcv_io, is_inband_frame)
def rcv_io(self, port, frame):
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 01f008e..dcc55a7 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -158,7 +158,7 @@
def __del__(self):
if self.io_port is not None:
- registry('frameio').del_interface(self.interface)
+ registry('frameio').close_port(self.io_port)
def get_channel(self):
if self.channel is None:
@@ -266,7 +266,7 @@
# finally, open the frameio port to receive in-band packet_in messages
self.log.info('registering-frameio')
- self.io_port = registry('frameio').add_interface(
+ self.io_port = registry('frameio').open_port(
self.interface, self.rcv_io, is_inband_frame)
self.log.info('registered-frameio')
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index d1ee6d1..5d9ab81 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -133,7 +133,7 @@
def stop(self):
log.debug('stopping')
if self.io_port is not None:
- registry('frameio').del_interface(self.interface)
+ registry('frameio').close_port(self.io_port)
log.info('stopped')
def adapter_descriptor(self):
@@ -155,7 +155,7 @@
def _activate_io_port(self):
if self.io_port is None:
- self.io_port = registry('frameio').add_interface(
+ self.io_port = registry('frameio').open_port(
self.interface, self._rcv_io, is_tibit_frame)
@inlineCallbacks
@@ -182,7 +182,7 @@
if 1: # TODO check if it is really what we expect, and wait if not
break
- except Exception, e:
+ except Exception as e:
log.exception('launch device failed', e=e)
# if we got response, we can fill out the device info, mark the device