Adding infrastructure to allow sending messages between adapters
using the adapter-agent event bus.

Change-Id: Ia3654b8944745bc76f8ac4ea3c1439a082979dd4
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index 59fd987..b8f7d7f 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -68,6 +68,9 @@
         )
         self.devices_handlers = dict()  # device_id -> BroadcomOnuHandler()
 
+        # register for adapter messages
+        self.adapter_agent.register_for_inter_adapter_messages()
+
     def start(self):
         log.debug('starting')
         log.info('started')
@@ -138,6 +141,9 @@
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
 
+    def receive_inter_adapter_message(self, msg):
+        log.info('receive_inter_adapter_message', msg=msg)
+
 
 class BroadcomOnuHandler(object):
 
@@ -165,6 +171,7 @@
         self.proxy_address = device.proxy_address
         self.adapter_agent.register_for_proxied_messages(device.proxy_address)
 
+
         # populate device info
         device.root = True
         device.vendor = 'Broadcom'
diff --git a/voltha/adapters/dpoe_onu/dpoe_onu.py b/voltha/adapters/dpoe_onu/dpoe_onu.py
index a73fd82..6084bc4 100644
--- a/voltha/adapters/dpoe_onu/dpoe_onu.py
+++ b/voltha/adapters/dpoe_onu/dpoe_onu.py
@@ -448,3 +448,6 @@
     def receive_packet_out(self, logical_device_id, egress_port_no, msg):
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
+
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index fcba8af..60c27f1 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -192,6 +192,17 @@
         :return: None
         """
 
+    def receive_inter_adapter_message(msg):
+        """
+        Called when the adapter recieves a message that was sent to it directly
+        from another adapter. An adapter may register for these messages by calling
+        the register_for_inter_adapter_messages() method in the adapter agent.
+        Note that it is the responsibility of the sending and receiving
+        adapters to properly encode and decode the message.
+        :param msg: The message contents.
+        :return: None
+        """
+
         # TODO work in progress
         # ...
 
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 926e8d9..09e4440 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -293,7 +293,15 @@
             # event: 'disable-completed'
             #     event_data: {'serial_num-vendor_id': <str>
             #                  'serial_num-vendor_specific: <str>}
-            pass
+            event_dict = {'event':event, 'event_data':event_data}
+
+            # Get child_device from onu_id
+            child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=key['onu_id'])
+            assert child_device is not None
+
+            # Send the event message to the ONU adapter
+            self.adapter_agent.publish_inter_adapter_message(child_device.id, event_dict)
+
         elif _object == 'alloc_id':
             # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>, 'alloc_id': ,<int>}
             pass
@@ -385,6 +393,9 @@
         self.devices_handlers = dict()  # device_id -> MapleOltHandler()
         self.logical_device_id_to_root_device_id = dict()
 
+        # register for adapter messages
+        self.adapter_agent.register_for_inter_adapter_messages()
+
     def start(self):
         log.debug('starting')
         log.info('started')
@@ -465,6 +476,8 @@
         handler = self.devices_handlers[device_id]
         handler.packet_out(egress_port_no, msg)
 
+    def receive_inter_adapter_message(self, msg):
+        pass
 
 class MaplePBClientFactory(pb.PBClientFactory, ReconnectingClientFactory):
     channel = None
diff --git a/voltha/adapters/microsemi_olt/microsemi_olt.py b/voltha/adapters/microsemi_olt/microsemi_olt.py
index b98e340..8ac93c5 100644
--- a/voltha/adapters/microsemi_olt/microsemi_olt.py
+++ b/voltha/adapters/microsemi_olt/microsemi_olt.py
@@ -165,6 +165,9 @@
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
     ##
     # Private methods
     ##
diff --git a/voltha/adapters/pmcs_onu/pmcs_onu.py b/voltha/adapters/pmcs_onu/pmcs_onu.py
index 57672ba..50aada1 100644
--- a/voltha/adapters/pmcs_onu/pmcs_onu.py
+++ b/voltha/adapters/pmcs_onu/pmcs_onu.py
@@ -142,6 +142,9 @@
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
     @inlineCallbacks
     def _onu_device_activation(self, device):
         # first we verify that we got parent reference and proxy info
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index c05d7fd..58cba69 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -294,6 +294,9 @@
         handler = self.devices_handlers[device_id]
         handler.packet_out(egress_port_no, msg)
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
 
 class PonSimOltHandler(object):
     def __init__(self, adapter, device_id):
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index a131052..2098a22 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -143,6 +143,9 @@
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
 
 class PonSimOnuHandler(object):
     def __init__(self, adapter, device_id):
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 6ed10a9..d6421f0 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -667,6 +667,9 @@
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
     def start_kpi_collection(self, device_id):
 
         """Simulate periodic KPI metric collection from the device"""
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index e25004f..8abe3a3 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -379,3 +379,6 @@
     def receive_packet_out(self, logical_device_id, egress_port_no, msg):
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
+
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 22ca111..914c47b 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -919,6 +919,9 @@
 
         self.io_port.send(str(frame))
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
     def start_kpi_collection(self, device_id):
         """ Periodic KPI metric collection from the device """
         import random
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index 849bf5a..00477b4 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -531,6 +531,9 @@
         log.info('packet-out', logical_device_id=logical_device_id,
                  egress_port_no=egress_port_no, msg_len=len(msg))
 
+    def receive_inter_adapter_message(self, msg):
+        raise NotImplementedError()
+
     def start_kpi_collection(self, device_id):
 
         """TMP Simulate periodic KPI metric collection from the device"""
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 7ef9a69..5798045 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -186,6 +186,43 @@
     def get_device(self, device_id):
         return self.root_proxy.get('/devices/{}'.format(device_id))
 
+    def get_child_device(self, parent_device_id, **kwargs):
+        """
+        Retrieve a child device object belonging
+        to the specified parent device based on some match
+        criteria. The first child device that matches the
+        provided criteria is returned.
+        :param parent_device_id: parent's device id
+        :param **kwargs: arbitrary list of match criteria
+        :return: Child Device Object or None
+        """
+        # Get all arguments to be used for comparison
+        # Note that for now we are only matching on the ONU ID
+        # Other matching fields can be added as required in the future
+        onu_id = kwargs.pop('onu_id', None)
+        if onu_id is None: return None
+
+        # Get all devices
+        devices = self.root_proxy.get('/devices')
+
+        # Get all child devices with the same parent ID
+        children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
+
+        # Loop through all the child devices with this parent ID
+        for child_id in children_ids:
+            found = True
+            device = self.get_device(child_id)
+
+            # Does this child device match the passed in ONU ID?
+            if device.proxy_address.onu_id != onu_id:
+                found = False
+
+            # Return the matched child device
+            if found is True:
+                return device
+
+        return None
+
     def add_device(self, device):
         assert isinstance(device, Device)
         self._make_up_to_date('/devices', device.id, device)
@@ -507,6 +544,22 @@
         topic = self._gen_rx_proxy_address_topic(proxy_address)
         self.event_bus.publish(topic, msg)
 
+    def register_for_inter_adapter_messages(self):
+        self.event_bus.subscribe(self.adapter_name,
+            lambda t, m: self.adapter.receive_inter_adapter_message(m))
+
+    def unregister_for_inter_adapter_messages(self):
+        self.event_bus.unsubscribe(self.adapter_name)
+
+    def publish_inter_adapter_message(self, device_id, msg):
+        # Get the device from the device_id
+        device = self.get_device(device_id)
+        assert device is not None
+
+        # Publish a message to the adapter that is responsible
+        # for managing this device
+        self.event_bus.publish(device.type, msg)
+
     # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
 
     def send_packet_in(self, logical_device_id, logical_port_no, packet):