blob: d2572570c3fae3351f28a65242fbd8dbd7d2eed1 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import structlog
from datetime import datetime, timedelta
from transitions import Machine
from twisted.internet import reactor
from voltha.extensions.omci.omci_frame import OmciFrame
from voltha.extensions.omci.database.mib_db_api import MDS_KEY
from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
AttributeAccess
from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
RX_RESPONSE_KEY
from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, OnuDeviceEntry, \
SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
from voltha.extensions.omci.omci_entities import OntData
from common.event_bus import EventBusClient
from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
RxEvent = OmciCCRxEvents
DevEvent = OnuDeviceEvents
OP = EntityOperations
RC = ReasonCodes
AA = AttributeAccess
class MibSynchronizer(object):
"""
OpenOMCI MIB Synchronizer state machine
"""
DEFAULT_STATES = ['disabled', 'starting', 'uploading', 'examining_mds',
'in_sync', 'out_of_sync', 'auditing', 'resynchronizing']
DEFAULT_TRANSITIONS = [
{'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
{'trigger': 'upload_mib', 'source': 'starting', 'dest': 'uploading'},
{'trigger': 'examine_mds', 'source': 'starting', 'dest': 'examining_mds'},
{'trigger': 'success', 'source': 'uploading', 'dest': 'in_sync'},
{'trigger': 'success', 'source': 'examining_mds', 'dest': 'in_sync'},
{'trigger': 'mismatch', 'source': 'examining_mds', 'dest': 'resynchronizing'},
{'trigger': 'audit_mib', 'source': 'in_sync', 'dest': 'auditing'},
{'trigger': 'success', 'source': 'out_of_sync', 'dest': 'in_sync'},
{'trigger': 'audit_mib', 'source': 'out_of_sync', 'dest': 'auditing'},
{'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
{'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
{'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
{'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
{'trigger': 'diffs_found', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
# Do wildcard 'timeout' trigger that sends us back to start
{'trigger': 'timeout', 'source': '*', 'dest': 'starting'},
# Do wildcard 'stop' trigger last so it covers all previous states
{'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
]
DEFAULT_TIMEOUT_RETRY = 5 # Seconds to delay after task failure/timeout
DEFAULT_AUDIT_DELAY = 60 # Periodic tick to audit the MIB Data Sync
DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
def __init__(self, agent, device_id, mib_sync_tasks, db,
advertise_events=False,
states=DEFAULT_STATES,
transitions=DEFAULT_TRANSITIONS,
initial_state='disabled',
timeout_delay=DEFAULT_TIMEOUT_RETRY,
audit_delay=DEFAULT_AUDIT_DELAY,
resync_delay=DEFAULT_RESYNC_DELAY):
"""
Class initialization
:param agent: (OpenOmciAgent) Agent
:param device_id: (str) ONU Device ID
:param db: (MibDbVolatileDict) MIB Database
:param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
:param mib_sync_tasks: (dict) Tasks to run
:param states: (list) List of valid states
:param transitions: (dict) Dictionary of triggers and state changes
:param initial_state: (str) Initial state machine state
:param timeout_delay: (int/float) Number of seconds after a timeout to attempt
a retry (goes back to starting state)
:param audit_delay: (int) Seconds between MIB audits while in sync. Set to
zero to disable audit. An operator can request
an audit manually by calling 'self.audit_mib'
:param resync_delay: (int) Seconds in sync before performing a forced MIB
resynchronization
"""
self.log = structlog.get_logger(device_id=device_id)
self._agent = agent
self._device_id = device_id
self._device = None
self._database = db
self._timeout_delay = timeout_delay
self._audit_delay = audit_delay
self._resync_delay = resync_delay
self._upload_task = mib_sync_tasks['mib-upload']
self._get_mds_task = mib_sync_tasks['get-mds']
self._audit_task = mib_sync_tasks['mib-audit']
self._resync_task = mib_sync_tasks['mib-resync']
self._reconcile_task = mib_sync_tasks['mib-reconcile']
self._advertise_events = advertise_events
self._deferred = None
self._current_task = None # TODO: Support multiple running tasks after v.2.0 release
self._task_deferred = None
self._mib_data_sync = 0
self._last_mib_db_sync_value = None
self._device_in_db = False
self._next_resync = None
self._on_olt_only_diffs = None
self._on_onu_only_diffs = None
self._attr_diffs = None
self._audited_olt_db = None
self._audited_onu_db = None
self._event_bus = EventBusClient()
self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
RxEvent.MIB_Reset: None,
RxEvent.AVC_Notification: None,
RxEvent.MIB_Upload: None,
RxEvent.MIB_Upload_Next: None,
RxEvent.Create: None,
RxEvent.Delete: None,
RxEvent.Set: None,
}
self._omci_cc_sub_mapping = {
RxEvent.MIB_Reset: self.on_mib_reset_response,
RxEvent.AVC_Notification: self.on_avc_notification,
RxEvent.MIB_Upload: self.on_mib_upload_response,
RxEvent.MIB_Upload_Next: self.on_mib_upload_next_response,
RxEvent.Create: self.on_create_response,
RxEvent.Delete: self.on_delete_response,
RxEvent.Set: self.on_set_response,
}
self._onu_dev_subscriptions = { # DevEvent.enum -> Subscription Object
DevEvent.OmciCapabilitiesEvent: None
}
self._onu_dev_sub_mapping = {
DevEvent.OmciCapabilitiesEvent: self.on_capabilities_event
}
# Statistics and attributes
# TODO: add any others if it will support problem diagnosis
# Set up state machine to manage states
self.machine = Machine(model=self, states=states,
transitions=transitions,
initial=initial_state,
queued=True,
name='{}-{}'.format(self.__class__.__name__,
device_id))
try:
import logging
logging.getLogger('transitions').setLevel(logging.WARNING)
except Exception as e:
self.log.exception('log-level-failed', e=e)
def _cancel_deferred(self):
d1, self._deferred = self._deferred, None
d2, self._task_deferred = self._task_deferred, None
for d in [d1, d1]:
try:
if d is not None and not d.called:
d.cancel()
except:
pass
def __str__(self):
return 'MIBSynchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
def delete(self):
"""
Cleanup any state information
"""
self.stop()
db, self._database = self._database, None
if db is not None:
db.remove(self._device_id)
@property
def device_id(self):
return self._device_id
@property
def mib_data_sync(self):
return self._mib_data_sync
def increment_mib_data_sync(self):
self._mib_data_sync += 1
if self._mib_data_sync > 255:
self._mib_data_sync = 0
if self._database is not None:
self._database.save_mib_data_sync(self._device_id,
self._mib_data_sync)
@property
def last_mib_db_sync(self):
return self._last_mib_db_sync_value
@last_mib_db_sync.setter
def last_mib_db_sync(self, value):
self._last_mib_db_sync_value = value
if self._database is not None:
self._database.save_last_sync(self.device_id, value)
@property
def is_new_onu(self):
"""
Is this a new ONU (has never completed MIB synchronization)
:return: (bool) True if this ONU should be considered new
"""
return self.last_mib_db_sync is None
@property
def advertise_events(self):
return self._advertise_events
@advertise_events.setter
def advertise_events(self, value):
if not isinstance(value, bool):
raise TypeError('Advertise event is a boolean')
self._advertise_events = value
def advertise(self, event, info):
"""Advertise an event on the OpenOMCI event bus"""
if self._advertise_events:
self._agent.advertise(event,
{
'state-machine': self.machine.name,
'info': info,
'time': str(datetime.utcnow())
})
def on_enter_disabled(self):
"""
State machine is being stopped
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
if self._device is not None:
self._device.mib_db_in_sync = False
task, self._current_task = self._current_task, None
if task is not None:
task.stop()
# Drop Response and Autonomous notification subscriptions
for event, sub in self._omci_cc_subscriptions.iteritems():
if sub is not None:
self._omci_cc_subscriptions[event] = None
self._device.omci_cc.event_bus.unsubscribe(sub)
for event, sub in self._onu_dev_subscriptions.iteritems():
if sub is not None:
self._onu_dev_subscriptions[event] = None
self._device.event_bus.unsubscribe(sub)
# TODO: Stop and remove any currently running or scheduled tasks
# TODO: Anything else?
def _seed_database(self):
if not self._device_in_db:
try:
try:
self._database.start()
self._database.add(self._device_id)
self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
except KeyError:
# Device already is in database
self.log.debug('seed-db-exist', device_id=self._device_id)
self._mib_data_sync = self._database.get_mib_data_sync(self._device_id)
self._last_mib_db_sync_value = self._database.get_last_sync(self._device_id)
self._device_in_db = True
except Exception as e:
self.log.exception('seed-database-failure', e=e)
def on_enter_starting(self):
"""
Determine ONU status and start/re-start MIB Synchronization tasks
"""
self._device = self._agent.get_device(self._device_id)
self.advertise(OpenOmciEventType.state_change, self.state)
# Make sure root of external MIB Database exists
self._seed_database()
# Set up Response and Autonomous notification subscriptions
try:
for event, sub in self._omci_cc_sub_mapping.iteritems():
if self._omci_cc_subscriptions[event] is None:
self._omci_cc_subscriptions[event] = \
self._device.omci_cc.event_bus.subscribe(
topic=OMCI_CC.event_bus_topic(self._device_id, event),
callback=sub)
except Exception as e:
self.log.exception('omci-cc-subscription-setup', e=e)
# Set up ONU device subscriptions
try:
for event, sub in self._onu_dev_sub_mapping.iteritems():
if self._onu_dev_subscriptions[event] is None:
self._onu_dev_subscriptions[event] = \
self._device.event_bus.subscribe(
topic=OnuDeviceEntry.event_bus_topic(self._device_id, event),
callback=sub)
except Exception as e:
self.log.exception('dev-subscription-setup', e=e)
# Clear any previous audit results
self._on_olt_only_diffs = None
self._on_onu_only_diffs = None
self._attr_diffs = None
self._audited_olt_db = None
self._audited_onu_db = None
# Determine if this ONU has ever synchronized
if self.is_new_onu:
# Start full MIB upload
self._deferred = reactor.callLater(0, self.upload_mib)
else:
# Examine the MIB Data Sync
self._deferred = reactor.callLater(0, self.examine_mds)
def on_enter_uploading(self):
"""
Begin full MIB data upload, starting with a MIB RESET
"""
self.advertise(OpenOmciEventType.state_change, self.state)
def success(results):
self.log.debug('mib-upload-success', results=results)
self._current_task = None
self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
def failure(reason):
self.log.info('mib-upload-failure', reason=reason)
self._current_task = None
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
self._device.mib_db_in_sync = False
self._current_task = self._upload_task(self._agent, self._device_id)
self._task_deferred = self._device.task_runner.queue_task(self._current_task)
self._task_deferred.addCallbacks(success, failure)
def on_enter_examining_mds(self):
"""
Create a simple task to fetch the MIB Data Sync value and
determine if the ONU value matches what is in the MIB database
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self._mib_data_sync = self._database.get_mib_data_sync(self._device_id) or 0
def success(onu_mds_value):
self.log.debug('examine-mds-success', onu_mds_value=onu_mds_value, olt_mds_value=self.mib_data_sync)
self._current_task = None
# Examine MDS value
if self.mib_data_sync == onu_mds_value:
self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
else:
self._deferred = reactor.callLater(0, self.mismatch)
def failure(reason):
self.log.info('examine-mds-failure', reason=reason)
self._current_task = None
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
self._device.mib_db_in_sync = False
self._current_task = self._get_mds_task(self._agent, self._device_id)
self._task_deferred = self._device.task_runner.queue_task(self._current_task)
self._task_deferred.addCallbacks(success, failure)
def on_enter_in_sync(self):
"""
The OLT/OpenOMCI MIB Database is in sync with the ONU MIB Database.
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self.last_mib_db_sync = datetime.utcnow()
self._device.mib_db_in_sync = True
if self._audit_delay > 0:
self._deferred = reactor.callLater(self._audit_delay, self.audit_mib)
def on_enter_out_of_sync(self):
"""
The MIB in OpenOMCI and the ONU are out of sync. This can happen if:
o the MIB_Data_Sync values are not equal, or
o the MIBs were compared and differences were found.
Schedule a task to reconcile the differences
"""
self.advertise(OpenOmciEventType.state_change, self.state)
# We are only out-of-sync if there were differences. If here due to MDS
# value differences, still run the reconcile so we up date the ONU's MDS
# value to match ours.
self._device.mib_db_in_sync = self._attr_diffs is None and \
self._on_onu_only_diffs is None and \
self._on_olt_only_diffs is None
def success(onu_mds_value):
self.log.debug('reconcile-success', mds_value=onu_mds_value)
self._current_task = None
self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
def failure(reason):
self.log.info('reconcile-failure', reason=reason)
self._current_task = None
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
diff_collection = {
'onu-only': self._on_onu_only_diffs,
'olt-only': self._on_olt_only_diffs,
'attributes': self._attr_diffs,
'olt-db': self._audited_olt_db,
'onu-db': self._audited_onu_db
}
# Clear out results since reconciliation task will be handling them
self._on_olt_only_diffs = None
self._on_onu_only_diffs = None
self._attr_diffs = None
self._audited_olt_db = None
self._audited_onu_db = None
self._current_task = self._reconcile_task(self._agent, self._device_id, diff_collection)
self._task_deferred = self._device.task_runner.queue_task(self._current_task)
self._task_deferred.addCallbacks(success, failure)
def on_enter_auditing(self):
"""
Perform a MIB Audit. If our last MIB resync was too long in the
past, perform a resynchronization anyway
"""
self.advertise(OpenOmciEventType.state_change, self.state)
if self._next_resync is None:
self.log.error('next-forced-resync-error', msg='Next Resync should always be valid at this point')
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
if datetime.utcnow() >= self._next_resync:
self._deferred = reactor.callLater(0, self.force_resync)
else:
def success(onu_mds_value):
self.log.debug('audit-success', onu_mds_value=onu_mds_value, olt_mds_value=self.mib_data_sync)
self._current_task = None
# Examine MDS value
if self.mib_data_sync == onu_mds_value:
self._deferred = reactor.callLater(0, self.success)
else:
self._device.mib_db_in_sync = False
self._deferred = reactor.callLater(0, self.mismatch)
def failure(reason):
self.log.info('audit-failure', reason=reason)
self._current_task = None
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
self._current_task = self._audit_task(self._agent, self._device_id)
self._task_deferred = self._device.task_runner.queue_task(self._current_task)
self._task_deferred.addCallbacks(success, failure)
def on_enter_resynchronizing(self):
"""
Perform a resynchronization of the MIB database
First calculate any differences
"""
self.advertise(OpenOmciEventType.state_change, self.state)
def success(results):
self.log.debug('resync-success', results=results)
on_olt_only = results.get('on-olt-only')
on_onu_only = results.get('on-onu-only')
attr_diffs = results.get('attr-diffs')
olt_db = results.get('olt-db')
onu_db = results.get('onu-db')
self._current_task = None
self._on_olt_only_diffs = on_olt_only if on_olt_only and len(on_olt_only) else None
self._on_onu_only_diffs = on_onu_only if on_onu_only and len(on_onu_only) else None
self._attr_diffs = attr_diffs if attr_diffs and len(attr_diffs) else None
self._audited_olt_db = olt_db
self._audited_onu_db = onu_db
mds_equal = self.mib_data_sync == self._audited_onu_db[MDS_KEY]
if mds_equal and all(diff is None for diff in [self._on_olt_only_diffs,
self._on_onu_only_diffs,
self._attr_diffs]):
self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
else:
self._deferred = reactor.callLater(0, self.diffs_found)
def failure(reason):
self.log.info('resync-failure', reason=reason)
self._current_task = None
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
self._current_task = self._resync_task(self._agent, self._device_id)
self._task_deferred = self._device.task_runner.queue_task(self._current_task)
self._task_deferred.addCallbacks(success, failure)
def on_mib_reset_response(self, _topic, msg):
"""
Called upon receipt of a MIB Reset Response for this ONU
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-mib-reset-response', state=self.state)
try:
response = msg[RX_RESPONSE_KEY]
# Check if expected in current mib_sync state
if self.state != 'uploading' or self._omci_cc_subscriptions[RxEvent.MIB_Reset] is None:
self.log.error('rx-in-invalid-state', state=self.state)
else:
now = datetime.utcnow()
if not isinstance(response, OmciFrame):
raise TypeError('Response should be an OmciFrame')
omci_msg = response.fields['omci_message'].fields
status = omci_msg['success_code']
assert status == RC.Success, 'Unexpected MIB reset response status: {}'. \
format(status)
self._device.mib_db_in_sync = False
self._mib_data_sync = 0
self._device._modified = now
self._database.on_mib_reset(self._device_id)
except KeyError:
pass # NOP
def on_avc_notification(self, _topic, msg):
"""
Process an Attribute Value Change Notification
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-avc-notification', state=self.state)
if self._omci_cc_subscriptions[RxEvent.AVC_Notification]:
try:
notification = msg[RX_RESPONSE_KEY]
if self.state == 'disabled':
self.log.error('rx-in-invalid-state', state=self.state)
# Inspect the notification
omci_msg = notification.fields['omci_message'].fields
class_id = omci_msg['entity_class']
instance_id = omci_msg['entity_id']
data = omci_msg['data']
attributes = [data.keys()]
# Look up ME Instance in Database. Not-found can occur if a MIB
# reset has occurred
info = self._database.query(self.device_id, class_id, instance_id, attributes)
# TODO: Add old/new info to log message
self.log.debug('avc-change', class_id=class_id, instance_id=instance_id)
# Save the changed data to the MIB.
self._database.set(self.device_id, class_id, instance_id, data)
# Autonomous creation and deletion of managed entities do not
# result in an increment of the MIB data sync value. However,
# AVC's in response to a change by the Operator do incur an
# increment of the MIB Data Sync. If here during uploading,
# we issued a MIB-Reset which may generate AVC. (TODO: Focus testing during hardening)
if self.state == 'uploading':
self.increment_mib_data_sync()
except KeyError:
pass # NOP
def on_mib_upload_response(self, _topic, msg):
"""
Process a MIB Upload response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-mib-upload-next-response', state=self.state)
if self._omci_cc_subscriptions[RxEvent.MIB_Upload]:
# Check if expected in current mib_sync state
if self.state == 'resynchronizing':
# The resync task handles this
# TODO: Remove this subscription if we never do anything with the response
return
if self.state != 'uploading':
self.log.error('rx-in-invalid-state', state=self.state)
def on_mib_upload_next_response(self, _topic, msg):
"""
Process a MIB Upload Next response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-mib-upload-next-response', state=self.state)
if self._omci_cc_subscriptions[RxEvent.MIB_Upload_Next]:
try:
if self.state == 'resynchronizing':
# The resync task handles this
return
# Check if expected in current mib_sync state
if self.state != 'uploading':
self.log.error('rx-in-invalid-state', state=self.state)
else:
response = msg[RX_RESPONSE_KEY]
# Extract entity instance information
omci_msg = response.fields['omci_message'].fields
class_id = omci_msg['object_entity_class']
entity_id = omci_msg['object_entity_id']
# Filter out the 'mib_data_sync' from the database. We save that at
# the device level and do not want it showing up during a re-sync
# during data compares
if class_id == OntData.class_id:
return
attributes = {k: v for k, v in omci_msg['object_data'].items()}
# Save to the database
self._database.set(self._device_id, class_id, entity_id, attributes)
except KeyError:
pass # NOP
except Exception as e:
self.log.exception('upload-next', e=e)
def on_create_response(self, _topic, msg):
"""
Process a Set response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-create-response', state=self.state)
if self._omci_cc_subscriptions[RxEvent.Create]:
if self.state in ['disabled', 'uploading']:
self.log.error('rx-in-invalid-state', state=self.state)
return
try:
request = msg[TX_REQUEST_KEY]
response = msg[RX_RESPONSE_KEY]
status = response.fields['omci_message'].fields['success_code']
if status != RC.Success and status != RC.InstanceExists:
# TODO: Support offline ONTs in post VOLTHA v1.3.0
omci_msg = response.fields['omci_message']
self.log.warn('set-response-failure',
class_id=omci_msg.fields['entity_class'],
instance_id=omci_msg.fields['entity_id'],
status=omci_msg.fields['success_code'],
status_text=self._status_to_text(omci_msg.fields['success_code']),
parameter_error_attributes_mask=omci_msg.fields['parameter_error_attributes_mask'])
else:
omci_msg = request.fields['omci_message'].fields
class_id = omci_msg['entity_class']
entity_id = omci_msg['entity_id']
attributes = {k: v for k, v in omci_msg['data'].items()}
# Save to the database
created = self._database.set(self._device_id, class_id, entity_id, attributes)
if created:
self.increment_mib_data_sync()
# If the ME contains set-by-create or writeable values that were
# not specified in the create command, the ONU will have
# initialized those fields
if class_id in self._device.me_map:
sbc_w_set = {attr.field.name for attr in self._device.me_map[class_id].attributes
if (AA.SBC in attr.access or AA.W in attr.access)
and attr.field.name != 'managed_entity_id'}
missing = sbc_w_set - {k for k in attributes.iterkeys()}
if len(missing):
# Request the missing attributes
self.update_sbc_w_items(class_id, entity_id, missing)
except KeyError as e:
pass # NOP
except Exception as e:
self.log.exception('create', e=e)
def update_sbc_w_items(self, class_id, entity_id, missing_attributes):
"""
Perform a get-request for Set-By-Create (SBC) or writable (w) attributes
that were not specified in the original Create request.
:param class_id: (int) Class ID
:param entity_id: (int) Instance ID
:param missing_attributes: (set) Missing SBC or Writable attribute
"""
if len(missing_attributes) and class_id in self._device.me_map:
from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
self.log.info('update-sbc-items', class_id=class_id, entity_id=entity_id,
attributes=missing_attributes)
def success(results):
self._database.set(self._device_id, class_id, entity_id, results.attributes)
def failure(reason):
self.log.warn('update-sbc-w-failed', reason=reason, class_id=class_id,
entity_id=entity_id, attributes=missing_attributes)
d = self._device.task_runner.queue_task(OmciGetRequest(self._agent, self._device_id,
self._device.me_map[class_id],
entity_id, missing_attributes,
allow_failure=True))
d.addCallbacks(success, failure)
def on_delete_response(self, _topic, msg):
"""
Process a Delete response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-delete-response', state=self.state)
if self._omci_cc_subscriptions[RxEvent.Delete]:
if self.state in ['disabled', 'uploading']:
self.log.error('rx-in-invalid-state', state=self.state)
return
try:
request = msg[TX_REQUEST_KEY]
response = msg[RX_RESPONSE_KEY]
if response.fields['omci_message'].fields['success_code'] != RC.Success:
# TODO: Support offline ONTs in post VOLTHA v1.3.0
omci_msg = response.fields['omci_message']
self.log.warn('set-response-failure',
class_id=omci_msg.fields['entity_class'],
instance_id=omci_msg.fields['entity_id'],
status=omci_msg.fields['success_code'],
status_text=self._status_to_text(omci_msg.fields['success_code']))
else:
omci_msg = request.fields['omci_message'].fields
class_id = omci_msg['entity_class']
entity_id = omci_msg['entity_id']
# Remove from the database
deleted = self._database.delete(self._device_id, class_id, entity_id)
if deleted:
self.increment_mib_data_sync()
except KeyError as e:
pass # NOP
except Exception as e:
self.log.exception('delete', e=e)
def on_set_response(self, _topic, msg):
"""
Process a Set response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
"""
self.log.debug('on-set-response', state=self.state)
if self._omci_cc_subscriptions[RxEvent.Set]:
if self.state in ['disabled', 'uploading']:
self.log.error('rx-in-invalid-state', state=self.state)
try:
request = msg[TX_REQUEST_KEY]
response = msg[RX_RESPONSE_KEY]
if response.fields['omci_message'].fields['success_code'] != RC.Success:
# TODO: Support offline ONTs in post VOLTHA v1.3.0
omci_msg = response.fields['omci_message']
self.log.warn('set-response-failure',
class_id=omci_msg.fields['entity_class'],
instance_id=omci_msg.fields['entity_id'],
status=omci_msg.fields['success_code'],
status_text=self._status_to_text(omci_msg.fields['success_code']),
unsupported_attribute_mask=omci_msg.fields['unsupported_attributes_mask'],
failed_attribute_mask=omci_msg.fields['failed_attributes_mask'])
else:
omci_msg = request.fields['omci_message'].fields
class_id = omci_msg['entity_class']
entity_id = omci_msg['entity_id']
attributes = {k: v for k, v in omci_msg['data'].items()}
# Save to the database (Do not save 'sets' of the mib-data-sync however)
if class_id != OntData.class_id:
modified = self._database.set(self._device_id, class_id, entity_id, attributes)
if modified:
self.increment_mib_data_sync()
except KeyError as _e:
pass # NOP
except Exception as e:
self.log.exception('set', e=e)
# TODO: Future -> Monitor Software download start, section, activate, and commit responses
# and increment MIB Data Sync per Table 11.2.2-1 of ITUT-T G.988 (11/2017)
# on page 515. Eventually also monitor set-table responses once the
# extended message set is supported.
def on_capabilities_event(self, _topic, msg):
"""
Process a OMCI capabilties event
:param _topic: (str) OnuDeviceEntry Capabilities event
:param msg: (dict) Message Entities & Message Types supported
"""
self._database.update_supported_managed_entities(self.device_id,
msg[SUPPORTED_MESSAGE_ENTITY_KEY])
self._database.update_supported_message_types(self.device_id,
msg[SUPPORTED_MESSAGE_TYPES_KEY])
def _status_to_text(self, success_code):
return {
RC.Success: "Success",
RC.ProcessingError: "Processing Error",
RC.NotSupported: "Not Supported",
RC.ParameterError: "Paremeter Error",
RC.UnknownEntity: "Unknown Entity",
RC.UnknownInstance: "Unknown Instance",
RC.DeviceBusy: "Device Busy",
RC.InstanceExists: "Instance Exists"
}.get(success_code, 'Unknown status code: {}'.format(success_code))
def query_mib(self, class_id=None, instance_id=None, attributes=None):
"""
Get MIB database information.
This method can be used to request information from the database to the detailed
level requested
:param class_id: (int) Managed Entity class ID
:param instance_id: (int) Managed Entity instance
:param attributes: (list or str) Managed Entity instance's attributes
:return: (dict) The value(s) requested. If class/inst/attribute is
not found, an empty dictionary is returned
:raises DatabaseStateError: If the database is not enabled or does not exist
"""
from voltha.extensions.omci.database.mib_db_api import DatabaseStateError
self.log.debug('query', class_id=class_id,
instance_id=instance_id, attributes=attributes)
if self._database is None:
raise DatabaseStateError('Database does not yet exist')
return self._database.query(self._device_id, class_id=class_id,
instance_id=instance_id,
attributes=attributes)
def mib_set(self, class_id, entity_id, attributes):
"""
Set attributes of an existing ME Class instance
This method is primarily used by other state machines to save ME specific
information to the persistent database. Access by objects external to the
OpenOMCI library is discouraged.
:param class_id: (int) ME Class ID
:param entity_id: (int) ME Class entity ID
:param attributes: (dict) attribute -> value pairs to set
"""
# It must exist first (but attributes can be new)
if isinstance(attributes, dict) and len(attributes) and\
self.query_mib(class_id, entity_id) is not None:
self._database.set(self._device_id, class_id, entity_id, attributes)
def mib_delete(self, class_id, entity_id):
"""
Delete an existing ME Class instance
This method is primarily used by other state machines to delete an ME
from the MIB database
:param class_id: (int) ME Class ID
:param entity_id: (int) ME Class entity ID
:raises KeyError: If device does not exist
:raises DatabaseStateError: If the database is not enabled
"""
self._database.delete(self._device_id, class_id, entity_id)