blob: c13739e15feb346e5213aa956eecc35c7112778c [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import structlog
from transitions import Machine
from twisted.internet import reactor
from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY
from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
class OnuOmciCapabilities(object):
"""
OpenOMCI ONU OMCI Capabilities State machine
"""
DEFAULT_STATES = ['disabled', 'out_of_sync', 'in_sync', 'idle']
DEFAULT_TRANSITIONS = [
{'trigger': 'start', 'source': 'disabled', 'dest': 'out_of_sync'},
{'trigger': 'synchronized', 'source': 'out_of_sync', 'dest': 'in_sync'},
{'trigger': 'success', 'source': 'in_sync', 'dest': 'idle'},
{'trigger': 'failure', 'source': 'in_sync', 'dest': 'out_of_sync'},
{'trigger': 'not_synchronized', 'source': 'idle', 'dest': 'out_of_sync'},
# Do wildcard 'stop' trigger last so it covers all previous states
{'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
]
DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
def __init__(self, agent, device_id, tasks,
advertise_events=False,
states=DEFAULT_STATES,
transitions=DEFAULT_TRANSITIONS,
initial_state='disabled',
timeout_delay=DEFAULT_RETRY):
"""
Class initialization
:param agent: (OpenOmciAgent) Agent
:param device_id: (str) ONU Device ID
:param tasks: (dict) Tasks to run
:param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
:param states: (list) List of valid states
:param transitions: (dict) Dictionary of triggers and state changes
:param initial_state: (str) Initial state machine state
:param timeout_delay: (int/float) Number of seconds after a timeout or poll
"""
self.log = structlog.get_logger(device_id=device_id)
self._agent = agent
self._device_id = device_id
self._device = None
self._timeout_delay = timeout_delay
self._get_capabilities_task = tasks['get-capabilities']
self._advertise_events = advertise_events
self._deferred = None
self._current_task = None
self._task_deferred = None
self._supported_entities = frozenset()
self._supported_msg_types = frozenset()
self._subscriptions = { # RxEvent.enum -> Subscription Object
OnuDeviceEvents.MibDatabaseSyncEvent: None
}
self._sub_mapping = {
OnuDeviceEvents.MibDatabaseSyncEvent: self.on_mib_sync_event
}
# Statistics and attributes
# TODO: add any others if it will support problem diagnosis
# Set up state machine to manage states
self.machine = Machine(model=self, states=states,
transitions=transitions,
initial=initial_state,
queued=True,
name='{}-{}'.format(self.__class__.__name__,
device_id))
def _cancel_deferred(self):
d1, self._deferred = self._deferred, None
d2, self._task_deferred = self._task_deferred, None
for d in [d1, d2]:
try:
if d is not None and not d.called:
d.cancel()
except:
pass
def _cancel_tasks(self):
task, self._current_task = self._current_task, None
if task is not None:
task.stop()
def __str__(self):
return 'OnuOmciCapabilities: Device ID: {}, State:{}'.format(self._device_id, self.state)
def delete(self):
"""
Cleanup any state information
"""
self.stop()
@property
def device_id(self):
return self._device_id
@property
def supported_managed_entities(self):
"""
Return a set of the Managed Entity class IDs supported on this ONU
None is returned if no MEs have been discovered
:return: (set of ints)
"""
return self._supported_entities if len(self._supported_entities) else None
@property
def supported_message_types(self):
"""
Return a set of the Message Types supported on this ONU
None is returned if no message types have been discovered
:return: (set of EntityOperations)
"""
return self._supported_msg_types if len(self._supported_msg_types) else None
@property
def advertise_events(self):
return self._advertise_events
@advertise_events.setter
def advertise_events(self, value):
if not isinstance(value, bool):
raise TypeError('Advertise event is a boolean')
self._advertise_events = value
def advertise(self, event, info):
"""Advertise an event on the OpenOMCI event bus"""
from datetime import datetime
if self._advertise_events:
self._agent.advertise(event,
{
'state-machine': self.machine.name,
'info': info,
'time': str(datetime.utcnow())
})
def on_enter_disabled(self):
"""
State machine is being stopped
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._cancel_tasks()
self._supported_entities = frozenset()
self._supported_msg_types = frozenset()
# Drop Response and Autonomous notification subscriptions
for event, sub in self._subscriptions.iteritems():
if sub is not None:
self._subscriptions[event] = None
self._device.event_bus.unsubscribe(sub)
def on_enter_out_of_sync(self):
"""
State machine has just started or the MIB database has transitioned
to an out-of-synchronization state
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._device = self._agent.get_device(self._device_id)
# Subscribe to events of interest
try:
for event, sub in self._sub_mapping.iteritems():
if self._subscriptions[event] is None:
self._subscriptions[event] = \
self._device.event_bus.subscribe(
topic=OnuDeviceEntry.event_bus_topic(self._device_id,
event),
callback=sub)
except Exception as e:
self.log.exception('subscription-setup', e=e)
# Periodically check/poll for in-sync in case subscription was missed or
# already in sync
self._deferred = reactor.callLater(0, self.check_in_sync)
def check_in_sync(self):
if self._device.mib_db_in_sync:
self.synchronized()
else:
self._deferred = reactor.callLater(self._timeout_delay,
self.check_in_sync)
def on_enter_in_sync(self):
"""
State machine has just transitioned to an in-synchronization state
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
def success(results):
self.log.debug('capabilities-success', results=results)
self._supported_entities = self._current_task.supported_managed_entities
self._supported_msg_types = self._current_task.supported_message_types
self._current_task = None
self._deferred = reactor.callLater(0, self.success)
def failure(reason):
self.log.info('capabilities-failure', reason=reason)
self._supported_entities = frozenset()
self._supported_msg_types = frozenset()
self._current_task = None
self._deferred = reactor.callLater(self._timeout_delay, self.failure)
# Schedule a task to read the ONU's OMCI capabilities
self._current_task = self._get_capabilities_task(self._agent, self._device_id)
self._task_deferred = self._device.task_runner.queue_task(self._current_task)
self._task_deferred.addCallbacks(success, failure)
def on_enter_idle(self):
"""
Notify any subscribers for a capabilities event and wait until
stopped or ONU MIB database goes out of sync
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._device.publish_omci_capabilities_event()
def on_mib_sync_event(self, _topic, msg):
"""
Handle In-Sync/Out-of-Sync for the MIB database
:param _topic: (str) Subscription topic
:param msg: (dict) In-Sync event data
"""
if self._subscriptions.get(OnuDeviceEvents.MibDatabaseSyncEvent) is None:
return
if msg[IN_SYNC_KEY]:
self.synchronized()
else:
self.not_synchronized()