Improved FrameIO support and proxy messaging

Specific changes:
- FrameIO support for Mac OS X (making testing easier)
- Message passing between root and child devices implemented
  (example use in simulated_olt and simulated_onu adapters
- Making FrameIOMgr accessible via registry so that modules
  can easily reach it
- Making "main" to be a registered component so that command
  line args and config file based info is accessible to all.
- Minor clean-ups and improvements

Change-Id: I6812dd5b198fef5cb19f17fc8d7948d3fba8b625
diff --git a/common/frameio/frameio.py b/common/frameio/frameio.py
index daa7deb..0a222ad 100644
--- a/common/frameio/frameio.py
+++ b/common/frameio/frameio.py
@@ -22,8 +22,10 @@
 directly supported) we need to run the receiver select loop on a dedicated
 thread.
 """
+
 import os
 import socket
+import struct
 from pcapy import BPFProgram
 from threading import Thread, Condition
 
@@ -31,10 +33,18 @@
 
 import select
 import structlog
+import sys
 
+from scapy.data import ETH_P_ALL
 from twisted.internet import reactor
+from zope.interface import implementer
 
-from common.frameio.third_party.oftest import afpacket, netutils
+from voltha.registry import IComponent
+
+if sys.platform.startswith('linux'):
+    from common.frameio.third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+    from scapy.arch import pcapdnet, BIOCIMMEDIATE
 
 log = structlog.get_logger()
 
@@ -107,22 +117,22 @@
         self.iface_name = iface_name
         self.callback = callback
         self.filter = filter
-        self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
-        afpacket.enable_auxdata(self.socket)
-        self.socket.bind((self.iface_name, self.ETH_P_ALL))
-        netutils.set_promisc(self.socket, self.iface_name)
-        self.socket.settimeout(self.RCV_TIMEOUT)
+        self.socket = self.open_socket(self.iface_name)
         log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
-
         self.received = 0
         self.discarded = 0
 
+    def open_sockets(self, iface_name):
+        raise NotImplementedError('to be implemented by derived class')
+
+    def rcv_frame(self):
+        raise NotImplementedError('to be implemented by derived class')
+
     def __del__(self):
         if self.socket:
-            fn = self.fileno()
             self.socket.close()
             self.socket = None
-            log.debug('socket-closed', fn=fn, iface=self.iface_name)
+        log.debug('socket-closed', iface=self.iface_name)
 
     def fileno(self):
         return self.socket.fileno()
@@ -134,11 +144,11 @@
     def recv(self):
         """Called on the select thread when a packet arrives"""
         try:
-            frame = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+            frame = self.rcv_frame()
         except RuntimeError, e:
             # we observed this happens sometimes right after the socket was
             # attached to a newly created veth interface. So we log it, but
-            # allow to continue
+            # allow to continue.
             log.warn('afpacket-recv-error', code=-1)
             return
 
@@ -165,13 +175,55 @@
         return self
 
     def down(self):
-        os.system('ip link set {] down'.format(self.iface_name))
+        os.system('ip link set {} down'.format(self.iface_name))
         return self
 
     def statistics(self):
         return self.received, self.discarded
 
 
+class LinuxFrameIOPort(FrameIOPort):
+
+    def open_socket(self, iface_name):
+        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+        afpacket.enable_auxdata(s)
+        s.bind((self.iface_name, self.ETH_P_ALL))
+        netutils.set_promisc(s, iface_name)
+        s.settimeout(self.RCV_TIMEOUT)
+        return s
+
+    def rcv_frame(self):
+        return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+    def open_socket(self, iface_name):
+        s = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+        try:
+            fcntl.ioctl(s.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+        except:
+            pass
+
+        return s
+
+    def rcv_frame(self):
+        pkt = self.socket.next()
+        if pkt is not None:
+            ts, pkt = pkt
+        return pkt
+
+
+if sys.platform == 'darwin':
+    _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+    _FrameIOPort = LinuxFrameIOPort
+else:
+    raise Exception('Unsupported platform {}'.format(sys.platform))
+    sys.exit(1)
+
+
+@implementer(IComponent)
 class FrameIOManager(Thread):
     """
     Packet/Frame IO manager that can be used to send/receive raw frames
@@ -188,7 +240,7 @@
         self.stopped = False
         self.ports_changed = False
 
-    #~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~~~
+    # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
 
     def start(self):
         """
@@ -233,7 +285,7 @@
         """
         """Add a new interface"""
         assert iface_name not in self.ports
-        port = FrameIOPort(iface_name, callback, filter)
+        port = _FrameIOPort(iface_name, callback, filter)
         self.ports[iface_name] = port
         # need to exit select loop to reconstruct select fd lists
         self.ports_changed = True
