Improved FrameIO support and proxy messaging
Specific changes:
- FrameIO support for Mac OS X (making testing easier)
- Message passing between root and child devices implemented
(example use in simulated_olt and simulated_onu adapters
- Making FrameIOMgr accessible via registry so that modules
can easily reach it
- Making "main" to be a registered component so that command
line args and config file based info is accessible to all.
- Minor clean-ups and improvements
Change-Id: I6812dd5b198fef5cb19f17fc8d7948d3fba8b625
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index daa7deb..0a222ad 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -22,8 +22,10 @@
directly supported) we need to run the receiver select loop on a dedicated
thread.
"""
+
import os
import socket
+import struct
from pcapy import BPFProgram
from threading import Thread, Condition
@@ -31,10 +33,18 @@
import select
import structlog
+import sys
+from scapy.data import ETH_P_ALL
from twisted.internet import reactor
+from zope.interface import implementer
-from common.frameio.third_party.oftest import afpacket, netutils
+from voltha.registry import IComponent
+
+if sys.platform.startswith('linux'):
+ from common.frameio.third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+ from scapy.arch import pcapdnet, BIOCIMMEDIATE
log = structlog.get_logger()
@@ -107,22 +117,22 @@
self.iface_name = iface_name
self.callback = callback
self.filter = filter
- self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
- afpacket.enable_auxdata(self.socket)
- self.socket.bind((self.iface_name, self.ETH_P_ALL))
- netutils.set_promisc(self.socket, self.iface_name)
- self.socket.settimeout(self.RCV_TIMEOUT)
+ self.socket = self.open_socket(self.iface_name)
log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
-
self.received = 0
self.discarded = 0
+ def open_sockets(self, iface_name):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def rcv_frame(self):
+ raise NotImplementedError('to be implemented by derived class')
+
def __del__(self):
if self.socket:
- fn = self.fileno()
self.socket.close()
self.socket = None
- log.debug('socket-closed', fn=fn, iface=self.iface_name)
+ log.debug('socket-closed', iface=self.iface_name)
def fileno(self):
return self.socket.fileno()
@@ -134,11 +144,11 @@
def recv(self):
"""Called on the select thread when a packet arrives"""
try:
- frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+ frame = self.rcv_frame()
except RuntimeError, e:
# we observed this happens sometimes right after the socket was
# attached to a newly created veth interface. So we log it, but
- # allow to continue
+ # allow to continue.
log.warn('afpacket-recv-error', code=-1)
return
@@ -165,13 +175,55 @@
return self
def down(self):
- os.system('ip link set {] down'.format(self.iface_name))
+ os.system('ip link set {} down'.format(self.iface_name))
return self
def statistics(self):
return self.received, self.discarded
+class LinuxFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(s)
+ s.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(s, iface_name)
+ s.settimeout(self.RCV_TIMEOUT)
+ return s
+
+ def rcv_frame(self):
+ return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+ try:
+ fcntl.ioctl(s.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+ except:
+ pass
+
+ return s
+
+ def rcv_frame(self):
+ pkt = self.socket.next()
+ if pkt is not None:
+ ts, pkt = pkt
+ return pkt
+
+
+if sys.platform == 'darwin':
+ _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+ _FrameIOPort = LinuxFrameIOPort
+else:
+ raise Exception('Unsupported platform {}'.format(sys.platform))
+ sys.exit(1)
+
+
+@implementer(IComponent)
class FrameIOManager(Thread):
"""
Packet/Frame IO manager that can be used to send/receive raw frames
@@ -188,7 +240,7 @@
self.stopped = False
self.ports_changed = False
- #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+ # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
def start(self):
"""
@@ -233,7 +285,7 @@
"""
"""Add a new interface"""
assert iface_name not in self.ports
- port = FrameIOPort(iface_name, callback, filter)
+ port = _FrameIOPort(iface_name, callback, filter)
self.ports[iface_name] = port
# need to exit select loop to reconstruct select fd lists
self.ports_changed = True
@@ -248,7 +300,6 @@
"""
assert iface_name in self.ports
port = self.ports[iface_name]
- port.stop()
del self.ports[iface_name]
# need to exit select loop to reconstruct select fd lists
self.ports_changed = True
@@ -263,7 +314,7 @@
"""
return self.ports[iface_name].send(frame)
- #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+ # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
def run(self):
"""