Improved FrameIO support and proxy messaging
Specific changes:
- FrameIO support for Mac OS X (making testing easier)
- Message passing between root and child devices implemented
(example use in simulated_olt and simulated_onu adapters
- Making FrameIOMgr accessible via registry so that modules
can easily reach it
- Making "main" to be a registered component so that command
line args and config file based info is accessible to all.
- Minor clean-ups and improvements
Change-Id: I6812dd5b198fef5cb19f17fc8d7948d3fba8b625
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index daa7deb..0a222ad 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -22,8 +22,10 @@
directly supported) we need to run the receiver select loop on a dedicated
thread.
"""
+
import os
import socket
+import struct
from pcapy import BPFProgram
from threading import Thread, Condition
@@ -31,10 +33,18 @@
import select
import structlog
+import sys
+from scapy.data import ETH_P_ALL
from twisted.internet import reactor
+from zope.interface import implementer
-from common.frameio.third_party.oftest import afpacket, netutils
+from voltha.registry import IComponent
+
+if sys.platform.startswith('linux'):
+ from common.frameio.third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+ from scapy.arch import pcapdnet, BIOCIMMEDIATE
log = structlog.get_logger()
@@ -107,22 +117,22 @@
self.iface_name = iface_name
self.callback = callback
self.filter = filter
- self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
- afpacket.enable_auxdata(self.socket)
- self.socket.bind((self.iface_name, self.ETH_P_ALL))
- netutils.set_promisc(self.socket, self.iface_name)
- self.socket.settimeout(self.RCV_TIMEOUT)
+ self.socket = self.open_socket(self.iface_name)
log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
-
self.received = 0
self.discarded = 0
+ def open_sockets(self, iface_name):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def rcv_frame(self):
+ raise NotImplementedError('to be implemented by derived class')
+
def __del__(self):
if self.socket:
- fn = self.fileno()
self.socket.close()
self.socket = None
- log.debug('socket-closed', fn=fn, iface=self.iface_name)
+ log.debug('socket-closed', iface=self.iface_name)
def fileno(self):
return self.socket.fileno()
@@ -134,11 +144,11 @@
def recv(self):
"""Called on the select thread when a packet arrives"""
try:
- frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+ frame = self.rcv_frame()
except RuntimeError, e:
# we observed this happens sometimes right after the socket was
# attached to a newly created veth interface. So we log it, but
- # allow to continue
+ # allow to continue.
log.warn('afpacket-recv-error', code=-1)
return
@@ -165,13 +175,55 @@
return self
def down(self):
- os.system('ip link set {] down'.format(self.iface_name))
+ os.system('ip link set {} down'.format(self.iface_name))
return self
def statistics(self):
return self.received, self.discarded
+class LinuxFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(s)
+ s.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(s, iface_name)
+ s.settimeout(self.RCV_TIMEOUT)
+ return s
+
+ def rcv_frame(self):
+ return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+ try:
+ fcntl.ioctl(s.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+ except:
+ pass
+
+ return s
+
+ def rcv_frame(self):
+ pkt = self.socket.next()
+ if pkt is not None:
+ ts, pkt = pkt
+ return pkt
+
+
+if sys.platform == 'darwin':
+ _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+ _FrameIOPort = LinuxFrameIOPort
+else:
+ raise Exception('Unsupported platform {}'.format(sys.platform))
+ sys.exit(1)
+
+
+@implementer(IComponent)
class FrameIOManager(Thread):
"""
Packet/Frame IO manager that can be used to send/receive raw frames
@@ -188,7 +240,7 @@
self.stopped = False
self.ports_changed = False
- #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+ # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
def start(self):
"""
@@ -233,7 +285,7 @@
"""
"""Add a new interface"""
assert iface_name not in self.ports
- port = FrameIOPort(iface_name, callback, filter)
+ port = _FrameIOPort(iface_name, callback, filter)
self.ports[iface_name] = port
# need to exit select loop to reconstruct select fd lists
self.ports_changed = True
@@ -248,7 +300,6 @@
"""
assert iface_name in self.ports
port = self.ports[iface_name]
- port.stop()
del self.ports[iface_name]
# need to exit select loop to reconstruct select fd lists
self.ports_changed = True
@@ -263,7 +314,7 @@
"""
return self.ports[iface_name].send(frame)
- #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+ # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
def run(self):
"""
diff --git a/env.sh b/env.sh
index 9efac4d..d1d023b 100644
--- a/env.sh
+++ b/env.sh
@@ -13,7 +13,7 @@
. $VENVDIR/bin/activate
# add top-level voltha dir to pythonpath
-export PYTHONPATH=$VENVDIR/lib/python2.7/site-packages:$PYTHONPATH:$VOLTHA_BASE:$VOLTHA_BASE/voltha/protos/third_party
+export PYTHONPATH=$VOLTHA_BASE/$VENVDIR/lib/python2.7/site-packages:$PYTHONPATH:$VOLTHA_BASE:$VOLTHA_BASE/voltha/protos/third_party
# assign DOCKER_HOST_IP to be the main ip address of this host
export DOCKER_HOST_IP=$(python common/utils/nethelpers.py)
diff --git a/tests/itests/run_as_root/test_frameio.py b/tests/itests/run_as_root/test_frameio.py
index b3863d9..057fa28 100644
--- a/tests/itests/run_as_root/test_frameio.py
+++ b/tests/itests/run_as_root/test_frameio.py
@@ -20,7 +20,7 @@
docker run -ti --rm -v $(pwd):/voltha --privileged cord/voltha-base \
env PYTHONPATH=/voltha python \
- /voltha/tests/itests/frameio_tests/run_as_root/test_frameio.py
+ /voltha/tests/itests/run_as_root/test_frameio.py
"""
@@ -88,7 +88,6 @@
self.assertEqual(port, p1)
self.assertEqual(frame, bogus_frame)
-
@inlineCallbacks
def test_packet_send_receive_with_filter(self):
rcvd = DeferredWithTimeout()
@@ -108,7 +107,6 @@
self.assertEqual(port, p1)
self.assertEqual(frame, ip_packet)
-
@inlineCallbacks
def test_packet_send_drop_with_filter(self):
rcvd = DeferredWithTimeout()
@@ -128,7 +126,6 @@
else:
self.fail('not timed out')
-
@inlineCallbacks
def test_concurrent_packet_send_receive(self):
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index f9426c6..bb40eb0 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -179,7 +179,8 @@
def child_device_detected(parent_device_id,
parent_port_no,
child_device_type,
- child_device_address_kw):
+ proxy_address,
+ **kw):
# TODO add doc
""""""
diff --git a/voltha/adapters/loader.py b/voltha/adapters/loader.py
index dca8afa..4a26dff 100644
--- a/voltha/adapters/loader.py
+++ b/voltha/adapters/loader.py
@@ -69,21 +69,24 @@
def _find_adapters(self):
subdirs = os.walk(mydir).next()[1]
- for subdir in subdirs:
- adapter_name = subdir
- py_file = os.path.join(mydir, subdir, subdir + '.py')
- if os.path.isfile(py_file):
- try:
- package_name = __package__ + '.' + subdir
- pkg = __import__(package_name, None, None, [adapter_name])
- module = getattr(pkg, adapter_name)
- except ImportError, e:
- log.exception('cannot-load', file=py_file, e=e)
- continue
+ try:
+ for subdir in subdirs:
+ adapter_name = subdir
+ py_file = os.path.join(mydir, subdir, subdir + '.py')
+ if os.path.isfile(py_file):
+ try:
+ package_name = __package__ + '.' + subdir
+ pkg = __import__(package_name, None, None, [adapter_name])
+ module = getattr(pkg, adapter_name)
+ except ImportError, e:
+ log.exception('cannot-load', file=py_file, e=e)
+ continue
- for attr_name in dir(module):
- cls = getattr(module, attr_name)
- if isinstance(cls, type) and \
- IAdapterInterface.implementedBy(cls):
- verifyClass(IAdapterInterface, cls)
- yield adapter_name, cls
+ for attr_name in dir(module):
+ cls = getattr(module, attr_name)
+ if isinstance(cls, type) and \
+ IAdapterInterface.implementedBy(cls):
+ verifyClass(IAdapterInterface, cls)
+ yield adapter_name, cls
+ except Exception, e:
+ log.exception('failed', e=e)
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index c6a6e11..f693a73 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -321,13 +321,11 @@
parent_device_id=device.id,
parent_port_no=1,
child_device_type='simulated_onu',
- child_device_address_kw=dict(
- proxy_device=Device.ProxyAddress(
- device_id=device.id,
- channel_id=vlan_id
- ),
- vlan=100 + i
- )
+ proxy_address=Device.ProxyAddress(
+ device_id=device.id,
+ channel_id=vlan_id
+ ),
+ vlan=100 + i
)
def _olt_side_onu_activation(self, seq):
@@ -349,7 +347,14 @@
raise NotImplementedError()
def send_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ # we mimick a response by sending the same message back in a short time
+ reactor.callLater(
+ 0.2,
+ self.adapter_agent.receive_proxied_message,
+ proxy_address,
+ msg
+ )
def receive_proxied_message(self, proxy_address, msg):
raise NotImplementedError()
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index 38cb2c5..73a2f8f 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -21,7 +21,7 @@
import structlog
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, DeferredQueue
from zope.interface import implementer
from common.utils.asleep import asleep
@@ -62,6 +62,7 @@
version='0.1',
config=AdapterConfig(log_level=LogLevel.INFO)
)
+ self.incoming_messages = DeferredQueue()
def start(self):
log.debug('starting')
@@ -96,10 +97,11 @@
@inlineCallbacks
def _simulate_device_activation(self, device):
+
# first we verify that we got parent reference and proxy info
assert device.parent_id
- assert device.proxy_device.device_id
- assert device.proxy_device.channel_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
# we pretend that we were able to contact the device and obtain
# additional information about it
@@ -150,7 +152,7 @@
# and name for the virtual ports, as this is guaranteed to be unique
# in the context of the OLT port, so it is also unique in the context
# of the logical device
- port_no = device.proxy_device.channel_id
+ port_no = device.proxy_address.channel_id
cap = OFPPF_1GB_FD | OFPPF_FIBER
self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
id=str(port_no),
@@ -170,7 +172,10 @@
device_port_no=uni_port.port_no
))
- # and finally update to active
+ # simulate a proxied message sending and receving a reply
+ reply = yield self._simulate_message_exchange(device)
+
+ # and finally update to "ACTIVE"
device = self.adapter_agent.get_device(device.id)
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
@@ -186,4 +191,27 @@
raise NotImplementedError()
def receive_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
+ # just place incoming message to a list
+ self.incoming_messages.put((proxy_address, msg))
+
+ @inlineCallbacks
+ def _simulate_message_exchange(self, device):
+
+ # register for receiving async messages
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # reset incoming message queue
+ while self.incoming_messages.pending:
+ _ = yield self.incoming_messages.get()
+
+ # construct message
+ msg = 'test message'
+
+ # send message
+ self.adapter_agent.send_proxied_message(device.proxy_address, msg)
+
+ # wait till we detect incoming message
+ yield self.incoming_messages.get()
+
+ # by returning we allow the device to be shown as active, which
+ # indirectly verified that message passing works
diff --git a/voltha/adapters/tibit_olt/tb_json_version.py b/voltha/adapters/tibit_olt/tb_json_version.py
new file mode 100644
index 0000000..4b46619
--- /dev/null
+++ b/voltha/adapters/tibit_olt/tb_json_version.py
@@ -0,0 +1,107 @@
+#! /usr/bin/env python
+""" JSON layer for scapy """
+
+# Set log level to benefit from Scapy warnings
+import logging
+import json
+import argparse
+logging.getLogger("scapy").setLevel(1)
+
+from scapy.packet import Packet, bind_layers
+from scapy.fields import StrField
+from scapy.layers.l2 import Ether
+from scapy.sendrecv import sendp
+from scapy.sendrecv import srp1
+# from scapy.main import interact
+
+from uuid import getnode as get_srcmac
+
+
+class TBJSON(Packet):
+ """ TBJSON 'packet' layer. """
+ name = "TBJSON"
+ fields_desc = [StrField("data", default="")]
+
+
+def tb_json_packet_from_dict(json_operation_dict, dst_macid):
+ """ Given an command matrix operation dictionary, return a packet """
+ json_op_string = json.dumps(json_operation_dict, dst_macid)
+ return tb_json_packet_from_str(json_op_string, dst_macid)
+
+
+def tb_json_packet_from_str(json_operation_str, dst_macid):
+ """ Given an command matrix operation as json string, return a packet """
+ base_packet = Ether()/TBJSON(data='json %s' % json_operation_str)
+ base_packet.type = int("9001", 16)
+ mac = '%012x' % get_srcmac()
+ base_packet.src = ':'.join(s.encode('hex') for s in mac.decode('hex'))
+ base_packet.dst = dst_macid
+ bind_layers(Ether, TBJSON, type=0x9001)
+ return base_packet
+
+
+def tb_macid_to_scapy(macid):
+ """ convert a tibit macid (xxxxxxxxxxxx) to scapy (xx:xx:xx:xx:xx:xx) """
+ if len(macid) != 12:
+ print('tb_macid_to_scapy: unexpected macid length (%s)' % macid)
+ return '00:00:00:00:00:00'
+ new_macid = ''
+ for i in [0, 2, 4, 6, 8]:
+ new_macid += macid[i:i+2] + ':'
+ new_macid += macid[10:12]
+ return new_macid
+
+
+def scapy_to_tb_macid(macid):
+ """ convert a scapy macid (xx:xx:xx:xx:xx:xx) to tibit (xxxxxxxxxxxx) """
+ if len(macid) != 17:
+ print('tb_macid_to_scapy: unexpected macid length (%s)' % macid)
+ return '000000000000'
+ new_macid = ''
+ for i in [0, 3, 6, 9, 12, 15]:
+ new_macid += macid[i:i+2]
+ return new_macid
+
+
+def tb_json_packet(json_operation_dict):
+ """ Given an command matrix operation dictionary, return a packet """
+ json_op_string = json.dumps(json_operation_dict)
+ base_packet = Ether()/TBJSON(data='json %s' % json_op_string)
+
+ base_packet.type = int("9001", 16)
+ mac = '%012x' % get_srcmac()
+ base_packet.src = ':'.join(s.encode('hex') for s in mac.decode('hex'))
+ base_packet.dst = args.dstAddress
+
+ bind_layers(Ether, TBJSON, type=0x9001)
+
+ return base_packet
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--dst', dest='dstAddress', action='store',
+ help='MAC address to use as destination.')
+
+ args = parser.parse_args()
+
+ if (args.dstAddress == None):
+ args.dstAddress = '00:0c:e2:31:10:00'
+
+ # Create a json packet
+ PACKET = tb_json_packet_from_dict({"operation":"version"}, args.dstAddress)
+
+ # Send the packet
+ PACKET.show()
+ p = srp1(PACKET, iface="eth0")
+ if p:
+ print "============================================================================="
+ p.show()
+
+ print "============================================================================="
+ print "Stripping off the \"json\" and quotes yields...\n%s" % p.data[5:]
+ print "============================================================================="
+ print "Load the JSON..."
+ print json.loads(p.data[5:])
+ print "============================================================================="
+
+ # interact(mydict=globals(), mybanner="===( TBJSON MODE )===")
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 1cb25c8..a1f9417 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -17,20 +17,37 @@
"""
Tibit OLT device adapter
"""
-
+import scapy
import structlog
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.l2 import Ether
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet import reactor
+
from zope.interface import implementer
+from common.frameio.frameio import BpfProgramFilter
from voltha.registry import registry
from voltha.adapters.interface import IAdapterInterface
from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
from voltha.protos.device_pb2 import DeviceType, DeviceTypes
from voltha.protos.health_pb2 import HealthStatus
-from voltha.protos.common_pb2 import LogLevel
+from voltha.protos.common_pb2 import LogLevel, ConnectStatus
+
+from scapy.packet import Packet, bind_layers
+from scapy.fields import StrField
log = structlog.get_logger()
+is_tibit_frame = BpfProgramFilter('ether[12:2] = 0x9001')
+
+# To be removed
+class TBJSON(Packet):
+ """ TBJSON 'packet' layer. """
+ name = "TBJSON"
+ fields_desc = [StrField("data", default="")]
+
@implementer(IAdapterInterface)
class TibitOltAdapter(object):
@@ -54,6 +71,8 @@
config=AdapterConfig(log_level=LogLevel.INFO)
)
self.interface = registry('main').get_args().interface
+ self.io_port = None
+ self.incoming_queues = {} # mac_address -> DeferredQueue()
def start(self):
log.debug('starting', interface=self.interface)
@@ -61,6 +80,8 @@
def stop(self):
log.debug('stopping')
+ if self.io_port is not None:
+ registry('frameio').del_interface(self.interface)
log.info('stopped')
def adapter_descriptor(self):
@@ -77,8 +98,76 @@
def adopt_device(self, device):
log.info('adopt-device', device=device)
+ self._activate_io_port()
+ reactor.callLater(0, self._launch_device_activation, device)
return device
+ def _activate_io_port(self):
+ if self.io_port is None:
+ self.io_port = registry('frameio').add_interface(
+ self.interface, self._rcv_io, is_tibit_frame)
+
+ @inlineCallbacks
+ def _launch_device_activation(self, device):
+
+
+ try:
+ log.debug('launch_dev_activation')
+ # prepare receive queue
+ self.incoming_queues[device.mac_address] = DeferredQueue(size=100)
+
+ # send out ping to OLT device
+ olt_mac = device.mac_address
+ ping_frame = self._make_ping_frame(mac_address=olt_mac)
+ self.io_port.send(ping_frame)
+
+ # wait till we receive a response
+ # TODO add timeout mechanism so we can signal if we cannot reach device
+ while True:
+ response = yield self.incoming_queues[olt_mac].get()
+ # verify response and if not the expected response
+ if 1: # TODO check if it is really what we expect, and wait if not
+ break
+
+ except Exception, e:
+ log.exception('launch device failed', e=e)
+
+ # if we got response, we can fill out the device info, mark the device
+ # reachable
+ import pdb
+ pdb.set_trace()
+
+ device.root = True
+ device.vendor = 'Tibit stuff'
+ device.model = 'n/a'
+ device.hardware_version = 'n/a'
+ device.firmware_version = 'n/a'
+ device.software_version = '1.0'
+ device.serial_number = 'add junk here'
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ def _rcv_io(self, port, frame):
+
+ log.info('frame-recieved')
+
+ # extract source mac
+ response = Ether(frame)
+
+ # enqueue incoming parsed frame to rigth device
+ self.incoming_queues[response.src].put(response)
+
+ def _make_ping_frame(self, mac_address):
+ # TODO Nathan to make this to be an actual OLT ping
+ # Create a json packet
+ json_operation_str = '{\"operation\":\"version\"}'
+ frame = Ether()/TBJSON(data='json %s' % json_operation_str)
+ frame.type = int("9001", 16)
+ frame.dst = '00:0c:e2:31:25:00'
+ bind_layers(Ether, TBJSON, type=0x9001)
+ frame.show()
+ return str(frame)
+
def abandon_device(self, device):
raise NotImplementedError(0
)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 5659f85..83b4e89 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -23,6 +23,7 @@
from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import implementer
+from common.event_bus import EventBusClient
from voltha.adapters.interface import IAdapterAgent
from voltha.protos import third_party
from voltha.protos.device_pb2 import Device, Port
@@ -31,8 +32,6 @@
from voltha.registry import registry
-log = structlog.get_logger()
-
@implementer(IAdapterAgent)
class AdapterAgent(object):
"""
@@ -52,26 +51,33 @@
self.adapter = None
self.adapter_node_proxy = None
self.root_proxy = self.core.get_proxy('/')
+ self._rx_event_subscriptions = {}
+ self._tx_event_subscriptions = {}
+ self.event_bus = EventBusClient()
+ self.log = structlog.get_logger(adapter_name=adapter_name)
@inlineCallbacks
def start(self):
- log.debug('starting')
+ self.log.debug('starting')
config = self._get_adapter_config() # this may be None
- adapter = self.adapter_cls(self, config)
- yield adapter.start()
+ try:
+ adapter = self.adapter_cls(self, config)
+ yield adapter.start()
+ except Exception, e:
+ self.log.exception(e)
self.adapter = adapter
self.adapter_node_proxy = self._update_adapter_node()
self._update_device_types()
- log.info('started')
+ self.log.info('started')
returnValue(self)
@inlineCallbacks
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
if self.adapter is not None:
yield self.adapter.stop()
self.adapter = None
- log.info('stopped')
+ self.log.info('stopped')
def _get_adapter_config(self):
"""
@@ -197,16 +203,51 @@
parent_device_id,
parent_port_no,
child_device_type,
- child_device_address_kw):
+ proxy_address,
+ **kw):
# we create new ONU device objects and insert them into the config
- # TODO should we auto-enable the freshly created device? Probably
+ # TODO should we auto-enable the freshly created device? Probably.
device = Device(
id=uuid4().hex[:12],
type=child_device_type,
parent_id=parent_device_id,
parent_port_no=parent_port_no,
admin_state=AdminState.ENABLED,
- **child_device_address_kw
+ proxy_address=proxy_address,
+ **kw
)
self._make_up_to_date(
'/devices', device.id, device)
+
+ topic = self._gen_tx_proxy_address_topic(proxy_address)
+ self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
+ topic, lambda t, m: self._send_proxied_message(proxy_address, m))
+
+ def _gen_rx_proxy_address_topic(self, proxy_address):
+ """Generate unique topic name specific to this proxy address for rx"""
+ topic = 'rx:' + proxy_address.SerializeToString()
+ return topic
+
+ def _gen_tx_proxy_address_topic(self, proxy_address):
+ """Generate unique topic name specific to this proxy address for tx"""
+ topic = 'tx:' + proxy_address.SerializeToString()
+ return topic
+
+ def register_for_proxied_messages(self, proxy_address):
+ topic = self._gen_rx_proxy_address_topic(proxy_address)
+ self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
+ topic, lambda t, m: self._receive_proxied_message(proxy_address, m))
+
+ def _receive_proxied_message(self, proxy_address, msg):
+ self.adapter.receive_proxied_message(proxy_address, msg)
+
+ def send_proxied_message(self, proxy_address, msg):
+ topic = self._gen_tx_proxy_address_topic(proxy_address)
+ self.event_bus.publish(topic, msg)
+
+ def _send_proxied_message(self, proxy_address, msg):
+ self.adapter.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ topic = self._gen_rx_proxy_address_topic(proxy_address)
+ self.event_bus.publish(topic, msg)
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 17ec620..acd5bc8 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -26,8 +26,6 @@
from voltha.protos.common_pb2 import AdminState, OperStatus
from voltha.registry import registry
-log = structlog.get_logger()
-
class InvalidStateTransition(Exception): pass
@@ -61,23 +59,24 @@
'/device_types/{}'.format(initial_data.type)).get()
self.adapter_agent = None
+ self.log = structlog.get_logger(device_id=initial_data.id)
@inlineCallbacks
def start(self):
- log.debug('starting')
+ self.log.debug('starting')
self._set_adapter_agent()
yield self._process_update(self._tmp_initial_data)
del self._tmp_initial_data
- log.info('started')
+ self.log.info('started')
returnValue(self)
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
self.proxy.unregister_callback(
CallbackType.PRE_UPDATE, self._validate_update)
self.proxy.unregister_callback(
CallbackType.POST_UPDATE, self._process_update)
- log.info('stopped')
+ self.log.info('stopped')
def _set_adapter_agent(self):
adapter_name = self._tmp_initial_data.adapter
@@ -97,7 +96,7 @@
(by raising an exception), or even the augmentation of the incoming
data.
"""
- log.debug('device-pre-update', device=device)
+ self.log.debug('device-pre-update', device=device)
yield self._process_state_transitions(device, dry_run=True)
returnValue(device)
@@ -108,7 +107,7 @@
a transaction), and it is used to propagate the change down to the
adapter
"""
- log.debug('device-post-update', device=device)
+ self.log.debug('device-post-update', device=device)
# first, process any potential state transition
yield self._process_state_transitions(device)
@@ -135,7 +134,7 @@
@inlineCallbacks
def _activate_device(self, device, dry_run=False):
- log.info('activate-device', device=device, dry_run=dry_run)
+ self.log.info('activate-device', device=device, dry_run=dry_run)
if not dry_run:
device = yield self.adapter_agent.adopt_device(device)
device.oper_status = OperStatus.ACTIVATING
@@ -151,22 +150,22 @@
raise NotImplementedError()
def _propagate_change(self, device, dry_run=False):
- log.info('propagate-change', device=device, dry_run=dry_run)
+ self.log.info('propagate-change', device=device, dry_run=dry_run)
if device != self.last_data:
raise NotImplementedError()
else:
- log.debug('no-op')
+ self.log.debug('no-op')
def _abandon_device(self, device, dry_run=False):
- log.info('abandon-device', device=device, dry_run=dry_run)
+ self.log.info('abandon-device', device=device, dry_run=dry_run)
raise NotImplementedError()
def _disable_device(self, device, dry_run=False):
- log.info('disable-device', device=device, dry_run=dry_run)
+ self.log.info('disable-device', device=device, dry_run=dry_run)
raise NotImplementedError()
def _reenable_device(self, device, dry_run=False):
- log.info('reenable-device', device=device, dry_run=dry_run)
+ self.log.info('reenable-device', device=device, dry_run=dry_run)
raise NotImplementedError()
admin_state_fsm = {
@@ -194,7 +193,7 @@
@inlineCallbacks
def _flow_table_updated(self, flows):
- log.debug('flow-table-updated',
+ self.log.debug('flow-table-updated',
logical_device_id=self.last_data.id, flows=flows)
# if device accepts bulk flow update, lets just call that
@@ -216,7 +215,7 @@
@inlineCallbacks
def _group_table_updated(self, groups):
- log.debug('group-table-updated',
+ self.log.debug('group-table-updated',
logical_device_id=self.last_data.id,
flow_groups=groups)
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 5b629ca..41badb4 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -33,7 +33,6 @@
from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
from voltha.registry import registry
-log = structlog.get_logger()
_ = third_party
def mac_str_to_tuple(mac):
@@ -64,13 +63,15 @@
self.self_proxy.register_callback(
CallbackType.POST_REMOVE, self._port_list_updated)
+ self.log = structlog.get_logger(logical_device_id=logical_device.id)
+
def start(self):
- log.debug('starting')
- log.info('started')
+ self.log.debug('starting')
+ self.log.info('started')
return self
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
self.flows_proxy.unregister_callback(
CallbackType.POST_UPDATE, self._flow_table_updated)
self.groups_proxy.unregister_callback(
@@ -79,7 +80,7 @@
CallbackType.POST_ADD, self._port_list_updated)
self.self_proxy.unregister_callback(
CallbackType.POST_REMOVE, self._port_list_updated)
- log.info('stopped')
+ self.log.info('stopped')
def announce_flows_deleted(self, flows):
for f in flows:
@@ -118,7 +119,7 @@
self.flow_modify_strict(flow_mod)
else:
- log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
+ self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
# def list_flows(self):
# return self.flows
@@ -137,7 +138,7 @@
self.group_modify(group_mod)
else:
- log.warn('unhandled-group-mod', command=command,
+ self.log.warn('unhandled-group-mod', command=command,
group_mod=group_mod)
def list_groups(self):
@@ -163,7 +164,7 @@
flow = flow_stats_entry_from_flow_mod_message(mod)
flows.append(flow)
changed = True
- log.debug('flow-added', flow=mod)
+ self.log.debug('flow-added', flow=mod)
else:
flow = flow_stats_entry_from_flow_mod_message(mod)
@@ -175,12 +176,12 @@
flow.packet_count = old_flow.packet_count
flows[idx] = flow
changed = True
- log.debug('flow-updated', flow=flow)
+ self.log.debug('flow-updated', flow=flow)
else:
flows.append(flow)
changed = True
- log.debug('flow-added', flow=mod)
+ self.log.debug('flow-added', flow=mod)
# write back to model
if changed:
@@ -225,7 +226,7 @@
changed = True
else:
# TODO need to check what to do with this case
- log.warn('flow-cannot-delete', flow=flow)
+ self.log.warn('flow-cannot-delete', flow=flow)
if changed:
self.flows_proxy.update('/', Flows(items=flows))
@@ -403,7 +404,7 @@
# signal controller as requested by flow's flag
groups = OrderedDict()
groups_changed = True
- log.debug('all-groups-deleted')
+ self.log.debug('all-groups-deleted')
else:
if group_id not in groups:
@@ -415,7 +416,7 @@
flows_changed, flows = self.flows_delete_by_group_id(flows, group_id)
del groups[group_id]
groups_changed = True
- log.debug('group-deleted', group_id=group_id)
+ self.log.debug('group-deleted', group_id=group_id)
if groups_changed:
self.groups_proxy.update('/', FlowGroups(items=groups.values()))
@@ -444,7 +445,7 @@
## <=============== PACKET_OUT ===========================================>
def packet_out(self, ofp_packet_out):
- log.debug('packet-out', packet=ofp_packet_out)
+ self.log.debug('packet-out', packet=ofp_packet_out)
print threading.current_thread().name
print 'PACKET_OUT:', ofp_packet_out
# TODO for debug purposes, lets turn this around and send it back
@@ -465,7 +466,7 @@
## <======================== FLOW TABLE UPDATE HANDLING ===================
def _flow_table_updated(self, flows):
- log.debug('flow-table-updated',
+ self.log.debug('flow-table-updated',
logical_device_id=self.logical_device_id, flows=flows)
# TODO we have to evolve this into a policy-based, event based pattern
@@ -484,7 +485,7 @@
## <======================= GROUP TABLE UPDATE HANDLING ===================
def _group_table_updated(self, flow_groups):
- log.debug('group-table-updated',
+ self.log.debug('group-table-updated',
logical_device_id=self.logical_device_id,
flow_groups=flow_groups)
diff --git a/voltha/main.py b/voltha/main.py
index 44cd416..18ed7ad 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -22,6 +22,7 @@
import time
import yaml
+from simplejson import dumps
from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
@@ -37,6 +38,8 @@
from voltha.northbound.rest.health_check import init_rest_service
from voltha.protos.common_pb2 import LogLevel
from voltha.registry import registry, IComponent
+from common.frameio.frameio import FrameIOManager
+
VERSION = '0.9.0'
@@ -229,7 +232,7 @@
if not args.no_heartbeat:
self.start_heartbeat()
- self.start_kafka_heartbeat()
+ self.start_kafka_heartbeat(args.instance_id)
def start(self):
self.start_reactor() # will not return except Keyboard interrupt
@@ -285,6 +288,11 @@
).start()
yield registry.register(
+ 'frameio',
+ FrameIOManager()
+ ).start()
+
+ yield registry.register(
'adapter_loader',
AdapterLoader(config=self.config.get('adapter_loader', {}))
).start()
@@ -323,20 +331,26 @@
# Temporary function to send a heartbeat message to the external kafka
# broker
- def start_kafka_heartbeat(self):
+ def start_kafka_heartbeat(self, instance_id):
# For heartbeat we will send a message to a specific "voltha-heartbeat"
# topic. The message is a protocol buf
# message
- message = 'Heartbeat message:{}'.format(get_my_primary_local_ipv4())
- topic = "voltha-heartbeat"
+ message = dumps(dict(
+ type='heartbeat',
+ voltha_instance=instance_id,
+ ip=get_my_primary_local_ipv4()
+ ))
+ topic = "heartbeat.voltha"
from twisted.internet.task import LoopingCall
kafka_proxy = get_kafka_proxy()
if kafka_proxy:
lc = LoopingCall(kafka_proxy.send_message, topic, message)
lc.start(10)
+ pass
else:
self.log.error('Kafka proxy has not been created!')
+
if __name__ == '__main__':
Main().start()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index a710400..341f9b1 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -100,24 +100,22 @@
# first check whether we have a kafka producer. If there is none
# then try to get one - this happens only when we try to lookup the
# kafka service from consul
- if self.kproducer is None:
- self._get_kafka_producer()
- # Lets the next message request do the retry if still a failure
- if self.kproducer is None:
- log.error('No kafka producer available at {}'.format(
- self.kafka_endpoint))
- return
-
- log.info('Sending message {} to kafka topic {}'.format(msg,
- topic))
try:
- msg_list = [msg]
- yield self.kproducer.send_messages(topic, msgs=msg_list)
- log.info('Successfully sent message {} to kafka topic '
- '{}'.format(msg, topic))
- except Exception as e:
- log.error('Failure to send message {} to kafka topic {}: '
- '{}'.format(msg, topic, repr(e)))
+ if self.kproducer is None:
+ self._get_kafka_producer()
+ # Lets the next message request do the retry if still a failure
+ if self.kproducer is None:
+ log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ return
+
+ log.debug('sending-kafka-msg', topic=topic, msg=msg)
+ msgs = [msg]
+ yield self.kproducer.send_messages(topic, msgs=msgs)
+ log.debug('sent-kafka-msg', topic=topic, msg=msg)
+
+ except Exception, e:
+ log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+
# set the kafka producer to None. This is needed if the
# kafka docker went down and comes back up with a different
# port number.
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 0a1e9b2..b3b1711 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -110,7 +110,7 @@
// ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
string ipv6_address = 15;
- ProxyAddress proxy_device = 19;
+ ProxyAddress proxy_address = 19;
};
AdminState.AdminState admin_state = 16;
diff --git a/voltha/protos/meta.proto b/voltha/protos/meta.proto
index d16668a..b78c9e3 100644
--- a/voltha/protos/meta.proto
+++ b/voltha/protos/meta.proto
@@ -26,8 +26,17 @@
}
enum Access {
- CONFIG = 0; // read-write, stored
- READ_ONLY = 1; // read-only field
+
+ // read-write, stored attribute
+ CONFIG = 0;
+
+ // read-only field, stored with the model, covered by its hash
+ READ_ONLY = 1;
+
+ // A read-only attribute that is not stored in the model, not covered
+ // by its hash, its value is filled real-time upon each request.
+ REAL_TIME = 2;
+
}
extend google.protobuf.FieldOptions {