@@ -248,7 +300,6 @@
         """
         assert iface_name in self.ports
         port = self.ports[iface_name]
-        port.stop()
         del self.ports[iface_name]
         # need to exit select loop to reconstruct select fd lists
         self.ports_changed = True
@@ -263,7 +314,7 @@
         """
         return self.ports[iface_name].send(frame)
 
-    #~~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+    # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
 
     def run(self):
         """
diff --git a/env.sh b/env.sh
index 9efac4d..d1d023b 100644
--- a/env.sh
+++ b/env.sh
@@ -13,7 +13,7 @@
 . $VENVDIR/bin/activate
 
 # add top-level voltha dir to pythonpath
-export PYTHONPATH=$VENVDIR/lib/python2.7/site-packages:$PYTHONPATH:$VOLTHA_BASE:$VOLTHA_BASE/voltha/protos/third_party
+export PYTHONPATH=$VOLTHA_BASE/$VENVDIR/lib/python2.7/site-packages:$PYTHONPATH:$VOLTHA_BASE:$VOLTHA_BASE/voltha/protos/third_party
 
 # assign DOCKER_HOST_IP to be the main ip address of this host
 export DOCKER_HOST_IP=$(python common/utils/nethelpers.py)
diff --git a/tests/itests/run_as_root/test_frameio.py b/tests/itests/run_as_root/test_frameio.py
index b3863d9..057fa28 100644
--- a/tests/itests/run_as_root/test_frameio.py
+++ b/tests/itests/run_as_root/test_frameio.py
@@ -20,7 +20,7 @@
 
 docker run -ti --rm -v $(pwd):/voltha  --privileged cord/voltha-base \
     env PYTHONPATH=/voltha python \
-    /voltha/tests/itests/frameio_tests/run_as_root/test_frameio.py
+    /voltha/tests/itests/run_as_root/test_frameio.py
 
 """
 
@@ -88,7 +88,6 @@
         self.assertEqual(port, p1)
         self.assertEqual(frame, bogus_frame)
 
-
     @inlineCallbacks
     def test_packet_send_receive_with_filter(self):
         rcvd = DeferredWithTimeout()
@@ -108,7 +107,6 @@
         self.assertEqual(port, p1)
         self.assertEqual(frame, ip_packet)
 
-
     @inlineCallbacks
     def test_packet_send_drop_with_filter(self):
         rcvd = DeferredWithTimeout()
@@ -128,7 +126,6 @@
         else:
             self.fail('not timed out')
 
-
     @inlineCallbacks
     def test_concurrent_packet_send_receive(self):
 
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index f9426c6..bb40eb0 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -179,7 +179,8 @@
     def child_device_detected(parent_device_id,
                               parent_port_no,
                               child_device_type,
-                              child_device_address_kw):
+                              proxy_address,
+                              **kw):
         # TODO add doc
         """"""
 
diff --git a/voltha/adapters/loader.py b/voltha/adapters/loader.py
index dca8afa..4a26dff 100644
--- a/voltha/adapters/loader.py
+++ b/voltha/adapters/loader.py
@@ -69,21 +69,24 @@
 
     def _find_adapters(self):
         subdirs = os.walk(mydir).next()[1]
-        for subdir in subdirs:
-            adapter_name = subdir
-            py_file = os.path.join(mydir, subdir, subdir + '.py')
-            if os.path.isfile(py_file):
-                try:
-                    package_name = __package__ + '.' + subdir
-                    pkg = __import__(package_name, None, None, [adapter_name])
-                    module = getattr(pkg, adapter_name)
-                except ImportError, e:
-                    log.exception('cannot-load', file=py_file, e=e)
-                    continue
+        try:
+            for subdir in subdirs:
+                adapter_name = subdir
+                py_file = os.path.join(mydir, subdir, subdir + '.py')
+                if os.path.isfile(py_file):
+                    try:
+                        package_name = __package__ + '.' + subdir
+                        pkg = __import__(package_name, None, None, [adapter_name])
+                        module = getattr(pkg, adapter_name)
+                    except ImportError, e:
+                        log.exception('cannot-load', file=py_file, e=e)
+                        continue
 
-                for attr_name in dir(module):
-                    cls = getattr(module, attr_name)
-                    if isinstance(cls, type) and \
-                            IAdapterInterface.implementedBy(cls):
-                        verifyClass(IAdapterInterface, cls)
-                        yield adapter_name, cls
+                    for attr_name in dir(module):
+                        cls = getattr(module, attr_name)
+                        if isinstance(cls, type) and \
+                                IAdapterInterface.implementedBy(cls):
+                            verifyClass(IAdapterInterface, cls)
+                            yield adapter_name, cls
+        except Exception, e:
+            log.exception('failed', e=e)
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index c6a6e11..f693a73 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -321,13 +321,11 @@
                 parent_device_id=device.id,
                 parent_port_no=1,
                 child_device_type='simulated_onu',
