VOL-1493 VOL-1454:
Update to reference core_proxy and adapter_proxy.
Process omci messages using adapter_proxy
Implement and process inter adapter messages. Start and use
openomci state machine. Update handler and omci_cc
for new core/adapter proxy
Can now successfully mib upload and mib_sync is success
and run omci tasks
Change-Id: I94db6e9a0cd2aa23dad3a1224f2e66f2ff76b771
diff --git a/pyvoltha/adapters/extensions/omci/database/alarm_db_ext.py b/pyvoltha/adapters/extensions/omci/database/alarm_db_ext.py
index 575ef54..01ecc1a 100644
--- a/pyvoltha/adapters/extensions/omci/database/alarm_db_ext.py
+++ b/pyvoltha/adapters/extensions/omci/database/alarm_db_ext.py
@@ -48,7 +48,7 @@
:param omci_agent: (OpenOMCIAgent) OpenOMCI Agent
"""
super(AlarmDbExternal, self).__init__(omci_agent)
- self._core = omci_agent.core
+ self._core = omci_agent.core_proxy
def start(self):
"""
diff --git a/pyvoltha/adapters/extensions/omci/database/mib_db_ext.py b/pyvoltha/adapters/extensions/omci/database/mib_db_ext.py
index 4415987..7b3bea7 100644
--- a/pyvoltha/adapters/extensions/omci/database/mib_db_ext.py
+++ b/pyvoltha/adapters/extensions/omci/database/mib_db_ext.py
@@ -105,7 +105,7 @@
:param omci_agent: (OpenOMCIAgent) OpenOMCI Agent
"""
super(MibDbExternal, self).__init__(omci_agent)
- self._core = omci_agent.core
+ self._core = omci_agent.core_proxy
# Some statistics to help with debug/tuning/...
self._statistics = {
'get': MibDbStatistic('get'),
diff --git a/pyvoltha/adapters/extensions/omci/omci_cc.py b/pyvoltha/adapters/extensions/omci/omci_cc.py
index ce90243..ac0731d 100644
--- a/pyvoltha/adapters/extensions/omci/omci_cc.py
+++ b/pyvoltha/adapters/extensions/omci/omci_cc.py
@@ -28,6 +28,7 @@
from pyvoltha.adapters.extensions.omci.omci_defs import EntityOperations, ReasonCodes
from pyvoltha.adapters.extensions.omci.omci_entities import entity_id_to_class_map
from pyvoltha.common.event_bus import EventBusClient
+from voltha_protos.inter_container_pb2 import InterAdapterMessageType, InterAdapterOmciMessage
from enum import IntEnum
from binascii import hexlify
@@ -102,11 +103,13 @@
OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
}
- def __init__(self, adapter_agent, device_id, me_map=None,
+ def __init__(self, core_proxy, adapter_proxy, device_id, me_map=None,
clock=None):
self.log = structlog.get_logger(device_id=device_id)
- self._adapter_agent = adapter_agent
+ self._core_proxy = core_proxy
+ self._adapter_proxy = adapter_proxy
self._device_id = device_id
+ self._device = None
self._proxy_address = None
self._enabled = False
self._extended_messaging = False
@@ -261,6 +264,7 @@
def max_lp_tx_queue(self):
return self._max_lp_tx_queue
+ @inlineCallbacks
def _start(self):
"""
Start the OMCI Communications Channel
@@ -268,8 +272,8 @@
assert self._enabled, 'Start should only be called if enabled'
self.flush()
- device = self._adapter_agent.get_device(self._device_id)
- self._proxy_address = device.proxy_address
+ self._device = yield self._core_proxy.get_device(self._device_id)
+ self._proxy_address = self._device.proxy_address
def _stop(self):
"""
@@ -858,6 +862,7 @@
other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
return other_msg_type not in not_allowed
+ @inlineCallbacks
def _send_next_request(self, high_priority):
"""
Pull next tx request and send it
@@ -896,8 +901,20 @@
ts = arrow.utcnow().float_timestamp
try:
self._rx_response[index] = None
- self._adapter_agent.send_proxied_message(self._proxy_address,
- hexify(str(frame)))
+
+ omci_msg = InterAdapterOmciMessage(message=hexify(str(frame)))
+
+ self.log.debug('inter-adapter-send-omci', omci_msg=omci_msg)
+
+ yield self._adapter_proxy.send_inter_adapter_message(
+ msg=omci_msg,
+ type=InterAdapterMessageType.OMCI_REQUEST,
+ from_adapter=self._device.type,
+ to_adapter=self._proxy_address.device_type,
+ to_device_id=self._device_id,
+ proxy_device_id=self._proxy_address.device_id
+ )
+
finally:
global entity_id_to_class_map
entity_id_to_class_map = saved_me_map
diff --git a/pyvoltha/adapters/extensions/omci/onu_device_entry.py b/pyvoltha/adapters/extensions/omci/onu_device_entry.py
index 062edc4..dab0195 100644
--- a/pyvoltha/adapters/extensions/omci/onu_device_entry.py
+++ b/pyvoltha/adapters/extensions/omci/onu_device_entry.py
@@ -55,14 +55,15 @@
"""
An ONU Device entry in the MIB
"""
- def __init__(self, omci_agent, device_id, adapter_agent, custom_me_map,
+ def __init__(self, omci_agent, device_id, core_proxy, adapter_proxy, custom_me_map,
mib_db, alarm_db, support_classes, clock=None):
"""
Class initializer
:param omci_agent: (OpenOMCIAgent) Reference to OpenOMCI Agent
:param device_id: (str) ONU Device ID
- :param adapter_agent: (AdapterAgent) Adapter agent for ONU
+ :param core_proxy: (CoreProxy) Remote API to VOLTHA Core
+ :param adapter_proxy: (AdapterProxy) Remote API to other Adapters via VOLTHA Core
:param custom_me_map: (dict) Additional/updated ME to add to class map
:param mib_db: (MibDbApi) MIB Database reference
:param alarm_db: (MibDbApi) Alarm Table/Database reference
@@ -73,7 +74,8 @@
self._started = False
self._omci_agent = omci_agent # OMCI AdapterAgent
self._device_id = device_id # ONU Device ID
- self._adapter_agent = adapter_agent
+ self._core_proxy = core_proxy
+ self._adapter_proxy = adapter_proxy
self._runner = TaskRunner(device_id, clock=clock) # OMCI_CC Task runner
self._deferred = None
# self._img_download_deferred = None # deferred of image file download from server
@@ -167,7 +169,7 @@
self.event_bus = EventBusClient()
# Create OMCI communications channel
- self._omci_cc = OMCI_CC(adapter_agent, self.device_id, self._me_map, clock=clock)
+ self._omci_cc = OMCI_CC(core_proxy, adapter_proxy, self.device_id, self._me_map, clock=clock)
@staticmethod
def event_bus_topic(device_id, event):
@@ -190,8 +192,8 @@
return self._omci_cc
@property
- def adapter_agent(self):
- return self._adapter_agent
+ def core_proxy(self):
+ return self._core_proxy
@property
def task_runner(self):
diff --git a/pyvoltha/adapters/extensions/omci/openomci_agent.py b/pyvoltha/adapters/extensions/omci/openomci_agent.py
index 1addc60..672af5b 100644
--- a/pyvoltha/adapters/extensions/omci/openomci_agent.py
+++ b/pyvoltha/adapters/extensions/omci/openomci_agent.py
@@ -104,15 +104,17 @@
This will become the primary interface into OpenOMCI for ONU Device Adapters
in VOLTHA v1.3 sprint 3 time frame.
"""
- def __init__(self, core, support_classes=OpenOmciAgentDefaults, clock=None):
+ def __init__(self, core_proxy, adapter_proxy, support_classes=OpenOmciAgentDefaults, clock=None):
"""
Class initializer
- :param core: (VolthaCore) VOLTHA Core
+ :param core_proxy: (CoreProxy) Remote API to VOLTHA Core
+ :param adapter_proxy: (AdapterProxy) Remote API to other adapters via VOLTHA Core
:param support_classes: (Dict) Classes to support OMCI
"""
self.log = structlog.get_logger()
- self._core = core
+ self._core_proxy = core_proxy
+ self._adapter_proxy = adapter_proxy
self.reactor = clock if clock is not None else reactor
self._started = False
self._devices = dict() # device-id -> DeviceEntry
@@ -130,9 +132,9 @@
self._alarm_database_cls = support_classes['alarm-synchronizer']['database']
@property
- def core(self):
+ def core_proxy(self):
""" Return a reference to the VOLTHA Core component"""
- return self._core
+ return self._core_proxy
@property
def database_class(self):
@@ -214,7 +216,7 @@
except Exception as e:
self.log.exception('advertise-failure', e=e)
- def add_device(self, device_id, adapter_agent, custom_me_map=None,
+ def add_device(self, device_id, core_proxy, adapter_proxy, custom_me_map=None,
support_classes=OpenOmciAgentDefaults):
"""
Add a new ONU to be managed.
@@ -227,7 +229,8 @@
for this object.
:param device_id: (str) Device ID of ONU to add
- :param adapter_agent: (AdapterAgent) Adapter agent for ONU
+ :param core_proxy: (CoreProxy) Remote API to VOLTHA core
+ :param adapter_proxy: (AdapterProxy) Remote API to other adapters via VOLTHA core
:param custom_me_map: (dict) Additional/updated ME to add to class map
:param support_classes: (dict) State machines and tasks for this ONU
@@ -238,7 +241,7 @@
device = self._devices.get(device_id)
if device is None:
- device = OnuDeviceEntry(self, device_id, adapter_agent, custom_me_map,
+ device = OnuDeviceEntry(self, device_id, core_proxy, adapter_proxy, custom_me_map,
self._mib_db, self._alarm_db, support_classes, clock=self.reactor)
self._devices[device_id] = device