VOL-717: Alarm Synchronization State Machine
Intial the codes for the Alarm synchronization state machine
which perform the getAllAlarms/getAllalarmsResponse/
getAllAlarmsNext and getAllAlarmNextResponse and provides
database to save related alarms AlarmDbExternal.
Change-Id: I2d4c6b1027a8bf466ed821adfc01e2721c5a4702
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 61e434c..fd9f9c3 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -151,12 +151,19 @@
self.channel = grpc.insecure_channel(self.host_and_port)
self.channel_ready_future = grpc.channel_ready_future(self.channel)
- # Start indications thread
- self.indications_thread_handle = threading.Thread(
- target=self.indications_thread)
- self.indications_thread_handle.daemon = True
- self.indications_thread_handle.start()
-
+ # Catch RuntimeError exception
+ try:
+ # Start indications thread
+ self.indications_thread_handle = threading.Thread(
+ target=self.indications_thread)
+ # Old getter/setter API for daemon; use it directly as a
+ # property instead. The Jinkins error will happon on the reason of
+ # Exception in thread Thread-1 (most likely raised # during
+ # interpreter shutdown)
+ self.indications_thread_handle.setDaemon(True)
+ self.indications_thread_handle.start()
+ except Exception as e:
+ self.log.exception('do_state_init failed', e=e)
'''
# FIXME - Move to oper_up state without connecting to OLT?
if is_reconciliation:
diff --git a/voltha/core/config/config_rev_persisted.py b/voltha/core/config/config_rev_persisted.py
index 78d45f9..8b25b82 100644
--- a/voltha/core/config/config_rev_persisted.py
+++ b/voltha/core/config/config_rev_persisted.py
@@ -138,5 +138,6 @@
bbf_fiber_tcont_body_pb2, bbf_fiber_gemport_body_pb2, \
bbf_fiber_multicast_gemport_body_pb2, \
bbf_fiber_multicast_distribution_set_body_pb2, \
- omci_mib_db_pb2
+ omci_mib_db_pb2, \
+ omci_alarm_db_pb2
return getattr(locals()[module_name], cls_name)
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index b0bcef9..aed018e 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -46,6 +46,7 @@
MulticastGemportsConfigData
from voltha.protos.bbf_fiber_multicast_distribution_set_body_pb2 import \
MulticastDistributionSetData
+from voltha.protos.omci_alarm_db_pb2 import AlarmDeviceData
log = structlog.get_logger()
@@ -1759,3 +1760,21 @@
else:
log.debug('grpc-success-response', response=response)
returnValue(response)
+
+ @twisted_async
+ @inlineCallbacks
+ def GetAlarmDeviceData(self, request, context):
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('GetAlarmDeviceData',
+ request,
+ context,
+ id=request.id)
+ log.debug('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.warn('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(AlarmDeviceData())
+ else:
+ log.debug('grpc-success-response', response=response)
+ returnValue(response)
\ No newline at end of file
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 75a5c7a..81067b4 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -37,6 +37,7 @@
from voltha.protos.bbf_fiber_base_pb2 import AllMulticastDistributionSetData, AllMulticastGemportsConfigData
from voltha.registry import registry
from voltha.protos.omci_mib_db_pb2 import MibDeviceData
+from voltha.protos.omci_alarm_db_pb2 import AlarmDeviceData
from requests.api import request
from common.utils.asleep import asleep
@@ -1355,3 +1356,23 @@
context.set_code(StatusCode.NOT_FOUND)
return MibDeviceData()
+ @twisted_async
+ def GetAlarmDeviceData(self, request, context):
+ log.info('grpc-request', request=request)
+
+ depth = int(dict(context.invocation_metadata()).get('get-depth', -1))
+
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed device id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return AlarmDeviceData()
+
+ try:
+ return self.root.get('/omci_alarms/' + request.id, depth=depth)
+
+ except KeyError:
+ context.set_details(
+ 'OMCI ALARM for Device \'{}\' not found'.format(request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return AlarmDeviceData()
\ No newline at end of file
diff --git a/voltha/extensions/omci/database/alarm_db_ext.py b/voltha/extensions/omci/database/alarm_db_ext.py
new file mode 100644
index 0000000..c4054d3
--- /dev/null
+++ b/voltha/extensions/omci/database/alarm_db_ext.py
@@ -0,0 +1,828 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from mib_db_api import *
+from voltha.protos.omci_alarm_db_pb2 import AlarmInstanceData, AlarmClassData, \
+ AlarmDeviceData, AlarmAttributeData
+from voltha.extensions.omci.omci_entities import *
+from scapy.fields import StrField, FieldListField
+
+
+class AlarmDbExternal(MibDbApi):
+ """
+ A persistent external OpenOMCI Alarm Database
+ """
+ CURRENT_VERSION = 1 # VOLTHA v1.3.0 release
+
+ _TIME_FORMAT = '%Y%m%d-%H%M%S.%f'
+
+ # Paths from root proxy
+ ALARM_PATH = '/omci_alarms'
+ DEVICE_PATH = ALARM_PATH + '/{}' # .format(device_id)
+
+ # Classes, Instances, and Attributes as lists from root proxy
+ CLASSES_PATH = DEVICE_PATH + '/classes' # .format(device_id)
+ INSTANCES_PATH = DEVICE_PATH +'/classes/{}/instances' # .format(device_id, class_id)
+ ATTRIBUTES_PATH = DEVICE_PATH + '/classes/{}/instances/{}/attributes' # .format(device_id, class_id, instance_id)
+
+ # Single Class, Instance, and Attribute as objects from device proxy
+ CLASS_PATH = '/classes/{}' # .format(class_id)
+ INSTANCE_PATH = '/classes/{}/instances/{}' # .format(class_id, instance_id)
+ ATTRIBUTE_PATH = '/classes/{}/instances/{}/attributes/{}' # .format(class_id, instance_id
+ # attribute_name)
+
+ def __init__(self, omci_agent):
+ """
+ Class initializer
+ :param omci_agent: (OpenOMCIAgent) OpenOMCI Agent
+ """
+ super(AlarmDbExternal, self).__init__(omci_agent)
+ self._core = omci_agent.core
+
+ def start(self):
+ """
+ Start up/restore the database
+ """
+ self.log.debug('start')
+
+ if not self._started:
+ super(AlarmDbExternal, self).start()
+ root_proxy = self._core.get_proxy('/')
+
+ try:
+ base = root_proxy.get(AlarmDbExternal.ALARM_PATH)
+ self.log.info('db-exists', num_devices=len(base))
+
+ except Exception as e:
+ self.log.exception('start-failure', e=e)
+ raise
+
+ def stop(self):
+ """
+ Start up the database
+ """
+ self.log.debug('stop')
+
+ if self._started:
+ super(AlarmDbExternal, self).stop()
+ # TODO: Delete this method if nothing else is done except calling the base class
+
+ def _time_to_string(self, time):
+ return time.strftime(AlarmDbExternal._TIME_FORMAT) if time is not None else ''
+
+ def _string_to_time(self, time):
+ return datetime.strptime(time, AlarmDbExternal._TIME_FORMAT) if len(time) else None
+
+ def _attribute_to_string(self, device_id, class_id, attr_name, value):
+ """
+ Convert an ME's attribute value to string representation
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param attr_name: (str) Attribute Name (see EntityClasses)
+ :param value: (various) Attribute Value
+
+ :return: (str) String representation of the value
+ :raises KeyError: Device, Class ID, or Attribute does not exist
+ """
+ try:
+ me_map = self._omci_agent.get_device(device_id).me_map
+
+ if class_id in me_map:
+ entity = me_map[class_id]
+ attr_index = entity.attribute_name_to_index_map[attr_name]
+ eca = entity.attributes[attr_index]
+ field = eca.field
+ else:
+ # Here for auto-defined MEs (ones not defined in ME Map)
+ from voltha.extensions.omci.omci_cc import UNKNOWN_CLASS_ATTRIBUTE_KEY
+ field = StrFixedLenField(UNKNOWN_CLASS_ATTRIBUTE_KEY, None, 24)
+
+ if isinstance(field, StrFixedLenField):
+ from scapy.base_classes import Packet_metaclass
+ # For StrFixedLenField, value is a str already (or possibly JSON encoded)
+ if hasattr(value, 'to_json'):
+ # Packet Class to string
+ str_value = value.to_json()
+ elif isinstance(field.default, Packet_metaclass) \
+ and hasattr(field.default, 'json_from_value'):
+ # Value/hex of Packet Class to string
+ str_value = field.default.json_from_value(value)
+ else:
+ str_value = str(value)
+
+ elif isinstance(field, (StrField, MACField, IPField)):
+ # For StrField, value is an str already
+ # For MACField, value is a string in ':' delimited form
+ # For IPField, value is a string in '.' delimited form
+ str_value = str(value)
+
+ elif isinstance(field, (ByteField, ShortField, IntField, LongField)):
+ # For ByteField, ShortField, IntField, and LongField value is an int
+ str_value = str(value)
+
+ elif isinstance(field, BitField):
+ # For BitField, value is a long
+ #
+ str_value = str(value)
+
+ elif isinstance(field, FieldListField):
+ str_value = json.dumps(value, separators=(',', ':'))
+
+ else:
+ self.log.warning('default-conversion', type=type(field),
+ class_id=class_id, attribute=attr_name, value=str(value))
+ str_value = str(value)
+
+ return str_value
+
+ except Exception as e:
+ self.log.exception('attr-to-string', device_id=device_id,
+ class_id=class_id, attr=attr_name,
+ value=value, e=e)
+ raise
+
+ def _string_to_attribute(self, device_id, class_id, attr_name, str_value):
+ """
+ Convert an ME's attribute value-string to its Scapy decode equivalent
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param attr_name: (str) Attribute Name (see EntityClasses)
+ :param str_value: (str) Attribute Value in string form
+
+ :return: (various) String representation of the value
+ :raises KeyError: Device, Class ID, or Attribute does not exist
+ """
+ try:
+ me_map = self._omci_agent.get_device(device_id).me_map
+
+ if class_id in me_map:
+ entity = me_map[class_id]
+ attr_index = entity.attribute_name_to_index_map[attr_name]
+ eca = entity.attributes[attr_index]
+ field = eca.field
+ else:
+ # Here for auto-defined MEs (ones not defined in ME Map)
+ from voltha.extensions.omci.omci_cc import UNKNOWN_CLASS_ATTRIBUTE_KEY
+ field = StrFixedLenField(UNKNOWN_CLASS_ATTRIBUTE_KEY, None, 24)
+
+ if isinstance(field, StrFixedLenField):
+ from scapy.base_classes import Packet_metaclass
+ if isinstance(field.default, Packet_metaclass) and \
+ hasattr(field.default, 'to_json'):
+ value = json.loads(str_value)
+ else:
+ value = str_value
+
+ elif isinstance(field, MACField):
+ value = str_value
+
+ elif isinstance(field, IPField):
+ value = str_value
+
+ elif isinstance(field, (ByteField, ShortField, IntField, LongField)):
+ if str_value.lower() in ('true', 'false'):
+ str_value = '1' if str_value.lower() == 'true' else '0'
+ value = int(str_value)
+
+ elif isinstance(field, BitField):
+ value = long(str_value)
+
+ elif isinstance(field, FieldListField):
+ value = json.loads(str_value)
+
+ else:
+ self.log.warning('default-conversion', type=type(field),
+ class_id=class_id, attribute=attr_name, value=str_value)
+ value = None
+
+ return value
+
+ except Exception as e:
+ self.log.exception('attr-to-string', device_id=device_id,
+ class_id=class_id, attr=attr_name,
+ value=str_value, e=e)
+ raise
+
+ def add(self, device_id, overwrite=False):
+ """
+ Add a new ONU to database
+
+ :param device_id: (str) Device ID of ONU to add
+ :param overwrite: (bool) Overwrite existing entry if found.
+
+ :raises KeyError: If device already exists and 'overwrite' is False
+ """
+ self.log.debug('add-device', device_id=device_id, overwrite=overwrite)
+
+ now = datetime.utcnow()
+ found = False
+ root_proxy = self._core.get_proxy('/')
+
+ data = AlarmDeviceData(device_id=device_id,
+ created=self._time_to_string(now),
+ version=AlarmDbExternal.CURRENT_VERSION,
+ last_alarm_sequence=0)
+ try:
+ dev_proxy = self._device_proxy(device_id)
+ found = True
+
+ if not overwrite:
+ # Device already exists
+ raise KeyError('Device with ID {} already exists in Alarm database'.
+ format(device_id))
+
+ # Overwrite with new data
+ data = dev_proxy.get('/', depth=0)
+ self._root_proxy.update(AlarmDbExternal.DEVICE_PATH.format(device_id), data)
+ self._modified = now
+
+ except KeyError:
+ if found:
+ raise
+ # Did not exist, add it now
+ root_proxy.add(AlarmDbExternal.ALARM_PATH, data)
+ self._created = now
+ self._modified = now
+
+ def remove(self, device_id):
+ """
+ Remove an ONU from the database
+
+ :param device_id: (str) Device ID of ONU to remove from database
+ """
+ self.log.debug('remove-device', device_id=device_id)
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ try:
+ # self._root_proxy.get(AlarmDbExternal.DEVICE_PATH.format(device_id))
+ self._root_proxy.remove(AlarmDbExternal.DEVICE_PATH.format(device_id))
+ self._modified = datetime.utcnow()
+
+ except KeyError:
+ # Did not exists, which is not a failure
+ pass
+
+ except Exception as e:
+ self.log.exception('remove-exception', device_id=device_id, e=e)
+ raise
+
+ @property
+ def _root_proxy(self):
+ return self._core.get_proxy('/')
+
+ def _device_proxy(self, device_id):
+ """
+ Return a config proxy to the OMCI Alarm_DB leaf for a given device
+
+ :param device_id: (str) ONU Device ID
+ :return: (ConfigProxy) Configuration proxy rooted at OMCI Alarm DB
+ :raises KeyError: If the device does not exist in the database
+ """
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ return self._core.get_proxy(AlarmDbExternal.DEVICE_PATH.format(device_id))
+
+ def _class_proxy(self, device_id, class_id, create=False):
+ """
+ Get a config proxy to a specific managed entity class
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param create: (bool) If true, create default instance (and class)
+ :return: (ConfigProxy) Class configuration proxy
+
+ :raises DatabaseStateError: If database is not started
+ :raises KeyError: If Instance does not exist and 'create' is False
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError('class-id is 0..0xFFFF')
+
+ fmt = AlarmDbExternal.DEVICE_PATH + AlarmDbExternal.CLASS_PATH
+ path = fmt.format(device_id, class_id)
+
+ try:
+ return self._core.get_proxy(path)
+
+ except KeyError:
+ if not create:
+ self.log.error('class-proxy-does-not-exist', device_id=device_id,
+ class_id=class_id)
+ raise
+
+ # Create class
+ data = AlarmClassData(class_id=class_id)
+ root_path = AlarmDbExternal.CLASSES_PATH.format(device_id)
+ self._root_proxy.add(root_path, data)
+
+ return self._core.get_proxy(path)
+
+ def _instance_proxy(self, device_id, class_id, instance_id, create=False):
+ """
+ Get a config proxy to a specific managed entity instance
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param instance_id: (int) Instance ID
+ :param create: (bool) If true, create default instance (and class)
+ :return: (ConfigProxy) Instance configuration proxy
+
+ :raises DatabaseStateError: If database is not started
+ :raises KeyError: If Instance does not exist and 'create' is False
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID is a string')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError('class-id is 0..0xFFFF')
+
+ if not 0 <= instance_id <= 0xFFFF:
+ raise ValueError('instance-id is 0..0xFFFF')
+
+ fmt = AlarmDbExternal.DEVICE_PATH + AlarmDbExternal.INSTANCE_PATH
+ path = fmt.format(device_id, class_id, instance_id)
+
+ try:
+ return self._core.get_proxy(path)
+
+ except KeyError:
+ if not create:
+ self.log.error('instance-proxy-does-not-exist', device_id=device_id,
+ class_id=class_id, instance_id=instance_id)
+ raise
+
+ # Create instance, first make sure class exists
+ self._class_proxy(device_id, class_id, create=True)
+
+ now = self._time_to_string(datetime.utcnow())
+ data = AlarmInstanceData(instance_id=instance_id, created=now, modified=now)
+ root_path = AlarmDbExternal.INSTANCES_PATH.format(device_id, class_id)
+ self._root_proxy.add(root_path, data)
+
+ return self._core.get_proxy(path)
+
+ def save_last_sync_time(self, device_id, value):
+ """
+ Save the Last Sync time to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (DateTime) Value to save
+ """
+ self.log.debug('save-last-sync-time', device_id=device_id, time=str(value))
+
+ try:
+ if not isinstance(value, datetime):
+ raise TypeError('Expected a datetime object, got {}'.
+ format(type(datetime)))
+
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+
+ now = datetime.utcnow()
+ data.last_sync_time = self._time_to_string(value)
+
+ # Update
+ self._root_proxy.update(MibDbExternal.DEVICE_PATH.format(device_id),
+ data)
+ self._modified = now
+ self.log.debug('save-sync-time-complete', device_id=device_id)
+
+ except Exception as e:
+ self.log.exception('save-last-sync-exception', device_id=device_id, e=e)
+ raise
+
+ def get_last_sync_time(self, device_id):
+ """
+ Get the Last Sync Time saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ self.log.debug('get-last-sync-time', device_id=device_id)
+
+ try:
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+ return self._string_to_time(data.last_sync_time)
+
+ except KeyError:
+ return None # OMCI MIB_DB entry has not yet been created
+
+ except Exception as e:
+ self.log.exception('get-last-sync-time-exception', e=e)
+ raise
+
+ def save_alarm_last_sync(self, device_id, value):
+ """
+ Save the Last Sync time to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (DateTime) Value to save
+ """
+ self.log.debug('save-last-sync', device_id=device_id, seq=str(value))
+
+ try:
+ if not isinstance(value, datetime):
+ raise TypeError('Expected a datetime object, got {}'.
+ format(type(datetime)))
+
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+
+ now = datetime.utcnow()
+ data.last_alarm_sequence = int(value)
+
+ # Update
+ self._root_proxy.update(AlarmDbExternal.DEVICE_PATH.format(device_id),
+ data)
+ self._modified = now
+ self.log.debug('save-sequence-complete', device_id=device_id)
+
+ except Exception as e:
+ self.log.exception('save-last-sync-exception', device_id=device_id, e=e)
+ raise
+
+ def get_alarm_last_sync(self, device_id):
+ """
+ Get the Last Sync Time saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ self.log.debug('get-last-sync', device_id=device_id)
+
+ try:
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+ return int(data.last_alarm_sequence)
+
+ except KeyError:
+ return None # OMCI ALARM_DB entry has not yet been created
+
+ except Exception as e:
+ self.log.exception('get-last-alarm-exception', e=e)
+ raise
+
+ def _add_new_class(self, device_id, class_id, instance_id, attributes):
+ """
+ Create an entry for a new class in the external database
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+ """
+ self.log.debug('add', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+
+ now = self._time_to_string(datetime.utcnow())
+ attrs = [AlarmAttributeData(name=k,
+ value=self._attribute_to_string(device_id,
+ class_id,
+ k,
+ v)) for k, v in attributes.items()]
+ class_data = AlarmClassData(class_id=class_id,
+ instances=[AlarmInstanceData(instance_id=instance_id,
+ created=now,
+ modified=now,
+ attributes=attrs)])
+
+ self._root_proxy.add(AlarmDbExternal.CLASSES_PATH.format(device_id), class_data)
+ self.log.debug('set-complete', device_id=device_id, class_id=class_id,
+ entity_id=instance_id, attributes=attributes)
+ return True
+
+ def _add_new_instance(self, device_id, class_id, instance_id, attributes):
+ """
+ Create an entry for a instance of an existing class in the external database
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+ """
+ self.log.debug('add', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+
+ now = self._time_to_string(datetime.utcnow())
+ attrs = [AlarmAttributeData(name=k,
+ value=self._attribute_to_string(device_id,
+ class_id,
+ k,
+ v)) for k, v in attributes.items()]
+ instance_data = AlarmInstanceData(instance_id=instance_id,
+ created=now,
+ modified=now,
+ attributes=attrs)
+
+ self._root_proxy.add(AlarmDbExternal.INSTANCES_PATH.format(device_id, class_id),
+ instance_data)
+
+ self.log.debug('set-complete', device_id=device_id, class_id=class_id,
+ entity_id=instance_id, attributes=attributes)
+ return True
+
+ def set(self, device_id, class_id, instance_id, attributes):
+ """
+ Set a database value. This should only be called by the Alarm synchronizer
+ and its related tasks
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('set', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+ try:
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be a string')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError("Invalid Class ID: {}, should be 0..65535".format(class_id))
+
+ if not 0 <= instance_id <= 0xFFFF:
+ raise ValueError("Invalid Instance ID: {}, should be 0..65535".format(instance_id))
+
+ if not isinstance(attributes, dict):
+ raise TypeError("Attributes should be a dictionary")
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ # Determine the best strategy to add the information
+ dev_proxy = self._device_proxy(device_id)
+
+ try:
+ class_data = dev_proxy.get(AlarmDbExternal.CLASS_PATH.format(class_id), deep=True)
+
+ inst_data = next((inst for inst in class_data.instances
+ if inst.instance_id == instance_id), None)
+
+ if inst_data is None:
+ return self._add_new_instance(device_id, class_id, instance_id, attributes)
+
+ # Possibly adding to or updating an existing instance
+ # Get instance proxy, creating it if needed
+
+ exist_attr_indexes = dict()
+ attr_len = len(inst_data.attributes)
+
+ for index in xrange(0, attr_len):
+ exist_attr_indexes[inst_data.attributes[index].name] = index
+
+ modified = False
+ new_attributes = []
+
+ for k, v in attributes.items():
+ try:
+ str_value = self._attribute_to_string(device_id, class_id, k, v)
+ new_attributes.append(AlarmAttributeData(name=k, value=str_value))
+
+ except Exception as e:
+ self.log.exception('save-error', e=e, class_id=class_id,
+ attr=k, value_type=type(v))
+
+ if k not in exist_attr_indexes or \
+ inst_data.attributes[exist_attr_indexes[k]].value != str_value:
+ modified = True
+
+ if modified:
+ now = datetime.utcnow()
+ new_data = AlarmInstanceData(instance_id=instance_id,
+ created=inst_data.created,
+ modified=self._time_to_string(now),
+ attributes=new_attributes)
+ dev_proxy.remove(AlarmDbExternal.INSTANCE_PATH.format(class_id, instance_id))
+ self._root_proxy.add(AlarmDbExternal.INSTANCES_PATH.format(device_id,
+ class_id), new_data)
+
+ self.log.debug('set-complete', device_id=device_id, class_id=class_id,
+ entity_id=instance_id, attributes=attributes, modified=modified)
+ return modified
+
+ except KeyError:
+ # Here if the class-id does not yet exist in the database
+ return self._add_new_class(device_id, class_id, instance_id,
+ attributes)
+ except Exception as e:
+ self.log.exception('set-exception', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes, e=e)
+ raise
+
+ def delete(self, device_id, class_id, entity_id):
+ """
+ Delete an entity from the database if it exists. If all instances
+ of a class are deleted, the class is deleted as well.
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Entity ID
+
+ :returns: (bool) True if the instance was found and deleted. False
+ if it did not exist.
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('delete', device_id=device_id, class_id=class_id,
+ entity_id=entity_id)
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError('class-id is 0..0xFFFF')
+
+ if not 0 <= entity_id <= 0xFFFF:
+ raise ValueError('instance-id is 0..0xFFFF')
+
+ try:
+ # Remove instance
+ self._instance_proxy(device_id, class_id, entity_id).remove('/')
+ now = datetime.utcnow()
+
+ # If resulting class has no instance, remove it as well
+ class_proxy = self._class_proxy(device_id, class_id)
+ class_data = class_proxy.get('/', depth=1)
+
+ if len(class_data.instances) == 0:
+ class_proxy.remove('/')
+
+ self._modified = now
+ return True
+
+ except KeyError:
+ return False # Not found
+
+ except Exception as e:
+ self.log.exception('get-last-data-exception', device_id=device_id, e=e)
+ raise
+
+ def query(self, device_id, class_id=None, instance_id=None, attributes=None):
+ """
+ Get database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attributes: (list/set or str) Managed Entity instance's attributes
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises KeyError: If the requested device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+ try:
+ if class_id is None:
+ # Get full device info
+ dev_data = self._device_proxy(device_id).get('/', depth=-1)
+ data = self._device_to_dict(dev_data)
+
+ elif instance_id is None:
+ # Get all instances of the class
+ try:
+ cls_data = self._class_proxy(device_id, class_id).get('/', depth=-1)
+ data = self._class_to_dict(device_id, cls_data)
+
+ except KeyError:
+ data = dict()
+
+ else:
+ # Get all attributes of a specific ME
+ try:
+ inst_data = self._instance_proxy(device_id, class_id, instance_id).\
+ get('/', depth=-1)
+
+ if attributes is None:
+ # All Attributes
+ data = self._instance_to_dict(device_id, class_id, inst_data)
+
+ else:
+ # Specific attribute(s)
+ if isinstance(attributes, basestring):
+ attributes = {attributes}
+
+ data = {
+ attr.name: self._string_to_attribute(device_id,
+ class_id,
+ attr.name,
+ attr.value)
+ for attr in inst_data.attributes if attr.name in attributes}
+
+ except KeyError:
+ data = dict()
+
+ return data
+
+ except KeyError:
+ self.log.warn('query-no-device', device_id=device_id)
+ raise
+
+ except Exception as e:
+ self.log.exception('get-last-sync-exception', device_id=device_id, e=e)
+ raise
+
+ def _instance_to_dict(self, device_id, class_id, instance):
+ if not isinstance(instance, AlarmInstanceData):
+ raise TypeError('{} is not of type AlarmInstanceData'.format(type(instance)))
+
+ data = {
+ INSTANCE_ID_KEY: instance.instance_id,
+ CREATED_KEY: self._string_to_time(instance.created),
+ MODIFIED_KEY: self._string_to_time(instance.modified),
+ ATTRIBUTES_KEY: dict()
+ }
+ for attribute in instance.attributes:
+ data[ATTRIBUTES_KEY][attribute.name] = self._string_to_attribute(device_id,
+ class_id,
+ attribute.name,
+ attribute.value)
+ return data
+
+ def _class_to_dict(self, device_id, val):
+ if not isinstance(val, AlarmClassData):
+ raise TypeError('{} is not of type AlarmClassData'.format(type(val)))
+
+ data = {
+ CLASS_ID_KEY: val.class_id,
+ }
+ for instance in val.instances:
+ data[instance.instance_id] = self._instance_to_dict(device_id,
+ val.class_id,
+ instance)
+ return data
+
+ def _device_to_dict(self, val):
+ if not isinstance(val, AlarmDeviceData):
+ raise TypeError('{} is not of type AlarmDeviceData'.format(type(val)))
+
+ data = {
+ DEVICE_ID_KEY: val.device_id,
+ CREATED_KEY: self._string_to_time(val.created),
+ VERSION_KEY: val.version,
+ ME_KEY: dict(),
+ MSG_TYPE_KEY: set()
+ }
+ for class_data in val.classes:
+ data[class_data.class_id] = self._class_to_dict(val.device_id,
+ class_data)
+ for managed_entity in val.managed_entities:
+ data[ME_KEY][managed_entity.class_id] = managed_entity.name
+
+ for msg_type in val.message_types:
+ data[MSG_TYPE_KEY].add(msg_type.message_type)
+
+ return data
+
+ def _managed_entity_to_name(self, device_id, class_id):
+ me_map = self._omci_agent.get_device(device_id).me_map
+ entity = me_map.get(class_id)
+
+ return entity.__name__ if entity is not None else 'UnknownManagedEntity'
diff --git a/voltha/extensions/omci/me_frame.py b/voltha/extensions/omci/me_frame.py
index 7ce812f..ce8f146 100644
--- a/voltha/extensions/omci/me_frame.py
+++ b/voltha/extensions/omci/me_frame.py
@@ -339,3 +339,35 @@
minute=dt.minute,
second=dt.second,
))
+
+ def get_all_alarm(self, alarm_retrieval_mode):
+ """
+ Create a Alarm request from for this ME
+ :return: (OmciFrame) OMCI Frame
+ """
+ self._check_operation(OP.GetAllAlarms)
+
+ return OmciFrame(
+ transaction_id=None,
+ message_type=OmciGetAllAlarms.message_id,
+ omci_message=OmciGetAllAlarms(
+ entity_class=getattr(self.entity_class, 'class_id'),
+ entity_id=getattr(self, 'entity_id'),
+ alarm_retrieval_mode=self._entity_id.mask_for(alarm_retrieval_mode)
+ ))
+
+ def get_all_alarm_next(self, command_sequence_number):
+ """
+ Create a Alarm request from for this ME
+ :return: (OmciFrame) OMCI Frame
+ """
+ self._check_operation(OP.GetAllAlarmsNext)
+
+ return OmciFrame(
+ transaction_id=None,
+ message_type=OmciGetAllAlarmsNext.message_id,
+ omci_message=OmciGetAllAlarmsNext(
+ entity_class=getattr(self.entity_class, 'class_id'),
+ entity_id=getattr(self, 'entity_id'),
+ command_sequence_number=command_sequence_number
+ ))
\ No newline at end of file
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index db39abe..c89aded 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -53,7 +53,9 @@
Alarm_Notification = 6,
Test_Result = 7,
MIB_Reset = 8,
- Connectivity = 9
+ Connectivity = 9,
+ Get_ALARM_Get = 10,
+ Get_ALARM_Get_Next = 11
# abbreviations
@@ -71,6 +73,9 @@
OmciCreateResponse.message_id: RxEvent.Create,
OmciDeleteResponse.message_id: RxEvent.Delete,
OmciSetResponse.message_id: RxEvent.Set,
+ OmciGetAllAlarmsResponse.message_id: RxEvent.Get_ALARM_Get,
+ OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
+
}
def __init__(self, adapter_agent, device_id, me_map=None,
@@ -655,3 +660,15 @@
frame = OntGFrame().reboot()
return self.send(frame, timeout)
+
+ def send_get_all_alarm(self, alarm_retrival_mode=0, timeout=DEFAULT_OMCI_TIMEOUT):
+ self.log.debug('send_get_alarm')
+
+ frame = OntDataFrame().get_all_alarm(alarm_retrival_mode)
+ return self.send(frame, timeout)
+
+ def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT):
+ self.log.debug('send_get_alarm_next')
+
+ frame = OntDataFrame().get_all_alarm_next(seq_no)
+ return self.send(frame, timeout)
\ No newline at end of file
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 02d7419..569bb97 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -43,6 +43,7 @@
DeviceStatusEvent = 0 # OnuDeviceEntry running status changed
MibDatabaseSyncEvent = 1 # MIB database sync changed
OmciCapabilitiesEvent = 2 # OMCI ME and message type capabilities
+ AlarmDatabaseSyncEvent = 3 # Alarm database sync changed
# TODO: Add other events here as needed
@@ -101,6 +102,16 @@
self._pm_intervals_sm = interval_info['state-machine'](self._omci_agent, device_id,
interval_info['tasks'],
advertise_events=advertise)
+
+ # ONU ALARM Synchronization state machine
+ self._alarm_db_in_sync = False
+ alarm_synchronizer_info = support_classes.get('alarm-syncronizer')
+ advertise = alarm_synchronizer_info['advertise-events']
+ self._alarm_sync_sm = alarm_synchronizer_info['state-machine'](self._omci_agent,
+ device_id,
+ alarm_synchronizer_info['tasks'],
+ mib_db,
+ advertise_events=advertise)
except Exception as e:
self.log.exception('state-machine-create-failed', e=e)
raise
@@ -111,6 +122,7 @@
self._on_start_state_machines = [ # Run when 'start()' called
self._mib_sync_sm,
self._capabilities_sm,
+ self._alarm_sync_sm,
]
self._on_sync_state_machines = [ # Run after first in_sync event
self._pm_intervals_sm
@@ -172,6 +184,13 @@
return self._pm_intervals_sm
@property
+ def alarm_synchronizer(self):
+ """
+ Reference to the OpenOMCI Alarm Synchronization state machine for this ONU
+ """
+ return self._alarm_sync_sm
+
+ @property
def active(self):
"""
Is the ONU device currently active/running
@@ -220,6 +239,29 @@
self.event_bus.publish(topic=topic, msg=msg)
@property
+ def alarm_db_in_sync(self):
+ return self._alarm_db_in_sync
+
+ @alarm_db_in_sync.setter
+ def alarm_db_in_sync(self, value):
+ if self._alarm_db_in_sync != value:
+ # Save value
+ self._alarm_db_in_sync = value
+
+ # Start up other state machines if needed
+ if self._first_in_sync:
+ self.first_in_sync_event()
+
+ # Notify any event listeners
+ topic = OnuDeviceEntry.event_bus_topic(self.device_id,
+ OnuDeviceEvents.AlarmDatabaseSyncEvent)
+ msg = {
+ IN_SYNC_KEY: self._alarm_db_in_sync,
+ LAST_IN_SYNC_KEY: self.alarm_synchronizer.last_alarm_sync_time
+ }
+ self.event_bus.publish(topic=topic, msg=msg)
+
+ @property
def configuration(self):
"""
Get the OMCI Configuration object for this ONU. This is a class that provides some
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 57cf496..76a8076 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -21,6 +21,11 @@
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
from voltha.extensions.omci.tasks.sync_time_task import SyncTimeTask
+from voltha.extensions.omci.state_machines.Alarm_sync import AlarmSynchronizer
+from voltha.extensions.omci.tasks.alarm_sync_data import AlarmSyncDataTask
+from voltha.extensions.omci.tasks.alarm_check_task import AlarmDataTask
+from voltha.extensions.omci.tasks.alarm_resync_task import AlarmResyncTask
+from voltha.extensions.omci.database.alarm_db_ext import AlarmDbExternal
from voltha.extensions.omci.tasks.interval_data_task import IntervalDataTask
from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry
from voltha.extensions.omci.state_machines.omci_onu_capabilities import OnuOmciCapabilities
@@ -59,15 +64,18 @@
'create-pm': OmciCreatePMRequest,
'delete-pm': OmciDeletePMRequest,
},
- }
- # 'alarm-syncronizer': {
- # 'state-machine': AlarmSynchronizer, # Implements the Alarm sync state machine
- # 'database': AlarmDb, # For any State storage needs
- # 'tasks': {
- # 'task-1': needToWrite,
- # 'task-2': needToWrite,
- # }
- # }
+ },
+ 'alarm-syncronizer': {
+ 'state-machine': AlarmSynchronizer, # Implements the Alarm sync state machine
+ 'database': AlarmDbExternal, # For any State storage needs
+ 'advertise-events': True, # Advertise events on OpenOMCI event bus
+ 'tasks': {
+ 'alarm-sync': AlarmSyncDataTask,
+ 'alarm-check': AlarmDataTask,
+ 'alarm-resync': AlarmResyncTask,
+ 'alarm-audit': AlarmDataTask
+ }
+ }
}
@@ -99,8 +107,8 @@
self._mib_database_cls = support_classes['mib-synchronizer']['database']
# Alarm Synchronization Database # TODO: Stretch goal for VOLTHA v1.3.0
- # self._alarm_db = None
- # self._alarm_database_cls = self._alarm_synchronizer_info['database']
+ self._alarm_db = None
+ self._alarm_database_cls = support_classes['alarm-syncronizer']['database']
@property
def core(self):
@@ -129,10 +137,13 @@
self._mib_db = self._mib_database_cls(self)
# TODO Alarm DB
+ if self._alarm_db is None:
+ self._alarm_db = self._alarm_database_cls(self)
# Start/restore databases
self._mib_db.start()
+ self._alarm_db.start()
for device in self._devices.itervalues():
device.start()
@@ -157,6 +168,7 @@
# DB shutdown
self._mib_db.stop()
+ self._alarm_db.stop()
def mk_event_bus(self):
""" Get the event bus for OpenOMCI"""
diff --git a/voltha/extensions/omci/openomci_event_bus.py b/voltha/extensions/omci/openomci_event_bus.py
index c432032..5c67865 100644
--- a/voltha/extensions/omci/openomci_event_bus.py
+++ b/voltha/extensions/omci/openomci_event_bus.py
@@ -18,6 +18,7 @@
from simplejson import dumps
from common.event_bus import EventBusClient
from voltha.protos.omci_mib_db_pb2 import OpenOmciEvent
+from voltha.protos.omci_alarm_db_pb2 import AlarmOpenOmciEvent
from common.utils.json_format import MessageToDict
@@ -43,8 +44,11 @@
else:
msg = str(data)
- event = OpenOmciEvent(
- type=event_type,
- data=msg
+ event_func = AlarmOpenOmciEvent if 'AlarmSynchronizer' in msg \
+ else OpenOmciEvent
+ event = event_func(
+ type=event_type,
+ data=msg
)
+
self._event_bus_client.publish(self._topic, event)
diff --git a/voltha/extensions/omci/state_machines/Alarm_sync.py b/voltha/extensions/omci/state_machines/Alarm_sync.py
new file mode 100644
index 0000000..6ad46c9
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/Alarm_sync.py
@@ -0,0 +1,544 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from datetime import datetime, timedelta
+from transitions import Machine
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
+ AttributeAccess
+from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
+ RX_RESPONSE_KEY
+from voltha.extensions.omci.omci_entities import OntData
+import voltha.extensions.omci.omci_entities as omci_entities
+from common.event_bus import EventBusClient
+from voltha.protos.omci_alarm_db_pb2 import AlarmOpenOmciEventType
+
+RxEvent = OmciCCRxEvents
+RC = ReasonCodes
+
+
+class AlarmSynchronizer(object):
+ """
+ OpenOMCI Alarm Synchronizer state machine
+ """
+ DEFAULT_STATES = ['disabled', 'starting', 'updating', 'syncing_alarm',
+ 'in_sync', 'out_of_sync', 'auditing', 'resynchronizing']
+
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
+
+ {'trigger': 'update_alarm', 'source': 'starting', 'dest': 'updating'},
+ {'trigger': 'sync_alarm', 'source': 'starting', 'dest': 'syncing_alarm'},
+
+ {'trigger': 'success', 'source': 'updating', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'updating', 'dest': 'starting'},
+
+ {'trigger': 'success', 'source': 'syncing_alarm', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'syncing_alarm', 'dest': 'starting'},
+ {'trigger': 'mismatch', 'source': 'syncing_alarm', 'dest': 'updating'},
+
+ {'trigger': 'audit_alarm', 'source': 'in_sync', 'dest': 'auditing'},
+ {'trigger': 'audit_alarm', 'source': 'out_of_sync', 'dest': 'auditing'},
+
+ {'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'auditing', 'dest': 'starting'},
+ {'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
+ {'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
+
+ {'trigger': 'timeout', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
+ {'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
+
+ # Do wildcard 'stop' trigger last so it covers all previous states
+ {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+ ]
+ DEFAULT_TIMEOUT_RETRY = 60 # Seconds to delay after task failure/timeout
+ DEFAULT_AUDIT_DELAY = 15 # Periodic tick to audit the MIB Data Sync
+ DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
+
+ def __init__(self, agent, device_id, alarm_sync_tasks, db,
+ advertise_events=False,
+ states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_TIMEOUT_RETRY,
+ audit_delay=DEFAULT_AUDIT_DELAY,
+ resync_delay=DEFAULT_RESYNC_DELAY):
+ """
+ Class initialization
+
+ :param agent: (OpenOmciAgent) Agent
+ :param device_id: (str) ONU Device ID
+ :param db: (MibDbVolatileDict) MIB/Alarm Database
+ :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
+ :param alarm_sync_tasks: (dict) Tasks to run
+ :param states: (list) List of valid states
+ :param transitions: (dict) Dictionary of triggers and state changes
+ :param initial_state: (str) Initial state machine state
+ :param timeout_delay: (int/float) Number of seconds after a timeout to attempt
+ a retry (goes back to starting state)
+ :param audit_delay: (int) Seconds between Alarm audits while in sync. Set to
+ zero to disable audit. An operator can request
+ an audit manually by calling 'self.audit_alarm'
+ :param resync_delay: (int) Seconds in sync before performing a forced Alarm
+ resynchronization
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._agent = agent
+ self._device_id = device_id
+ self._device = None
+ self._database = db
+ self._timeout_delay = timeout_delay
+ self._audit_delay = audit_delay
+ self._resync_delay = resync_delay
+
+ self._update_task = alarm_sync_tasks['alarm-sync']
+ self._check_task = alarm_sync_tasks['alarm-check']
+ self._resync_task = alarm_sync_tasks['alarm-resync']
+ self._audit_task = alarm_sync_tasks['alarm-audit']
+ self._advertise_events = advertise_events
+
+ self._deferred = None
+ self._current_task = None # TODO: Support multiple running tasks after v.1.3.0 release
+ self._task_deferred = None
+ self._last_alarm_sync_time = None
+ self._last_alarm_sequence_value = None
+ self._device_in_db = False
+ self._alarm_class_id = None
+ self._alarm_entity_id = None
+ self._commands_retrieved = None
+ self._alarm_table = None
+
+ self._event_bus = EventBusClient()
+ self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
+ RxEvent.Get_ALARM_Get: None,
+ RxEvent.Get_ALARM_Get_Next: None
+ }
+ self._omci_cc_sub_mapping = {
+ RxEvent.Get_ALARM_Get: self.on_alarm_update_response,
+ RxEvent.Get_ALARM_Get_Next: self.on_alarm_update_next_response
+ }
+
+ # Statistics and attributes
+ # TODO: add any others if it will support problem diagnosis
+
+ # Set up state machine to manage states
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}-{}'.format(self.__class__.__name__,
+ device_id))
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+
+ for d in [d1, d1]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def __str__(self):
+ return 'Alarm Synchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
+
+ def delete(self):
+ """
+ Cleanup any state information
+ """
+ self.stop()
+ db, self._database = self._database, None
+
+ if db is not None:
+ db.remove(self._device_id)
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def last_alarm_sequence(self):
+ return self._last_alarm_sequence_value
+
+ @last_alarm_sequence.setter
+ def last_alarm_seuence(self, value):
+ self._last_alarm_sequence_value = value
+ if self._database is not None:
+ self._database.save_alarm_last_sync(self.device_id, value)
+
+ @property
+ def last_alarm_sync_time(self):
+ return self._last_alarm_sync_time
+
+ @last_alarm_sync_time.setter
+ def last_alarm_sync_time(self, value):
+ self._last_alarm_sync_time = value
+ if self._database is not None:
+ self._database.save_last_sync_time(self.device_id, value)
+
+ @property
+ def is_updated_alarm(self):
+ """
+ Is this a new ONU (has never completed Alarm synchronization)
+ :return: (bool) True if this ONU should be considered new
+ """
+ return self.last_alarm_sequence is None
+
+ @property
+ def advertise_events(self):
+ return self._advertise_events
+
+ @advertise_events.setter
+ def advertise_events(self, value):
+ if not isinstance(value, bool):
+ raise TypeError('Advertise event is a boolean')
+ self._advertise_events = value
+
+ def advertise(self, event, info):
+ """Advertise an event on the OpenOMCI event bus"""
+ if self._advertise_events:
+ self._agent.advertise(event,
+ {
+ 'state-machine': self.machine.name,
+ 'info': info,
+ 'time': str(datetime.utcnow())
+ })
+
+ def on_enter_disabled(self):
+ """
+ State machine is being stopped
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ self._cancel_deferred()
+
+ task, self._current_task = self._current_task, None
+ if task is not None:
+ task.stop()
+
+ # Drop Response and Autonomous notification subscriptions
+ for event, sub in self._omci_cc_subscriptions.iteritems():
+ if sub is not None:
+ self._omci_cc_subscriptions[event] = None
+ self._device.omci_cc.event_bus.unsubscribe(sub)
+
+ # TODO: Stop and remove any currently running or scheduled tasks
+ # TODO: Anything else?
+
+ def _seed_database(self):
+ if not self._device_in_db:
+ try:
+ try:
+ self._database.start()
+ self._database.add(self._device_id)
+ self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
+
+ except KeyError:
+ # Device already is in database
+ self.log.debug('seed-db-exist', device_id=self._device_id)
+ self.last_alarm_sequence = \
+ self._database.get_alarm_last_sync(self._device_id)
+
+ self._device_in_db = True
+
+ except Exception as e:
+ self.log.exception('seed-database-failure', e=e)
+
+ def on_enter_starting(self):
+ """
+ Determine ONU status and start Alarm Synchronization tasks
+ """
+ self._device = self._agent.get_device(self._device_id)
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ # Make sure root of external Alarm Database exists
+ self._seed_database()
+
+ # Set up Response and Autonomous notification subscriptions
+ try:
+ for event, sub in self._omci_cc_sub_mapping.iteritems():
+ if self._omci_cc_subscriptions[event] is None:
+ self._omci_cc_subscriptions[event] = \
+ self._device.omci_cc.event_bus.subscribe(
+ topic=OMCI_CC.event_bus_topic(self._device_id, event),
+ callback=sub)
+
+ except Exception as e:
+ self.log.exception('omci-cc-subscription-setup', e=e)
+
+ except Exception as e:
+ self.log.exception('dev-subscription-setup', e=e)
+
+ if self.is_updated_alarm:
+ self._deferred = reactor.callLater(0, self.update_alarm)
+ # Determine if this ONU has ever synchronized
+ else:
+ self._deferred = reactor.callLater(0, self.sync_alarm)
+
+ def on_enter_updating(self):
+ """
+ Begin full Alarm data sync, starting with a Alarm RESET
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ def success(results):
+ self.log.debug('alarm-update-success', results='the sequence_number is {}'.
+ format(results))
+ self._current_task = None
+ # The new ONU is up, save the first updated alarm sequence number
+ self.last_alarm_sequence = results
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('alarm-update-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._current_task = self._update_task(self._agent, self._device_id)
+
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_syncing_alarm(self):
+ """
+ Create a simple task to fetch the Alarm value
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ self.last_alarm_sequence = self._database.get_alarm_last_sync(self._device_id) or 0
+
+ def success(sequence):
+ self.log.debug('sync-alarm-success', sequence_value=sequence)
+ self._current_task = None
+
+ # Examine Alarm value
+ if self.last_alarm_sequence == sequence:
+ self._deferred = reactor.callLater(0, self.success)
+ else:
+ self._deferred = reactor.callLater(0, self.mismatch)
+
+ def failure(reason):
+ self.log.info('sync-alarm-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._current_task = self._check_task(self._agent, self._device_id)
+
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_in_sync(self):
+ """
+ Schedule a tick to occur to in the future to request an audit
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+ self.last_alarm_sync_time = datetime.utcnow()
+ self._device.alarm_db_in_sync = True
+
+ if self._audit_delay > 0:
+ self._deferred = reactor.callLater(self._audit_delay, self.audit_alarm)
+
+ def on_enter_out_of_sync(self):
+ """
+ The Alarm re-synchronization state does not match alarm status currently.
+ The condition would happen the following as below:
+ 1. The alarm sequence is not equal the last alarm status.
+ 2. The ONU alarm does not happen right now.
+
+ Condition 1: Something happen on the alarm table does not match the sequence.
+ Opening display of message to examine the alarm table.
+ Condition 2: In this state, why happen this situation?
+ Has ONU recover the alarm table in this meanwhile?
+
+ Schedule a tick to occur to in the future to request an audit
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+ self._device.alarm_db_in_sync = False
+
+ step = 'Nothing'
+ class_id = 0
+ entity_id = 0
+ attribute = self._alarm_table
+
+ try:
+ if self._commands_retrieved is not self.last_alarm_sequence :
+ # The alarm sequence does not match last saving value. Has happen Alarm?
+ step = 'alarm-table'
+ for sequence in xrange(self._commands_retrieved):
+ self.log.info(step, class_id=self._alarm_class_id[sequence],
+ entity_id=self._alarm_entity_id[sequence],
+ alarm_sequence=self._alarm_table[sequence])
+ pass
+ elif self._commands_retrieved is None:
+ # The alarm sequence does not get alarm at present.
+ # TODO: need to update the database here ?
+ step = 'None_of_alarm'
+ self._alarm_table = None
+ pass
+
+ self._deferred = reactor.callLater(1, self.audit_alarm)
+
+ except Exception as e:
+ self.log.exception('alarm-out-of-update', e=e, step=step, class_id=class_id,
+ entity_id=entity_id, attribute=attribute)
+ # Retry the Audit process
+ self._deferred = reactor.callLater(1, self.audit_alarm)
+
+ def on_enter_auditing(self):
+ """
+ Perform a Alarm Audit. If our last Alarm resync was too long in the
+ past, perform a resynchronization anyway
+ """
+ next_resync = self.last_alarm_sync_time + timedelta(seconds=self._resync_delay)\
+ if self.last_alarm_sync_time is not None else datetime.utcnow()
+
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ if datetime.utcnow() >= next_resync:
+ self._deferred = reactor.callLater(0, self.force_resync)
+ else:
+ def success(sequence):
+ self.log.debug('get-alarm-success', alarm_sequence=sequence)
+ self._current_task = None
+
+ # Examine alarm sequence value
+ if self.last_alarm_sequence == sequence:
+ self._deferred = reactor.callLater(0, self.success)
+ else:
+ self._device.alarm_db_in_sync = False
+ self._deferred = reactor.callLater(0, self.mismatch)
+
+ def failure(reason):
+ self.log.info('get-alarm-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._current_task = self._audit_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_resynchronizing(self):
+ """
+ Perform a resynchronization of the Alarm database
+
+ First calculate any differences
+ """
+ self.advertise(AlarmOpenOmciEventType.state_change, self.state)
+
+ def success(results):
+ self.log.debug('resync-success', results=results)
+
+ self._alarm_class_id = results.get('alarm_class_id')
+ self._alarm_entity_id = results.get('alarm_entity_id')
+ self._commands_retrieved = results.get('commands_retrieved')
+ self._alarm_table = results.get('alarm_table')
+
+ if self._commands_retrieved is not None and all(self._alarm_table):
+ self._deferred = reactor.callLater(0, self.success)
+ else:
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ def failure(reason):
+ self.log.info('resync-failure', reason=reason)
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._current_task = self._resync_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_alarm_update_next_response(self, _topic, msg):
+ """
+ Process a Alarm update Next response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-alarm-upload-next-response', state=self.state)
+
+ if self._omci_cc_subscriptions[RxEvent.Get_ALARM_Get_Next]:
+ try:
+ # Check if expected in current alarm_sync state
+ if self.state == 'disabled':
+ self.log.error('rx-in-invalid-state', state=self.state)
+
+ else:
+ response = msg[RX_RESPONSE_KEY]
+
+ # Extract entity instance information
+ omci_msg = response.fields['omci_message'].fields
+
+ class_id = omci_msg['entity_class']
+ entity_id = omci_msg['entity_id']
+ alarm_entity_class = omci_msg['alarmed_entity_class']
+ alarm_entity_id = omci_msg['alarmed_entity_id']
+ alarm_bit_map = omci_msg['alarm_bit_map']
+
+ self.log.info('set-response-failure',
+ class_id=class_id, entity_id=entity_id,
+ alarm_entity_class=alarm_entity_class,
+ alarm_entity_id=alarm_entity_id,
+ alarm_bit_map=alarm_bit_map)
+
+ if class_id == OntData.class_id:
+ return
+
+ # Save to the database
+ self._database.set(self._device_id, class_id, entity_id, alarm_bit_map)
+
+ except KeyError:
+ pass # NOP
+ except Exception as e:
+ self.log.exception('upload-next', e=e)
+
+ def on_alarm_update_response(self, _topic, msg):
+ """
+ Process a Set response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-alarm-update-response', state=self.state)
+
+ if self._omci_cc_subscriptions[RxEvent.Get_ALARM_Get]:
+ if self.state == 'disabled':
+ self.log.error('rx-in-invalid-state', state=self.state)
+ return
+ try:
+ response = msg[RX_RESPONSE_KEY]
+ omci_msg = response.fields['omci_message'].fields
+ class_id = omci_msg['entity_class']
+ entity_id = omci_msg['entity_id']
+ number_of_commands = omci_msg.fields['number_of_commands']
+
+
+ self.last_alarm_sequence = number_of_commands
+
+ self.log.info('received alarm response',
+ class_id=class_id,
+ instance_id=entity_id,
+ number_of_commands=number_of_commands)
+
+ if class_id == OntData.class_id:
+ return
+
+ # Save to the database
+ self._database.set(self._device_id, class_id, entity_id, number_of_commands)
+
+
+ except KeyError:
+ pass # NOP
\ No newline at end of file
diff --git a/voltha/extensions/omci/tasks/alarm_check_task.py b/voltha/extensions/omci/tasks/alarm_check_task.py
new file mode 100644
index 0000000..9539996
--- /dev/null
+++ b/voltha/extensions/omci/tasks/alarm_check_task.py
@@ -0,0 +1,118 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes
+
+
+class AlarmDataTaskFailure(Exception):
+ pass
+
+
+class AlarmDataTask(Task):
+ """
+ OpenOMCI Alarm Data Get Request
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "Alarm Data Task"
+ max_payload = 29
+
+ def __init__(self, omci_agent, device_id, class_id, entity_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME entity ID
+ """
+ super(AlarmDataTask, self).__init__(AlarmDataTask.name,
+ omci_agent,
+ device_id,
+ priority=AlarmDataTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._class_id = class_id
+ self._entity_id = entity_id
+ self._last_number_of_commands = None
+
+ def cancel_deferred(self):
+ super(AlarmDataTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the tasks
+ """
+ super(AlarmDataTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.check_alarm_data)
+
+ def stop(self):
+ """
+ Shutdown the tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(AlarmDataTask, self).stop()
+
+ @inlineCallbacks
+ def check_alarm_data(self):
+ """
+ Sync the current alarm sequence number
+ """
+ self.log.info('perform-get-interval', class_id=self._class_id,
+ entity_id=self._entity_id)
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ results = yield device.omci_cc.send_get_all_alarm()
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ alarm_sequence_number = omci_msg['number_of_commands']
+ self.log.debug('alarm-data', alarm_sequence_number=alarm_sequence_number)
+
+ if status != ReasonCodes.Success:
+ raise AlarmDataTaskFailure('Unexpected Response Status: {}'.
+ format(status))
+
+ if self._last_number_of_commands is None:
+ self._last_number_of_commands = alarm_sequence_number
+
+ elif alarm_sequence_number != self._last_number_of_commands:
+ msg = 'The last number of sequence does not match {} to {}' \
+ .format(self._last_number_of_commands, alarm_sequence_number)
+ self.log.info('interval-roll-over', msg=msg)
+ raise AlarmDataTaskFailure(msg)
+
+ # Successful if here
+ self.deferred.callback(alarm_sequence_number)
+
+ except TimeoutError as e:
+ self.log.warn('alarm_retrieval_mode', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('alarm-get-failure', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/voltha/extensions/omci/tasks/alarm_resync_task.py b/voltha/extensions/omci/tasks/alarm_resync_task.py
new file mode 100644
index 0000000..b8b8e69
--- /dev/null
+++ b/voltha/extensions/omci/tasks/alarm_resync_task.py
@@ -0,0 +1,260 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, returnValue
+from twisted.internet import reactor
+from common.utils.asleep import asleep
+from voltha.extensions.omci.database.mib_db_dict import *
+from voltha.extensions.omci.omci_entities import OntData
+from voltha.extensions.omci.omci_defs import AttributeAccess
+AA = AttributeAccess
+
+
+class AlarmCopyException(Exception):
+ pass
+
+
+class AlarmDownloadException(Exception):
+ pass
+
+
+class AlarmResyncException(Exception):
+ pass
+
+
+class AlarmResyncTask(Task):
+ """
+ OpenOMCI ALARM resynchronization Task
+
+ This task should get a copy of the ALARM and compare compare it to a
+ copy of the database. When the ALARM Upload command is sent to the ONU,
+ it should make a copy and source the data requested from this database.
+ The ONU can still source AVC's and the the OLT can still send config
+ commands to the actual.
+ """
+ task_priority = 240
+ name = "ALARM Resynchronization Task"
+
+ max_retries = 3
+ retry_delay = 7
+
+ max_alarm_upload_next_retries = 3
+ alarm_upload_next_delay = 10 # Max * delay < 60 seconds
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(AlarmResyncTask, self).__init__(AlarmResyncTask.name,
+ omci_agent,
+ device_id,
+ priority=AlarmResyncTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._device = omci_agent.get_device(device_id)
+ self._db_active = MibDbVolatileDict(omci_agent)
+ self._db_active.start()
+
+ def cancel_deferred(self):
+ super(AlarmResyncTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start ALARM Re-Synchronization task
+ """
+ super(AlarmResyncTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_alarm_resync)
+ self._db_active.start()
+ self._db_active.add(self.device_id)
+
+ def stop(self):
+ """
+ Shutdown ALARM Re-Synchronization task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ self._db_active.stop()
+ self._db_active = None
+ super(AlarmResyncTask, self).stop()
+
+ def stop_if_not_running(self):
+ if not self.running:
+ raise AlarmResyncException('Resync Task was cancelled')
+
+ @inlineCallbacks
+ def perform_alarm_resync(self):
+ """
+ Perform the ALARM Resynchronization sequence
+
+ The sequence to be performed is:
+ - get a copy of the current ALARM database
+
+ - perform ALARM upload commands to get ONU's database and save this
+ to a local DB.
+ During the alarm upload process, the maximum time between alarm upload next
+ requests is 1 minute.
+ """
+ self.log.info('perform-alarm-resync')
+
+ try:
+ command_sequence_number = yield self.snapshot_alarm()
+
+ # Start the ALARM upload sequence, save alarms to the table
+ commands_retrieved, alarm_table = yield self.upload_alarm(command_sequence_number)
+
+ if commands_retrieved < command_sequence_number:
+ e = AlarmDownloadException('Only retrieved {} of {} instances'.
+ format(commands_retrieved, command_sequence_number))
+ self.deferred.errback(failure.Failure(e))
+
+ self.deferred.callback(
+ {
+ 'commands_retrieved': commands_retrieved,
+ 'alarm_table': alarm_table
+ })
+
+ except Exception as e:
+ self.log.exception('resync', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def snapshot_alarm(self):
+ """
+ Snapshot the ALARM on the ONU and create a copy of our local ALARM database
+
+ :return: (pair) (command_sequence_number)
+ """
+ command_sequence_number = None
+
+ try:
+ max_tries = AlarmResyncTask.max_retries - 1
+
+ for retries in xrange(0, max_tries + 1):
+ # Send ALARM Upload so ONU snapshots its ALARM
+ try:
+ command_sequence_number = yield self.send_alarm_upload()
+ self.stop_if_not_running()
+
+ if command_sequence_number is None:
+ if retries >= max_tries:
+ break
+
+ except TimeoutError as e:
+ self.log.warn('timeout', e=e)
+ if retries >= max_tries:
+ raise
+
+ yield asleep(AlarmResyncTask.retry_delay)
+ self.stop_if_not_running()
+ continue
+
+ except Exception as e:
+ self.log.exception('alarm-resync', e=e)
+ raise
+
+ # Handle initial failures
+
+ if command_sequence_number is None:
+ raise AlarmCopyException('Failed to snapshot ALARM copy after {} retries'.
+ format(AlarmResyncTask.max_retries))
+
+ returnValue(command_sequence_number)
+
+ @inlineCallbacks
+ def send_alarm_upload(self):
+ """
+ Perform ALARM upload command and get the number of entries to retrieve
+
+ :return: (int) Number of commands to execute or None on error
+ """
+ ########################################
+ # Begin ALARM Upload
+ try:
+ results = yield self._device.omci_cc.send_get_all_alarm()
+ self.stop_if_not_running()
+ command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
+
+ if command_sequence_number is None or command_sequence_number <= 0:
+ raise ValueError('Number of commands was {}'.format(command_sequence_number))
+
+ returnValue(command_sequence_number)
+
+ except TimeoutError as e:
+ self.log.warn('alarm-resync-get-timeout', e=e)
+ raise
+
+ @inlineCallbacks
+ def upload_alarm(self, command_sequence_number):
+ ########################################
+ # Begin ALARM Upload
+ seq_no = None
+
+ for seq_no in xrange(command_sequence_number):
+ max_tries = AlarmResyncTask.max_alarm_upload_next_retries
+ alarm_class_id = {}
+ alarm_entity_id = {}
+ attributes = {}
+
+ for retries in xrange(0, max_tries):
+ try:
+ response = yield self._device.omci_cc.get_all_alarm_next(seq_no)
+ self.stop_if_not_running()
+
+ omci_msg = response.fields['omci_message'].fields
+ alarm_class_id[seq_no] = omci_msg['alarmed_entity_class']
+ alarm_entity_id[seq_no] = omci_msg['alarmed_entity_id']
+
+ # Filter out the 'alarm_data_sync' from the database. We save that at
+ # the device level and do not want it showing up during a re-sync
+ # during data comparison
+
+ if alarm_class_id[seq_no] == OntData.class_id:
+ break
+
+ attributes[seq_no] = omci_msg['alarm_bit_map']
+
+ # Save to the database
+ self._db_active.set(self.device_id, alarm_class_id[seq_no],
+ alarm_entity_id[seq_no], attributes[seq_no])
+ break
+
+ except TimeoutError:
+ self.log.warn('alarm-resync-timeout', seq_no=seq_no,
+ command_sequence_number=command_sequence_number)
+
+ if retries < max_tries - 1:
+ yield asleep(AlarmResyncTask.alarm_upload_next_delay)
+ else:
+ raise
+
+ except Exception as e:
+ self.log.exception('resync', e=e, seq_no=seq_no,
+ command_sequence_number=command_sequence_number)
+
+ returnValue((seq_no + 1, alarm_class_id, alarm_entity_id, attributes)) # seq_no is zero based and alarm table.
+
diff --git a/voltha/extensions/omci/tasks/alarm_sync_data.py b/voltha/extensions/omci/tasks/alarm_sync_data.py
new file mode 100644
index 0000000..ed079dd
--- /dev/null
+++ b/voltha/extensions/omci/tasks/alarm_sync_data.py
@@ -0,0 +1,131 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes as RC
+
+
+class AlarmSyncDataFailure(Exception):
+ """
+ This error is raised by default when the upload fails
+ """
+
+
+class AlarmSyncDataTask(Task):
+ """
+ OpenOMCI - Synchronize the ONU data
+ """
+ task_priority = Task.DEFAULT_PRIORITY + 10
+ name = "Alarm Sync Time Task"
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(AlarmSyncDataTask, self).__init__(AlarmSyncDataTask.name,
+ omci_agent,
+ device_id,
+ priority=AlarmSyncDataTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(AlarmSyncDataTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the tasks
+ """
+ super(AlarmSyncDataTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_alarm_sync_data)
+
+ def stop(self):
+ """
+ Shutdown the tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(AlarmSyncDataTask, self).stop()
+
+ def stop_if_not_running(self):
+ if not self.running:
+ raise AlarmSyncDataFailure('Update Task was cancelled')
+
+ @inlineCallbacks
+ def perform_alarm_sync_data(self):
+ """
+ Sync the time
+ """
+ self.log.info('perform-alarm-sync-data')
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # ONU Data (ME #2)
+ # alarm_retrival_mode=1, time=DEFAULT_OMCI_TIMEOUT
+ results = yield device.omci_cc.send_get_all_alarm(alarm_retrival_mode=1)
+ self.stop_if_not_running()
+ command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
+
+ for seq_no in xrange(command_sequence_number):
+ if not device.active or not device.omci_cc.enabled:
+ raise AlarmSyncDataFailure('OMCI and/or ONU is not active')
+
+ for retry in range(0, 3):
+ try:
+ self.log.debug('alarm-data-next-request', seq_no=seq_no,
+ retry=retry,
+ command_sequence_number=command_sequence_number)
+ yield device.omci_cc.send_get_all_alarm_next(seq_no)
+ self.stop_if_not_running()
+ self.log.debug('alarm-data-next-success', seq_no=seq_no,
+ command_sequence_number=command_sequence_number)
+ break
+
+ except TimeoutError as e:
+ from common.utils.asleep import asleep
+ self.log.warn('alarm-data-timeout', e=e, seq_no=seq_no,
+ command_sequence_number=command_sequence_number)
+ if retry >= 2:
+ raise AlarmSyncDataFailure('Alarm timeout failure on req {} of {}'.
+ format(seq_no + 1, command_sequence_number))
+ yield asleep(0.3)
+ self.stop_if_not_running()
+
+ # Successful if here
+ self.log.info('alarm-synchronized')
+ self.deferred.callback(command_sequence_number)
+
+ except TimeoutError as e:
+ self.log.warn('alarm-sync-time-timeout', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('alarm-sync-time', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/voltha/protos/omci_alarm_db.proto b/voltha/protos/omci_alarm_db.proto
new file mode 100644
index 0000000..d4971d2
--- /dev/null
+++ b/voltha/protos/omci_alarm_db.proto
@@ -0,0 +1,80 @@
+//
+// Copyright 2018 - present the original author or authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+syntax = "proto3";
+
+option go_package = "github.com/opencord/voltha/protos/go/omci";
+
+package alarm;
+
+import "meta.proto";
+
+
+message AlarmAttributeData {
+ string name = 1 [(voltha.access) = READ_ONLY];
+ string value = 2;
+}
+
+message AlarmInstanceData {
+ uint32 instance_id = 1 [(voltha.access) = READ_ONLY];
+ string created = 2;
+ string modified = 3;
+
+ repeated AlarmAttributeData attributes = 4
+ [(voltha.child_node) = {key: "name"}];
+}
+
+message AlarmClassData {
+ uint32 class_id = 1 [(voltha.access) = READ_ONLY];
+
+ repeated AlarmInstanceData instances= 2
+ [(voltha.child_node) = {key: "instance_id"}];
+}
+
+message AlarmManagedEntity {
+ uint32 class_id = 1 [(voltha.access) = READ_ONLY];
+ string name = 2 [(voltha.access) = READ_ONLY];
+}
+
+message AlarmMessageType {
+ uint32 message_type = 1 [(voltha.access) = READ_ONLY];
+}
+
+message AlarmDeviceData {
+ string device_id = 1 [(voltha.access) = READ_ONLY];
+ string created = 2;
+ uint32 last_alarm_sequence = 3;
+ string last_sync_time = 4;
+ uint32 version = 5;
+
+
+ repeated AlarmClassData classes = 6
+ [(voltha.child_node) = {key: "class_id"}];
+
+ repeated AlarmManagedEntity managed_entities = 7;
+ repeated AlarmMessageType message_types = 8;
+}
+
+message AlarmOpenOmciEventType {
+ enum OpenOmciEventType {
+ state_change = 0; // A state machine has transitioned to a new state
+ }
+}
+
+message AlarmOpenOmciEvent {
+ AlarmOpenOmciEventType.OpenOmciEventType type = 1;
+
+ string data = 2; // associated data, in json format
+}
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index b106289..3d62878 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -31,6 +31,7 @@
import "bbf_fiber_tcont_body.proto";
import "bbf_fiber_traffic_descriptor_profile_body.proto";
import "omci_mib_db.proto";
+import "omci_alarm_db.proto";
option java_package = "org.opencord.voltha";
@@ -140,6 +141,10 @@
repeated
omci.MibDeviceData omci_mibs = 28
[(child_node) = {key: "device_id"}];
+
+ repeated
+ alarm.AlarmDeviceData omci_alarms = 29
+ [(child_node) = {key: "device_id"}];
}
message VolthaInstances {
@@ -203,6 +208,10 @@
repeated
omci.MibDeviceData omci_mib_database = 28
[(child_node) = {key: "device_id"}];
+
+ repeated
+ alarm.AlarmDeviceData omci_alarm_database = 29
+ [(child_node) = {key: "device_id"}];
}
// Device Self Test Response
@@ -956,6 +965,13 @@
get: "/api/v1/openomci/{id}/mib"
};
}
+
+ // OpenOMCI ALARM information
+ rpc GetAlarmDeviceData(ID) returns(alarm.AlarmDeviceData) {
+ option (google.api.http) = {
+ get: "/api/v1/openomci/{id}/alarm"
+ };
+ }
}
/*
@@ -1691,4 +1707,11 @@
get: "/api/v1/openomci/{id}/mib"
};
}
+
+ // OpenOMCI ALARM information
+ rpc GetAlarmDeviceData(ID) returns(alarm.AlarmDeviceData) {
+ option (google.api.http) = {
+ get: "/api/v1/openomci/{id}/alarm"
+ };
+ }
}