-                child_device_address_kw=dict(
-                    proxy_device=Device.ProxyAddress(
-                        device_id=device.id,
-                        channel_id=vlan_id
-                    ),
-                    vlan=100 + i
-                )
+                proxy_address=Device.ProxyAddress(
+                    device_id=device.id,
+                    channel_id=vlan_id
+                ),
+                vlan=100 + i
             )
 
     def _olt_side_onu_activation(self, seq):
@@ -349,7 +347,14 @@
         raise NotImplementedError()
 
     def send_proxied_message(self, proxy_address, msg):
-        raise NotImplementedError()
+        log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+        # we mimick a response by sending the same message back in a short time
+        reactor.callLater(
+            0.2,
+            self.adapter_agent.receive_proxied_message,
+            proxy_address,
+            msg
+        )
 
     def receive_proxied_message(self, proxy_address, msg):
         raise NotImplementedError()
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index 38cb2c5..73a2f8f 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -21,7 +21,7 @@
 
 import structlog
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, DeferredQueue
 from zope.interface import implementer
 
 from common.utils.asleep import asleep
@@ -62,6 +62,7 @@
             version='0.1',
             config=AdapterConfig(log_level=LogLevel.INFO)
         )
+        self.incoming_messages = DeferredQueue()
 
     def start(self):
         log.debug('starting')
@@ -96,10 +97,11 @@
 
     @inlineCallbacks
     def _simulate_device_activation(self, device):
+
         # first we verify that we got parent reference and proxy info
         assert device.parent_id
-        assert device.proxy_device.device_id
-        assert device.proxy_device.channel_id
+        assert device.proxy_address.device_id
+        assert device.proxy_address.channel_id
 
         # we pretend that we were able to contact the device and obtain
         # additional information about it
@@ -150,7 +152,7 @@
         # and name for the virtual ports, as this is guaranteed to be unique
         # in the context of the OLT port, so it is also unique in the context
         # of the logical device
-        port_no = device.proxy_device.channel_id
+        port_no = device.proxy_address.channel_id
         cap = OFPPF_1GB_FD | OFPPF_FIBER
         self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
             id=str(port_no),
@@ -170,7 +172,10 @@
             device_port_no=uni_port.port_no
         ))
 
-        # and finally update to active
+        # simulate a proxied message sending and receving a reply
+        reply = yield self._simulate_message_exchange(device)
+
+        # and finally update to "ACTIVE"
         device = self.adapter_agent.get_device(device.id)
         device.oper_status = OperStatus.ACTIVE
         self.adapter_agent.update_device(device)
@@ -186,4 +191,27 @@
         raise NotImplementedError()
 
     def receive_proxied_message(self, proxy_address, msg):
