VOL-1097 : Ofagent integration for voltha 2.0
- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto
Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/python/common/frameio/frameio.py b/python/common/frameio/frameio.py
new file mode 100644
index 0000000..3f5bcf6
--- /dev/null
+++ b/python/common/frameio/frameio.py
@@ -0,0 +1,437 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+
+import os
+import socket
+import struct
+import uuid
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+import sys
+
+from scapy.data import ETH_P_ALL
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from voltha.registry import IComponent
+
+if sys.platform.startswith('linux'):
+ from common.frameio.third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+ from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+ """
+ Return a hexadecimal string encoding of input buffer
+ """
+ return ''.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+ """
+ A descriptor that can be mixed into a select loop to wake it up.
+ """
+ def __init__(self):
+ self.pipe_read, self.pipe_write = os.pipe()
+ fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def __del__(self):
+ os.close(self.pipe_read)
+ os.close(self.pipe_write)
+
+ def fileno(self):
+ return self.pipe_read
+
+ def wait(self):
+ os.read(self.pipe_read, 1)
+
+ def notify(self):
+ """Trigger a select loop"""
+ os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+ """
+ Convenience packet filter based on the well-tried Berkeley Packet Filter,
+ used by many well known open source tools such as pcap and tcpdump.
+ """
+ def __init__(self, program_string):
+ """
+ Create a filter using the BPF command syntax. To learn more,
+ consult 'man pcap-filter'.
+ :param program_string: The textual definition of the filter. Examples:
+ 'vlan 1000'
+ 'vlan 1000 and ip src host 10.10.10.10'
+ """
+ self.bpf = BPFProgram(program_string)
+
+ def __call__(self, frame):
+ """
+ Return 1 if frame passes filter.
+ :param frame: Raw frame provided as Python string
+ :return: 1 if frame satisfies filter, 0 otherwise.
+ """
+ return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+ """
+ Represents a network interface which we can send/receive raw
+ Ethernet frames.
+ """
+
+ RCV_SIZE_DEFAULT = 4096
+ ETH_P_ALL = 0x03
+ RCV_TIMEOUT = 10000
+ MIN_PKT_SIZE = 60
+
+ def __init__(self, iface_name):
+ self.iface_name = iface_name
+ self.proxies = []
+ self.socket = self.open_socket(self.iface_name)
+ log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+ self.received = 0
+ self.discarded = 0
+
+ def add_proxy(self, proxy):
+ self.proxies.append(proxy)
+
+ def del_proxy(self, proxy):
+ self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+ def open_socket(self, iface_name):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def rcv_frame(self):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def __del__(self):
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+ log.debug('socket-closed', iface=self.iface_name)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def _dispatch(self, proxy, frame):
+ log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
+ try:
+ proxy.callback(proxy, frame)
+ except Exception as e:
+ log.exception('callback-error',
+ explanation='Callback failed while processing frame',
+ e=e)
+
+ def recv(self):
+ """Called on the select thread when a packet arrives"""
+ try:
+ frame = self.rcv_frame()
+ except RuntimeError as e:
+ # we observed this happens sometimes right after the socket was
+ # attached to a newly created veth interface. So we log it, but
+ # allow to continue.
+ log.warn('afpacket-recv-error', code=-1)
+ return
+
+ log.debug('frame-received', iface=self.iface_name, len=len(frame),
+ hex=hexify(frame))
+ self.received +=1
+ dispatched = False
+ for proxy in self.proxies:
+ if proxy.filter is None or proxy.filter(frame):
+ log.debug('frame-dispatched')
+ dispatched = True
+ reactor.callFromThread(self._dispatch, proxy, frame)
+
+ if not dispatched:
+ self.discarded += 1
+ log.debug('frame-discarded')
+
+ def send(self, frame):
+ log.debug('sending', len=len(frame), iface=self.iface_name)
+ sent_bytes = self.send_frame(frame)
+ if sent_bytes != len(frame):
+ log.error('send-error', iface=self.iface_name,
+ wanted_to_send=len(frame), actually_sent=sent_bytes)
+ return sent_bytes
+
+ def send_frame(self, frame):
+ try:
+ return self.socket.send(frame)
+ except socket.error, err:
+ if err[0] == os.errno.EINVAL:
+ if len(frame) < self.MIN_PKT_SIZE:
+ padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
+ frame = frame + padding
+ return self.socket.send(frame)
+ else:
+ raise
+
+ def up(self):
+ if sys.platform.startswith('darwin'):
+ pass
+ else:
+ os.system('ip link set {} up'.format(self.iface_name))
+ return self
+
+ def down(self):
+ if sys.platform.startswith('darwin'):
+ pass
+ else:
+ os.system('ip link set {} down'.format(self.iface_name))
+ return self
+
+ def statistics(self):
+ return self.received, self.discarded
+
+
+class LinuxFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(s)
+ s.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(s, iface_name)
+ s.settimeout(self.RCV_TIMEOUT)
+ return s
+
+ def rcv_frame(self):
+ return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+ try:
+ fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+ except:
+ pass
+
+ # need a different kind of socket for sending out
+ self.sout = dnet.eth(iface_name)
+
+ return sin
+
+ def send_frame(self, frame):
+ return self.sout.send(frame)
+
+ def rcv_frame(self):
+ pkt = self.socket.next()
+ if pkt is not None:
+ ts, pkt = pkt
+ return pkt
+
+
+if sys.platform == 'darwin':
+ _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+ _FrameIOPort = LinuxFrameIOPort
+else:
+ raise Exception('Unsupported platform {}'.format(sys.platform))
+ sys.exit(1)
+
+
+class FrameIOPortProxy(object):
+ """Makes FrameIOPort sharable between multiple users"""
+
+ def __init__(self, frame_io_port, callback, filter=None, name=None):
+ self.frame_io_port = frame_io_port
+ self.callback = callback
+ self.filter = filter
+ self.name = uuid.uuid4().hex[:12] if name is None else name
+
+ @property
+ def iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def get_iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def send(self, frame):
+ return self.frame_io_port.send(frame)
+
+ def up(self):
+ self.frame_io_port.up()
+ return self
+
+ def down(self):
+ self.frame_io_port.down()
+ return self
+
+
+@implementer(IComponent)
+class FrameIOManager(Thread):
+ """
+ Packet/Frame IO manager that can be used to send/receive raw frames
+ on a set of network interfaces.
+ """
+ def __init__(self):
+ super(FrameIOManager, self).__init__()
+
+ self.ports = {} # iface_name -> ActiveFrameReceiver
+ self.queue = {} # iface_name -> TODO
+
+ self.cvar = Condition()
+ self.waker = _SelectWakerDescriptor()
+ self.stopped = False
+ self.ports_changed = False
+
+ # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
+
+ def start(self):
+ """
+ Start the IO manager and its select loop thread
+ """
+ log.debug('starting')
+ super(FrameIOManager, self).start()
+ log.info('started')
+ return self
+
+ def stop(self):
+ """
+ Stop the IO manager and its thread with the select loop
+ """
+ log.debug('stopping')
+ self.stopped = True
+ self.waker.notify()
+ self.join()
+ del self.ports
+ log.info('stopped')
+
+ def list_interfaces(self):
+ """
+ Return list of interfaces listened on
+ :return: List of FrameIOPort objects
+ """
+ return self.ports
+
+ def open_port(self, iface_name, callback, filter=None, name=None):
+ """
+ Add a new interface and start receiving on it.
+ :param iface_name: Name of the interface. Must be an existing Unix
+ interface (eth0, en0, etc.)
+ :param callback: Called on each received frame;
+ signature: def callback(port, frame) where port is the FrameIOPort
+ instance at which the frame was received, frame is the actual frame
+ received (as binay string)
+ :param filter: An optional filter (predicate), with signature:
+ def filter(frame). If provided, only frames for which filter evaluates
+ to True will be forwarded to callback.
+ :return: FrmaeIOPortProxy instance.
+ """
+
+ port = self.ports.get(iface_name)
+ if port is None:
+ port = _FrameIOPort(iface_name)
+ self.ports[iface_name] = port
+ self.ports_changed = True
+ self.waker.notify()
+
+ proxy = FrameIOPortProxy(port, callback, filter, name)
+ port.add_proxy(proxy)
+
+ return proxy
+
+ def close_port(self, proxy):
+ """
+ Remove the proxy. If this is the last proxy on an interface, stop and
+ remove the named interface as well
+ :param proxy: FrameIOPortProxy reference
+ :return: None
+ """
+ assert isinstance(proxy, FrameIOPortProxy)
+ iface_name = proxy.get_iface_name()
+ assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
+ port = self.ports[iface_name]
+ port.del_proxy(proxy)
+
+ if not port.proxies:
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+
+ def send(self, iface_name, frame):
+ """
+ Send frame on given interface
+ :param iface_name: Name of previously registered interface
+ :param frame: frame as string
+ :return: number of bytes sent
+ """
+ return self.ports[iface_name].send(frame)
+
+ # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+ def run(self):
+ """
+ Called on the alien thread, this is the core multi-port receive loop
+ """
+
+ log.debug('select-loop-started')
+
+ # outer loop constructs sockets list for select
+ while not self.stopped:
+ sockets = [self.waker] + self.ports.values()
+ self.ports_changed = False
+ empty = []
+ # inner select loop
+
+ while not self.stopped:
+ try:
+ _in, _out, _err = select.select(sockets, empty, empty, 1)
+ except Exception as e:
+ log.exception('frame-io-select-error', e=e)
+ break
+ with self.cvar:
+ for port in _in:
+ if port is self.waker:
+ self.waker.wait()
+ continue
+ else:
+ port.recv()
+ self.cvar.notify_all()
+ if self.ports_changed:
+ break # break inner loop so we reconstruct sockets list
+
+ log.debug('select-loop-exited')
+
+ def del_interface(self, iface_name):
+ """
+ Delete interface for stopping
+ """
+
+ log.info('Delete interface')
+ del self.ports[iface_name]
+ log.info('Interface(port) is deleted')