Adding infrastructure to allow sending messages between adapters
using the adapter-agent event bus.
Change-Id: Ia3654b8944745bc76f8ac4ea3c1439a082979dd4
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index 59fd987..b8f7d7f 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -68,6 +68,9 @@
)
self.devices_handlers = dict() # device_id -> BroadcomOnuHandler()
+ # register for adapter messages
+ self.adapter_agent.register_for_inter_adapter_messages()
+
def start(self):
log.debug('starting')
log.info('started')
@@ -138,6 +141,9 @@
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+ def receive_inter_adapter_message(self, msg):
+ log.info('receive_inter_adapter_message', msg=msg)
+
class BroadcomOnuHandler(object):
@@ -165,6 +171,7 @@
self.proxy_address = device.proxy_address
self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
# populate device info
device.root = True
device.vendor = 'Broadcom'
diff --git a/voltha/adapters/dpoe_onu/dpoe_onu.py b/voltha/adapters/dpoe_onu/dpoe_onu.py
index a73fd82..6084bc4 100644
--- a/voltha/adapters/dpoe_onu/dpoe_onu.py
+++ b/voltha/adapters/dpoe_onu/dpoe_onu.py
@@ -448,3 +448,6 @@
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index fcba8af..60c27f1 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -192,6 +192,17 @@
:return: None
"""
+ def receive_inter_adapter_message(msg):
+ """
+ Called when the adapter recieves a message that was sent to it directly
+ from another adapter. An adapter may register for these messages by calling
+ the register_for_inter_adapter_messages() method in the adapter agent.
+ Note that it is the responsibility of the sending and receiving
+ adapters to properly encode and decode the message.
+ :param msg: The message contents.
+ :return: None
+ """
+
# TODO work in progress
# ...
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 926e8d9..09e4440 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -293,7 +293,15 @@
# event: 'disable-completed'
# event_data: {'serial_num-vendor_id': <str>
# 'serial_num-vendor_specific: <str>}
- pass
+ event_dict = {'event':event, 'event_data':event_data}
+
+ # Get child_device from onu_id
+ child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=key['onu_id'])
+ assert child_device is not None
+
+ # Send the event message to the ONU adapter
+ self.adapter_agent.publish_inter_adapter_message(child_device.id, event_dict)
+
elif _object == 'alloc_id':
# key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>, 'alloc_id': ,<int>}
pass
@@ -385,6 +393,9 @@
self.devices_handlers = dict() # device_id -> MapleOltHandler()
self.logical_device_id_to_root_device_id = dict()
+ # register for adapter messages
+ self.adapter_agent.register_for_inter_adapter_messages()
+
def start(self):
log.debug('starting')
log.info('started')
@@ -465,6 +476,8 @@
handler = self.devices_handlers[device_id]
handler.packet_out(egress_port_no, msg)
+ def receive_inter_adapter_message(self, msg):
+ pass
class MaplePBClientFactory(pb.PBClientFactory, ReconnectingClientFactory):
channel = None
diff --git a/voltha/adapters/microsemi_olt/microsemi_olt.py b/voltha/adapters/microsemi_olt/microsemi_olt.py
index b98e340..8ac93c5 100644
--- a/voltha/adapters/microsemi_olt/microsemi_olt.py
+++ b/voltha/adapters/microsemi_olt/microsemi_olt.py
@@ -165,6 +165,9 @@
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
##
# Private methods
##
diff --git a/voltha/adapters/pmcs_onu/pmcs_onu.py b/voltha/adapters/pmcs_onu/pmcs_onu.py
index 57672ba..50aada1 100644
--- a/voltha/adapters/pmcs_onu/pmcs_onu.py
+++ b/voltha/adapters/pmcs_onu/pmcs_onu.py
@@ -142,6 +142,9 @@
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
@inlineCallbacks
def _onu_device_activation(self, device):
# first we verify that we got parent reference and proxy info
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index c05d7fd..58cba69 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -294,6 +294,9 @@
handler = self.devices_handlers[device_id]
handler.packet_out(egress_port_no, msg)
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
class PonSimOltHandler(object):
def __init__(self, adapter, device_id):
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index a131052..2098a22 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -143,6 +143,9 @@
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
class PonSimOnuHandler(object):
def __init__(self, adapter, device_id):
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 6ed10a9..d6421f0 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -667,6 +667,9 @@
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
def start_kpi_collection(self, device_id):
"""Simulate periodic KPI metric collection from the device"""
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index e25004f..8abe3a3 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -379,3 +379,6 @@
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 22ca111..914c47b 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -919,6 +919,9 @@
self.io_port.send(str(frame))
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
def start_kpi_collection(self, device_id):
""" Periodic KPI metric collection from the device """
import random
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index 849bf5a..00477b4 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -531,6 +531,9 @@
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+ def receive_inter_adapter_message(self, msg):
+ raise NotImplementedError()
+
def start_kpi_collection(self, device_id):
"""TMP Simulate periodic KPI metric collection from the device"""
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 7ef9a69..5798045 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -186,6 +186,43 @@
def get_device(self, device_id):
return self.root_proxy.get('/devices/{}'.format(device_id))
+ def get_child_device(self, parent_device_id, **kwargs):
+ """
+ Retrieve a child device object belonging
+ to the specified parent device based on some match
+ criteria. The first child device that matches the
+ provided criteria is returned.
+ :param parent_device_id: parent's device id
+ :param **kwargs: arbitrary list of match criteria
+ :return: Child Device Object or None
+ """
+ # Get all arguments to be used for comparison
+ # Note that for now we are only matching on the ONU ID
+ # Other matching fields can be added as required in the future
+ onu_id = kwargs.pop('onu_id', None)
+ if onu_id is None: return None
+
+ # Get all devices
+ devices = self.root_proxy.get('/devices')
+
+ # Get all child devices with the same parent ID
+ children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
+
+ # Loop through all the child devices with this parent ID
+ for child_id in children_ids:
+ found = True
+ device = self.get_device(child_id)
+
+ # Does this child device match the passed in ONU ID?
+ if device.proxy_address.onu_id != onu_id:
+ found = False
+
+ # Return the matched child device
+ if found is True:
+ return device
+
+ return None
+
def add_device(self, device):
assert isinstance(device, Device)
self._make_up_to_date('/devices', device.id, device)
@@ -507,6 +544,22 @@
topic = self._gen_rx_proxy_address_topic(proxy_address)
self.event_bus.publish(topic, msg)
+ def register_for_inter_adapter_messages(self):
+ self.event_bus.subscribe(self.adapter_name,
+ lambda t, m: self.adapter.receive_inter_adapter_message(m))
+
+ def unregister_for_inter_adapter_messages(self):
+ self.event_bus.unsubscribe(self.adapter_name)
+
+ def publish_inter_adapter_message(self, device_id, msg):
+ # Get the device from the device_id
+ device = self.get_device(device_id)
+ assert device is not None
+
+ # Publish a message to the adapter that is responsible
+ # for managing this device
+ self.event_bus.publish(device.type, msg)
+
# ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
def send_packet_in(self, logical_device_id, logical_port_no, packet):