-        raise NotImplementedError()
+        # just place incoming message to a list
+        self.incoming_messages.put((proxy_address, msg))
+
+    @inlineCallbacks
+    def _simulate_message_exchange(self, device):
+
+        # register for receiving async messages
+        self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+        # reset incoming message queue
+        while self.incoming_messages.pending:
+            _ = yield self.incoming_messages.get()
+
+        # construct message
+        msg = 'test message'
+
+        # send message
+        self.adapter_agent.send_proxied_message(device.proxy_address, msg)
+
+        # wait till we detect incoming message
+        yield self.incoming_messages.get()
+
+        # by returning we allow the device to be shown as active, which
+        # indirectly verified that message passing works
diff --git a/voltha/adapters/tibit_olt/tb_json_version.py b/voltha/adapters/tibit_olt/tb_json_version.py
new file mode 100644
index 0000000..4b46619
--- /dev/null
+++ b/voltha/adapters/tibit_olt/tb_json_version.py
@@ -0,0 +1,107 @@
+#! /usr/bin/env python
+""" JSON layer for scapy """
+
+# Set log level to benefit from Scapy warnings
+import logging
+import json
+import argparse
+logging.getLogger("scapy").setLevel(1)
+
+from scapy.packet import Packet, bind_layers
+from scapy.fields import StrField
+from scapy.layers.l2 import Ether
+from scapy.sendrecv import sendp
+from scapy.sendrecv import srp1
+# from scapy.main import interact
+
+from uuid import getnode as get_srcmac
+
+
+class TBJSON(Packet):
+    """ TBJSON 'packet' layer. """
+    name = "TBJSON"
+    fields_desc = [StrField("data", default="")]
+
+
+def tb_json_packet_from_dict(json_operation_dict, dst_macid):
+    """ Given an command matrix operation dictionary, return a packet """
+    json_op_string = json.dumps(json_operation_dict, dst_macid)
+    return tb_json_packet_from_str(json_op_string, dst_macid)
+
+
+def tb_json_packet_from_str(json_operation_str, dst_macid):
+    """ Given an command matrix operation as json string, return a packet """
+    base_packet = Ether()/TBJSON(data='json %s' % json_operation_str)
+    base_packet.type = int("9001", 16)
+    mac = '%012x' % get_srcmac()
+    base_packet.src = ':'.join(s.encode('hex') for s in mac.decode('hex'))
+    base_packet.dst = dst_macid
+    bind_layers(Ether, TBJSON, type=0x9001)
+    return base_packet
+
+
+def tb_macid_to_scapy(macid):
+    """ convert a tibit macid (xxxxxxxxxxxx) to scapy (xx:xx:xx:xx:xx:xx) """
+    if len(macid) != 12:
+        print('tb_macid_to_scapy: unexpected macid length (%s)' % macid)
+        return '00:00:00:00:00:00'
+    new_macid = ''
+    for i in [0, 2, 4, 6, 8]:
+        new_macid += macid[i:i+2] + ':'
+    new_macid += macid[10:12]
+    return new_macid
+
+
+def scapy_to_tb_macid(macid):
+    """ convert a scapy macid (xx:xx:xx:xx:xx:xx) to tibit (xxxxxxxxxxxx) """
+    if len(macid) != 17:
+        print('tb_macid_to_scapy: unexpected macid length (%s)' % macid)
+        return '000000000000'
+    new_macid = ''
+    for i in [0, 3, 6, 9, 12, 15]:
+        new_macid += macid[i:i+2]
+    return new_macid
+
+
+def tb_json_packet(json_operation_dict):
+    """ Given an command matrix operation dictionary, return a packet """
+    json_op_string = json.dumps(json_operation_dict)
+    base_packet = Ether()/TBJSON(data='json %s' % json_op_string)
+
+    base_packet.type = int("9001", 16)
+    mac = '%012x' % get_srcmac()
+    base_packet.src = ':'.join(s.encode('hex') for s in mac.decode('hex'))
+    base_packet.dst = args.dstAddress
+
+    bind_layers(Ether, TBJSON, type=0x9001)
+
+    return base_packet
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--dst', dest='dstAddress', action='store',
+                        help='MAC address to use as destination.')
+
+    args = parser.parse_args()
+
+    if (args.dstAddress == None):
+        args.dstAddress = '00:0c:e2:31:10:00'
+
+    # Create a json packet
+    PACKET = tb_json_packet_from_dict({"operation":"version"}, args.dstAddress)
+
+    # Send the packet
+    PACKET.show()
+    p = srp1(PACKET, iface="eth0")
+    if p:
+        print "============================================================================="
+        p.show()
+
+    print "============================================================================="
+    print "Stripping off the \"json\" and quotes yields...\n%s" % p.data[5:]
+    print "============================================================================="
+    print "Load the JSON..."
+    print json.loads(p.data[5:])
+    print "============================================================================="
+
+    # interact(mydict=globals(), mybanner="===( TBJSON MODE )===")
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 1cb25c8..a1f9417 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -17,20 +17,37 @@
 """
 Tibit OLT device adapter
 """
-
+import scapy
 import structlog
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.l2 import Ether
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet import reactor
+
 from zope.interface import implementer
 
+from common.frameio.frameio import BpfProgramFilter
 from voltha.registry import registry
 from voltha.adapters.interface import IAdapterInterface
 from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
 from voltha.protos.device_pb2 import DeviceType, DeviceTypes
 from voltha.protos.health_pb2 import HealthStatus
-from voltha.protos.common_pb2 import LogLevel
+from voltha.protos.common_pb2 import LogLevel, ConnectStatus
+
+from scapy.packet import Packet, bind_layers
+from scapy.fields import StrField
 
 log = structlog.get_logger()
 
 
+is_tibit_frame = BpfProgramFilter('ether[12:2] = 0x9001')
+
+# To be removed
+class TBJSON(Packet):
+    """ TBJSON 'packet' layer. """
+    name = "TBJSON"
+    fields_desc = [StrField("data", default="")]
+
 @implementer(IAdapterInterface)
 class TibitOltAdapter(object):
 
@@ -54,6 +71,8 @@
             config=AdapterConfig(log_level=LogLevel.INFO)
         )
         self.interface = registry('main').get_args().interface
+        self.io_port = None
+        self.incoming_queues = {}  # mac_address -> DeferredQueue()
 
     def start(self):
         log.debug('starting', interface=self.interface)
@@ -61,6 +80,8 @@
 
     def stop(self):
         log.debug('stopping')
+        if self.io_port is not None:
+            registry('frameio').del_interface(self.interface)
         log.info('stopped')
 
     def adapter_descriptor(self):
@@ -77,8 +98,76 @@
 
     def adopt_device(self, device):
         log.info('adopt-device', device=device)
+        self._activate_io_port()
+        reactor.callLater(0, self._launch_device_activation, device)
         return device
 
