VOL-1398: Adtran-ONU - Initial containerization commit
Change-Id: I7afcc1ad65b9ef80da994b0b0ddf74860911bb46
diff --git a/adapters/adtran_onu/omci/omci.py b/adapters/adtran_onu/omci/omci.py
new file mode 100644
index 0000000..8469a3f
--- /dev/null
+++ b/adapters/adtran_onu/omci/omci.py
@@ -0,0 +1,367 @@
+# Copyright 2018-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue, TimeoutError
+from twisted.internet import reactor
+
+from voltha.protos.device_pb2 import Image
+
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus
+from voltha.extensions.omci.onu_configuration import OMCCVersion
+
+from omci_entities import onu_custom_me_entities
+from voltha.extensions.omci.omci_me import *
+
+_STARTUP_RETRY_WAIT = 5
+# abbreviations
+OP = EntityOperations
+
+
+class OMCI(object):
+ """
+ OpenOMCI Support
+ """
+ DEFAULT_UNTAGGED_VLAN = 4091 # To be equivalent to BroadCom Defaults
+
+ def __init__(self, handler, omci_agent):
+ self.log = structlog.get_logger(device_id=handler.device_id)
+ self._handler = handler
+ self._openomci_agent = omci_agent
+ self._enabled = False
+ self._connected = False
+ self._deferred = None
+ self._bridge_initialized = False
+ self._in_sync_reached = False
+ self._omcc_version = OMCCVersion.Unknown
+ self._total_tcont_count = 0 # From ANI-G ME
+ self._qos_flexibility = 0 # From ONT2_G ME
+
+ self._in_sync_subscription = None
+ self._connectivity_subscription = None
+ self._capabilities_subscription = None
+
+ # self._service_downloaded = False
+ self._mib_downloaded = False
+ self._mib_download_task = None
+ self._mib_download_deferred = None
+
+ self._onu_omci_device = omci_agent.add_device(handler.device_id,
+ handler.adapter_agent,
+ custom_me_map=onu_custom_me_entities(),
+ support_classes=handler.adapter.adtran_omci)
+
+ def __str__(self):
+ return "OMCI"
+
+ @property
+ def omci_agent(self):
+ return self._openomci_agent
+
+ @property
+ def omci_cc(self):
+ # TODO: Decrement access to Communications channel at this point? What about current PM stuff?
+ return self.onu_omci_device.omci_cc if self._onu_omci_device is not None else None
+
+ def receive_message(self, msg):
+ if self.enabled:
+ # TODO: Have OpenOMCI actually receive the messages
+ self.omci_cc.receive_message(msg)
+
+ def _start(self):
+ self._cancel_deferred()
+
+ # Subscriber to events of interest in OpenOMCI
+ self._subscribe_to_events()
+ self._onu_omci_device.start()
+
+ device = self._handler.adapter_agent.get_device(self._handler.device_id)
+ device.reason = 'Performing MIB Upload'
+ self._handler.adapter_agent.update_device(device)
+
+ if self._onu_omci_device.mib_db_in_sync:
+ self._deferred = reactor.callLater(0, self._mib_in_sync)
+
+ def _stop(self):
+ self._cancel_deferred()
+
+ # Unsubscribe to OpenOMCI Events
+ self._unsubscribe_to_events()
+ self._onu_omci_device.stop() # Will also cancel any running tasks/state-machines
+
+ self._mib_downloaded = False
+ self._mib_download_task = None
+ self._bridge_initialized = False
+ self._in_sync_reached = False
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._mib_download_deferred = self._mib_download_deferred, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def delete(self):
+ self.enabled = False
+
+ agent, self._openomci_agent = self._openomci_agent, None
+ device_id = self._handler.device_id
+ self._onu_omci_device = None
+ self._handler = None
+
+ if agent is not None:
+ agent.remove_device(device_id, cleanup=True)
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ if self._enabled != value:
+ self._enabled = value
+
+ if value:
+ self._start()
+ else:
+ self._stop()
+
+ @property
+ def connected(self):
+ return self._connected
+
+ @property
+ def onu_omci_device(self):
+ return self._onu_omci_device
+
+ def set_pm_config(self, pm_config):
+ """
+ Set PM interval configuration
+
+ :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
+ :return:
+ """
+ self.onu_omci_device.set_pm_config(pm_config)
+
+ def _mib_in_sync(self):
+ """
+ This method is ran whenever the ONU MIB database is in-sync. This is often after
+ the initial MIB Upload during ONU startup, or after it has gone out-of-sync and
+ then back in. This second case could be due a reboot of the ONU and a new version
+ of firmware is running on the ONU hardware.
+ """
+ self.log.info('mib-in-sync')
+
+ device = self._handler.adapter_agent.get_device(self._handler.device_id)
+ device.oper_status = OperStatus.ACTIVE
+ device.connect_status = ConnectStatus.REACHABLE
+ device.reason = ''
+ self._handler.adapter_agent.update_device(device)
+
+ omci_dev = self._onu_omci_device
+ config = omci_dev.configuration
+
+ # In Sync, we can register logical ports now. Ideally this could occur on
+ # the first time we received a successful (no timeout) OMCI Rx response.
+ try:
+ device = self._handler.adapter_agent.get_device(self._handler.device_id)
+
+ ani_g = config.ani_g_entities
+ uni_g = config.uni_g_entities
+ pon_ports = len(ani_g) if ani_g is not None else 0
+ uni_ports = len(uni_g) if uni_g is not None else 0
+
+ # For the UNI ports below, they are created after the MIB Sync event occurs
+ # and the onu handler adds the ONU
+ assert pon_ports == 1, 'Expected one PON/ANI port, got {}'.format(pon_ports)
+ assert uni_ports == len(self._handler.uni_ports), \
+ 'Expected {} UNI port(s), got {}'.format(len(self._handler.uni_ports), uni_ports)
+
+ # serial_number = omci_dev.configuration.serial_number
+ # self.log.info('serial-number', serial_number=serial_number)
+
+ # Save entity_id of PON ports
+ self._handler.pon_ports[0].entity_id = ani_g.keys()[0]
+
+ self._total_tcont_count = ani_g.get('total-tcont-count')
+ self._qos_flexibility = config.qos_configuration_flexibility or 0
+ self._omcc_version = config.omcc_version or OMCCVersion.Unknown
+
+ # vendorProductCode = str(config.vendor_product_code or 'unknown').rstrip('\0')
+
+ host_info = omci_dev.query_mib(IpHostConfigData.class_id)
+ mgmt_mac_address = next((host_info[inst].get('attributes').get('mac_address')
+ for inst in host_info
+ if isinstance(inst, int)), 'unknown')
+ device.mac_address = str(mgmt_mac_address)
+ device.model = str(config.version or 'unknown').rstrip('\0')
+
+ equipment_id = config.equipment_id or " unknown unknown "
+ eqpt_boot_version = str(equipment_id).rstrip('\0')
+ # eqptId = eqpt_boot_version[:10] # ie) BVMDZ10DRA
+ boot_version = eqpt_boot_version[12:] # ie) CML.D55~
+
+ images = [Image(name='boot-code',
+ version=boot_version.rstrip('\0'),
+ is_active=False,
+ is_committed=True,
+ is_valid=True,
+ install_datetime='Not Available',
+ hash='Not Available')] + \
+ config.software_images
+
+ del (device.images.image[:]) # Clear previous entries
+ device.images.image.extend(images)
+
+ # Save our device information
+ self._handler.adapter_agent.update_device(device)
+
+ # Start MIB download TODO: This will be replaced with a MIB Download task soon
+ self._in_sync_reached = True
+
+ except Exception as e:
+ self.log.exception('device-info-load', e=e)
+ self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
+
+ def _subscribe_to_events(self):
+ from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, \
+ OnuDeviceEntry
+ from voltha.extensions.omci.omci_cc import OMCI_CC, OmciCCRxEvents
+
+ # OMCI MIB Database sync status
+ bus = self._onu_omci_device.event_bus
+ topic = OnuDeviceEntry.event_bus_topic(self._handler.device_id,
+ OnuDeviceEvents.MibDatabaseSyncEvent)
+ self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
+
+ # OMCI Capabilities (MEs and Message Types
+ bus = self._onu_omci_device.event_bus
+ topic = OnuDeviceEntry.event_bus_topic(self._handler.device_id,
+ OnuDeviceEvents.OmciCapabilitiesEvent)
+ self._capabilities_subscription = bus.subscribe(topic, self.capabilities_handler)
+
+ # OMCI-CC Connectivity Events (for reachability/heartbeat)
+ bus = self._onu_omci_device.omci_cc.event_bus
+ topic = OMCI_CC.event_bus_topic(self._handler.device_id,
+ OmciCCRxEvents.Connectivity)
+ self._connectivity_subscription = bus.subscribe(topic, self.onu_is_reachable)
+
+ # TODO: Watch for any MIB RESET events or detection of an ONU reboot.
+ # If it occurs, set _service_downloaded and _mib_download to false
+ # and make sure that we get 'new' capabilities
+
+ def _unsubscribe_to_events(self):
+ insync, self._in_sync_subscription = self._in_sync_subscription, None
+ connect, self._connectivity_subscription = self._connectivity_subscription, None
+ caps, self._capabilities_subscription = self._capabilities_subscription, None
+
+ if insync is not None:
+ bus = self._onu_omci_device.event_bus
+ bus.unsubscribe(insync)
+
+ if connect is not None:
+ bus = self._onu_omci_device.omci_cc.event_bus
+ bus.unsubscribe(connect)
+
+ if caps is not None:
+ bus = self._onu_omci_device.event_bus
+ bus.unsubscribe(caps)
+
+ def in_sync_handler(self, _topic, msg):
+ if self._in_sync_subscription is not None:
+ try:
+ from voltha.extensions.omci.onu_device_entry import IN_SYNC_KEY
+
+ if msg[IN_SYNC_KEY]:
+ # Start up device_info load from MIB DB
+ reactor.callLater(0, self._mib_in_sync)
+ else:
+ # Cancel any running/scheduled MIB download task
+ try:
+ d, self._mib_download_deferred = self._mib_download_deferred, None
+ d.cancel()
+ except:
+ pass
+
+ except Exception as e:
+ self.log.exception('in-sync', e=e)
+
+ def capabilities_handler(self, _topic, _msg):
+ """
+ This event occurs after an ONU reaches the In-Sync state and the OMCI ME has
+ been queried for supported ME and message types.
+
+ At this point, we can act upon any download device and/or service Technology
+ profiles (when they exist). For now, just run our somewhat fixed script
+ """
+ if self._capabilities_subscription is not None:
+ from adtn_mib_download_task import AdtnMibDownloadTask
+ self._mib_download_task = None
+
+ def success(_results):
+ dev = self._handler.adapter_agent.get_device(self._handler.device_id)
+ dev.reason = ''
+ self._handler.adapter_agent.update_device(dev)
+ self._mib_downloaded = True
+ self._mib_download_task = None
+
+ def failure(reason):
+ self.log.error('mib-download-failure', reason=reason)
+ self._mib_download_task = None
+ dev = self._handler.adapter_agent.get_device(self._handler.device_id)
+ self._handler.adapter_agent.update_device(dev)
+ self._mib_download_deferred = reactor.callLater(_STARTUP_RETRY_WAIT,
+ self.capabilities_handler,
+ None, None)
+ if not self._mib_downloaded:
+ device = self._handler.adapter_agent.get_device(self._handler.device_id)
+ device.reason = 'Initial MIB Download'
+ self._handler.adapter_agent.update_device(device)
+ self._mib_download_task = AdtnMibDownloadTask(self.omci_agent,
+ self._handler)
+ if self._mib_download_task is not None:
+ self._mib_download_deferred = \
+ self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
+ self._mib_download_deferred.addCallbacks(success, failure)
+
+ def onu_is_reachable(self, _topic, msg):
+ """
+ Reach-ability change event
+ :param _topic: (str) subscription topic, not used
+ :param msg: (dict) 'connected' key holds True if reachable
+ """
+ from voltha.extensions.omci.omci_cc import CONNECTED_KEY
+ if self._connectivity_subscription is not None:
+ try:
+ connected = msg[CONNECTED_KEY]
+
+ # TODO: For now, only care about the first connect occurrence.
+ # Later we could use this for a heartbeat, but may want some hysteresis
+ # Cancel any 'reachable' subscriptions
+ if connected:
+ evt_bus = self._onu_omci_device.omci_cc.event_bus
+ evt_bus.unsubscribe(self._connectivity_subscription)
+ self._connectivity_subscription = None
+ self._connected = True
+
+ device = self._handler.adapter_agent.get_device(self._handler.device_id)
+ device.oper_status = OperStatus.ACTIVE
+ device.connect_status = ConnectStatus.REACHABLE
+ self._handler.adapter_agent.update_device(device)
+
+ except Exception as e:
+ self.log.exception('onu-reachable', e=e)