VOL-1274: Third attempt at Alarm State Machine simplification
Change-Id: Ie50b0871db63c6783d6a05e6afd9f945611eb143
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 086e1d7..8ddfe87 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -135,9 +135,9 @@
self._on_start_state_machines = [ # Run when 'start()' called
self._mib_sync_sm,
self._capabilities_sm,
- self._alarm_sync_sm,
]
self._on_sync_state_machines = [ # Run after first in_sync event
+ self._alarm_sync_sm,
]
self._on_capabilities_state_machines = [ # Run after first capabilities events
self._pm_intervals_sm
@@ -488,6 +488,24 @@
return entry[attribute] if attribute in entry else None
+ def query_alarm_table(self, class_id=None, instance_id=None):
+ """
+ Get Alarm information
+
+ This method can be used to request information from the alarm database to
+ the detailed level requested
+
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query', class_id=class_id, instance_id=instance_id)
+
+ return self.alarm_synchronizer.query_mib(class_id=class_id, instance_id=instance_id)
+
def reboot(self,
flags=RebootFlags.Reboot_Unconditionally,
timeout=OmciRebootRequest.DEFAULT_REBOOT_TIMEOUT):
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 8c90b31..9e2cdde 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -22,9 +22,7 @@
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
from voltha.extensions.omci.tasks.mib_reconcile_task import MibReconcileTask
from voltha.extensions.omci.tasks.sync_time_task import SyncTimeTask
-from voltha.extensions.omci.state_machines.Alarm_sync import AlarmSynchronizer
-from voltha.extensions.omci.tasks.alarm_sync_data import AlarmSyncDataTask
-from voltha.extensions.omci.tasks.alarm_check_task import AlarmDataTask
+from voltha.extensions.omci.state_machines.alarm_sync import AlarmSynchronizer
from voltha.extensions.omci.tasks.alarm_resync_task import AlarmResyncTask
from voltha.extensions.omci.database.alarm_db_ext import AlarmDbExternal
from voltha.extensions.omci.tasks.interval_data_task import IntervalDataTask
@@ -73,10 +71,7 @@
'database': AlarmDbExternal, # For any State storage needs
'advertise-events': True, # Advertise events on OpenOMCI event bus
'tasks': {
- 'alarm-sync': AlarmSyncDataTask,
- 'alarm-check': AlarmDataTask,
- 'alarm-resync': AlarmResyncTask,
- 'alarm-audit': AlarmDataTask
+ 'alarm-resync': AlarmResyncTask
}
},
'image_downloader': {
diff --git a/voltha/extensions/omci/state_machines/Alarm_sync.py b/voltha/extensions/omci/state_machines/Alarm_sync.py
deleted file mode 100644
index 15c06d0..0000000
--- a/voltha/extensions/omci/state_machines/Alarm_sync.py
+++ /dev/null
@@ -1,538 +0,0 @@
-#
-# Copyright 2017 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import structlog
-from datetime import datetime, timedelta
-from transitions import Machine
-from twisted.internet import reactor
-from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
- AttributeAccess
-from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
- RX_RESPONSE_KEY
-from voltha.extensions.omci.omci_entities import OntData
-import voltha.extensions.omci.omci_entities as omci_entities
-from common.event_bus import EventBusClient
-from voltha.protos.omci_alarm_db_pb2 import AlarmOpenOmciEventType
-
-RxEvent = OmciCCRxEvents
-RC = ReasonCodes
-
-
-class AlarmSynchronizer(object):
- """
- OpenOMCI Alarm Synchronizer state machine
- """
- DEFAULT_STATES = ['disabled', 'starting', 'updating', 'syncing_alarm',
- 'in_sync', 'out_of_sync', 'auditing', 'resynchronizing']
-
- DEFAULT_TRANSITIONS = [
- {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
-
- {'trigger': 'update_alarm', 'source': 'starting', 'dest': 'updating'},
- {'trigger': 'sync_alarm', 'source': 'starting', 'dest': 'syncing_alarm'},
-
- {'trigger': 'success', 'source': 'updating', 'dest': 'in_sync'},
- {'trigger': 'timeout', 'source': 'updating', 'dest': 'starting'},
-
- {'trigger': 'success', 'source': 'syncing_alarm', 'dest': 'in_sync'},
- {'trigger': 'timeout', 'source': 'syncing_alarm', 'dest': 'starting'},
- {'trigger': 'mismatch', 'source': 'syncing_alarm', 'dest': 'updating'},
-
- {'trigger': 'audit_alarm', 'source': 'in_sync', 'dest': 'auditing'},
- {'trigger': 'audit_alarm', 'source': 'out_of_sync', 'dest': 'auditing'},
-
- {'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
- {'trigger': 'timeout', 'source': 'auditing', 'dest': 'starting'},
- {'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
- {'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
-
- {'trigger': 'timeout', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
- {'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
-
- # Do wildcard 'stop' trigger last so it covers all previous states
- {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
- ]
- DEFAULT_TIMEOUT_RETRY = 60 # Seconds to delay after task failure/timeout
- DEFAULT_AUDIT_DELAY = 15 # Periodic tick to audit the MIB Data Sync
- DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
-
- def __init__(self, agent, device_id, alarm_sync_tasks, db,
- advertise_events=False,
- states=DEFAULT_STATES,
- transitions=DEFAULT_TRANSITIONS,
- initial_state='disabled',
- timeout_delay=DEFAULT_TIMEOUT_RETRY,
- audit_delay=DEFAULT_AUDIT_DELAY,
- resync_delay=DEFAULT_RESYNC_DELAY):
- """
- Class initialization
-
- :param agent: (OpenOmciAgent) Agent
- :param device_id: (str) ONU Device ID
- :param db: (MibDbApi) MIB/Alarm Database
- :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
- :param alarm_sync_tasks: (dict) Tasks to run
- :param states: (list) List of valid states
- :param transitions: (dict) Dictionary of triggers and state changes
- :param initial_state: (str) Initial state machine state
- :param timeout_delay: (int/float) Number of seconds after a timeout to attempt
- a retry (goes back to starting state)
- :param audit_delay: (int) Seconds between Alarm audits while in sync. Set to
- zero to disable audit. An operator can request
- an audit manually by calling 'self.audit_alarm'
- :param resync_delay: (int) Seconds in sync before performing a forced Alarm
- resynchronization
- """
- self.log = structlog.get_logger(device_id=device_id)
-
- self._agent = agent
- self._device_id = device_id
- self._device = None
- self._database = db
- self._timeout_delay = timeout_delay
- self._audit_delay = audit_delay
- self._resync_delay = resync_delay
-
- self._update_task = alarm_sync_tasks['alarm-sync']
- self._check_task = alarm_sync_tasks['alarm-check']
- self._resync_task = alarm_sync_tasks['alarm-resync']
- self._audit_task = alarm_sync_tasks['alarm-audit']
- self._advertise_events = advertise_events
-
- self._deferred = None
- self._current_task = None # TODO: Support multiple running tasks after v.1.3.0 release
- self._task_deferred = None
- self._last_alarm_sync_time = None
- self._last_alarm_sequence_value = None
- self._device_in_db = False
- self._alarm_class_id = None
- self._alarm_entity_id = None
- self._commands_retrieved = None
- self._alarm_table = None
-
- self._event_bus = EventBusClient()
- self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
- RxEvent.Get_ALARM_Get: None,
- RxEvent.Get_ALARM_Get_Next: None
- }
- self._omci_cc_sub_mapping = {
- RxEvent.Get_ALARM_Get: self.on_alarm_update_response,
- RxEvent.Get_ALARM_Get_Next: self.on_alarm_update_next_response
- }
-
- # Statistics and attributes
- # TODO: add any others if it will support problem diagnosis
-
- # Set up state machine to manage states
- self.machine = Machine(model=self, states=states,
- transitions=transitions,
- initial=initial_state,
- queued=True,
- name='{}-{}'.format(self.__class__.__name__,
- device_id))
-
- def _cancel_deferred(self):
- d1, self._deferred = self._deferred, None
- d2, self._task_deferred = self._task_deferred, None
-
- for d in [d1, d1]:
- try:
- if d is not None and not d.called:
- d.cancel()
- except:
- pass
-
- def __str__(self):
- return 'Alarm Synchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
-
- def delete(self):
- """
- Cleanup any state information
- """
- self.stop()
- db, self._database = self._database, None
-
- if db is not None:
- db.remove(self._device_id)
-
- @property
- def device_id(self):
- return self._device_id
-
- @property
- def last_alarm_sequence(self):
- return self._last_alarm_sequence_value
-
- @last_alarm_sequence.setter
- def last_alarm_sequence(self, value):
- if self._last_alarm_sequence_value != value:
- self._last_alarm_sequence_value = value
- if self._database is not None:
- self._database.save_alarm_last_sync(self.device_id, value)
-
- @property
- def last_alarm_sync_time(self):
- return self._last_alarm_sync_time
-
- @last_alarm_sync_time.setter
- def last_alarm_sync_time(self, value):
- self._last_alarm_sync_time = value
- if self._database is not None:
- self._database.save_last_sync_time(self.device_id, value)
-
- @property
- def is_updated_alarm(self):
- """
- Is this a new ONU (has never completed Alarm synchronization)
- :return: (bool) True if this ONU should be considered new
- """
- return self.last_alarm_sequence is None
-
- @property
- def advertise_events(self):
- return self._advertise_events
-
- @advertise_events.setter
- def advertise_events(self, value):
- if not isinstance(value, bool):
- raise TypeError('Advertise event is a boolean')
- self._advertise_events = value
-
- def advertise(self, event, info):
- """Advertise an event on the OpenOMCI event bus"""
- if self._advertise_events:
- self._agent.advertise(event,
- {
- 'state-machine': self.machine.name,
- 'info': info,
- 'time': str(datetime.utcnow())
- })
-
- def on_enter_disabled(self):
- """
- State machine is being stopped
- """
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- self._cancel_deferred()
-
- task, self._current_task = self._current_task, None
- if task is not None:
- task.stop()
-
- # Drop Response and Autonomous notification subscriptions
- for event, sub in self._omci_cc_subscriptions.iteritems():
- if sub is not None:
- self._omci_cc_subscriptions[event] = None
- self._device.omci_cc.event_bus.unsubscribe(sub)
-
- # TODO: Stop and remove any currently running or scheduled tasks
- # TODO: Anything else?
-
- def _seed_database(self):
- if not self._device_in_db:
- try:
- try:
- self._database.start()
- self._database.add(self._device_id)
- self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
-
- except KeyError:
- # Device already is in database
- self.log.debug('seed-db-exist', device_id=self._device_id)
- self._last_alarm_sequence_value = \
- self._database.get_alarm_last_sync(self._device_id)
-
- self._device_in_db = True
-
- except Exception as e:
- self.log.exception('seed-database-failure', e=e)
-
- def on_enter_starting(self):
- """
- Determine ONU status and start Alarm Synchronization tasks
- """
- self._device = self._agent.get_device(self._device_id)
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- # Make sure root of external Alarm Database exists
- self._seed_database()
-
- # Set up Response and Autonomous notification subscriptions
- try:
- for event, sub in self._omci_cc_sub_mapping.iteritems():
- if self._omci_cc_subscriptions[event] is None:
- self._omci_cc_subscriptions[event] = \
- self._device.omci_cc.event_bus.subscribe(
- topic=OMCI_CC.event_bus_topic(self._device_id, event),
- callback=sub)
-
- except Exception as e:
- self.log.exception('omci-cc-subscription-setup', e=e)
-
- except Exception as e:
- self.log.exception('dev-subscription-setup', e=e)
-
- # Determine if this ONU has ever synchronized
- if self.is_updated_alarm:
- self._deferred = reactor.callLater(0, self.update_alarm)
- else:
- self._deferred = reactor.callLater(0, self.sync_alarm)
-
- def on_enter_updating(self):
- """
- Begin full Alarm data sync, starting with a Alarm RESET
- """
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- def success(results):
- self.log.debug('alarm-update-success', results='the sequence_number is {}'.
- format(results))
- self._current_task = None
- # The new ONU is up
- self._deferred = reactor.callLater(0, self.success)
-
- def failure(reason):
- self.log.info('alarm-update-failure', reason=reason)
- self._current_task = None
- self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
-
- self._current_task = self._update_task(self._agent, self._device_id)
-
- self._task_deferred = self._device.task_runner.queue_task(self._current_task)
- self._task_deferred.addCallbacks(success, failure)
-
- def on_enter_syncing_alarm(self):
- """
- Create a simple task to fetch the Alarm value
- """
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- def success(sequence):
- self.log.debug('sync-alarm-success', sequence_value=sequence)
- self._current_task = None
-
- # Examine Alarm value
- if self.last_alarm_sequence == sequence:
- self._deferred = reactor.callLater(0, self.success)
- else:
- self._deferred = reactor.callLater(0, self.mismatch)
-
- def failure(reason):
- self.log.info('sync-alarm-failure', reason=reason)
- self._current_task = None
- self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
-
- self._current_task = self._check_task(self._agent, self._device_id)
-
- self._task_deferred = self._device.task_runner.queue_task(self._current_task)
- self._task_deferred.addCallbacks(success, failure)
-
- def on_enter_in_sync(self):
- """
- Schedule a tick to occur to in the future to request an audit
- """
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- if not self._device.alarm_db_in_sync:
- self.last_alarm_sync_time = datetime.utcnow()
- self._device.alarm_db_in_sync = True
-
- if self._audit_delay > 0:
- self._deferred = reactor.callLater(self._audit_delay, self.audit_alarm)
-
- def on_enter_out_of_sync(self):
- """
- The Alarm re-synchronization state does not match alarm status currently.
- The condition would happen the following as below:
- 1. The alarm sequence is not equal the last alarm status.
- 2. The ONU alarm does not happen right now.
-
- Condition 1: Something happen on the alarm table does not match the sequence.
- Opening display of message to examine the alarm table.
- Condition 2: In this state, why happen this situation?
- Has ONU recover the alarm table in this meanwhile?
-
- Schedule a tick to occur to in the future to request an audit
- """
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
- self._device.alarm_db_in_sync = False
-
- step = 'Nothing'
- class_id = 0
- entity_id = 0
- attribute = self._alarm_table
-
- try:
- if self._commands_retrieved is not self.last_alarm_sequence :
- # The alarm sequence does not match last saving value. Has happen Alarm?
- step = 'alarm-table'
- for sequence in xrange(self._commands_retrieved):
- self.log.info(step, class_id=self._alarm_class_id[sequence],
- entity_id=self._alarm_entity_id[sequence],
- alarm_sequence=self._alarm_table[sequence])
- pass
- elif self._commands_retrieved is None:
- # The alarm sequence does not get alarm at present.
- # TODO: need to update the database here ?
- step = 'None_of_alarm'
- self._alarm_table = None
- pass
-
- self._deferred = reactor.callLater(1, self.audit_alarm)
-
- except Exception as e:
- self.log.exception('alarm-out-of-update', e=e, step=step, class_id=class_id,
- entity_id=entity_id, attribute=attribute)
- # Retry the Audit process
- self._deferred = reactor.callLater(1, self.audit_alarm)
-
- def on_enter_auditing(self):
- """
- Perform a Alarm Audit. If our last Alarm resync was too long in the
- past, perform a resynchronization anyway
- """
- next_resync = self.last_alarm_sync_time + timedelta(seconds=self._resync_delay)\
- if self.last_alarm_sync_time is not None else datetime.utcnow()
-
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- if datetime.utcnow() >= next_resync:
- self._deferred = reactor.callLater(0, self.force_resync)
- else:
- def success(sequence):
- self.log.debug('get-alarm-success', alarm_sequence=sequence)
- self._current_task = None
-
- # Examine alarm sequence value
- if self.last_alarm_sequence == sequence:
- self._deferred = reactor.callLater(0, self.success)
- else:
- self._device.alarm_db_in_sync = False
- self._deferred = reactor.callLater(0, self.mismatch)
-
- def failure(reason):
- self.log.info('get-alarm-failure', reason=reason)
- self._current_task = None
- self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
-
- self._current_task = self._audit_task(self._agent, self._device_id)
- self._task_deferred = self._device.task_runner.queue_task(self._current_task)
- self._task_deferred.addCallbacks(success, failure)
-
- def on_enter_resynchronizing(self):
- """
- Perform a resynchronization of the Alarm database
-
- First calculate any differences
- """
- self.advertise(AlarmOpenOmciEventType.state_change, self.state)
-
- def success(results):
- self.log.debug('resync-success', results=results)
-
- self._alarm_class_id = results.get('alarm_class_id')
- self._alarm_entity_id = results.get('alarm_entity_id')
- self._commands_retrieved = results.get('commands_retrieved')
- self._alarm_table = results.get('alarm_table')
-
- if self._commands_retrieved is not None and all(self._alarm_table):
- self._deferred = reactor.callLater(0, self.success)
- else:
- self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
-
- def failure(reason):
- self.log.info('resync-failure', reason=reason)
- self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
-
- self._current_task = self._resync_task(self._agent, self._device_id)
- self._task_deferred = self._device.task_runner.queue_task(self._current_task)
- self._task_deferred.addCallbacks(success, failure)
-
- def on_alarm_update_next_response(self, _topic, msg):
- """
- Process a Get All Alarm Next response
-
- :param _topic: (str) OMCI-RX topic
- :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
- """
- self.log.debug('on-alarm-upload-next-response', state=self.state)
-
- if self._omci_cc_subscriptions[RxEvent.Get_ALARM_Get_Next]:
- try:
- # Check if expected in current alarm_sync state
- if self.state == 'disabled':
- self.log.error('rx-in-invalid-state', state=self.state)
-
- else:
- response = msg[RX_RESPONSE_KEY]
-
- # Extract entity instance information
- omci_msg = response.fields['omci_message'].fields
-
- class_id = omci_msg['entity_class']
- entity_id = omci_msg['entity_id']
- alarm_entity_class = omci_msg['alarmed_entity_class']
- alarm_entity_id = omci_msg['alarmed_entity_id']
- alarm_bit_map = omci_msg['alarm_bit_map']
-
- self.log.info('set-response-failure',
- class_id=class_id, entity_id=entity_id,
- alarm_entity_class=alarm_entity_class,
- alarm_entity_id=alarm_entity_id,
- alarm_bit_map=alarm_bit_map)
-
- if class_id == OntData.class_id:
- return
-
- # Save to the database
- self._database.set(self._device_id, class_id, entity_id, alarm_bit_map)
-
- except KeyError:
- pass # NOP
- except Exception as e:
- self.log.exception('upload-next', e=e)
-
- def on_alarm_update_response(self, _topic, msg):
- """
- Process a Get All Alarms response
-
- :param _topic: (str) OMCI-RX topic
- :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
- """
- self.log.debug('on-alarm-update-response', state=self.state)
-
- if self._omci_cc_subscriptions[RxEvent.Get_ALARM_Get]:
- if self.state == 'disabled':
- self.log.error('rx-in-invalid-state', state=self.state)
- return
- try:
- response = msg[RX_RESPONSE_KEY]
- omci_msg = response.fields['omci_message'].fields
- class_id = omci_msg['entity_class']
- entity_id = omci_msg['entity_id']
- number_of_commands = omci_msg['number_of_commands']
-
- # ONU will reset its last alarm sequence number to 0 on receipt of the
- # Get All Alarms request
- self.last_alarm_sequence = 0
-
- self.log.info('received alarm response',
- class_id=class_id,
- instance_id=entity_id,
- number_of_commands=number_of_commands)
-
- except Exception as e:
- self.log.exception('upload-alarm-failure', e=e)
diff --git a/voltha/extensions/omci/state_machines/alarm_sync.py b/voltha/extensions/omci/state_machines/alarm_sync.py
new file mode 100644
index 0000000..ea30b74
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/alarm_sync.py
@@ -0,0 +1,408 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from datetime import datetime
+from transitions import Machine
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, RX_RESPONSE_KEY
+from voltha.extensions.omci.omci_messages import OmciGetAllAlarmsResponse
+from voltha.extensions.omci.omci_frame import OmciFrame
+from common.event_bus import EventBusClient
+from voltha.protos.omci_alarm_db_pb2 import AlarmOpenOmciEventType
+
+RxEvent = OmciCCRxEvents
+RC = ReasonCodes
+
+
+class AlarmSynchronizer(object):
+ """
+ OpenOMCI Alarm Synchronizer state machine
+ """
+ DEFAULT_STATES = ['disabled', 'starting', 'auditing', 'in_sync']
+
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
+
+ {'trigger': 'audit_alarm', 'source': 'starting', 'dest': 'auditing'},
+ {'trigger': 'sync_alarm', 'source': 'starting', 'dest': 'in_sync'},
+
+ {'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
+ {'trigger': 'audit_alarm', 'source': 'auditing', 'dest': 'auditing'},
+ {'trigger': 'failure', 'source': 'auditing', 'dest': 'auditing'},
+
+ {'trigger': 'audit_alarm', 'source': 'in_sync', 'dest': 'auditing'},
+
+ # Do wildcard 'stop' trigger last so it covers all previous states
+ {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+ ]
+ DEFAULT_TIMEOUT_RETRY = 15 # Seconds to delay after task failure/timeout
+ DEFAULT_AUDIT_DELAY = 0 # 300 # Periodic tick to audit the ONU's alarm table
+
+ def __init__(self, agent, device_id, alarm_sync_tasks, db,
+ advertise_events=False,
+ states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_TIMEOUT_RETRY,
+ audit_delay=DEFAULT_AUDIT_DELAY):
+ """
+ Class initialization
+
+ :param agent: (OpenOmciAgent) Agent
+ :param device_id: (str) ONU Device ID
+ :param db: (MibDbApi) MIB/Alarm Database
+ :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
+ :param alarm_sync_tasks: (dict) Tasks to run
+ :param states: (list) List of valid states
+ :param transitions: (dict) Dictionary of triggers and state changes
+ :param initial_state: (str) Initial state machine state
+ :param timeout_delay: (int/float) Number of seconds after a timeout to attempt
+ a retry (goes back to starting state)
+ :param audit_delay: (int) Seconds between Alarm audits while in sync. Set to
+ zero to disable audit. An operator can request
+ an audit manually by calling 'self.audit_alarm'
+ """
+
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._agent = agent
+ self._device_id = device_id
+ self._device = None
+ self._database = db
+ self._timeout_delay = timeout_delay
+ self._audit_delay = audit_delay
+ self._resync_task = alarm_sync_tasks['alarm-resync']
+ self._advertise_events = advertise_events
+
+ self._deferred = None
+ self._current_task = None
+ self._task_deferred = None
+ self._last_alarm_sequence_value = None
+ self._device_in_db = False
+ # self._alarm_bit_map_notification = dict()
+ # self._alarm_sequence_number_notification = dict()
+
+ self._event_bus = EventBusClient()
+ self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
+ RxEvent.Get_ALARM_Get: None,
+ RxEvent.Alarm_Notification: None
+ }
+ self._omci_cc_sub_mapping = {
+ RxEvent.Get_ALARM_Get: self.on_alarm_update_response,
+ RxEvent.Alarm_Notification: self.on_alarm_notification
+ }
+
+ # Statistics and attributes
+ # TODO: add any others if it will support problem diagnosis
+
+ # Set up state machine to manage states
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}-{}'.format(self.__class__.__name__,
+ device_id))
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+
+ for d in [d1, d1]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def __str__(self):
+ return 'Alarm Synchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
+
+ def delete(self):
+ """
+ Cleanup any state information
+ """
+ self.stop()
+ db, self._database = self._database, None
+
+ if db is not None:
+ db.remove(self._device_id)
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def last_alarm_sequence(self):
+ return self._last_alarm_sequence_value
+
+ def reset_alarm_sequence(self):
+ if self._last_alarm_sequence_value != 0:
+ self._last_alarm_sequence_value = 0
+
+ def increment_alarm_sequence(self):
+ self._last_alarm_sequence_value += 1
+ if self._last_alarm_sequence_value > 255:
+ self._last_alarm_sequence_value = 1
+
+ @property
+ def advertise_events(self):
+ return self._advertise_events
+
+ @advertise_events.setter
+ def advertise_events(self, value):
+ if not isinstance(value, bool):
+ raise TypeError('Advertise event is a boolean')
+ self._advertise_events = value
+
+ def advertise(self, event, info):
+ """Advertise an event on the OpenOMCI event bus"""
+ if self._advertise_events:
+ self._agent.advertise(event,
+ {
+ 'state-machine': self.machine.name,
+ 'info': info,
+ 'time': str(datetime.utcnow())
+ })
+
+ def on_enter_disabled(self):
+ """
+ State machine is being stopped
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ self._cancel_deferred()
+
+ task, self._current_task = self._current_task, None
+ if task is not None:
+ task.stop()
+
+ # Drop Response and Autonomous notification subscriptions
+ for event, sub in self._omci_cc_subscriptions.iteritems():
+ if sub is not None:
+ self._omci_cc_subscriptions[event] = None
+ self._device.omci_cc.event_bus.unsubscribe(sub)
+
+ def _seed_database(self):
+ if not self._device_in_db:
+ try:
+ try:
+ self._database.start()
+ self._database.add(self._device_id)
+ self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
+
+ except KeyError:
+ # Device already is in database
+ self.log.debug('seed-db-exist', device_id=self._device_id)
+
+ self._device_in_db = True
+
+ except Exception as e:
+ self.log.exception('seed-database-failure', e=e)
+
+ def on_enter_starting(self):
+ """
+ Determine ONU status and start Alarm Synchronization tasks
+ """
+ self._device = self._agent.get_device(self._device_id)
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ # Make sure root of external Alarm Database exists
+ self._seed_database()
+
+ # Set up Response and Autonomous notification subscriptions
+ try:
+ for event, sub in self._omci_cc_sub_mapping.iteritems():
+ if self._omci_cc_subscriptions[event] is None:
+ self._omci_cc_subscriptions[event] = \
+ self._device.omci_cc.event_bus.subscribe(
+ topic=OMCI_CC.event_bus_topic(self._device_id, event),
+ callback=sub)
+
+ except Exception as e:
+ self.log.exception('omci-cc-subscription-setup', e=e)
+
+ # Schedule first audit if enabled
+ if self._audit_delay > 0:
+ # Note using the shorter timeout delay here since this is the first
+ # audit after startup
+ self._deferred = reactor.callLater(self._timeout_delay, self.audit_alarm)
+ else:
+ self._deferred = reactor.callLater(0, self.sync_alarm)
+
+ def on_enter_in_sync(self):
+ """
+ Schedule a tick to occur to in the future to request an audit
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ if self._audit_delay > 0:
+ # Note using the shorter timeout delay here since this is the first
+ # audit after startup
+ self._deferred = reactor.callLater(self._audit_delay, self.audit_alarm)
+
+ def on_enter_auditing(self):
+ """
+ Begin full Alarm data sync, Comparing the all alarms
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ def success(results):
+ self.log.debug('alarm-diff-success')
+ self._current_task = None
+
+ # Any differences found between ONU and OpenOMCI Alarm tables?
+ if results is None:
+ self._device.alarm_db_in_sync = True
+ self._deferred = reactor.callLater(0, self.success)
+ else:
+ # Reconcile the alarm table and re-run audit
+ self.reconcile_alarm_table(results)
+ self._deferred = reactor.callLater(5, self.audit_alarm)
+
+ def failure(reason):
+ self.log.info('alarm-update-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+ self._current_task = self._resync_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def reconcile_alarm_table(self, results):
+ self.log.info('alarm-reconcile', state=self.state, results=results)
+
+ onu_only = results['onu-only']
+ olt_only = results['olt-only']
+ attr_diffs = results['attr-diffs']
+
+ # Compare the differences. During upload, if there are no alarms at all,
+ # then the ONU alarm table retrieved may be empty (instead of MEs with all
+ # bits cleared) depending upon the ONU's OMCI Stack.
+
+ if onu_only is not None:
+ pass
+ # ONU only alarms will typically occur when doing the first audit as our
+ # database is clear and we are seeding the alarm table. Save the entries
+ # and if any are set, we need to raise that alarm.
+ #
+ # self._database.set(self._device_id, class_id, entity_id, alarm_bit_map)
+
+ if olt_only is not None:
+ pass
+
+ if attr_diffs is not None:
+ pass
+
+ def on_alarm_update_response(self, _topic, msg):
+ """
+ Process a Get All Alarms response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.info('on-alarm-update-response', state=self.state, msg=msg)
+
+ if self._omci_cc_subscriptions[RxEvent.Get_ALARM_Get]:
+ if self.state == 'disabled':
+ self.log.error('rx-in-invalid-state', state=self.state)
+ return
+
+ try:
+ response = msg.get(RX_RESPONSE_KEY)
+
+ if isinstance(response, OmciFrame) and \
+ isinstance(response.fields.get('omci_message'), OmciGetAllAlarmsResponse):
+ # ONU will reset its last alarm sequence number to 0 on receipt of the
+ # Get All Alarms request
+ self.log.info('received alarm response')
+ self.reset_alarm_sequence()
+
+ except Exception as e:
+ self.log.exception('upload-alarm-failure', e=e)
+
+ def on_alarm_notification(self, _topic, msg):
+ """
+ Process an alarm Notification
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with keys:
+ TX_REQUEST_KEY -> None (this is an autonomous msg)
+ RX_RESPONSE_KEY -> OmciMessage (Alarm notification frame)
+ """
+ self.log.info('on-alarm-update-response', state=self.state, msg=msg)
+
+ alarm_msg = msg.get(RX_RESPONSE_KEY)
+ if alarm_msg is not None:
+ # TODO: Process alarm
+ # decode message, note that the seq number should never
+ # be zero.
+
+ # increment alarm number & compare to alarm # in message
+ self.increment_alarm_sequence()
+
+ # Signal early audit if no match and audits are enabled
+ # if self.last_alarm_sequence != msg_seq_no and self._audit_delay > 0:
+ # self._deferred = reactor.callLater(0, self.audit_alarm)
+
+ # update alarm table/db (compare current db with alarm msg)
+ # notify ONU Device Handler, ...
+ pass
+ # Note that right now we do not alarm anyone or save it to the database, so
+ # if we can create (or clear) an alarm on the ONU, then the audit logic
+ # should detect the difference. So we can test the audit that way.
+
+ def raise_alarm(self, class_id, entity_id, alarm_number):
+ """
+ Raise an alarm on the ONU
+
+ :param class_id: (int) Class ID of the Alarm ME
+ :param entity_id: (int) Entity ID of the Alarm
+ :param alarm_number: (int) Alarm number (bit) that is alarmed
+ """
+ pass # TODO: Implement this
+
+ def clear_alarm(self, class_id, entity_id, alarm_number):
+ """
+ Lower/clear an alarm on the ONU
+
+ :param class_id: (int) Class ID of the Alarm ME
+ :param entity_id: (int) Entity ID of the Alarm
+ :param alarm_number: (int) Alarm number (bit) that is alarmed
+ """
+ pass # TODO: Implement this
+
+ def query_mib(self, class_id=None, instance_id=None):
+ """
+ Get Alarm database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises DatabaseStateError: If the database is not enabled or does not exist
+ """
+ from voltha.extensions.omci.database.mib_db_api import DatabaseStateError
+
+ self.log.debug('query', class_id=class_id, instance_id=instance_id)
+ if self._database is None:
+ raise DatabaseStateError('Database does not yet exist')
+
+ return self._database.query(self._device_id, class_id=class_id, instance_id=instance_id)
diff --git a/voltha/extensions/omci/tasks/alarm_check_task.py b/voltha/extensions/omci/tasks/alarm_check_task.py
deleted file mode 100644
index 505f0f8..0000000
--- a/voltha/extensions/omci/tasks/alarm_check_task.py
+++ /dev/null
@@ -1,115 +0,0 @@
-#
-# Copyright 2018 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from task import Task
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
-from voltha.extensions.omci.omci_defs import ReasonCodes
-
-
-class AlarmDataTaskFailure(Exception):
- pass
-
-
-class AlarmDataTask(Task):
- """
- OpenOMCI Alarm Data Get Request
- """
- task_priority = Task.DEFAULT_PRIORITY
- name = "Alarm Data Task"
- max_payload = 29
-
- def __init__(self, omci_agent, device_id):
- """
- Class initialization
-
- :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
- :param device_id: (str) ONU Device ID
- :param class_id: (int) ME Class ID
- :param entity_id: (int) ME entity ID
- """
- super(AlarmDataTask, self).__init__(AlarmDataTask.name,
- omci_agent,
- device_id,
- priority=AlarmDataTask.task_priority,
- exclusive=False)
- self._local_deferred = None
- self._last_number_of_commands = None
-
- def cancel_deferred(self):
- super(AlarmDataTask, self).cancel_deferred()
-
- d, self._local_deferred = self._local_deferred, None
- try:
- if d is not None and not d.called:
- d.cancel()
- except:
- pass
-
- def start(self):
- """
- Start the tasks
- """
- super(AlarmDataTask, self).start()
- self._local_deferred = reactor.callLater(0, self.check_alarm_data)
-
- def stop(self):
- """
- Shutdown the tasks
- """
- self.log.debug('stopping')
-
- self.cancel_deferred()
- super(AlarmDataTask, self).stop()
-
- @inlineCallbacks
- def check_alarm_data(self):
- """
- Sync the current alarm sequence number
- """
- self.log.info('perform-get-interval')
-
- try:
- device = self.omci_agent.get_device(self.device_id)
-
- results = yield device.omci_cc.send_get_all_alarm()
- omci_msg = results.fields['omci_message'].fields
- status = omci_msg['success_code']
- alarm_sequence_number = omci_msg['number_of_commands']
- self.log.debug('alarm-data', alarm_sequence_number=alarm_sequence_number)
-
- if status != ReasonCodes.Success:
- raise AlarmDataTaskFailure('Unexpected Response Status: {}'.
- format(status))
-
- if self._last_number_of_commands is None:
- self._last_number_of_commands = alarm_sequence_number
-
- elif alarm_sequence_number != self._last_number_of_commands:
- msg = 'The last number of sequence does not match {} to {}' \
- .format(self._last_number_of_commands, alarm_sequence_number)
- self.log.info('interval-roll-over', msg=msg)
- raise AlarmDataTaskFailure(msg)
-
- # Successful if here
- self.deferred.callback(alarm_sequence_number)
-
- except TimeoutError as e:
- self.log.warn('alarm_retrieval_mode', e=e)
- self.deferred.errback(failure.Failure(e))
-
- except Exception as e:
- self.log.exception('alarm-get-failure', e=e)
- self.deferred.errback(failure.Failure(e))
diff --git a/voltha/extensions/omci/tasks/alarm_resync_task.py b/voltha/extensions/omci/tasks/alarm_resync_task.py
index 2278296..826bf00 100644
--- a/voltha/extensions/omci/tasks/alarm_resync_task.py
+++ b/voltha/extensions/omci/tasks/alarm_resync_task.py
@@ -45,7 +45,7 @@
The ONU can still source AVC's and the the OLT can still send config
commands to the actual.
"""
- task_priority = 240
+ task_priority = Task.DEFAULT_PRIORITY
name = "ALARM Resynchronization Task"
max_retries = 3
@@ -53,7 +53,6 @@
max_alarm_upload_next_retries = 3
alarm_upload_next_delay = 10 # Max * delay < 60 seconds
- watchdog_timeout = 15 # Should be > any retry delay
def __init__(self, omci_agent, device_id):
"""
@@ -66,8 +65,7 @@
omci_agent,
device_id,
priority=AlarmResyncTask.task_priority,
- exclusive=False,
- watchdog_timeout=AlarmResyncTask.watchdog_timeout)
+ exclusive=False)
self._local_deferred = None
self._device = omci_agent.get_device(device_id)
self._db_active = MibDbVolatileDict(omci_agent)
@@ -121,22 +119,52 @@
try:
self.strobe_watchdog()
- command_sequence_number = yield self.snapshot_alarm()
+ results = yield self.snapshot_alarm()
+ olt_db_copy = results[0]
+ number_of_commands = results[1]
- # Start the ALARM upload sequence, save alarms to the table
- self.strobe_watchdog()
- commands_retrieved, alarm_table = yield self.upload_alarm(command_sequence_number)
-
- if commands_retrieved < command_sequence_number:
- e = AlarmDownloadException('Only retrieved {} of {} instances'.
- format(commands_retrieved, command_sequence_number))
+ if olt_db_copy is None:
+ e = AlarmCopyException('Failed to get local database copy')
self.deferred.errback(failure.Failure(e))
+ else:
+ # Start the ALARM upload sequence, save alarms to the table
- self.deferred.callback(
- {
- 'commands_retrieved': commands_retrieved,
- 'alarm_table': alarm_table
- })
+ self.strobe_watchdog()
+ if number_of_commands > 0:
+ commands_retrieved = yield self.upload_alarm(number_of_commands)
+ else:
+ commands_retrieved = 0
+
+ if commands_retrieved != number_of_commands:
+ e = AlarmDownloadException('Only retrieved {} of {} instances'.
+ format(commands_retrieved, number_of_commands))
+ self.deferred.errback(failure.Failure(e))
+ else:
+ # Compare the databases
+ onu_db_copy = self._db_active.query(self.device_id)
+
+ on_olt_only, on_onu_only, attr_diffs = \
+ self.compare_mibs(olt_db_copy, onu_db_copy)
+
+ on_olt_only = on_olt_only if len(on_olt_only) else None
+ on_onu_only = on_onu_only if len(on_onu_only) else None
+ attr_diffs = attr_diffs if len(attr_diffs) else None
+
+ on_olt_only_diffs = on_olt_only if on_olt_only and len(on_olt_only) else None
+ on_onu_only_diffs = on_onu_only if on_onu_only and len(on_onu_only) else None
+ attr_diffs = attr_diffs if attr_diffs and len(attr_diffs) else None
+
+ if all(diff is None for diff in [on_olt_only_diffs, on_onu_only_diffs, attr_diffs]):
+ results = None
+ else:
+ results = {
+ 'onu-only': on_onu_only_diffs,
+ 'olt-only': on_olt_only_diffs,
+ 'attr-diffs': attr_diffs,
+ 'onu-db': onu_db_copy,
+ 'olt-db': olt_db_copy
+ }
+ self.deferred.callback(results)
except Exception as e:
self.log.exception('resync', e=e)
@@ -149,6 +177,7 @@
:return: (pair) (command_sequence_number)
"""
+ olt_db_copy = None
command_sequence_number = None
try:
@@ -162,6 +191,7 @@
if command_sequence_number is None:
if retries >= max_tries:
+ olt_db_copy = None
break
except TimeoutError as e:
@@ -173,17 +203,22 @@
yield asleep(AlarmResyncTask.retry_delay)
continue
+ # Get a snapshot of the local MIB database
+ olt_db_copy = self._device.query_alarm_table()
+ # if we made it this far, no need to keep trying
+ break
+
except Exception as e:
self.log.exception('alarm-resync', e=e)
raise
# Handle initial failures
- if command_sequence_number is None:
+ if olt_db_copy is None or command_sequence_number is None:
raise AlarmCopyException('Failed to snapshot ALARM copy after {} retries'.
format(AlarmResyncTask.max_retries))
- returnValue(command_sequence_number)
+ returnValue((olt_db_copy, command_sequence_number))
@inlineCallbacks
def send_alarm_upload(self):
@@ -195,12 +230,11 @@
########################################
# Begin ALARM Upload
try:
- self.strobe_watchdog()
results = yield self._device.omci_cc.send_get_all_alarm()
-
+ self.strobe_watchdog()
command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
- if command_sequence_number is None or command_sequence_number <= 0:
+ if command_sequence_number < 0:
raise ValueError('Number of commands was {}'.format(command_sequence_number))
returnValue(command_sequence_number)
@@ -217,31 +251,36 @@
for seq_no in xrange(command_sequence_number):
max_tries = AlarmResyncTask.max_alarm_upload_next_retries
- alarm_class_id = {}
- alarm_entity_id = {}
- attributes = {}
for retries in xrange(0, max_tries):
try:
+ response = yield self._device.omci_cc.send_get_all_alarm_next(seq_no)
self.strobe_watchdog()
- response = yield self._device.omci_cc.get_all_alarm_next(seq_no)
omci_msg = response.fields['omci_message'].fields
- alarm_class_id[seq_no] = omci_msg['alarmed_entity_class']
- alarm_entity_id[seq_no] = omci_msg['alarmed_entity_id']
+ alarm_class_id = omci_msg['alarmed_entity_class']
+ alarm_entity_id = omci_msg['alarmed_entity_id']
# Filter out the 'alarm_data_sync' from the database. We save that at
# the device level and do not want it showing up during a re-sync
# during data comparison
- if alarm_class_id[seq_no] == OntData.class_id:
+ if alarm_class_id == OntData.class_id:
break
- attributes[seq_no] = omci_msg['alarm_bit_map']
+ bit_map = omci_msg['alarm_bit_map'].encode('hex')
+ bit_map_hex = "{0:b}".format(int(bit_map, 16))
+ alarm_bit_map = eval(bit_map_hex)
+
+ # alarm bit map space is 28 bytes * 8 = 224 bits
+ if len(bit_map_hex) != 224:
+ continue
+
+ attributes = alarm_bit_map
# Save to the database
- self._db_active.set(self.device_id, alarm_class_id[seq_no],
- alarm_entity_id[seq_no], attributes[seq_no])
+ self._db_active.set(self.device_id, alarm_class_id,
+ alarm_entity_id, attributes)
break
except TimeoutError:
@@ -249,8 +288,8 @@
command_sequence_number=command_sequence_number)
if retries < max_tries - 1:
- self.strobe_watchdog()
yield asleep(AlarmResyncTask.alarm_upload_next_delay)
+ self.strobe_watchdog()
else:
raise
@@ -258,6 +297,110 @@
self.log.exception('resync', e=e, seq_no=seq_no,
command_sequence_number=command_sequence_number)
- self.strobe_watchdog()
- returnValue((seq_no + 1, alarm_class_id, alarm_entity_id, attributes)) # seq_no is zero based and alarm table.
+ returnValue(seq_no + 1) # seq_no is zero based and alarm table.
+ def compare_mibs(self, db_copy, db_active):
+ """
+ Compare the our db_copy with the ONU's active copy
+
+ :param db_copy: (dict) OpenOMCI's copy of the database
+ :param db_active: (dict) ONU's database snapshot
+ :return: (dict), (dict), dict() Differences
+ """
+ self.strobe_watchdog()
+
+ # Class & Entities only in local copy (OpenOMCI)
+ on_olt_only = self.get_lsh_only_dict(db_copy, db_active)
+
+ # Class & Entities only on remote (ONU)
+ on_onu_only = self.get_lsh_only_dict(db_active, db_copy)
+
+ # Class & Entities on both local & remote, but one or more attributes
+ # are different on the ONU. This is the value that the local (OpenOMCI)
+ # thinks should be on the remote (ONU)
+
+ me_map = self.omci_agent.get_device(self.device_id).me_map
+ attr_diffs = self.get_attribute_diffs(db_copy, db_active, me_map)
+
+ return on_olt_only, on_onu_only, attr_diffs
+
+ def get_lsh_only_dict(self, lhs, rhs):
+ """
+ Compare two MIB database dictionaries and return the ME Class ID and
+ instances that are unique to the lhs dictionary. Both parameters
+ should be in the common MIB Database output dictionary format that
+ is returned by the mib 'query' command.
+
+ :param lhs: (dict) Left-hand-side argument.
+ :param rhs: (dict) Right-hand-side argument
+
+ return: (list(int,int)) List of tuples where (class_id, inst_id)
+ """
+ results = list()
+
+ for cls_id, cls_data in lhs.items():
+ # Get unique classes
+ #
+ # Skip keys that are not class IDs
+ if not isinstance(cls_id, int):
+ continue
+
+ if cls_id not in rhs:
+ results.extend([(cls_id, inst_id) for inst_id in cls_data.keys()
+ if isinstance(inst_id, int)])
+ else:
+ # Get unique instances of a class
+ lhs_cls = cls_data
+ rhs_cls = rhs[cls_id]
+
+ for inst_id, _ in lhs_cls.items():
+ # Skip keys that are not instance IDs
+ if isinstance(cls_id, int) and inst_id not in rhs_cls:
+ results.extend([(cls_id, inst_id)])
+
+ return results
+
+ def get_attribute_diffs(self, omci_copy, onu_copy, me_map):
+ """
+ Compare two OMCI MIBs and return the ME class and instance IDs that exists
+ on both the local copy and the remote ONU that have different attribute
+ values. Both parameters should be in the common MIB Database output
+ dictionary format that is returned by the mib 'query' command.
+
+ :param omci_copy: (dict) OpenOMCI copy (OLT-side) of the MIB Database
+ :param onu_copy: (dict) active ONU latest copy its database
+ :param me_map: (dict) ME Class ID MAP for this ONU
+
+ return: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute)
+ points to the specific ME instance where attributes
+ are different
+ """
+ results = list()
+
+ # Get class ID's that are in both
+ class_ids = {cls_id for cls_id, _ in omci_copy.items()
+ if isinstance(cls_id, int) and cls_id in onu_copy}
+
+ for cls_id in class_ids:
+ # Get unique instances of a class
+ olt_cls = omci_copy[cls_id]
+ onu_cls = onu_copy[cls_id]
+
+ # Get set of common instance IDs
+ inst_ids = {inst_id for inst_id, _ in olt_cls.items()
+ if isinstance(inst_id, int) and inst_id in onu_cls}
+
+ for inst_id in inst_ids:
+ omci_attributes = {k for k in olt_cls[inst_id][ATTRIBUTES_KEY].iterkeys()}
+ onu_attributes = {k for k in onu_cls[inst_id][ATTRIBUTES_KEY].iterkeys()}
+
+ # Get attributes that exist in one database, but not the other
+ sym_diffs = (omci_attributes ^ onu_attributes)
+ results.extend([(cls_id, inst_id, attr) for attr in sym_diffs])
+
+ # Get common attributes with different values
+ common_attributes = (omci_attributes & onu_attributes)
+ results.extend([(cls_id, inst_id, attr) for attr in common_attributes
+ if olt_cls[inst_id][ATTRIBUTES_KEY][attr] !=
+ onu_cls[inst_id][ATTRIBUTES_KEY][attr]])
+ return results
diff --git a/voltha/extensions/omci/tasks/alarm_sync_data.py b/voltha/extensions/omci/tasks/alarm_sync_data.py
deleted file mode 100644
index 331fe1b..0000000
--- a/voltha/extensions/omci/tasks/alarm_sync_data.py
+++ /dev/null
@@ -1,130 +0,0 @@
-#
-# Copyright 2018 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from task import Task
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
-from voltha.extensions.omci.omci_defs import ReasonCodes as RC
-
-
-class AlarmSyncDataFailure(Exception):
- """
- This error is raised by default when the upload fails
- """
-
-
-class AlarmSyncDataTask(Task):
- """
- OpenOMCI - Synchronize the ONU data
- """
- task_priority = Task.DEFAULT_PRIORITY + 10
- name = "Alarm Sync Time Task"
-
- def __init__(self, omci_agent, device_id):
- """
- Class initialization
-
- :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
- :param device_id: (str) ONU Device ID
- """
- super(AlarmSyncDataTask, self).__init__(AlarmSyncDataTask.name,
- omci_agent,
- device_id,
- priority=AlarmSyncDataTask.task_priority,
- exclusive=False)
- self._local_deferred = None
-
- def cancel_deferred(self):
- super(AlarmSyncDataTask, self).cancel_deferred()
-
- d, self._local_deferred = self._local_deferred, None
- try:
- if d is not None and not d.called:
- d.cancel()
- except:
- pass
-
- def start(self):
- """
- Start the tasks
- """
- super(AlarmSyncDataTask, self).start()
- self._local_deferred = reactor.callLater(0, self.perform_alarm_sync_data)
-
- def stop(self):
- """
- Shutdown the tasks
- """
- self.log.debug('stopping')
-
- self.cancel_deferred()
- super(AlarmSyncDataTask, self).stop()
-
- @inlineCallbacks
- def perform_alarm_sync_data(self):
- """
- Sync the time
- """
- self.log.info('perform-alarm-sync-data')
-
- try:
- device = self.omci_agent.get_device(self.device_id)
-
- #########################################
- # ONU Data (ME #2)
- # alarm_retrieval_mode=1, time=DEFAULT_OMCI_TIMEOUT
- self.strobe_watchdog()
- results = yield device.omci_cc.send_get_all_alarm(alarm_retrieval_mode=1)
-
- command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
-
- for seq_no in xrange(command_sequence_number):
- if not device.active or not device.omci_cc.enabled:
- raise AlarmSyncDataFailure('OMCI and/or ONU is not active')
-
- for retry in range(0, 3):
- try:
- self.log.debug('alarm-data-next-request', seq_no=seq_no,
- retry=retry,
- command_sequence_number=command_sequence_number)
- self.strobe_watchdog()
- yield device.omci_cc.send_get_all_alarm_next(seq_no)
-
- self.log.debug('alarm-data-next-success', seq_no=seq_no,
- command_sequence_number=command_sequence_number)
- break
-
- except TimeoutError as e:
- from common.utils.asleep import asleep
- self.log.warn('alarm-data-timeout', e=e, seq_no=seq_no,
- command_sequence_number=command_sequence_number)
- if retry >= 2:
- raise AlarmSyncDataFailure('Alarm timeout failure on req {} of {}'.
- format(seq_no + 1, command_sequence_number))
-
- self.strobe_watchdog()
- yield asleep(0.3)
-
- # Successful if here
- self.log.info('alarm-synchronized')
- self.deferred.callback(command_sequence_number)
-
- except TimeoutError as e:
- self.log.warn('alarm-sync-time-timeout', e=e)
- self.deferred.errback(failure.Failure(e))
-
- except Exception as e:
- self.log.exception('alarm-sync-time', e=e)
- self.deferred.errback(failure.Failure(e))