+    def _activate_io_port(self):
+        if self.io_port is None:
+            self.io_port = registry('frameio').add_interface(
+                self.interface, self._rcv_io, is_tibit_frame)
+
+    @inlineCallbacks
+    def _launch_device_activation(self, device):
+
+
+        try:
+            log.debug('launch_dev_activation')
+            # prepare receive queue
+            self.incoming_queues[device.mac_address] = DeferredQueue(size=100)
+
+            # send out ping to OLT device
+            olt_mac = device.mac_address
+            ping_frame = self._make_ping_frame(mac_address=olt_mac)
+            self.io_port.send(ping_frame)
+
+            # wait till we receive a response
+            # TODO add timeout mechanism so we can signal if we cannot reach device
+            while True:
+                response = yield self.incoming_queues[olt_mac].get()
+                # verify response and if not the expected response
+                if 1: # TODO check if it is really what we expect, and wait if not
+                    break
+
+        except Exception, e:
+            log.exception('launch device failed', e=e)
+
+        # if we got response, we can fill out the device info, mark the device
+        # reachable
+        import pdb
+        pdb.set_trace()
+
+        device.root = True
+        device.vendor = 'Tibit stuff'
+        device.model = 'n/a'
+        device.hardware_version = 'n/a'
+        device.firmware_version = 'n/a'
+        device.software_version = '1.0'
+        device.serial_number = 'add junk here'
+        device.connect_status = ConnectStatus.REACHABLE
+        self.adapter_agent.update_device(device)
+
+    def _rcv_io(self, port, frame):
+
+        log.info('frame-recieved')
+
+        # extract source mac
+        response = Ether(frame)
+
+        # enqueue incoming parsed frame to rigth device
+        self.incoming_queues[response.src].put(response)
+
+    def _make_ping_frame(self, mac_address):
+        # TODO Nathan to make this to be an actual OLT ping
+        # Create a json packet
+        json_operation_str = '{\"operation\":\"version\"}'
+        frame = Ether()/TBJSON(data='json %s' % json_operation_str)
+        frame.type = int("9001", 16)
+        frame.dst = '00:0c:e2:31:25:00'
+        bind_layers(Ether, TBJSON, type=0x9001)
+        frame.show()
+        return str(frame)
+
     def abandon_device(self, device):
         raise NotImplementedError(0
                                   )
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 5659f85..83b4e89 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -23,6 +23,7 @@
 from twisted.internet.defer import inlineCallbacks, returnValue
 from zope.interface import implementer
 
+from common.event_bus import EventBusClient
 from voltha.adapters.interface import IAdapterAgent
 from voltha.protos import third_party
 from voltha.protos.device_pb2 import Device, Port
@@ -31,8 +32,6 @@
 from voltha.registry import registry
 
 
-log = structlog.get_logger()
-
 @implementer(IAdapterAgent)
 class AdapterAgent(object):
     """
@@ -52,26 +51,33 @@
         self.adapter = None
         self.adapter_node_proxy = None
         self.root_proxy = self.core.get_proxy('/')
+        self._rx_event_subscriptions = {}
+        self._tx_event_subscriptions = {}
+        self.event_bus = EventBusClient()
+        self.log = structlog.get_logger(adapter_name=adapter_name)
 
     @inlineCallbacks
     def start(self):
-        log.debug('starting')
+        self.log.debug('starting')
         config = self._get_adapter_config()  # this may be None
-        adapter = self.adapter_cls(self, config)
-        yield adapter.start()
+        try:
+            adapter = self.adapter_cls(self, config)
+            yield adapter.start()
+        except Exception, e:
+            self.log.exception(e)
         self.adapter = adapter
         self.adapter_node_proxy = self._update_adapter_node()
         self._update_device_types()
-        log.info('started')
+        self.log.info('started')
         returnValue(self)
 
     @inlineCallbacks
     def stop(self):
-        log.debug('stopping')
+        self.log.debug('stopping')
         if self.adapter is not None:
             yield self.adapter.stop()
             self.adapter = None
-        log.info('stopped')
+        self.log.info('stopped')
 
     def _get_adapter_config(self):
         """
@@ -197,16 +203,51 @@
                               parent_device_id,
                               parent_port_no,
                               child_device_type,
-                              child_device_address_kw):
+                              proxy_address,
+                              **kw):
         # we create new ONU device objects and insert them into the config
-        # TODO should we auto-enable the freshly created device? Probably
+        # TODO should we auto-enable the freshly created device? Probably.
         device = Device(
             id=uuid4().hex[:12],
             type=child_device_type,
             parent_id=parent_device_id,
             parent_port_no=parent_port_no,
             admin_state=AdminState.ENABLED,
-            **child_device_address_kw
+            proxy_address=proxy_address,
+            **kw
         )
         self._make_up_to_date(
             '/devices', device.id, device)
