blob: 52de1f44589db7560bc959076b0259ddc02740ee [file] [log] [blame]
#
# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import structlog
from copy import deepcopy
from voltha_protos.device_pb2 import ImageDownload
from pyvoltha.adapters.extensions.omci.omci_defs import EntityOperations, ReasonCodes
import pyvoltha.adapters.extensions.omci.omci_entities as omci_entities
from pyvoltha.adapters.extensions.omci.omci_cc import OMCI_CC
from pyvoltha.common.event_bus import EventBusClient
from pyvoltha.adapters.extensions.omci.tasks.task_runner import TaskRunner
from pyvoltha.adapters.extensions.omci.onu_configuration import OnuConfiguration
from pyvoltha.adapters.extensions.omci.tasks.reboot_task import OmciRebootRequest, RebootFlags
from pyvoltha.adapters.extensions.omci.tasks.omci_modify_request import OmciModifyRequest
from pyvoltha.adapters.extensions.omci.omci_me import OntGFrame
from pyvoltha.adapters.extensions.omci.state_machines.image_agent import ImageAgent
from twisted.internet import reactor, defer
from enum import IntEnum
OP = EntityOperations
RC = ReasonCodes
ACTIVE_KEY = 'active'
IN_SYNC_KEY = 'in-sync'
LAST_IN_SYNC_KEY = 'last-in-sync-time'
SUPPORTED_MESSAGE_ENTITY_KEY = 'managed-entities'
SUPPORTED_MESSAGE_TYPES_KEY = 'message-type'
class OnuDeviceEvents(IntEnum):
# Events of interest to Device Adapters and OpenOMCI State Machines
DeviceStatusEvent = 0 # OnuDeviceEntry running status changed
MibDatabaseSyncEvent = 1 # MIB database sync changed
OmciCapabilitiesEvent = 2 # OMCI ME and message type capabilities
AlarmDatabaseSyncEvent = 3 # Alarm database sync changed
# TODO: Add other events here as needed
class OnuDeviceEntry(object):
"""
An ONU Device entry in the MIB
"""
def __init__(self, omci_agent, device_id, core_proxy, adapter_proxy, custom_me_map,
mib_db, alarm_db, support_classes, clock=None):
"""
Class initializer
:param omci_agent: (OpenOMCIAgent) Reference to OpenOMCI Agent
:param device_id: (str) ONU Device ID
:param core_proxy: (CoreProxy) Remote API to VOLTHA Core
:param adapter_proxy: (AdapterProxy) Remote API to other Adapters via VOLTHA Core
:param custom_me_map: (dict) Additional/updated ME to add to class map
:param mib_db: (MibDbApi) MIB Database reference
:param alarm_db: (MibDbApi) Alarm Table/Database reference
:param support_classes: (dict) State machines and tasks for this ONU
"""
self.log = structlog.get_logger(device_id=device_id)
self._started = False
self._omci_agent = omci_agent # OMCI AdapterAgent
self._device_id = device_id # ONU Device ID
self._core_proxy = core_proxy
self._adapter_proxy = adapter_proxy
self._runner = TaskRunner(device_id, clock=clock) # OMCI_CC Task runner
self._deferred = None
# self._img_download_deferred = None # deferred of image file download from server
self._omci_upgrade_deferred = None # deferred of ONU OMCI upgrading procedure
self._omci_activate_deferred = None # deferred of ONU OMCI Softwre Image Activate
self._img_deferred = None # deferred returned to caller of do_onu_software_download
self._first_in_sync = False
self._first_capabilities = False
self._timestamp = None
# self._image_download = None # (voltha_pb2.ImageDownload)
self.reactor = clock if clock is not None else reactor
# OMCI related databases are on a per-agent basis. State machines and tasks
# are per ONU Vendor
#
self._support_classes = support_classes
self._configuration = None
try:
# MIB Synchronization state machine
self._mib_db_in_sync = False
mib_synchronizer_info = support_classes.get('mib-synchronizer')
advertise = mib_synchronizer_info['advertise-events']
self._mib_sync_sm = mib_synchronizer_info['state-machine'](self._omci_agent,
device_id,
mib_synchronizer_info['tasks'],
mib_db,
advertise_events=advertise)
# ONU OMCI Capabilities state machine
capabilities_info = support_classes.get('omci-capabilities')
advertise = capabilities_info['advertise-events']
self._capabilities_sm = capabilities_info['state-machine'](self._omci_agent,
device_id,
capabilities_info['tasks'],
advertise_events=advertise)
# ONU Performance Monitoring Intervals state machine
interval_info = support_classes.get('performance-intervals')
advertise = interval_info['advertise-events']
self._pm_intervals_sm = interval_info['state-machine'](self._omci_agent, device_id,
interval_info['tasks'],
advertise_events=advertise)
# ONU ALARM Synchronization state machine
self._alarm_db_in_sync = False
alarm_synchronizer_info = support_classes.get('alarm-synchronizer')
advertise = alarm_synchronizer_info['advertise-events']
self._alarm_sync_sm = alarm_synchronizer_info['state-machine'](self._omci_agent,
device_id,
alarm_synchronizer_info['tasks'],
alarm_db,
advertise_events=advertise)
# State machine of downloading image file from server
downloader_info = support_classes.get('image_downloader')
image_upgrader_info = support_classes.get('image_upgrader')
# image_activate_info = support_classes.get('image_activator')
advertise = downloader_info['advertise-event']
# self._img_download_sm = downloader_info['state-machine'](self._omci_agent, device_id,
# downloader_info['tasks'],
# advertise_events=advertise)
self._image_agent = ImageAgent(self._omci_agent, device_id,
downloader_info['state-machine'], downloader_info['tasks'],
image_upgrader_info['state-machine'], image_upgrader_info['tasks'],
# image_activate_info['state-machine'],
advertise_events=advertise, clock=clock)
# self._omci_upgrade_sm = image_upgrader_info['state-machine'](device_id, advertise_events=advertise)
except Exception as e:
self.log.exception('state-machine-create-failed', e=e)
raise
# Put state machines in the order you wish to start them
self._state_machines = []
self._on_start_state_machines = [ # Run when 'start()' called
self._mib_sync_sm,
self._capabilities_sm,
]
self._on_sync_state_machines = [ # Run after first in_sync event
self._alarm_sync_sm,
]
self._on_capabilities_state_machines = [ # Run after first capabilities events
self._pm_intervals_sm
]
self._custom_me_map = custom_me_map
self._me_map = omci_entities.entity_id_to_class_map.copy()
if custom_me_map is not None:
self._me_map.update(custom_me_map)
self.event_bus = EventBusClient()
# Create OMCI communications channel
self._omci_cc = OMCI_CC(core_proxy, adapter_proxy, self.device_id, self._me_map, clock=clock)
@staticmethod
def event_bus_topic(device_id, event):
"""
Get the topic name for a given event for this ONU Device
:param device_id: (str) ONU Device ID
:param event: (OnuDeviceEvents) Type of event
:return: (str) Topic string
"""
assert event in OnuDeviceEvents, \
'Event {} is not an ONU Device Event'.format(event.name)
return 'omci-device:{}:{}'.format(device_id, event.name)
@property
def device_id(self):
return self._device_id
@property
def omci_cc(self):
return self._omci_cc
@property
def core_proxy(self):
return self._core_proxy
@property
def task_runner(self):
return self._runner
@property
def mib_synchronizer(self):
"""
Reference to the OpenOMCI MIB Synchronization state machine for this ONU
"""
return self._mib_sync_sm
@property
def omci_capabilities(self):
"""
Reference to the OpenOMCI OMCI Capabilities state machine for this ONU
"""
return self._capabilities_sm
@property
def pm_intervals_state_machine(self):
"""
Reference to the OpenOMCI PM Intervals state machine for this ONU
"""
return self._pm_intervals_sm
def set_pm_config(self, pm_config):
"""
Set PM interval configuration
:param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
"""
self._pm_intervals_sm.set_pm_config(pm_config)
@property
def timestamp(self):
"""Pollable Metrics last collected timestamp"""
return self._timestamp
@timestamp.setter
def timestamp(self, value):
self._timestamp = value
@property
def alarm_synchronizer(self):
"""
Reference to the OpenOMCI Alarm Synchronization state machine for this ONU
"""
return self._alarm_sync_sm
@property
def active(self):
"""
Is the ONU device currently active/running
"""
return self._started
@property
def custom_me_map(self):
""" Vendor-specific Managed Entity Map for this vendor's device"""
return self._custom_me_map
@property
def me_map(self):
""" Combined ME and Vendor-specific Managed Entity Map for this device"""
return self._me_map
def _cancel_deferred(self):
d, self._deferred = self._deferred, None
try:
if d is not None and not d.called:
d.cancel()
except:
pass
@property
def mib_db_in_sync(self):
return self._mib_db_in_sync
@mib_db_in_sync.setter
def mib_db_in_sync(self, value):
if self._mib_db_in_sync != value:
# Save value
self._mib_db_in_sync = value
# Start up other state machines if needed
if self._first_in_sync:
self.first_in_sync_event()
# Notify any event listeners
topic = OnuDeviceEntry.event_bus_topic(self.device_id,
OnuDeviceEvents.MibDatabaseSyncEvent)
msg = {
IN_SYNC_KEY: self._mib_db_in_sync,
LAST_IN_SYNC_KEY: self.mib_synchronizer.last_mib_db_sync
}
self.event_bus.publish(topic=topic, msg=msg)
@property
def alarm_db_in_sync(self):
return self._alarm_db_in_sync
@alarm_db_in_sync.setter
def alarm_db_in_sync(self, value):
if self._alarm_db_in_sync != value:
# Save value
self._alarm_db_in_sync = value
# Start up other state machines if needed
if self._first_in_sync:
self.first_in_sync_event()
# Notify any event listeners
topic = OnuDeviceEntry.event_bus_topic(self.device_id,
OnuDeviceEvents.AlarmDatabaseSyncEvent)
msg = {
IN_SYNC_KEY: self._alarm_db_in_sync
}
self.event_bus.publish(topic=topic, msg=msg)
@property
def configuration(self):
"""
Get the OMCI Configuration object for this ONU. This is a class that provides some
common database access functions for ONU capabilities and read-only configuration values.
:return: (OnuConfiguration)
"""
return self._configuration
@property
def image_agent(self):
return self._image_agent
# @property
# def image_download(self):
# return self._image_download
def start(self):
"""
Start the ONU Device Entry state machines
"""
self.log.debug('OnuDeviceEntry.start', previous=self._started)
if self._started:
return
self._started = True
self._omci_cc.enabled = True
self._first_in_sync = True
self._first_capabilities = True
self._runner.start()
self._configuration = OnuConfiguration(self._omci_agent, self._device_id)
# Start MIB Sync and other state machines that can run before the first
# MIB Synchronization event occurs. Start 'later' so that any
# ONU Device, OMCI DB, OMCI Agent, and others are fully started before
# performing the start.
self._state_machines = []
def start_state_machines(machines):
for sm in machines:
self._state_machines.append(sm)
sm.start()
self._deferred = reactor.callLater(1, start_state_machines,
self._on_start_state_machines)
# Notify any event listeners
self._publish_device_status_event()
def stop(self):
"""
Stop the ONU Device Entry state machines
"""
if not self._started:
return
self._started = False
self._cancel_deferred()
self._omci_cc.enabled = False
# Halt MIB Sync and other state machines
for sm in self._state_machines:
sm.stop()
self._state_machines = []
# Stop task runner
self._runner.stop()
# Notify any event listeners
self._publish_device_status_event()
def first_in_sync_event(self):
"""
This event is called on the first MIB synchronization event after
OpenOMCI has been started. It is responsible for starting any
other state machine and to initiate an ONU Capabilities report
"""
if self._first_in_sync:
self._first_in_sync = False
# Start up the ONU Capabilities task
self._configuration.reset()
# Insure that the ONU-G Administrative lock is disabled
def failure(reason):
self.log.error('disable-admin-state-lock', reason=reason)
frame = OntGFrame(attributes={'administrative_state': 0}).set()
task = OmciModifyRequest(self._omci_agent, self.device_id, frame)
self.task_runner.queue_task(task).addErrback(failure)
# Start up any other remaining OpenOMCI state machines
def start_state_machines(machines):
for sm in machines:
self._state_machines.append(sm)
reactor.callLater(0, sm.start)
self._deferred = reactor.callLater(0, start_state_machines,
self._on_sync_state_machines)
# if an ongoing upgrading is not accomplished, restart it
if self._img_deferred is not None:
self._image_agent.onu_bootup()
def first_in_capabilities_event(self):
"""
This event is called on the first capabilities event after
OpenOMCI has been started. It is responsible for starting any
other state machine. These are often state machines that have tasks
that are dependent upon knowing if various MEs are supported
"""
if self._first_capabilities:
self._first_capabilities = False
# Start up any other remaining OpenOMCI state machines
def start_state_machines(machines):
for sm in machines:
self._state_machines.append(sm)
reactor.callLater(0, sm.start)
self._deferred = reactor.callLater(0, start_state_machines,
self._on_capabilities_state_machines)
# def __on_omci_download_success(self, image_download):
# self.log.debug("__on_omci_download_success", image=image_download)
# self._omci_upgrade_deferred = None
# # self._ret_deferred = None
# self._omci_activate_deferred = self._image_agent.activate_onu_image(image_download.name)
# self._omci_activate_deferred.addCallbacks(self.__on_omci_image_activate_success,
# self.__on_omci_image_activate_fail, errbackArgs=(image_name,))
# return image_name
# def __on_omci_download_fail(self, fail, image_name):
# self.log.debug("__on_omci_download_fail", failure=fail, image_name=image_name)
# self.reactor.callLater(0, self._img_deferred.errback, fail)
# self._omci_upgrade_deferred = None
# self._img_deferred = None
def __on_omci_image_activate_success(self, image_name):
self.log.debug("__on_omci_image_activate_success", image_name=image_name)
self._omci_activate_deferred = None
self._img_deferred.callback(image_name)
self._img_deferred = None
return image_name
def __on_omci_image_activate_fail(self, fail, image_name):
self.log.debug("__on_omci_image_activate_fail", faile=fail, image_name=image_name)
self._omci_activate_deferred = None
self._img_deferred.errback(fail)
self._img_deferred = None
def _publish_device_status_event(self):
"""
Publish the ONU Device start/start status.
"""
topic = OnuDeviceEntry.event_bus_topic(self.device_id,
OnuDeviceEvents.DeviceStatusEvent)
msg = {ACTIVE_KEY: self._started}
self.event_bus.publish(topic=topic, msg=msg)
def publish_omci_capabilities_event(self):
"""
Publish the ONU Device start/start status.
"""
if self.first_in_capabilities_event:
self.first_in_capabilities_event()
topic = OnuDeviceEntry.event_bus_topic(self.device_id,
OnuDeviceEvents.OmciCapabilitiesEvent)
msg = {
SUPPORTED_MESSAGE_ENTITY_KEY: self.omci_capabilities.supported_managed_entities,
SUPPORTED_MESSAGE_TYPES_KEY: self.omci_capabilities.supported_message_types
}
self.event_bus.publish(topic=topic, msg=msg)
def delete(self):
"""
Stop the ONU Device's state machine and remove the ONU, and any related
OMCI state information from the OpenOMCI Framework
"""
self.stop()
self.mib_synchronizer.delete()
# OpenOMCI cleanup
if self._omci_agent is not None:
self._omci_agent.remove_device(self._device_id, cleanup=True)
def query_mib(self, class_id=None, instance_id=None, attributes=None):
"""
Get MIB database information.
This method can be used to request information from the database to the detailed
level requested
:param class_id: (int) Managed Entity class ID
:param instance_id: (int) Managed Entity instance
:param attributes: (list or str) Managed Entity instance's attributes
:return: (dict) The value(s) requested. If class/inst/attribute is
not found, an empty dictionary is returned
:raises DatabaseStateError: If the database is not enabled
"""
self.log.debug('query', class_id=class_id, instance_id=instance_id,
attributes=attributes)
return self.mib_synchronizer.query_mib(class_id=class_id, instance_id=instance_id,
attributes=attributes)
def query_mib_single_attribute(self, class_id, instance_id, attribute):
"""
Get MIB database information for a single specific attribute
This method can be used to request information from the database to the detailed
level requested
:param class_id: (int) Managed Entity class ID
:param instance_id: (int) Managed Entity instance
:param attribute: (str) Managed Entity instance's attribute
:return: (varies) The value requested. If class/inst/attribute is
not found, None is returned
:raises DatabaseStateError: If the database is not enabled
"""
self.log.debug('query-single', class_id=class_id,
instance_id=instance_id, attributes=attribute)
assert isinstance(attribute, basestring), \
'Only a single attribute value can be retrieved'
entry = self.mib_synchronizer.query_mib(class_id=class_id,
instance_id=instance_id,
attributes=attribute)
return entry[attribute] if attribute in entry else None
def query_alarm_table(self, class_id=None, instance_id=None):
"""
Get Alarm information
This method can be used to request information from the alarm database to
the detailed level requested
:param class_id: (int) Managed Entity class ID
:param instance_id: (int) Managed Entity instance
:return: (dict) The value(s) requested. If class/inst/attribute is
not found, an empty dictionary is returned
:raises DatabaseStateError: If the database is not enabled
"""
self.log.debug('query', class_id=class_id, instance_id=instance_id)
return self.alarm_synchronizer.query_mib(class_id=class_id, instance_id=instance_id)
def reboot(self,
flags=RebootFlags.Reboot_Unconditionally,
timeout=OmciRebootRequest.DEFAULT_REBOOT_TIMEOUT):
"""
Request a reboot of the ONU
:param flags: (RebootFlags) Reboot condition
:param timeout: (int) Reboot task priority
:return: (deferred) Fires upon completion or error
"""
assert self.active, 'This device is not active'
return self.task_runner.queue_task(OmciRebootRequest(self._omci_agent,
self.device_id,
flags=flags,
timeout=timeout))
# def get_imagefile(self, local_name, local_dir, remote_url=None):
# """
# Return a Deferred that will be triggered if the file is locally available
# or downloaded successfully
# """
# self.log.info('start download from {}'.format(remote_url))
# # for debug purpose, start runner here to queue downloading task
# # self._runner.start()
# return self._image_agent.get_image(self._image_download)
def do_onu_software_download(self, image_dnld):
"""
image_dnld: (ImageDownload)
: Return a Deferred that will be triggered when upgrading results in success or failure
"""
self.log.debug('do_onu_software_download')
image_download = deepcopy(image_dnld)
# self._img_download_deferred = self._image_agent.get_image(self._image_download)
# self._img_download_deferred.addCallbacks(self.__on_download_success, self.__on_download_fail, errbackArgs=(self._image_download,))
# self._ret_deferred = defer.Deferred()
# return self._ret_deferred
return self._image_agent.get_image(image_download)
# def do_onu_software_switch(self):
def do_onu_image_activate(self, image_dnld_name):
"""
Return a Deferred that will be triggered when switching software image results in success or failure
"""
if self._img_deferred is None:
self.log.debug('do_onu_image_activate')
self._img_deferred = defer.Deferred()
self._omci_upgrade_deferred = self._image_agent.onu_omci_download(image_dnld_name)
self._omci_upgrade_deferred.addCallbacks(self.__on_omci_image_activate_success,
self.__on_omci_image_activate_fail, errbackArgs=(image_dnld_name,))
return self._img_deferred
def cancel_onu_software_download(self, image_name):
self.log.debug('cancel_onu_software_download')
self._image_agent.cancel_download_image(image_name)
self._image_agent.cancel_upgrade_onu()
if self._img_deferred and not self._img_deferred.called:
self._img_deferred.cancel()
self._img_deferred = None
# self._image_download = None
def get_image_download_status(self, image_name):
return self._image_agent.get_image_status(image_name)