blob: 8469a3f6b66f68d5bfadaae2649e45a25a51b99b [file] [log] [blame]
# Copyright 2018-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import structlog
from twisted.internet.defer import inlineCallbacks, returnValue, TimeoutError
from twisted.internet import reactor
from voltha.protos.device_pb2 import Image
from voltha.protos.common_pb2 import OperStatus, ConnectStatus
from voltha.extensions.omci.onu_configuration import OMCCVersion
from omci_entities import onu_custom_me_entities
from voltha.extensions.omci.omci_me import *
_STARTUP_RETRY_WAIT = 5
# abbreviations
OP = EntityOperations
class OMCI(object):
"""
OpenOMCI Support
"""
DEFAULT_UNTAGGED_VLAN = 4091 # To be equivalent to BroadCom Defaults
def __init__(self, handler, omci_agent):
self.log = structlog.get_logger(device_id=handler.device_id)
self._handler = handler
self._openomci_agent = omci_agent
self._enabled = False
self._connected = False
self._deferred = None
self._bridge_initialized = False
self._in_sync_reached = False
self._omcc_version = OMCCVersion.Unknown
self._total_tcont_count = 0 # From ANI-G ME
self._qos_flexibility = 0 # From ONT2_G ME
self._in_sync_subscription = None
self._connectivity_subscription = None
self._capabilities_subscription = None
# self._service_downloaded = False
self._mib_downloaded = False
self._mib_download_task = None
self._mib_download_deferred = None
self._onu_omci_device = omci_agent.add_device(handler.device_id,
handler.adapter_agent,
custom_me_map=onu_custom_me_entities(),
support_classes=handler.adapter.adtran_omci)
def __str__(self):
return "OMCI"
@property
def omci_agent(self):
return self._openomci_agent
@property
def omci_cc(self):
# TODO: Decrement access to Communications channel at this point? What about current PM stuff?
return self.onu_omci_device.omci_cc if self._onu_omci_device is not None else None
def receive_message(self, msg):
if self.enabled:
# TODO: Have OpenOMCI actually receive the messages
self.omci_cc.receive_message(msg)
def _start(self):
self._cancel_deferred()
# Subscriber to events of interest in OpenOMCI
self._subscribe_to_events()
self._onu_omci_device.start()
device = self._handler.adapter_agent.get_device(self._handler.device_id)
device.reason = 'Performing MIB Upload'
self._handler.adapter_agent.update_device(device)
if self._onu_omci_device.mib_db_in_sync:
self._deferred = reactor.callLater(0, self._mib_in_sync)
def _stop(self):
self._cancel_deferred()
# Unsubscribe to OpenOMCI Events
self._unsubscribe_to_events()
self._onu_omci_device.stop() # Will also cancel any running tasks/state-machines
self._mib_downloaded = False
self._mib_download_task = None
self._bridge_initialized = False
self._in_sync_reached = False
def _cancel_deferred(self):
d1, self._deferred = self._deferred, None
d2, self._mib_download_deferred = self._mib_download_deferred, None
for d in [d1, d2]:
try:
if d is not None and not d.called:
d.cancel()
except:
pass
def delete(self):
self.enabled = False
agent, self._openomci_agent = self._openomci_agent, None
device_id = self._handler.device_id
self._onu_omci_device = None
self._handler = None
if agent is not None:
agent.remove_device(device_id, cleanup=True)
@property
def enabled(self):
return self._enabled
@enabled.setter
def enabled(self, value):
if self._enabled != value:
self._enabled = value
if value:
self._start()
else:
self._stop()
@property
def connected(self):
return self._connected
@property
def onu_omci_device(self):
return self._onu_omci_device
def set_pm_config(self, pm_config):
"""
Set PM interval configuration
:param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
:return:
"""
self.onu_omci_device.set_pm_config(pm_config)
def _mib_in_sync(self):
"""
This method is ran whenever the ONU MIB database is in-sync. This is often after
the initial MIB Upload during ONU startup, or after it has gone out-of-sync and
then back in. This second case could be due a reboot of the ONU and a new version
of firmware is running on the ONU hardware.
"""
self.log.info('mib-in-sync')
device = self._handler.adapter_agent.get_device(self._handler.device_id)
device.oper_status = OperStatus.ACTIVE
device.connect_status = ConnectStatus.REACHABLE
device.reason = ''
self._handler.adapter_agent.update_device(device)
omci_dev = self._onu_omci_device
config = omci_dev.configuration
# In Sync, we can register logical ports now. Ideally this could occur on
# the first time we received a successful (no timeout) OMCI Rx response.
try:
device = self._handler.adapter_agent.get_device(self._handler.device_id)
ani_g = config.ani_g_entities
uni_g = config.uni_g_entities
pon_ports = len(ani_g) if ani_g is not None else 0
uni_ports = len(uni_g) if uni_g is not None else 0
# For the UNI ports below, they are created after the MIB Sync event occurs
# and the onu handler adds the ONU
assert pon_ports == 1, 'Expected one PON/ANI port, got {}'.format(pon_ports)
assert uni_ports == len(self._handler.uni_ports), \
'Expected {} UNI port(s), got {}'.format(len(self._handler.uni_ports), uni_ports)
# serial_number = omci_dev.configuration.serial_number
# self.log.info('serial-number', serial_number=serial_number)
# Save entity_id of PON ports
self._handler.pon_ports[0].entity_id = ani_g.keys()[0]
self._total_tcont_count = ani_g.get('total-tcont-count')
self._qos_flexibility = config.qos_configuration_flexibility or 0
self._omcc_version = config.omcc_version or OMCCVersion.Unknown
# vendorProductCode = str(config.vendor_product_code or 'unknown').rstrip('\0')
host_info = omci_dev.query_mib(IpHostConfigData.class_id)
mgmt_mac_address = next((host_info[inst].get('attributes').get('mac_address')
for inst in host_info
if isinstance(inst, int)), 'unknown')
device.mac_address = str(mgmt_mac_address)
device.model = str(config.version or 'unknown').rstrip('\0')
equipment_id = config.equipment_id or " unknown unknown "
eqpt_boot_version = str(equipment_id).rstrip('\0')
# eqptId = eqpt_boot_version[:10] # ie) BVMDZ10DRA
boot_version = eqpt_boot_version[12:] # ie) CML.D55~
images = [Image(name='boot-code',
version=boot_version.rstrip('\0'),
is_active=False,
is_committed=True,
is_valid=True,
install_datetime='Not Available',
hash='Not Available')] + \
config.software_images
del (device.images.image[:]) # Clear previous entries
device.images.image.extend(images)
# Save our device information
self._handler.adapter_agent.update_device(device)
# Start MIB download TODO: This will be replaced with a MIB Download task soon
self._in_sync_reached = True
except Exception as e:
self.log.exception('device-info-load', e=e)
self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
def _subscribe_to_events(self):
from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, \
OnuDeviceEntry
from voltha.extensions.omci.omci_cc import OMCI_CC, OmciCCRxEvents
# OMCI MIB Database sync status
bus = self._onu_omci_device.event_bus
topic = OnuDeviceEntry.event_bus_topic(self._handler.device_id,
OnuDeviceEvents.MibDatabaseSyncEvent)
self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
# OMCI Capabilities (MEs and Message Types
bus = self._onu_omci_device.event_bus
topic = OnuDeviceEntry.event_bus_topic(self._handler.device_id,
OnuDeviceEvents.OmciCapabilitiesEvent)
self._capabilities_subscription = bus.subscribe(topic, self.capabilities_handler)
# OMCI-CC Connectivity Events (for reachability/heartbeat)
bus = self._onu_omci_device.omci_cc.event_bus
topic = OMCI_CC.event_bus_topic(self._handler.device_id,
OmciCCRxEvents.Connectivity)
self._connectivity_subscription = bus.subscribe(topic, self.onu_is_reachable)
# TODO: Watch for any MIB RESET events or detection of an ONU reboot.
# If it occurs, set _service_downloaded and _mib_download to false
# and make sure that we get 'new' capabilities
def _unsubscribe_to_events(self):
insync, self._in_sync_subscription = self._in_sync_subscription, None
connect, self._connectivity_subscription = self._connectivity_subscription, None
caps, self._capabilities_subscription = self._capabilities_subscription, None
if insync is not None:
bus = self._onu_omci_device.event_bus
bus.unsubscribe(insync)
if connect is not None:
bus = self._onu_omci_device.omci_cc.event_bus
bus.unsubscribe(connect)
if caps is not None:
bus = self._onu_omci_device.event_bus
bus.unsubscribe(caps)
def in_sync_handler(self, _topic, msg):
if self._in_sync_subscription is not None:
try:
from voltha.extensions.omci.onu_device_entry import IN_SYNC_KEY
if msg[IN_SYNC_KEY]:
# Start up device_info load from MIB DB
reactor.callLater(0, self._mib_in_sync)
else:
# Cancel any running/scheduled MIB download task
try:
d, self._mib_download_deferred = self._mib_download_deferred, None
d.cancel()
except:
pass
except Exception as e:
self.log.exception('in-sync', e=e)
def capabilities_handler(self, _topic, _msg):
"""
This event occurs after an ONU reaches the In-Sync state and the OMCI ME has
been queried for supported ME and message types.
At this point, we can act upon any download device and/or service Technology
profiles (when they exist). For now, just run our somewhat fixed script
"""
if self._capabilities_subscription is not None:
from adtn_mib_download_task import AdtnMibDownloadTask
self._mib_download_task = None
def success(_results):
dev = self._handler.adapter_agent.get_device(self._handler.device_id)
dev.reason = ''
self._handler.adapter_agent.update_device(dev)
self._mib_downloaded = True
self._mib_download_task = None
def failure(reason):
self.log.error('mib-download-failure', reason=reason)
self._mib_download_task = None
dev = self._handler.adapter_agent.get_device(self._handler.device_id)
self._handler.adapter_agent.update_device(dev)
self._mib_download_deferred = reactor.callLater(_STARTUP_RETRY_WAIT,
self.capabilities_handler,
None, None)
if not self._mib_downloaded:
device = self._handler.adapter_agent.get_device(self._handler.device_id)
device.reason = 'Initial MIB Download'
self._handler.adapter_agent.update_device(device)
self._mib_download_task = AdtnMibDownloadTask(self.omci_agent,
self._handler)
if self._mib_download_task is not None:
self._mib_download_deferred = \
self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
self._mib_download_deferred.addCallbacks(success, failure)
def onu_is_reachable(self, _topic, msg):
"""
Reach-ability change event
:param _topic: (str) subscription topic, not used
:param msg: (dict) 'connected' key holds True if reachable
"""
from voltha.extensions.omci.omci_cc import CONNECTED_KEY
if self._connectivity_subscription is not None:
try:
connected = msg[CONNECTED_KEY]
# TODO: For now, only care about the first connect occurrence.
# Later we could use this for a heartbeat, but may want some hysteresis
# Cancel any 'reachable' subscriptions
if connected:
evt_bus = self._onu_omci_device.omci_cc.event_bus
evt_bus.unsubscribe(self._connectivity_subscription)
self._connectivity_subscription = None
self._connected = True
device = self._handler.adapter_agent.get_device(self._handler.device_id)
device.oper_status = OperStatus.ACTIVE
device.connect_status = ConnectStatus.REACHABLE
self._handler.adapter_agent.update_device(device)
except Exception as e:
self.log.exception('onu-reachable', e=e)