+
+        topic = self._gen_tx_proxy_address_topic(proxy_address)
+        self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
+            topic, lambda t, m: self._send_proxied_message(proxy_address, m))
+
+    def _gen_rx_proxy_address_topic(self, proxy_address):
+        """Generate unique topic name specific to this proxy address for rx"""
+        topic = 'rx:' + proxy_address.SerializeToString()
+        return topic
+
+    def _gen_tx_proxy_address_topic(self, proxy_address):
+        """Generate unique topic name specific to this proxy address for tx"""
+        topic = 'tx:' + proxy_address.SerializeToString()
+        return topic
+
+    def register_for_proxied_messages(self, proxy_address):
+        topic = self._gen_rx_proxy_address_topic(proxy_address)
+        self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
+            topic, lambda t, m: self._receive_proxied_message(proxy_address, m))
+
+    def _receive_proxied_message(self, proxy_address, msg):
+        self.adapter.receive_proxied_message(proxy_address, msg)
+
+    def send_proxied_message(self, proxy_address, msg):
+        topic = self._gen_tx_proxy_address_topic(proxy_address)
+        self.event_bus.publish(topic, msg)
+
+    def _send_proxied_message(self, proxy_address, msg):
+        self.adapter.send_proxied_message(proxy_address, msg)
+
+    def receive_proxied_message(self, proxy_address, msg):
+        topic = self._gen_rx_proxy_address_topic(proxy_address)
+        self.event_bus.publish(topic, msg)
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 17ec620..acd5bc8 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -26,8 +26,6 @@
 from voltha.protos.common_pb2 import AdminState, OperStatus
 from voltha.registry import registry
 
-log = structlog.get_logger()
-
 
 class InvalidStateTransition(Exception): pass
 
@@ -61,23 +59,24 @@
             '/device_types/{}'.format(initial_data.type)).get()
 
         self.adapter_agent = None
+        self.log = structlog.get_logger(device_id=initial_data.id)
 
     @inlineCallbacks
     def start(self):
-        log.debug('starting')
+        self.log.debug('starting')
         self._set_adapter_agent()
         yield self._process_update(self._tmp_initial_data)
         del self._tmp_initial_data
-        log.info('started')
+        self.log.info('started')
         returnValue(self)
 
     def stop(self):
-        log.debug('stopping')
+        self.log.debug('stopping')
         self.proxy.unregister_callback(
             CallbackType.PRE_UPDATE, self._validate_update)
         self.proxy.unregister_callback(
             CallbackType.POST_UPDATE, self._process_update)
-        log.info('stopped')
+        self.log.info('stopped')
 
     def _set_adapter_agent(self):
         adapter_name = self._tmp_initial_data.adapter
@@ -97,7 +96,7 @@
         (by raising an exception), or even the augmentation of the incoming
         data.
         """
-        log.debug('device-pre-update', device=device)
+        self.log.debug('device-pre-update', device=device)
         yield self._process_state_transitions(device, dry_run=True)
         returnValue(device)
 
@@ -108,7 +107,7 @@
         a transaction), and it is used to propagate the change down to the
         adapter
         """
-        log.debug('device-post-update', device=device)
+        self.log.debug('device-post-update', device=device)
 
         # first, process any potential state transition
         yield self._process_state_transitions(device)
@@ -135,7 +134,7 @@
 
     @inlineCallbacks
     def _activate_device(self, device, dry_run=False):
-        log.info('activate-device', device=device, dry_run=dry_run)
+        self.log.info('activate-device', device=device, dry_run=dry_run)
         if not dry_run:
             device = yield self.adapter_agent.adopt_device(device)
             device.oper_status = OperStatus.ACTIVATING
@@ -151,22 +150,22 @@
         raise NotImplementedError()
 
     def _propagate_change(self, device, dry_run=False):
-        log.info('propagate-change', device=device, dry_run=dry_run)
+        self.log.info('propagate-change', device=device, dry_run=dry_run)
         if device != self.last_data:
             raise NotImplementedError()
         else:
-            log.debug('no-op')
+            self.log.debug('no-op')
 
     def _abandon_device(self, device, dry_run=False):
-        log.info('abandon-device', device=device, dry_run=dry_run)
+        self.log.info('abandon-device', device=device, dry_run=dry_run)
         raise NotImplementedError()
 
     def _disable_device(self, device, dry_run=False):
-        log.info('disable-device', device=device, dry_run=dry_run)
+        self.log.info('disable-device', device=device, dry_run=dry_run)
         raise NotImplementedError()
 
     def _reenable_device(self, device, dry_run=False):
-        log.info('reenable-device', device=device, dry_run=dry_run)
+        self.log.info('reenable-device', device=device, dry_run=dry_run)
         raise NotImplementedError()
 
     admin_state_fsm = {
@@ -194,7 +193,7 @@
 
     @inlineCallbacks
     def _flow_table_updated(self, flows):
-        log.debug('flow-table-updated',
+        self.log.debug('flow-table-updated',
                   logical_device_id=self.last_data.id, flows=flows)
 
         # if device accepts bulk flow update, lets just call that
@@ -216,7 +215,7 @@
 
     @inlineCallbacks
     def _group_table_updated(self, groups):
-        log.debug('group-table-updated',
+        self.log.debug('group-table-updated',
                   logical_device_id=self.last_data.id,
                   flow_groups=groups)
 
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 5b629ca..41badb4 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -33,7 +33,6 @@
 from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
 from voltha.registry import registry
 
-log = structlog.get_logger()
 _ = third_party
 
 def mac_str_to_tuple(mac):
@@ -64,13 +63,15 @@
         self.self_proxy.register_callback(
             CallbackType.POST_REMOVE, self._port_list_updated)
 
+        self.log = structlog.get_logger(logical_device_id=logical_device.id)
+
     def start(self):
-        log.debug('starting')
-        log.info('started')
+        self.log.debug('starting')
+        self.log.info('started')
         return self
 
     def stop(self):
-        log.debug('stopping')
+        self.log.debug('stopping')
         self.flows_proxy.unregister_callback(
             CallbackType.POST_UPDATE, self._flow_table_updated)
         self.groups_proxy.unregister_callback(
@@ -79,7 +80,7 @@
             CallbackType.POST_ADD, self._port_list_updated)
         self.self_proxy.unregister_callback(
             CallbackType.POST_REMOVE, self._port_list_updated)
-        log.info('stopped')
+        self.log.info('stopped')
 
     def announce_flows_deleted(self, flows):
         for f in flows:
@@ -118,7 +119,7 @@
             self.flow_modify_strict(flow_mod)
 
         else:
-            log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
+            self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
 
     # def list_flows(self):
     #     return self.flows
@@ -137,7 +138,7 @@
             self.group_modify(group_mod)
 
         else:
-            log.warn('unhandled-group-mod', command=command,
+            self.log.warn('unhandled-group-mod', command=command,
                      group_mod=group_mod)
 
     def list_groups(self):
@@ -163,7 +164,7 @@
                 flow = flow_stats_entry_from_flow_mod_message(mod)
                 flows.append(flow)
                 changed = True
-                log.debug('flow-added', flow=mod)
+                self.log.debug('flow-added', flow=mod)
 
         else:
             flow = flow_stats_entry_from_flow_mod_message(mod)
@@ -175,12 +176,12 @@
                     flow.packet_count = old_flow.packet_count
                 flows[idx] = flow
                 changed = True
-                log.debug('flow-updated', flow=flow)
+                self.log.debug('flow-updated', flow=flow)
 
             else:
                 flows.append(flow)
                 changed = True
-                log.debug('flow-added', flow=mod)
+                self.log.debug('flow-added', flow=mod)
 
         # write back to model
         if changed:
@@ -225,7 +226,7 @@
             changed = True
         else:
             # TODO need to check what to do with this case
-            log.warn('flow-cannot-delete', flow=flow)
+            self.log.warn('flow-cannot-delete', flow=flow)
 
         if changed:
             self.flows_proxy.update('/', Flows(items=flows))
@@ -403,7 +404,7 @@
             # signal controller as requested by flow's flag
             groups = OrderedDict()
             groups_changed = True
-            log.debug('all-groups-deleted')
+            self.log.debug('all-groups-deleted')
 
         else:
             if group_id not in groups:
@@ -415,7 +416,7 @@
                 flows_changed, flows = self.flows_delete_by_group_id(flows, group_id)
                 del groups[group_id]
                 groups_changed = True
-                log.debug('group-deleted', group_id=group_id)
+                self.log.debug('group-deleted', group_id=group_id)
 
         if groups_changed:
             self.groups_proxy.update('/', FlowGroups(items=groups.values()))
@@ -444,7 +445,7 @@
     ## <=============== PACKET_OUT ===========================================>
 
     def packet_out(self, ofp_packet_out):
-        log.debug('packet-out', packet=ofp_packet_out)
+        self.log.debug('packet-out', packet=ofp_packet_out)
         print threading.current_thread().name
         print 'PACKET_OUT:', ofp_packet_out
         # TODO for debug purposes, lets turn this around and send it back
@@ -465,7 +466,7 @@
     ## <======================== FLOW TABLE UPDATE HANDLING ===================
 
     def _flow_table_updated(self, flows):
-        log.debug('flow-table-updated',
+        self.log.debug('flow-table-updated',
                   logical_device_id=self.logical_device_id, flows=flows)
 
         # TODO we have to evolve this into a policy-based, event based pattern
@@ -484,7 +485,7 @@
     ## <======================= GROUP TABLE UPDATE HANDLING ===================
 
     def _group_table_updated(self, flow_groups):
-        log.debug('group-table-updated',
+        self.log.debug('group-table-updated',
                   logical_device_id=self.logical_device_id,
                   flow_groups=flow_groups)
 
diff --git a/voltha/main.py b/voltha/main.py
index 44cd416..18ed7ad 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -22,6 +22,7 @@
 import time
 
 import yaml
+from simplejson import dumps
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 
@@ -37,6 +38,8 @@
 from voltha.northbound.rest.health_check import init_rest_service
 from voltha.protos.common_pb2 import LogLevel
 from voltha.registry import registry, IComponent
+from common.frameio.frameio import FrameIOManager
+
 
 VERSION = '0.9.0'
 
@@ -229,7 +232,7 @@
 
         if not args.no_heartbeat:
             self.start_heartbeat()
-            self.start_kafka_heartbeat()
+            self.start_kafka_heartbeat(args.instance_id)
 
     def start(self):
         self.start_reactor()  # will not return except Keyboard interrupt
@@ -285,6 +288,11 @@
             ).start()
 
             yield registry.register(
+                'frameio',
+                FrameIOManager()
+            ).start()
+
+            yield registry.register(
                 'adapter_loader',
                 AdapterLoader(config=self.config.get('adapter_loader', {}))
             ).start()
@@ -323,20 +331,26 @@
 
     # Temporary function to send a heartbeat message to the external kafka
     # broker
-    def start_kafka_heartbeat(self):
+    def start_kafka_heartbeat(self, instance_id):
         # For heartbeat we will send a message to a specific "voltha-heartbeat"
         #  topic.  The message is a protocol buf
         # message
-        message = 'Heartbeat message:{}'.format(get_my_primary_local_ipv4())
-        topic = "voltha-heartbeat"
+        message = dumps(dict(
+            type='heartbeat',
+            voltha_instance=instance_id,
+            ip=get_my_primary_local_ipv4()
+        ))
+        topic = "heartbeat.voltha"
 
         from twisted.internet.task import LoopingCall
         kafka_proxy = get_kafka_proxy()
         if kafka_proxy:
             lc = LoopingCall(kafka_proxy.send_message, topic, message)
             lc.start(10)
+            pass
         else:
             self.log.error('Kafka proxy has not been created!')
 
+
 if __name__ == '__main__':
     Main().start()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index a710400..341f9b1 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -100,24 +100,22 @@
         # first check whether we have a kafka producer.  If there is none
         # then try to get one - this happens only when we try to lookup the
         # kafka service from consul
-        if self.kproducer is None:
-            self._get_kafka_producer()
-            # Lets the next message request do the retry if still a failure
-            if self.kproducer is None:
-                log.error('No kafka producer available at {}'.format(
-                    self.kafka_endpoint))
-                return
-
-        log.info('Sending message {} to kafka topic {}'.format(msg,
-                                                                    topic))
         try:
-            msg_list = [msg]
-            yield self.kproducer.send_messages(topic, msgs=msg_list)
-            log.info('Successfully sent message {} to kafka topic '
-                          '{}'.format(msg, topic))
-        except Exception as e:
-            log.error('Failure to send message {} to kafka topic {}: '
-                           '{}'.format(msg, topic, repr(e)))
+            if self.kproducer is None:
+                self._get_kafka_producer()
+                # Lets the next message request do the retry if still a failure
+                if self.kproducer is None:
+                    log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+                    return
+
+            log.debug('sending-kafka-msg', topic=topic, msg=msg)
+            msgs = [msg]
+            yield self.kproducer.send_messages(topic, msgs=msgs)
+            log.debug('sent-kafka-msg', topic=topic, msg=msg)
+
+        except Exception, e:
+            log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different
             # port number.
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 0a1e9b2..b3b1711 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -110,7 +110,7 @@
         // ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
         string ipv6_address = 15;
 
-        ProxyAddress proxy_device = 19;
+        ProxyAddress proxy_address = 19;
     };
 
     AdminState.AdminState admin_state = 16;
diff --git a/voltha/protos/meta.proto b/voltha/protos/meta.proto
index d16668a..b78c9e3 100644
--- a/voltha/protos/meta.proto
+++ b/voltha/protos/meta.proto
@@ -26,8 +26,17 @@
 }
 
 enum Access {
-    CONFIG = 0;  // read-write, stored
-    READ_ONLY = 1;      // read-only field
+
+    // read-write, stored attribute
+    CONFIG = 0;
+
+    // read-only field, stored with the model, covered by its hash
+    READ_ONLY = 1;
+
+    // A read-only attribute that is not stored in the model, not covered
+    // by its hash, its value is filled real-time upon each request.
+    REAL_TIME = 2;
+
 }
 
 extend google.protobuf.FieldOptions {