VOL-1395: Common shared libraries needed for Python based device adapters.
This is an initial check-in of code from the master branch. Additional work
is expected on a few items to work with the new go-core and will be covered
by separate JIRAs and commits.
Change-Id: I0856ec6b79b8d3e49082c609eb9c7eedd75b1708
diff --git a/python/adapters/extensions/omci/tasks/__init__.py b/python/adapters/extensions/omci/tasks/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/extensions/omci/tasks/alarm_resync_task.py b/python/adapters/extensions/omci/tasks/alarm_resync_task.py
new file mode 100644
index 0000000..a16f3a2
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/alarm_resync_task.py
@@ -0,0 +1,393 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, returnValue
+from twisted.internet import reactor
+from common.utils.asleep import asleep
+from voltha.extensions.omci.database.mib_db_dict import *
+from voltha.extensions.omci.omci_defs import AttributeAccess
+from voltha.extensions.omci.database.alarm_db_ext import AlarmDbExternal
+
+AA = AttributeAccess
+
+
+class AlarmCopyException(Exception):
+ pass
+
+
+class AlarmDownloadException(Exception):
+ pass
+
+
+class AlarmResyncException(Exception):
+ pass
+
+
+class AlarmResyncTask(Task):
+ """
+ OpenOMCI ALARM resynchronization Task
+
+ This task should get a copy of the ALARM and compare compare it to a
+ copy of the database. When the ALARM Upload command is sent to the ONU,
+ it should make a copy and source the data requested from this database.
+ The ONU can still source AVC's and the the OLT can still send config
+ commands to the actual.
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "ALARM Resynchronization Task"
+
+ max_retries = 3
+ retry_delay = 7
+
+ max_alarm_upload_next_retries = 3
+ alarm_upload_next_delay = 10 # Max * delay < 60 seconds
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(AlarmResyncTask, self).__init__(AlarmResyncTask.name,
+ omci_agent,
+ device_id,
+ priority=AlarmResyncTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._device = omci_agent.get_device(device_id)
+ self._db_active = MibDbVolatileDict(omci_agent)
+ self._db_active.start()
+
+ def cancel_deferred(self):
+ super(AlarmResyncTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start ALARM Re-Synchronization task
+ """
+ super(AlarmResyncTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_alarm_resync)
+ self._db_active.start()
+ self._db_active.add(self.device_id)
+
+ def stop(self):
+ """
+ Shutdown ALARM Re-Synchronization task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ self._db_active.stop()
+ self._db_active = None
+ super(AlarmResyncTask, self).stop()
+
+ @inlineCallbacks
+ def perform_alarm_resync(self):
+ """
+ Perform the ALARM Resynchronization sequence
+
+ The sequence to be performed is:
+ - get a copy of the current ALARM database
+
+ - perform ALARM upload commands to get ONU's database and save this
+ to a local DB.
+ During the alarm upload process, the maximum time between alarm upload next
+ requests is 1 minute.
+ """
+ self.log.debug('perform-alarm-resync')
+
+ try:
+ self.strobe_watchdog()
+ results = yield self.snapshot_alarm()
+ olt_db_copy = results[0]
+ number_of_commands = results[1]
+
+ if olt_db_copy is None:
+ e = AlarmCopyException('Failed to get local database copy')
+ self.deferred.errback(failure.Failure(e))
+ else:
+ # Start the ALARM upload sequence, save alarms to the table
+ self.strobe_watchdog()
+
+ if number_of_commands > 0:
+ commands_retrieved = yield self.upload_alarm(number_of_commands)
+ else:
+ commands_retrieved = 0
+
+ if commands_retrieved != number_of_commands:
+ e = AlarmDownloadException('Only retrieved {} of {} instances'.
+ format(commands_retrieved, number_of_commands))
+ self.deferred.errback(failure.Failure(e))
+ else:
+ # Compare the databases
+ onu_db_copy = self._db_active.query(self.device_id)
+
+ on_olt_only, on_onu_only, attr_diffs = \
+ self.compare_mibs(olt_db_copy, onu_db_copy)
+
+ on_olt_only = on_olt_only if len(on_olt_only) else None
+ on_onu_only = on_onu_only if len(on_onu_only) else None
+ attr_diffs = attr_diffs if len(attr_diffs) else None
+
+ on_olt_only_diffs = on_olt_only if on_olt_only and len(on_olt_only) else None
+ on_onu_only_diffs = on_onu_only if on_onu_only and len(on_onu_only) else None
+ attr_diffs = attr_diffs if attr_diffs and len(attr_diffs) else None
+
+ if all(diff is None for diff in [on_olt_only_diffs, on_onu_only_diffs, attr_diffs]):
+ results = None
+ else:
+ results = {
+ 'onu-only': on_onu_only_diffs,
+ 'olt-only': on_olt_only_diffs,
+ 'attr-diffs': attr_diffs,
+ 'onu-db': onu_db_copy,
+ 'olt-db': olt_db_copy
+ }
+ self.deferred.callback(results)
+
+ except Exception as e:
+ self.log.exception('resync', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def snapshot_alarm(self):
+ """
+ Snapshot the ALARM on the ONU and create a copy of our local ALARM database
+
+ :return: (pair) (command_sequence_number)
+ """
+ olt_db_copy = None
+ command_sequence_number = None
+
+ try:
+ max_tries = AlarmResyncTask.max_retries - 1
+
+ for retries in xrange(0, max_tries + 1):
+ # Send ALARM Upload so ONU snapshots its ALARM
+ try:
+ command_sequence_number = yield self.send_alarm_upload()
+ self.strobe_watchdog()
+
+ if command_sequence_number is None:
+ if retries >= max_tries:
+ olt_db_copy = None
+ break
+
+ except TimeoutError as e:
+ self.log.warn('timeout', e=e)
+ if retries >= max_tries:
+ raise
+
+ self.strobe_watchdog()
+ yield asleep(AlarmResyncTask.retry_delay)
+ continue
+
+ # Get a snapshot of the local MIB database
+ olt_db_copy = self._device.query_alarm_table()
+ # if we made it this far, no need to keep trying
+ break
+
+ except Exception as e:
+ self.log.exception('alarm-resync', e=e)
+ raise
+
+ # Handle initial failures
+
+ if olt_db_copy is None or command_sequence_number is None:
+ raise AlarmCopyException('Failed to snapshot ALARM copy after {} retries'.
+ format(AlarmResyncTask.max_retries))
+
+ returnValue((olt_db_copy, command_sequence_number))
+
+ @inlineCallbacks
+ def send_alarm_upload(self):
+ """
+ Perform ALARM upload command and get the number of entries to retrieve
+
+ :return: (int) Number of commands to execute or None on error
+ """
+ ########################################
+ # Begin ALARM Upload
+ try:
+ results = yield self._device.omci_cc.send_get_all_alarm()
+ self.strobe_watchdog()
+ command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
+
+ if command_sequence_number < 0:
+ raise ValueError('Number of commands was {}'.format(command_sequence_number))
+
+ returnValue(command_sequence_number)
+
+ except TimeoutError as e:
+ self.log.warn('alarm-resync-get-timeout', e=e)
+ raise
+
+ @inlineCallbacks
+ def upload_alarm(self, command_sequence_number):
+ ########################################
+ # Begin ALARM Upload
+ seq_no = None
+
+ for seq_no in xrange(command_sequence_number):
+ max_tries = AlarmResyncTask.max_alarm_upload_next_retries
+
+ for retries in xrange(0, max_tries):
+ try:
+ response = yield self._device.omci_cc.send_get_all_alarm_next(seq_no)
+ self.strobe_watchdog()
+
+ omci_msg = response.fields['omci_message'].fields
+ alarm_class_id = omci_msg['alarmed_entity_class']
+ alarm_entity_id = omci_msg['alarmed_entity_id']
+
+ alarm_bit_map = omci_msg['alarm_bit_map']
+ attributes = {AlarmDbExternal.ALARM_BITMAP_KEY: alarm_bit_map}
+
+ # Save to the database
+ self._db_active.set(self.device_id, alarm_class_id,
+ alarm_entity_id, attributes)
+ break
+
+ except TimeoutError:
+ self.log.warn('alarm-resync-timeout', seq_no=seq_no,
+ command_sequence_number=command_sequence_number)
+
+ if retries < max_tries - 1:
+ yield asleep(AlarmResyncTask.alarm_upload_next_delay)
+ self.strobe_watchdog()
+ else:
+ raise
+
+ except Exception as e:
+ self.log.exception('resync', e=e, seq_no=seq_no,
+ command_sequence_number=command_sequence_number)
+
+ returnValue(seq_no + 1) # seq_no is zero based and alarm table.
+
+ def compare_mibs(self, db_copy, db_active):
+ """
+ Compare the our db_copy with the ONU's active copy
+
+ :param db_copy: (dict) OpenOMCI's copy of the database
+ :param db_active: (dict) ONU's database snapshot
+ :return: (dict), (dict), dict() Differences
+ """
+ self.strobe_watchdog()
+
+ # Class & Entities only in local copy (OpenOMCI)
+ on_olt_only = self.get_lsh_only_dict(db_copy, db_active)
+
+ # Class & Entities only on remote (ONU)
+ on_onu_only = self.get_lsh_only_dict(db_active, db_copy)
+
+ # Class & Entities on both local & remote, but one or more attributes
+ # are different on the ONU. This is the value that the local (OpenOMCI)
+ # thinks should be on the remote (ONU)
+
+ me_map = self.omci_agent.get_device(self.device_id).me_map
+ attr_diffs = self.get_attribute_diffs(db_copy, db_active, me_map)
+
+ return on_olt_only, on_onu_only, attr_diffs
+
+ def get_lsh_only_dict(self, lhs, rhs):
+ """
+ Compare two MIB database dictionaries and return the ME Class ID and
+ instances that are unique to the lhs dictionary. Both parameters
+ should be in the common MIB Database output dictionary format that
+ is returned by the mib 'query' command.
+
+ :param lhs: (dict) Left-hand-side argument.
+ :param rhs: (dict) Right-hand-side argument
+
+ return: (list(int,int)) List of tuples where (class_id, inst_id)
+ """
+ results = list()
+
+ for cls_id, cls_data in lhs.items():
+ # Get unique classes
+ #
+ # Skip keys that are not class IDs
+ if not isinstance(cls_id, int):
+ continue
+
+ if cls_id not in rhs:
+ results.extend([(cls_id, inst_id) for inst_id in cls_data.keys()
+ if isinstance(inst_id, int)])
+ else:
+ # Get unique instances of a class
+ lhs_cls = cls_data
+ rhs_cls = rhs[cls_id]
+
+ for inst_id, _ in lhs_cls.items():
+ # Skip keys that are not instance IDs
+ if isinstance(cls_id, int) and inst_id not in rhs_cls:
+ results.extend([(cls_id, inst_id)])
+
+ return results
+
+ def get_attribute_diffs(self, omci_copy, onu_copy, me_map):
+ """
+ Compare two OMCI MIBs and return the ME class and instance IDs that exists
+ on both the local copy and the remote ONU that have different attribute
+ values. Both parameters should be in the common MIB Database output
+ dictionary format that is returned by the mib 'query' command.
+
+ :param omci_copy: (dict) OpenOMCI copy (OLT-side) of the MIB Database
+ :param onu_copy: (dict) active ONU latest copy its database
+ :param me_map: (dict) ME Class ID MAP for this ONU
+
+ return: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute)
+ points to the specific ME instance where attributes
+ are different
+ """
+ results = list()
+
+ # Get class ID's that are in both
+ class_ids = {cls_id for cls_id, _ in omci_copy.items()
+ if isinstance(cls_id, int) and cls_id in onu_copy}
+
+ for cls_id in class_ids:
+ # Get unique instances of a class
+ olt_cls = omci_copy[cls_id]
+ onu_cls = onu_copy[cls_id]
+
+ # Get set of common instance IDs
+ inst_ids = {inst_id for inst_id, _ in olt_cls.items()
+ if isinstance(inst_id, int) and inst_id in onu_cls}
+
+ for inst_id in inst_ids:
+ omci_attributes = {k for k in olt_cls[inst_id][ATTRIBUTES_KEY].iterkeys()}
+ onu_attributes = {k for k in onu_cls[inst_id][ATTRIBUTES_KEY].iterkeys()}
+
+ # Get attributes that exist in one database, but not the other
+ sym_diffs = (omci_attributes ^ onu_attributes)
+ results.extend([(cls_id, inst_id, attr) for attr in sym_diffs])
+
+ # Get common attributes with different values
+ common_attributes = (omci_attributes & onu_attributes)
+ results.extend([(cls_id, inst_id, attr) for attr in common_attributes
+ if olt_cls[inst_id][ATTRIBUTES_KEY][attr] !=
+ onu_cls[inst_id][ATTRIBUTES_KEY][attr]])
+ return results
diff --git a/python/adapters/extensions/omci/tasks/file_download_task.py b/python/adapters/extensions/omci/tasks/file_download_task.py
new file mode 100755
index 0000000..63da427
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/file_download_task.py
@@ -0,0 +1,108 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, AlreadyCalledError
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+import requests
+import os
+import time
+
+class FileDownloadTask(Task):
+ name = "Image File Download Task"
+ CHUNK_SIZE = 1024
+
+ def __init__(self, omci_agent, img_dnld, clock= None): #device_id, url, local_path)
+ super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, img_dnld.id,
+ exclusive=False,
+ watchdog_timeout=45)
+ # self.url = url
+ # self.local_path = local_path
+ self._image_download = img_dnld
+ self.reactor = clock if clock is not None else reactor
+ self._local_deferred = None
+ # self._request = None
+ # self._file = None
+ # self.log.debug('{} running'.format(FileDownloadTask.name))
+
+ # def __save_data(self):
+ # chunk = self._request.iter_content(chunk_size=FileDownloadTask.CHUNK_SIZE)
+ # if len(chunk) == 0:
+ # self._file.close()
+ # self.deferred.callback(self._image_download)
+ # else:
+ # self._file.write(chunk)
+ # self._image_download.downloaded_bytes += len(chunk)
+ # self.reactor.callLater(0, self.__save_data)
+
+ @inlineCallbacks
+ def perform_download_data(self):
+ try:
+ r = requests.get(self._image_download.url, stream=True)
+ with open(self._image_download.local_dir + '/' + self._image_download.name, 'wb') as f:
+ for chunk in r.iter_content(chunk_size=FileDownloadTask.CHUNK_SIZE):
+ self.strobe_watchdog()
+ if chunk: # filter out keep-alive new chunks
+ yield f.write(chunk)
+ self._image_download.file_size += len(chunk)
+ # yield time.sleep(1)
+ self.deferred.callback(self._image_download)
+ except Exception as e:
+ self.deferred.errback(failure.Failure(e))
+
+ def start(self):
+ super(FileDownloadTask, self).start()
+ if not os.path.exists(self._image_download.local_dir):
+ os.makedirs(self._image_download.local_dir)
+
+ self.strobe_watchdog()
+ self._image_download.file_size = 0
+ self._local_deferred = self.reactor.callLater(0, self.perform_download_data)
+ # try:
+ # if not os.path.exists(self._image_download.local_dir):
+ # os.makedirs(self._image_download.local_dir)
+
+ # self.strobe_watchdog()
+ # self._image_download.downloaded_bytes = 0
+ # self.reactor.callLater(0, self.perform_download_data)
+
+ # self._request = requests.get(self._image_download.url, stream=True)
+ # with open(self._image_download.local_dir + '/' + self._image_download.name, 'wb') as f:
+ # for chunk in r.iter_content(chunk_size=FileDownloadTask.CHUNK_SIZE):
+ # self.strobe_watchdog()
+ # if chunk: # filter out keep-alive new chunks
+ # f.write(chunk)
+ # self._image_download.downloaded_bytes += len(chunk)
+
+ # self.deferred.callback(self._image_download)
+ # except Exception as e:
+ # self.deferred.errback(failure.Failure(e))
+
+ # def stop(self):
+ # # self.cancel_deferred()
+ # super(FileDownloadTask, self).stop()
+
+ def cancel_deferred(self):
+ self.log.debug('FileDownloadTask cancel_deferred')
+ super(FileDownloadTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
diff --git a/python/adapters/extensions/omci/tasks/get_mds_task.py b/python/adapters/extensions/omci/tasks/get_mds_task.py
new file mode 100644
index 0000000..1560c83
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/get_mds_task.py
@@ -0,0 +1,112 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_me import OntDataFrame
+from voltha.extensions.omci.omci_defs import ReasonCodes as RC
+
+
+class GetMdsTask(Task):
+ """
+ OpenOMCI Get MIB Data Sync value task
+
+ On successful completion, this task will call the 'callback' method of the
+ deferred returned by the start method and return the value of the MIB
+ Data Sync attribute of the ONT Data ME
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "Get MDS Task"
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(GetMdsTask, self).__init__(GetMdsTask.name,
+ omci_agent,
+ device_id,
+ priority=GetMdsTask.task_priority)
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(GetMdsTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(GetMdsTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_mds)
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(GetMdsTask, self).stop()
+
+ @inlineCallbacks
+ def perform_get_mds(self):
+ """
+ Get the 'mib_data_sync' attribute of the ONU
+ """
+ self.log.debug('perform-get-mds')
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # Request (MDS supplied value does not matter for a 'get' request)
+
+ self.strobe_watchdog()
+ results = yield device.omci_cc.send(OntDataFrame().get())
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+
+ # Note: Currently the data reported by the Scapy decode is 16-bits since we need
+ # the data field that large in order to support MIB and Alarm Upload Next
+ # commands. Select only the first 8-bits since that is the size of the MIB
+ # Data Sync attribute
+ mds = (omci_msg['data']['mib_data_sync'] >> 8) & 0xFF \
+ if 'data' in omci_msg and 'mib_data_sync' in omci_msg['data'] else -1
+
+ self.log.debug('ont-data-mds', status=status, mib_data_sync=mds)
+
+ assert status == RC.Success, 'Unexpected Response Status: {}'.format(status)
+
+ # Successful if here
+ self.deferred.callback(mds)
+
+ except TimeoutError as e:
+ self.log.warn('get-mds-timeout', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('get-mds', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/interval_data_task.py b/python/adapters/extensions/omci/tasks/interval_data_task.py
new file mode 100644
index 0000000..d41c1d0
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/interval_data_task.py
@@ -0,0 +1,198 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from datetime import datetime
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_frame import OmciFrame, OmciGet
+
+
+class IntervalDataTaskFailure(Exception):
+ pass
+
+
+class IntervalDataTask(Task):
+ """
+ OpenOMCI Performance Interval Get Request
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "Interval Data Task"
+ max_payload = 29
+
+ def __init__(self, omci_agent, device_id, class_id, entity_id,
+ max_get_response_payload=max_payload,
+ parent_class_id=None,
+ parent_entity_id=None,
+ upstream=None):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME entity ID
+ :param max_get_response_payload: (int) Maximum number of octets in a
+ single GET response frame
+ """
+ super(IntervalDataTask, self).__init__(IntervalDataTask.name,
+ omci_agent,
+ device_id,
+ priority=IntervalDataTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._class_id = class_id
+ self._entity_id = entity_id
+
+ self._parent_class_id = parent_class_id
+ self._parent_entity_id = parent_entity_id
+ self._upstream = upstream
+
+ me_map = self.omci_agent.get_device(self.device_id).me_map
+ if self._class_id not in me_map:
+ msg = "The requested ME Class () does not exist in the ONU's ME Map".format(self._class_id)
+ self.log.warn('unknown-pm-me', msg=msg)
+ raise IntervalDataTaskFailure(msg)
+
+ self._entity = me_map[self._class_id]
+ self._counter_attributes = self.get_counter_attributes_names_and_size()
+ self._max_payload = max_get_response_payload
+
+ def cancel_deferred(self):
+ super(IntervalDataTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the tasks
+ """
+ super(IntervalDataTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_interval)
+
+ def stop(self):
+ """
+ Shutdown the tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(IntervalDataTask, self).stop()
+
+ def get_counter_attributes_names_and_size(self):
+ """
+ Get all of the counter attributes names and the amount of storage they take
+
+ :return: (dict) Attribute name -> length
+ """
+ return {name: self._entity.attributes[attr_index].field.sz
+ for name, attr_index in self._entity.attribute_name_to_index_map.items()
+ if self._entity.attributes[attr_index].is_counter}
+
+ @inlineCallbacks
+ def perform_get_interval(self):
+ """
+ Sync the time
+ """
+ self.log.info('perform-get-interval', class_id=self._class_id,
+ entity_id=self._entity_id)
+
+ device = self.omci_agent.get_device(self.device_id)
+ attr_names = self._counter_attributes.keys()
+
+ final_results = {
+ 'class_id': self._class_id,
+ 'entity_id': self._entity_id,
+ 'me_name': self._entity.__name__, # Mostly for debugging...
+ 'interval_utc_time': None,
+ 'parent_class_id': self._parent_class_id,
+ 'parent_entity_id': self._parent_entity_id,
+ 'upstream': self._upstream
+ # Counters added here as they are retrieved
+ }
+ last_end_time = None
+
+ while len(attr_names) > 0:
+ # Get as many attributes that will fit. Always include the 1 octet
+ # Interval End Time Attribute and 2 octets for the Entity ID
+
+ remaining_payload = self._max_payload - 3
+ attributes = list()
+ for name in attr_names:
+ if self._counter_attributes[name] > remaining_payload:
+ break
+
+ attributes.append(name)
+ remaining_payload -= self._counter_attributes[name]
+
+ attr_names = attr_names[len(attributes):]
+ attributes.append('interval_end_time')
+
+ frame = OmciFrame(
+ transaction_id=None,
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._class_id,
+ entity_id=self._entity_id,
+ attributes_mask=self._entity.mask_for(*attributes)
+ )
+ )
+ self.log.debug('interval-get-request', class_id=self._class_id,
+ entity_id=self._entity_id)
+ try:
+ self.strobe_watchdog()
+ results = yield device.omci_cc.send(frame)
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ end_time = omci_msg['data'].get('interval_end_time')
+
+ self.log.debug('interval-get-results', class_id=self._class_id,
+ entity_id=self._entity_id, status=status,
+ end_time=end_time)
+
+ if status != ReasonCodes.Success:
+ raise IntervalDataTaskFailure('Unexpected Response Status: {}, Class ID: {}'.
+ format(status, self._class_id))
+ if last_end_time is None:
+ last_end_time = end_time
+
+ elif end_time != last_end_time:
+ msg = 'Interval End Time Changed during retrieval from {} to {}'\
+ .format(last_end_time, end_time)
+ self.log.info('interval-roll-over', msg=msg, class_id=self._class_id)
+ raise IntervalDataTaskFailure(msg)
+
+ final_results['interval_utc_time'] = datetime.utcnow()
+ for attribute in attributes:
+ final_results[attribute] = omci_msg['data'].get(attribute)
+
+ except TimeoutError as e:
+ self.log.warn('interval-get-timeout', e=e, class_id=self._class_id,
+ entity_id=self._entity_id, attributes=attributes)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('interval-get-failure', e=e, class_id=self._class_id)
+ self.deferred.errback(failure.Failure(e))
+
+ # Successful if here
+ self.deferred.callback(final_results)
diff --git a/python/adapters/extensions/omci/tasks/mib_reconcile_task.py b/python/adapters/extensions/omci/tasks/mib_reconcile_task.py
new file mode 100644
index 0000000..38e29dc
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/mib_reconcile_task.py
@@ -0,0 +1,693 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from common.utils.asleep import asleep
+from voltha.extensions.omci.tasks.task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure, returnValue, TimeoutError
+from voltha.extensions.omci.omci_defs import *
+from voltha.extensions.omci.omci_me import OntDataFrame
+from voltha.extensions.omci.omci_frame import OmciFrame, OmciDelete, OmciCreate, OmciSet
+from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY
+
+OP = EntityOperations
+RC = ReasonCodes
+AA = AttributeAccess
+
+
+class MibReconcileException(Exception):
+ pass
+
+
+class MibPartialSuccessException(Exception):
+ pass
+
+
+class MibReconcileTask(Task):
+ """
+ OpenOMCI MIB Reconcile Task
+
+ This task attempts to resynchronize the MIB. Note that it runs in exclusive
+ OMCI-CC mode so that it can query the current database/ONU to verify the
+ differences still exist before correcting them.
+ """
+ task_priority = 240
+ name = "MIB Reconcile Task"
+ max_sequential_db_updates = 5 # Be kind, rewind
+ db_update_pause = 0.05 # 50mS
+
+ def __init__(self, omci_agent, device_id, diffs):
+ """
+ Class initialization
+
+ :param omci_agent: (OpenOMCIAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param diffs: (dict) Dictionary of what was found to be invalid
+ """
+ super(MibReconcileTask, self).__init__(MibReconcileTask.name,
+ omci_agent,
+ device_id,
+ priority=MibReconcileTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._diffs = diffs
+ self._device = None
+ self._sync_sm = None
+ self._db_updates = 0 # For tracking sequential blocking consul/etcd updates
+
+ def cancel_deferred(self):
+ super(MibReconcileTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Reconcile task
+ """
+ super(MibReconcileTask, self).start()
+
+ self._device = self.omci_agent.get_device(self.device_id)
+
+ if self._device is None:
+ e = MibReconcileException('Device {} no longer exists'.format(self.device_id))
+ self.deferred.errback(failure.Failure(e))
+ return
+
+ self._sync_sm = self._device.mib_synchronizer
+
+ if self._device is None:
+ e = MibReconcileException('Device {} MIB State machine no longer exists'.format(self.device_id))
+ self.deferred.errback(failure.Failure(e))
+ return
+
+ self._local_deferred = reactor.callLater(0, self.perform_mib_reconcile)
+
+ def stop(self):
+ """
+ Shutdown MIB Reconcile task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ super(MibReconcileTask, self).stop()
+
+ @inlineCallbacks
+ def perform_mib_reconcile(self):
+ """
+ Perform the MIB Reconciliation sequence.
+
+ The sequence to reconcile will be to clean up ONU only MEs, followed by
+ OLT/OpenOMCI-only MEs, and then finally correct common MEs with differing
+ attributes.
+ """
+ self.log.debug('perform-mib-reconcile')
+
+ try:
+ successes = 0
+ failures = 0
+
+ if self._diffs['onu-only'] is not None and len(self._diffs['onu-only']):
+ results = yield self.fix_onu_only(self._diffs['onu-only'],
+ self._diffs['onu-db'])
+ self.log.debug('onu-only-results', good=results[0], bad=results[1])
+ successes += results[0]
+ failures += results[1]
+
+ if self._diffs['olt-only'] is not None and len(self._diffs['olt-only']):
+ results = yield self.fix_olt_only(self._diffs['olt-only'],
+ self._diffs['onu-db'],
+ self._diffs['olt-db'])
+ self.log.debug('olt-only-results', good=results[0], bad=results[1])
+ successes += results[0]
+ failures += results[1]
+
+ if self._diffs['attributes'] is not None and len(self._diffs['attributes']):
+ results = yield self.fix_attributes_only(self._diffs['attributes'],
+ self._diffs['onu-db'],
+ self._diffs['olt-db'])
+ self.log.debug('attributes-results', good=results[0], bad=results[1])
+ successes += results[0]
+ failures += results[1]
+
+ # Success? Update MIB-data-sync
+ if failures == 0:
+ results = yield self.update_mib_data_sync()
+ successes += results[0]
+ failures += results[1]
+
+ # Send back final status
+ if failures > 0:
+ msg = '{} Successful updates, {} failures'.format(successes, failure)
+ error = MibPartialSuccessException(msg) if successes \
+ else MibReconcileException(msg)
+ self.deferred.errback(failure.Failure(error))
+ else:
+ self.deferred.callback('{} Successful updates'.format(successes))
+
+ except Exception as e:
+ if not self.deferred.called:
+ self.log.exception('reconcile', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def fix_onu_only(self, onu, onu_db):
+ """
+ Fix ME's that were only found on the ONU. For ONU only MEs there are
+ the following things that will be checked.
+
+ o ME's that do not have an OpenOMCI class decoder. These are stored
+ as binary blobs in the MIB database. Since we do not ever set them
+ (since no encoder as well), just store them in the OLT/OpenOMCI MIB
+ Database.
+
+ o For ME's that are created by the ONU (no create/delete access), the
+ MEs 'may' be due to a firmware upgrade and reboot or in response to
+ an OLT creating another ME entity and then creating this ME. Place
+ these 'new' into the database.
+
+ o For ME's that are created by the OLT/OpenOMCI, delete them from the
+ ONU
+
+ :param onu: (list(int,int)) List of tuples where (class_id, inst_id)
+ :param onu_db: (dict) ONU Database snapshot at time of audit
+
+ :return: (int, int) successes, failures
+ """
+ successes = 0
+ failures = 0
+ me_map = self._device.me_map
+
+ ####################################################################
+ # First the undecodables and onu-created (treated the same)
+ undecodable = self._undecodable(onu, me_map)
+ onu_created = self._onu_created(onu, me_map)
+
+ if len(undecodable) or len(onu_created):
+ results = yield self.fix_onu_only_save_to_db(undecodable, onu_created, onu_db)
+ successes += results[0]
+ failures += results[1]
+
+ ####################################################################
+ # Last the OLT created values, resend these to the ONU
+
+ olt_created = self._olt_created(onu, me_map)
+ if len(olt_created):
+ results = yield self.fix_onu_only_remove_from_onu(olt_created)
+ successes += results[0]
+ failures += results[1]
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_onu_only_save_to_db(self, undecodable, onu_created, onu_db):
+ """
+ In ONU database and needs to be saved to OLT/OpenOMCI database.
+
+ Note that some, perhaps all, of these instances could be ONU create
+ in response to the OLT creating some other ME instance. So treat
+ the Database operation as a create.
+ """
+ successes = 0
+ failures = 0
+
+ for cid, eid in undecodable + onu_created:
+ if self.deferred.called: # Check if task canceled
+ break
+ try:
+ # If in current MIB, had an audit issue or other MIB operation
+ # put it into the database, declare it a failure so we audit again
+ try:
+ olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid)
+
+ except KeyError: # Common for ONU created MEs during audit
+ olt_entry = None
+
+ if olt_entry is not None and len(olt_entry):
+ self.log.debug('onu-only-in-current', cid=cid, eid=eid)
+ failures += 1 # Mark as failure so we audit again
+
+ elif cid not in onu_db:
+ self.log.warn('onu-only-not-in-audit', cid=cid, eid=eid)
+ failures += 1
+
+ else:
+ entry = onu_db[cid][eid]
+ self.strobe_watchdog()
+ self._sync_sm.mib_set(cid, eid, entry[ATTRIBUTES_KEY])
+ successes += 1
+
+ # If we do nothing but DB updates for ALOT of MEs, we are
+ # blocking other async twisted tasks, be kind and pause
+ self._db_updates += 1
+
+ if self._db_updates >= MibReconcileTask.max_sequential_db_updates:
+ self._db_updates = 0
+ self._local_deferred = yield asleep(MibReconcileTask.db_update_pause)
+
+ except Exception as e:
+ self.log.warn('onu-only-error', e=e)
+ failures += 1
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_onu_only_remove_from_onu(self, olt_created,):
+ """ On ONU, but no longer on OLT/OpenOMCI, delete it """
+ successes = 0
+ failures = 0
+
+ for cid, eid in olt_created:
+ if self.deferred.called: # Check if task canceled
+ break
+ try:
+ # If in current MIB, had an audit issue, declare it an error
+ # and next audit should clear it up
+ try:
+ current_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid)
+
+ except KeyError:
+ # Expected if no other entities with same class present in MIB
+ current_entry = None
+
+ if current_entry is not None and len(current_entry):
+ self.log.debug('onu-only-in-current', cid=cid, eid=eid)
+ failures += 1
+
+ else:
+ # Delete it from the ONU. Assume success
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciDelete.message_id,
+ omci_message=OmciDelete(entity_class=cid, entity_id=eid))
+
+ self._local_deferred = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(self._local_deferred, 'onu-attribute-update')
+ successes += 1
+ self._db_updates = 0
+
+ except Exception as e:
+ self.log.warn('olt-only-error', e=e)
+ failures += 1
+ self.strobe_watchdog()
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_olt_only(self, olt, onu_db, olt_db):
+ """
+ Fix ME's that were only found on the OLT. For OLT only MEs there are
+ the following things that will be checked.
+
+ o ME's that do not have an OpenOMCI class decoder. These are stored
+ as binary blobs in the MIB database. Since the OLT will never
+ create these (all are learned from ONU), it is assumed the ONU
+ has removed them for some purpose. So delete them from the OLT
+ database.
+
+ o For ME's that are created by the ONU (no create/delete access), the
+ MEs 'may' not be on the ONU because of a reboot or an OLT created
+ ME was deleted and the ONU gratuitously removes it. So delete them
+ from the OLT database.
+
+ o For ME's that are created by the OLT/OpenOMCI, delete them from the
+ ONU
+
+ :param olt: (list(int,int)) List of tuples where (class_id, inst_id)
+ :param onu_db: (dict) ONU Database snapshot at time of audit
+ :param olt_db: (dict) OLT Database snapshot at time of audit
+
+ :return: (int, int) successes, failures
+ """
+ successes = 0
+ failures = 0
+ me_map = self._device.me_map
+
+ ####################################################################
+ # First the undecodables and onu-created (treated the same) remove
+ # from OpenOMCI database
+ undecodable = self._undecodable(olt, me_map)
+ onu_created = self._onu_created(olt, me_map)
+
+ if len(undecodable) or len(onu_created):
+ good, bad = self.fix_olt_only_remove_from_db(undecodable, onu_created)
+ successes += good
+ failures += bad
+
+ ####################################################################
+ # Last the OLT created
+
+ olt_created = self._olt_created(olt, me_map)
+ if len(olt_created):
+ results = yield self.fix_olt_only_create_on_onu(olt_created, me_map)
+ successes += results[0]
+ failures += results[1]
+
+ returnValue((successes, failures))
+
+ def fix_olt_only_remove_from_db(self, undecodable, onu_created):
+ """ On OLT, but not on ONU and are ONU created, delete from OLT/OpenOMCI DB """
+ successes = 0
+ failures = 0
+
+ for cid, eid in undecodable + onu_created:
+ if self.deferred.called: # Check if task canceled
+ break
+ try:
+ # Delete it. If already deleted (KeyError), then that is okay
+ self._sync_sm.mib_delete(cid, eid)
+ self.strobe_watchdog()
+
+ except KeyError:
+ successes += 1 # Not found in DB anymore, assume success
+
+ except Exception as e:
+ self.log.warn('olt-only-db-error', cid=cid, eid=eid, e=e)
+ failures += 1
+
+ return successes, failures
+
+ @inlineCallbacks
+ def fix_olt_only_create_on_onu(self, olt_created, me_map):
+ """ Found on OLT and created by OLT, so create on ONU"""
+ successes = 0
+ failures = 0
+
+ for cid, eid in olt_created:
+ if self.deferred.called: # Check if task canceled
+ break
+
+ try:
+ # Get current entry, use it if found
+ olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid)
+ me_entry = me_map[cid]
+
+ if olt_entry is None or len(olt_entry) == 0:
+ successes += 1 # Deleted before task got to run
+ else:
+ # Create it in the ONU. Only set-by-create attributes allowed
+ sbc_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items()
+ if AA.SetByCreate in
+ next((attr.access for attr in me_entry.attributes
+ if attr.field.name == k), set())}
+
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(entity_class=cid,
+ entity_id=eid,
+ data=sbc_data))
+
+ self._local_deferred = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(self._local_deferred, 'olt-create-sbc')
+ successes += 1
+ self._db_updates = 0
+
+ # Try any writeable attributes now (but not set-by-create)
+ writeable_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items()
+ if AA.Writable in
+ next((attr.access for attr in me_entry.attributes
+ if attr.field.name == k), set())
+ and AA.SetByCreate not in
+ next((attr.access for attr in me_entry.attributes
+ if attr.field.name == k), set())}
+
+ if len(writeable_data):
+ attributes_mask = me_entry.mask_for(*writeable_data.keys())
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(entity_class=cid,
+ entity_id=eid,
+ attributes_mask=attributes_mask,
+ data=writeable_data))
+
+ self._local_deferred = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(self._local_deferred, 'olt-set-writeable')
+ successes += 1
+
+ except Exception as e:
+ self.log.exception('olt-only-fix', e=e, cid=cid, eid=eid)
+ failures += 1
+ self.strobe_watchdog()
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_attributes_only(self, attrs, onu_db, olt_db):
+ """
+ Fix ME's that were found on both the ONU and OLT, but had differing
+ attribute values. There are several cases to handle here
+
+ o For ME's created on the ONU that have write attributes that
+ only exist in the ONU's database, copy these to the OLT/OpenOMCI
+ database
+
+ o For all other writeable attributes, the OLT value takes precedence
+
+ :param attrs: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute)
+ points to the specific ME instance where attributes
+ are different
+ :param onu_db: (dict) ONU Database snapshot at time of audit
+ :param olt_db: (dict) OLT Database snapshot at time of audit
+
+ :return: (int, int) successes, failures
+ """
+ successes = 0
+ failures = 0
+ me_map = self._device.me_map
+
+ # Collect up attributes on a per CID/EID basis. This will result in
+ # the minimal number of operations to either the database of over
+ # the OMCI-CC to the ONU
+
+ attr_map = dict()
+ for cid, eid, attribute in attrs:
+ if (cid, eid) not in attr_map:
+ attr_map[(cid, eid)] = {attribute}
+ else:
+ attr_map[(cid, eid)].add(attribute)
+
+ for entity_pair, attributes in attr_map.items():
+ cid = entity_pair[0]
+ eid = entity_pair[1]
+
+ # Skip MEs we cannot encode/decode
+ if cid not in me_map:
+ self.log.warn('no-me-map-decoder', class_id=cid)
+ failures += 1
+ continue
+
+ if self.deferred.called: # Check if task canceled
+ break
+
+ # Build up MIB set commands and ONU Set (via OMCI) commands
+ # based of the attributes
+ me_entry = me_map[cid]
+ mib_data_to_save = dict()
+ onu_data_to_set = dict()
+ olt_attributes = olt_db[cid][eid][ATTRIBUTES_KEY]
+ onu_attributes = onu_db[cid][eid][ATTRIBUTES_KEY]
+
+ for attribute in attributes:
+ map_access = next((attr.access for attr in me_entry.attributes
+ if attr.field.name == attribute), set())
+ writeable = AA.Writable in map_access or AA.SetByCreate in map_access
+
+ # If only in ONU database snapshot, save it to OLT
+ if attribute in onu_attributes and attribute not in olt_attributes:
+ # On onu only
+ mib_data_to_save[attribute] = onu_attributes[attribute]
+
+ elif writeable:
+ # On olt only or in both. Either way OLT wins
+ onu_data_to_set[attribute] = olt_attributes[attribute]
+
+ # Now do the bulk operations For both, check to see if the target
+ # is still the same as when the audit was performed. If it is, do
+ # the commit. If not, mark as a failure so an expedited audit will
+ # occur and check again.
+
+ if len(mib_data_to_save):
+ results = yield self.fix_attributes_only_in_mib(cid, eid, mib_data_to_save)
+ successes += results[0]
+ failures += results[1]
+
+ if len(onu_data_to_set):
+ results = yield self.fix_attributes_only_on_olt(cid, eid, onu_data_to_set, olt_db, me_entry)
+ successes += results[0]
+ failures += results[1]
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_attributes_only_in_mib(self, cid, eid, mib_data):
+ successes = 0
+ failures = 0
+ try:
+ # Get current and verify same as during audit it is missing from our DB
+ attributes = mib_data.keys()
+ current_entry = self._device.query_mib(cid, eid, attributes)
+
+ if current_entry is not None and len(current_entry):
+ clashes = {k: v for k, v in current_entry.items()
+ if k in attributes and v != mib_data[k]}
+
+ if len(clashes):
+ raise ValueError('Existing DB entry for {}/{} attributes clash with audit data. Clash: {}'.
+ format(cid, eid, clashes))
+
+ self._sync_sm.mib_set(cid, eid, mib_data)
+ successes += len(mib_data)
+ self.strobe_watchdog()
+
+ # If we do nothing but DB updates for ALOT of MEs, we are
+ # blocking other async twisted tasks, be kind and yield
+ self._db_updates += 1
+ if self._db_updates >= MibReconcileTask.max_sequential_db_updates:
+ self._db_updates = 0
+ self._local_deferred = yield asleep(MibReconcileTask.db_update_pause)
+
+ except ValueError as e:
+ self.log.debug('attribute-changed', e)
+ failures += len(mib_data)
+
+ except Exception as e:
+ self.log.exception('attribute-only-fix-mib', e=e, cid=cid, eid=eid)
+ failures += len(mib_data)
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_attributes_only_on_olt(self, cid, eid, onu_data, olt_db, me_entry):
+ successes = 0
+ failures = 0
+
+ try:
+ # On olt only or in both. Either way OLT wins, first verify that
+ # the OLT version is still the same data that we want to
+ # update on the ONU. Verify the data for the OLT is the same as
+ # at time of audit
+ olt_db_entries = {k: v for k, v in olt_db[cid][eid][ATTRIBUTES_KEY].items()
+ if k in onu_data.keys()}
+ current_entries = self._sync_sm.query_mib(class_id=cid, instance_id=eid,
+ attributes=onu_data.keys())
+
+ still_the_same = all(current_entries.get(k) == v for k, v in olt_db_entries.items())
+ if not still_the_same:
+ returnValue((0, len(onu_data))) # Wait for it to stabilize
+
+ # OLT data still matches, do the set operations now
+ # while len(onu_data):
+ attributes_mask = me_entry.mask_for(*onu_data.keys())
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(entity_class=cid,
+ entity_id=eid,
+ attributes_mask=attributes_mask,
+ data=onu_data))
+
+ results = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(results, 'onu-attribute-update')
+ successes += len(onu_data)
+ self._db_updates = 0
+
+ except Exception as e:
+ self.log.exception('attribute-only-fix-onu', e=e, cid=cid, eid=eid)
+ failures += len(onu_data)
+ self.strobe_watchdog()
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def update_mib_data_sync(self):
+ """
+ As the final step of MIB resynchronization, the OLT sets the MIB data sync
+ attribute of the ONU data ME to some suitable value of its own choice. It
+ then sets its own record of the same attribute to the same value,
+ incremented by 1, as explained in clause
+
+ :return: (int, int) success, failure counts
+ """
+ # Get MDS to set, do not user zero
+
+ new_mds_value = self._sync_sm.mib_data_sync
+ if new_mds_value == 0:
+ self._sync_sm.increment_mib_data_sync()
+ new_mds_value = self._sync_sm.mib_data_sync
+
+ # Update it. The set response will be sent on the OMCI-CC pub/sub bus
+ # and the MIB Synchronizer will update this MDS value in the database
+ # if successful.
+ try:
+ frame = OntDataFrame(mib_data_sync=new_mds_value).set()
+
+ results = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(results, 'ont-data-mbs-update')
+ returnValue((1, 0))
+
+ except TimeoutError as e:
+ self.log.debug('ont-data-send-timeout', e=e)
+ returnValue((0, 1))
+
+ except Exception as e:
+ self.log.exception('ont-data-send', e=e, mds=new_mds_value)
+ returnValue((0, 1))
+
+ def check_status_and_state(self, results, operation=''):
+ """
+ Check the results of an OMCI response. An exception is thrown
+ if the task was cancelled or an error was detected.
+
+ :param results: (OmciFrame) OMCI Response frame
+ :param operation: (str) what operation was being performed
+ :return: True if successful, False if the entity existed (already created)
+ """
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
+ failed_mask = omci_msg.get('failed_attributes_mask', 'n/a')
+ unsupported_mask = omci_msg.get('unsupported_attributes_mask', 'n/a')
+ self.strobe_watchdog()
+
+ self.log.debug(operation, status=status, error_mask=error_mask,
+ failed_mask=failed_mask, unsupported_mask=unsupported_mask)
+
+ if status == RC.Success:
+ return True
+
+ elif status == RC.InstanceExists:
+ return False
+
+ msg = '{} failed with a status of {}, error_mask: {}, failed_mask: {}, unsupported_mask: {}'.\
+ format(operation, status, error_mask, failed_mask, unsupported_mask)
+
+ raise MibReconcileException(msg)
+
+ def _undecodable(self, cid_eid_list, me_map):
+ return [(cid, eid) for cid, eid in cid_eid_list if cid not in me_map]
+
+ def _onu_created(self, cid_eid_list, me_map):
+ return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and
+ (OP.Create not in me_map[cid].mandatory_operations and
+ OP.Create not in me_map[cid].optional_operations)]
+
+ def _olt_created(self, cid_eid_list, me_map):
+ return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and
+ (OP.Create in me_map[cid].mandatory_operations or
+ OP.Create in me_map[cid].optional_operations)]
diff --git a/python/adapters/extensions/omci/tasks/mib_resync_task.py b/python/adapters/extensions/omci/tasks/mib_resync_task.py
new file mode 100644
index 0000000..ef9c531
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/mib_resync_task.py
@@ -0,0 +1,427 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, returnValue
+from twisted.internet import reactor
+from common.utils.asleep import asleep
+from voltha.extensions.omci.database.mib_db_dict import *
+from voltha.extensions.omci.omci_entities import OntData
+from voltha.extensions.omci.omci_defs import AttributeAccess, EntityOperations
+
+AA = AttributeAccess
+OP = EntityOperations
+
+class MibCopyException(Exception):
+ pass
+
+
+class MibDownloadException(Exception):
+ pass
+
+
+class MibResyncException(Exception):
+ pass
+
+
+class MibResyncTask(Task):
+ """
+ OpenOMCI MIB resynchronization Task
+
+ This task should get a copy of the MIB and compare compare it to a
+ copy of the database. When the MIB Upload command is sent to the ONU,
+ it should make a copy and source the data requested from this database.
+ The ONU can still source AVC's and the the OLT can still send config
+ commands to the actual.
+ """
+ task_priority = 240
+ name = "MIB Resynchronization Task"
+
+ max_db_copy_retries = 3
+ db_copy_retry_delay = 7
+
+ max_mib_upload_next_retries = 3
+ mib_upload_next_delay = 10 # Max * delay < 60 seconds
+ watchdog_timeout = 15 # Should be > max delay
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OpenOMCIAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(MibResyncTask, self).__init__(MibResyncTask.name,
+ omci_agent,
+ device_id,
+ priority=MibResyncTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._device = omci_agent.get_device(device_id)
+ self._db_active = MibDbVolatileDict(omci_agent)
+ self._db_active.start()
+
+ def cancel_deferred(self):
+ super(MibResyncTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Re-Synchronization task
+ """
+ super(MibResyncTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_mib_resync)
+ self._db_active.start()
+ self._db_active.add(self.device_id)
+
+ def stop(self):
+ """
+ Shutdown MIB Re-Synchronization task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ self._db_active.stop()
+ self._db_active = None
+ super(MibResyncTask, self).stop()
+
+ @inlineCallbacks
+ def perform_mib_resync(self):
+ """
+ Perform the MIB Resynchronization sequence
+
+ The sequence to be performed is:
+ - get a copy of the current MIB database (db_copy)
+
+ - perform MIB upload commands to get ONU's database and save this
+ to a local DB (db_active). Note that the ONU can still receive
+ create/delete/set/get operations from the operator and source
+ AVC notifications as well during this period.
+
+ - Compare the information in the db_copy to the db_active
+
+ During the mib upload process, the maximum time between mib upload next
+ requests is 1 minute.
+ """
+ self.log.debug('perform-mib-resync')
+
+ try:
+ results = yield self.snapshot_mib()
+ db_copy = results[0]
+
+ if db_copy is None:
+ e = MibCopyException('Failed to get local database copy')
+ self.deferred.errback(failure.Failure(e))
+
+ else:
+ number_of_commands = results[1]
+
+ # Start the MIB upload sequence
+ self.strobe_watchdog()
+ commands_retrieved = yield self.upload_mib(number_of_commands)
+
+ if commands_retrieved < number_of_commands:
+ e = MibDownloadException('Only retrieved {} of {} instances'.
+ format(commands_retrieved, number_of_commands))
+ self.deferred.errback(failure.Failure(e))
+ else:
+ # Compare the databases
+ active_copy = self._db_active.query(self.device_id)
+ on_olt_only, on_onu_only, attr_diffs = \
+ self.compare_mibs(db_copy, active_copy)
+
+ self.deferred.callback(
+ {
+ 'on-olt-only': on_olt_only if len(on_olt_only) else None,
+ 'on-onu-only': on_onu_only if len(on_onu_only) else None,
+ 'attr-diffs': attr_diffs if len(attr_diffs) else None,
+ 'olt-db': db_copy,
+ 'onu-db': active_copy
+ })
+
+ except Exception as e:
+ self.log.exception('resync', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def snapshot_mib(self):
+ """
+ Snapshot the MIB on the ONU and create a copy of our local MIB database
+
+ :return: (pair) (db_copy, number_of_commands)
+ """
+ db_copy = None
+ number_of_commands = None
+
+ try:
+ max_tries = MibResyncTask.max_db_copy_retries - 1
+
+ for retries in xrange(0, max_tries + 1):
+ # Send MIB Upload so ONU snapshots its MIB
+ try:
+ self.strobe_watchdog()
+ number_of_commands = yield self.send_mib_upload()
+
+ if number_of_commands is None:
+ if retries >= max_tries:
+ db_copy = None
+ break
+
+ except (TimeoutError, ValueError) as e:
+ self.log.warn('timeout-or-value-error', e=e)
+ if retries >= max_tries:
+ raise
+
+ self.strobe_watchdog()
+ yield asleep(MibResyncTask.db_copy_retry_delay)
+ continue
+
+ # Get a snapshot of the local MIB database
+ db_copy = self._device.query_mib()
+ # if we made it this far, no need to keep trying
+ break
+
+ except Exception as e:
+ self.log.exception('mib-resync', e=e)
+ raise
+
+ # Handle initial failures
+
+ if db_copy is None or number_of_commands is None:
+ raise MibCopyException('Failed to snapshot MIB copy after {} retries'.
+ format(MibResyncTask.max_db_copy_retries))
+
+ returnValue((db_copy, number_of_commands))
+
+ @inlineCallbacks
+ def send_mib_upload(self):
+ """
+ Perform MIB upload command and get the number of entries to retrieve
+
+ :return: (int) Number of commands to execute or None on error
+ """
+ ########################################
+ # Begin MIB Upload
+ try:
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send_mib_upload()
+
+ number_of_commands = results.fields['omci_message'].fields['number_of_commands']
+
+ if number_of_commands is None or number_of_commands <= 0:
+ raise ValueError('Number of commands was {}'.format(number_of_commands))
+
+ returnValue(number_of_commands)
+
+ except TimeoutError as e:
+ self.log.warn('mib-resync-get-timeout', e=e)
+ raise
+
+ @inlineCallbacks
+ def upload_mib(self, number_of_commands):
+ ########################################
+ # Begin MIB Upload
+ seq_no = None
+
+ for seq_no in xrange(number_of_commands):
+ max_tries = MibResyncTask.max_mib_upload_next_retries
+
+ for retries in xrange(0, max_tries):
+ try:
+ self.strobe_watchdog()
+ response = yield self._device.omci_cc.send_mib_upload_next(seq_no)
+
+ omci_msg = response.fields['omci_message'].fields
+ class_id = omci_msg['object_entity_class']
+ entity_id = omci_msg['object_entity_id']
+
+ # Filter out the 'mib_data_sync' from the database. We save that at
+ # the device level and do not want it showing up during a re-sync
+ # during data comparison
+ from binascii import hexlify
+ if class_id == OntData.class_id:
+ break
+
+ # The T&W ONU reports an ME with class ID 0 but only on audit. Perhaps others do as well.
+ if class_id == 0 or class_id > 0xFFFF:
+ self.log.warn('invalid-class-id', class_id=class_id)
+ break
+
+ attributes = {k: v for k, v in omci_msg['object_data'].items()}
+
+ # Save to the database
+ self._db_active.set(self.device_id, class_id, entity_id, attributes)
+ break
+
+ except TimeoutError:
+ self.log.warn('mib-resync-timeout', seq_no=seq_no,
+ number_of_commands=number_of_commands)
+
+ if retries < max_tries - 1:
+ self.strobe_watchdog()
+ yield asleep(MibResyncTask.mib_upload_next_delay)
+ else:
+ raise
+
+ except Exception as e:
+ self.log.exception('resync', e=e, seq_no=seq_no,
+ number_of_commands=number_of_commands)
+
+ returnValue(seq_no + 1) # seq_no is zero based.
+
+ def compare_mibs(self, db_copy, db_active):
+ """
+ Compare the our db_copy with the ONU's active copy
+
+ :param db_copy: (dict) OpenOMCI's copy of the database
+ :param db_active: (dict) ONU's database snapshot
+ :return: (dict), (dict), (list) Differences
+ """
+ self.strobe_watchdog()
+ me_map = self.omci_agent.get_device(self.device_id).me_map
+
+ # Class & Entities only in local copy (OpenOMCI)
+ on_olt_temp = self.get_lhs_only_dict(db_copy, db_active)
+
+ # Remove any entries that are not reported during an upload (but could
+ # be in our database copy. Retain undecodable class IDs.
+ on_olt_only = [(cid, eid) for cid, eid in on_olt_temp
+ if cid not in me_map or not me_map[cid].hidden]
+
+ # Further reduce the on_olt_only MEs reported in an audit to not
+ # include missed MEs that are ONU created. Not all ONUs report MEs
+ # that are ONU created unless we are doing the initial MIB upload.
+ # Adtran does report them, T&W may not as well as a few others
+ on_olt_only = [(cid, eid) for cid, eid in on_olt_only if cid in me_map and
+ (OP.Create in me_map[cid].mandatory_operations or
+ OP.Create in me_map[cid].optional_operations)]
+
+ # Class & Entities only on remote (ONU)
+ on_onu_only = self.get_lhs_only_dict(db_active, db_copy)
+
+ # Class & Entities on both local & remote, but one or more attributes
+ # are different on the ONU. This is the value that the local (OpenOMCI)
+ # thinks should be on the remote (ONU)
+
+ attr_diffs = self.get_attribute_diffs(db_copy, db_active, me_map)
+
+ # TODO: Note that certain MEs are excluded from the MIB upload. In particular,
+ # instances of some general purpose MEs, such as the Managed Entity ME and
+ # and the Attribute ME are not included in the MIB upload. Also all table
+ # attributes are not included in the MIB upload (but we do not yet support
+ # tables in this OpenOMCI implementation (VOLTHA v1.3.0)
+
+ return on_olt_only, on_onu_only, attr_diffs
+
+ def get_lhs_only_dict(self, lhs, rhs):
+ """
+ Compare two MIB database dictionaries and return the ME Class ID and
+ instances that are unique to the lhs dictionary. Both parameters
+ should be in the common MIB Database output dictionary format that
+ is returned by the mib 'query' command.
+
+ :param lhs: (dict) Left-hand-side argument.
+ :param rhs: (dict) Right-hand-side argument
+
+ return: (list(int,int)) List of tuples where (class_id, inst_id)
+ """
+ results = list()
+
+ for cls_id, cls_data in lhs.items():
+ # Get unique classes
+ #
+ # Skip keys that are not class IDs
+ if not isinstance(cls_id, int):
+ continue
+
+ if cls_id not in rhs:
+ results.extend([(cls_id, inst_id) for inst_id in cls_data.keys()
+ if isinstance(inst_id, int)])
+ else:
+ # Get unique instances of a class
+ lhs_cls = cls_data
+ rhs_cls = rhs[cls_id]
+
+ for inst_id, _ in lhs_cls.items():
+ # Skip keys that are not instance IDs
+ if isinstance(cls_id, int) and inst_id not in rhs_cls:
+ results.extend([(cls_id, inst_id)])
+
+ return results
+
+ def get_attribute_diffs(self, omci_copy, onu_copy, me_map):
+ """
+ Compare two OMCI MIBs and return the ME class and instance IDs that exists
+ on both the local copy and the remote ONU that have different attribute
+ values. Both parameters should be in the common MIB Database output
+ dictionary format that is returned by the mib 'query' command.
+
+ :param omci_copy: (dict) OpenOMCI copy (OLT-side) of the MIB Database
+ :param onu_copy: (dict) active ONU latest copy its database
+ :param me_map: (dict) ME Class ID MAP for this ONU
+
+ return: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute)
+ points to the specific ME instance where attributes
+ are different
+ """
+ results = list()
+ ro_set = {AA.R}
+
+ # Get class ID's that are in both
+ class_ids = {cls_id for cls_id, _ in omci_copy.items()
+ if isinstance(cls_id, int) and cls_id in onu_copy}
+
+ for cls_id in class_ids:
+ # Get unique instances of a class
+ olt_cls = omci_copy[cls_id]
+ onu_cls = onu_copy[cls_id]
+
+ # Weed out read-only attributes. Attributes on onu may be read-only. These
+ # will only show up it the OpenOMCI (OLT-side) database if it changed and
+ # an AVC Notification was sourced by the ONU
+ # TODO: These class IDs could be calculated once at ONU startup (at device add)
+ if cls_id in me_map:
+ ro_attrs = {attr.field.name for attr in me_map[cls_id].attributes
+ if attr.access == ro_set}
+ else:
+ # Here if partially defined ME (not defined in ME Map)
+ from voltha.extensions.omci.omci_cc import UNKNOWN_CLASS_ATTRIBUTE_KEY
+ ro_attrs = {UNKNOWN_CLASS_ATTRIBUTE_KEY}
+
+ # Get set of common instance IDs
+ inst_ids = {inst_id for inst_id, _ in olt_cls.items()
+ if isinstance(inst_id, int) and inst_id in onu_cls}
+
+ for inst_id in inst_ids:
+ omci_attributes = {k for k in olt_cls[inst_id][ATTRIBUTES_KEY].iterkeys()}
+ onu_attributes = {k for k in onu_cls[inst_id][ATTRIBUTES_KEY].iterkeys()}
+
+ # Get attributes that exist in one database, but not the other
+ sym_diffs = (omci_attributes ^ onu_attributes) - ro_attrs
+ results.extend([(cls_id, inst_id, attr) for attr in sym_diffs])
+
+ # Get common attributes with different values
+ common_attributes = (omci_attributes & onu_attributes) - ro_attrs
+ results.extend([(cls_id, inst_id, attr) for attr in common_attributes
+ if olt_cls[inst_id][ATTRIBUTES_KEY][attr] !=
+ onu_cls[inst_id][ATTRIBUTES_KEY][attr]])
+ return results
diff --git a/python/adapters/extensions/omci/tasks/mib_upload.py b/python/adapters/extensions/omci/tasks/mib_upload.py
new file mode 100644
index 0000000..4afd234
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/mib_upload.py
@@ -0,0 +1,158 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, AlreadyCalledError
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+
+
+class MibUploadFailure(Exception):
+ """
+ This error is raised by default when the upload fails
+ """
+
+
+class MibUploadTask(Task):
+ """
+ OpenOMCI MIB upload task
+
+ On successful completion, this task will call the 'callback' method of the
+ deferred returned by the start method. Only a textual message is provided as
+ the successful results and it lists the number of ME entities successfully
+ retrieved.
+
+ Note that the MIB Synchronization State Machine will get event subscription
+ information for the MIB Reset and MIB Upload Next requests and it is the
+ MIB Synchronization State Machine that actually populates the MIB Database.
+ """
+ task_priority = 250
+ name = "MIB Upload Task"
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(MibUploadTask, self).__init__(MibUploadTask.name,
+ omci_agent,
+ device_id,
+ priority=MibUploadTask.task_priority)
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(MibUploadTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(MibUploadTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_mib_upload)
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(MibUploadTask, self).stop()
+
+ @inlineCallbacks
+ def perform_mib_upload(self):
+ """
+ Perform the MIB Upload sequence
+ """
+ self.log.debug('perform-mib-upload')
+
+ seq_no = 0
+ number_of_commands = 0
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # MIB Reset
+ self.strobe_watchdog()
+ results = yield device.omci_cc.send_mib_reset()
+
+ status = results.fields['omci_message'].fields['success_code']
+ if status != ReasonCodes.Success.value:
+ raise MibUploadFailure('MIB Reset request failed with status code: {}'.
+ format(status))
+
+ ########################################
+ # Begin MIB Upload
+ self.strobe_watchdog()
+ results = yield device.omci_cc.send_mib_upload()
+
+ number_of_commands = results.fields['omci_message'].fields['number_of_commands']
+
+ for seq_no in xrange(number_of_commands):
+ if not device.active or not device.omci_cc.enabled:
+ raise MibUploadFailure('OMCI and/or ONU is not active')
+
+ for retry in range(0, 3):
+ try:
+ self.log.debug('mib-upload-next-request', seq_no=seq_no,
+ retry=retry,
+ number_of_commands=number_of_commands)
+ self.strobe_watchdog()
+ yield device.omci_cc.send_mib_upload_next(seq_no)
+
+ self.log.debug('mib-upload-next-success', seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ break
+
+ except TimeoutError as e:
+ from common.utils.asleep import asleep
+ self.log.warn('mib-upload-timeout', e=e, seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ if retry >= 2:
+ raise MibUploadFailure('Upload timeout failure on req {} of {}'.
+ format(seq_no + 1, number_of_commands))
+ self.strobe_watchdog()
+ yield asleep(0.3)
+
+ # Successful if here
+ self.log.info('mib-synchronized')
+ self.deferred.callback('success, loaded {} ME Instances'.
+ format(number_of_commands))
+
+ except TimeoutError as e:
+ self.log.warn('mib-upload-timeout-on-reset', e=e, seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ self.deferred.errback(failure.Failure(e))
+
+ except AlreadyCalledError:
+ # Can occur if task canceled due to MIB Sync state change
+ self.log.debug('already-called-exception', seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ assert self.deferred.called, \
+ 'Unexpected AlreadyCalledError exception: seq: {} of {}'.format(seq_no,
+ number_of_commands)
+ except Exception as e:
+ self.log.exception('mib-upload', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/omci_create_pm_task.py b/python/adapters/extensions/omci/tasks/omci_create_pm_task.py
new file mode 100644
index 0000000..355e26a
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/omci_create_pm_task.py
@@ -0,0 +1,150 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure, TimeoutError
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_messages import OmciCreate
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class CreatePMException(Exception):
+ pass
+
+
+class OmciCreatePMRequest(Task):
+ """
+ OpenOMCI routine to create the requested PM Interval MEs
+
+ TODO: Support of thresholding crossing alarms will be in a future VOLTHA release
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "ONU OMCI Create PM ME Task"
+
+ def __init__(self, omci_agent, device_id, me_dict, exclusive=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param me_dict: (dict) (pm cid, pm eid) -> (me cid, me eid, upstream)
+ :param exclusive: (bool) True if this Create request Task exclusively own the
+ OMCI-CC while running. Default: False
+ """
+ super(OmciCreatePMRequest, self).__init__(OmciCreatePMRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciCreatePMRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._me_dict = me_dict
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciCreatePMRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """ Start task """
+ super(OmciCreatePMRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_create)
+
+ @inlineCallbacks
+ def perform_create(self):
+ """ Perform the create requests """
+
+ try:
+ for pm, me in self._me_dict.items():
+ pm_class_id = pm[0]
+ pm_entity_id = pm[1]
+ me_class_id = me[0]
+ me_entity_id = me[1]
+ upstream = me[2]
+ self.log.debug('create-pm-me', class_id=pm_class_id, entity_id=pm_entity_id)
+
+ if me_class_id == 0:
+ # Typical/common PM interval format
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=pm_class_id,
+ entity_id=pm_entity_id,
+ data=dict()
+ )
+ )
+ else:
+ # Extended PM interval format. See ITU-T G.988 Section 9.3.32.
+ # Bit 1 - continuous accumulation if set, 15-minute interval if unset
+ # Bit 2 - directionality (0=upstream, 1=downstream)
+ # Bit 3..14 - Reserved
+ # Bit 15 - Use P bits of TCI field to filter
+ # Bit 16 - Use VID bits of TCI field to filter
+ bitmap = 0 if upstream else 1 << 1
+
+ data = {'control_block': [
+ 0, # Threshold data 1/2 ID
+ me_class_id, # Parent ME Class
+ me_entity_id, # Parent ME Instance
+ 0, # Accumulation disable
+ 0, # TCA Disable
+ bitmap, # Control fields bitmap
+ 0, # TCI
+ 0 # Reserved
+ ]}
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=pm_class_id,
+ entity_id=pm_entity_id,
+ data=data
+ )
+ )
+ self.strobe_watchdog()
+ try:
+ results = yield self._device.omci_cc.send(frame)
+ except TimeoutError:
+ self.log.warning('perform-create-timeout', me_class_id=me_class_id, me_entity_id=me_entity_id,
+ pm_class_id=pm_class_id, pm_entity_id=pm_entity_id)
+ raise
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('perform-create-status', status=status)
+
+ # Did it fail
+ if status != RC.Success.value and status != RC.InstanceExists.value:
+ msg = 'ME: {}, entity: {} failed with status {}'.format(pm_class_id,
+ pm_entity_id,
+ status)
+ raise CreatePMException(msg)
+
+ self.log.debug('create-pm-success', class_id=pm_class_id,
+ entity_id=pm_entity_id)
+
+ self.deferred.callback(self)
+
+ except Exception as e:
+ self.log.exception('perform-create', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/omci_delete_pm_task.py b/python/adapters/extensions/omci/tasks/omci_delete_pm_task.py
new file mode 100644
index 0000000..adf1ce2
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/omci_delete_pm_task.py
@@ -0,0 +1,108 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_messages import OmciDelete
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class DeletePMException(Exception):
+ pass
+
+
+class OmciDeletePMRequest(Task):
+ """
+ OpenOMCI routine to delete the requested PM Interval MEs
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "ONU OMCI Delete PM ME Task"
+
+ def __init__(self, omci_agent, device_id, me_set, exclusive=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param me_set: (set) Tuples of class_id / entity_id to create
+ :param exclusive: (bool) True if this Create request Task exclusively own the
+ OMCI-CC while running. Default: False
+ """
+ super(OmciDeletePMRequest, self).__init__(OmciDeletePMRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciDeletePMRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._me_tuples = me_set
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciDeletePMRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """ Start task """
+ super(OmciDeletePMRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_delete)
+
+ @inlineCallbacks
+ def perform_delete(self):
+ """ Perform the delete requests """
+ self.log.debug('perform-delete')
+
+ try:
+ for me in self._me_tuples:
+ class_id = me[0]
+ entity_id = me[1]
+
+ frame = OmciFrame(
+ transaction_id=None,
+ message_type=OmciDelete.message_id,
+ omci_message=OmciDelete(
+ entity_class=class_id,
+ entity_id=entity_id
+ )
+ )
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('perform-delete-status', status=status)
+
+ # Did it fail, it instance does not exist, not an error
+ if status != RC.Success.value and status != RC.UnknownInstance.value:
+ msg = 'ME: {}, entity: {} failed with status {}'.format(class_id,
+ entity_id,
+ status)
+ raise DeletePMException(msg)
+
+ self.log.debug('delete-pm-success', class_id=class_id,
+ entity_id=entity_id)
+ self.deferred.callback(self)
+
+ except Exception as e:
+ self.log.exception('perform-create', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/omci_get_request.py b/python/adapters/extensions/omci/tasks/omci_get_request.py
new file mode 100644
index 0000000..c325278
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/omci_get_request.py
@@ -0,0 +1,356 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import failure, inlineCallbacks, TimeoutError, returnValue
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_me import MEFrame
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_cc import DEFAULT_OMCI_TIMEOUT
+from voltha.extensions.omci.omci_messages import OmciGet
+from voltha.extensions.omci.omci_fields import OmciTableField
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class GetException(Exception):
+ pass
+
+
+class OmciGetRequest(Task):
+ """
+ OpenOMCI Get an OMCI ME Instance Attributes
+
+ Upon completion, the Task deferred callback is invoked with a reference of
+ this Task object.
+
+ The Task has an initializer option (allow_failure) that will retry all
+ requested attributes if the original request fails with a status code of
+ 9 (Attributes failed or unknown). This result means that an attribute
+ is not supported by the ONU or that a mandatory/optional attribute could
+ not be executed by the ONU, even if it is supported, for example,
+ because of a range or type violation.
+ """
+ task_priority = 128
+ name = "ONU OMCI Get Task"
+
+ def __init__(self, omci_agent, device_id, entity_class, entity_id, attributes,
+ exclusive=False, allow_failure=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param entity_class: (EntityClass) ME Class to retrieve
+ :param entity_id: (int) ME Class instance ID to retrieve
+ :param attributes: (list or set) Name of attributes to retrieve
+ :param exclusive: (bool) True if this GET request Task exclusively own the
+ OMCI-CC while running. Default: False
+ :param allow_failure: (bool) If true, attempt to get all valid attributes
+ if the original request receives an error
+ code of 9 (Attributes failed or unknown).
+ """
+ super(OmciGetRequest, self).__init__(OmciGetRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciGetRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._entity_class = entity_class
+ self._entity_id = entity_id
+ self._attributes = attributes
+ self._allow_failure = allow_failure
+ self._failed_or_unknown_attributes = set()
+ self._results = None
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciGetRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def me_class(self):
+ """The OMCI Managed Entity Class associated with this request"""
+ return self._entity_class
+
+ @property
+ def entity_id(self):
+ """The ME Entity ID associated with this request"""
+ return self._entity_id
+
+ @property
+ def attributes(self):
+ """
+ Return a dictionary of attributes for the request if the Get was
+ successfully completed. None otherwise
+ """
+ if self._results is None:
+ return None
+
+ omci_msg = self._results.fields['omci_message'].fields
+ return omci_msg['data'] if 'data' in omci_msg else None
+
+ @property
+ def success_code(self):
+ """
+ Return the OMCI success/reason code for the Get Response.
+ """
+ if self._results is None:
+ return None
+
+ return self._results.fields['omci_message'].fields['success_code']
+
+ @property
+ def raw_results(self):
+ """
+ Return the raw Get Response OMCIFrame
+ """
+ return self._results
+
+ def start(self):
+ """
+ Start MIB Capabilities task
+ """
+ super(OmciGetRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_omci)
+
+ @property
+ def failed_or_unknown_attributes(self):
+ """
+ Returns a set attributes that failed or unknown in the original get
+ request that resulted in an initial status code of 9 (Attributes
+ failed or unknown).
+
+ :return: (set of str) attributes
+ """
+ return self._failed_or_unknown_attributes
+
+ @inlineCallbacks
+ def perform_get_omci(self):
+ """
+ Perform the initial get request
+ """
+ self.log.info('perform-get', entity_class=self._entity_class,
+ entity_id=self._entity_id, attributes=self._attributes)
+ try:
+ # If one or more attributes is a table attribute, get it separately
+ def is_table_attr(attr):
+ index = self._entity_class.attribute_name_to_index_map[attr]
+ attr_def = self._entity_class.attributes[index]
+ return isinstance(attr_def.field, OmciTableField)
+
+ first_attributes = {attr for attr in self._attributes if not is_table_attr(attr)}
+ table_attributes = {attr for attr in self._attributes if is_table_attr(attr)}
+
+ frame = MEFrame(self._entity_class, self._entity_id, first_attributes).get()
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('perform-get-status', status=status)
+
+ # Success?
+ if status == RC.Success.value:
+ self._results = results
+ results_omci = results.fields['omci_message'].fields
+
+ # Were all attributes fetched?
+ missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
+ results_omci['attributes_mask']
+
+ if missing_attr > 0 or len(table_attributes) > 0:
+ self.log.info('perform-get-missing', num_missing=missing_attr,
+ table_attr=table_attributes)
+ self.strobe_watchdog()
+ self._local_deferred = reactor.callLater(0,
+ self.perform_get_missing_attributes,
+ missing_attr,
+ table_attributes)
+ returnValue(self._local_deferred)
+
+ elif status == RC.AttributeFailure.value:
+ # What failed? Note if only one attribute was attempted, then
+ # that is an overall failure
+
+ if not self._allow_failure or len(self._attributes) <= 1:
+ raise GetException('Get failed with status code: {}'.
+ format(RC.AttributeFailure.value))
+
+ self.strobe_watchdog()
+ self._local_deferred = reactor.callLater(0,
+ self.perform_get_failed_attributes,
+ results,
+ self._attributes)
+ returnValue(self._local_deferred)
+
+ else:
+ raise GetException('Get failed with status code: {}'.format(status))
+
+ self.log.debug('get-completed')
+ self.deferred.callback(self)
+
+ except TimeoutError as e:
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('perform-get', e=e, class_id=self._entity_class,
+ entity_id=self._entity_id, attributes=self._attributes)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def perform_get_missing_attributes(self, missing_attr, table_attributes):
+ """
+ This method is called when the original Get requests completes with success
+ but not all attributes were returned. This can happen if one or more of the
+ attributes would have exceeded the space available in the OMCI frame.
+
+ This routine iterates through the missing attributes and attempts to retrieve
+ the ones that were missing.
+
+ :param missing_attr: (int) Missing attributes bitmask
+ :param table_attributes: (set) Attributes that need table get/get-next support
+ """
+ self.log.debug('perform-get-missing', attrs=missing_attr, tbl=table_attributes)
+
+ # Retrieve missing attributes first (if any)
+ results_omci = self._results.fields['omci_message'].fields
+
+ for index in xrange(16):
+ attr_mask = 1 << index
+
+ if attr_mask & missing_attr:
+ # Get this attribute
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
+ )
+ try:
+ self.strobe_watchdog()
+ get_results = yield self._device.omci_cc.send(frame)
+
+ get_omci = get_results.fields['omci_message'].fields
+ if get_omci['success_code'] != RC.Success.value:
+ continue
+
+ assert attr_mask == get_omci['attributes_mask'], 'wrong attribute'
+ results_omci['attributes_mask'] |= attr_mask
+
+ if results_omci.get('data') is None:
+ results_omci['data'] = dict()
+
+ results_omci['data'].update(get_omci['data'])
+
+ except TimeoutError:
+ self.log.debug('missing-timeout')
+
+ except Exception as e:
+ self.log.exception('missing-failure', e=e)
+
+ # Now any table attributes. OMCI_CC handles background get/get-next sequencing
+ for tbl_attr in table_attributes:
+ attr_mask = self._entity_class.mask_for(tbl_attr)
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
+ )
+ try:
+ timeout = 2 * DEFAULT_OMCI_TIMEOUT # Multiple frames expected
+ self.strobe_watchdog()
+ get_results = yield self._device.omci_cc.send(frame,
+ timeout=timeout)
+ self.strobe_watchdog()
+ get_omci = get_results.fields['omci_message'].fields
+ if get_omci['success_code'] != RC.Success.value:
+ continue
+
+ if results_omci.get('data') is None:
+ results_omci['data'] = dict()
+
+ results_omci['data'].update(get_omci['data'])
+
+ except TimeoutError:
+ self.log.debug('tbl-attr-timeout')
+
+ except Exception as e:
+ self.log.exception('tbl-attr-timeout', e=e)
+
+ self.deferred.callback(self)
+
+ @inlineCallbacks
+ def perform_get_failed_attributes(self, tmp_results, attributes):
+ """
+
+ :param tmp_results:
+ :param attributes:
+ :return:
+ """
+ self.log.debug('perform-get-failed', attrs=attributes)
+
+ for attr in attributes:
+ try:
+ frame = MEFrame(self._entity_class, self._entity_id, {attr}).get()
+
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ status = results.fields['omci_message'].fields['success_code']
+
+ if status == RC.AttributeFailure.value:
+ self.log.debug('unknown-or-invalid-attribute', attr=attr, status=status)
+ self._failed_or_unknown_attributes.add(attr)
+
+ elif status != RC.Success.value:
+ self.log.warn('invalid-get', class_id=self._entity_class,
+ attribute=attr, status=status)
+ self._failed_or_unknown_attributes.add(attr)
+
+ else:
+ # Add to partial results and correct the status
+ tmp_results.fields['omci_message'].fields['success_code'] = status
+ tmp_results.fields['omci_message'].fields['attributes_mask'] |= \
+ results.fields['omci_message'].fields['attributes_mask']
+
+ if tmp_results.fields['omci_message'].fields.get('data') is None:
+ tmp_results.fields['omci_message'].fields['data'] = dict()
+
+ tmp_results.fields['omci_message'].fields['data'][attr] = \
+ results.fields['omci_message'].fields['data'][attr]
+
+ except TimeoutError as e:
+ self.log.debug('attr-timeout')
+
+ except Exception as e:
+ self.log.exception('attr-failure', e=e)
+
+ self._results = tmp_results
+ self.deferred.callback(self)
diff --git a/python/adapters/extensions/omci/tasks/omci_modify_request.py b/python/adapters/extensions/omci/tasks/omci_modify_request.py
new file mode 100644
index 0000000..da7bff5
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/omci_modify_request.py
@@ -0,0 +1,171 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure, returnValue
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_me import MEFrame
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_messages import OmciCreate, OmciSet, OmciDelete
+from voltha.extensions.omci.omci_entities import EntityClass
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class ModifyException(Exception):
+ pass
+
+
+class OmciModifyRequest(Task):
+ """
+ OpenOMCI Generic Create, Set, or Delete Frame support Task.
+
+ This task allows an ONU to send a Create, Set, or Delete request from any point in their
+ code while properly using the OMCI-CC channel. Direct access to the OMCI-CC object
+ to send requests by an ONU is highly discouraged.
+ """
+ task_priority = 128
+ name = "ONU OMCI Modify Task"
+
+ def __init__(self, omci_agent, device_id, frame, priority=task_priority, exclusive=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param frame: (OmciFrame) Frame to send
+ :param priority: (int) OpenOMCI Task priority (0..255) 255 is the highest
+ :param exclusive: (bool) True if this GET request Task exclusively own the
+ OMCI-CC while running. Default: False
+ """
+ super(OmciModifyRequest, self).__init__(OmciModifyRequest.name,
+ omci_agent,
+ device_id,
+ priority=priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._frame = frame
+ self._results = None
+ self._local_deferred = None
+
+ # Validate message type
+ self._msg_type = frame.fields['message_type']
+ if self._msg_type not in (OmciCreate.message_id, OmciSet.message_id, OmciDelete.message_id):
+ raise TypeError('Invalid Message type: {}, must be Create, Set, or Delete'.
+ format(self._msg_type))
+
+ def cancel_deferred(self):
+ super(OmciModifyRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def success_code(self):
+ """
+ Return the OMCI success/reason code for the Get Response.
+ """
+ if self._results is None:
+ return None
+
+ return self._results.fields['omci_message'].fields['success_code']
+
+ @property
+ def illegal_attributes_mask(self):
+ """
+ For Create & Set requests, a failure may indicate that one or more
+ attributes have an illegal value. This property returns any illegal
+ attributes
+
+ :return: None if not a create/set request, otherwise the attribute mask
+ of illegal attributes
+ """
+ if self._results is None:
+ return None
+
+ omci_msg = self._results.fields['omci_message'].fields
+
+ if self._msg_type == OmciCreate.message_id:
+ if self.success_code != RC.ParameterError:
+ return 0
+ return omci_msg['parameter_error_attributes_mask']
+
+ elif self._msg_type == OmciSet.message_id:
+ if self.success_code != RC.AttributeFailure:
+ return 0
+ return omci_msg['failed_attributes_mask']
+
+ return None
+
+ @property
+ def unsupported_attributes_mask(self):
+ """
+ For Set requests, a failure may indicate that one or more attributes
+ are not supported by this ONU. This property returns any those unsupported attributes
+
+ :return: None if not a set request, otherwise the attribute mask of any illegal
+ parameters
+ """
+ if self._msg_type != OmciSet.message_id or self._results is None:
+ return None
+
+ if self.success_code != RC.AttributeFailure:
+ return 0
+
+ return self._results.fields['omci_message'].fields['unsupported_attributes_mask']
+
+ @property
+ def raw_results(self):
+ """
+ Return the raw Response OMCIFrame
+ """
+ return self._results
+
+ def start(self):
+ """
+ Start MIB Capabilities task
+ """
+ super(OmciModifyRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_omci)
+
+ @inlineCallbacks
+ def perform_omci(self):
+ """
+ Perform the request
+ """
+ self.log.debug('perform-request')
+
+ try:
+ self.strobe_watchdog()
+ self._results = yield self._device.omci_cc.send(self._frame)
+
+ status = self._results.fields['omci_message'].fields['success_code']
+ self.log.debug('response-status', status=status)
+
+ # Success?
+ if status in (RC.Success.value, RC.InstanceExists):
+ self.deferred.callback(self)
+ else:
+ raise ModifyException('Failed with status {}'.format(status))
+
+ except Exception as e:
+ self.log.exception('perform-modify', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/omci_sw_image_upgrade_task.py b/python/adapters/extensions/omci/tasks/omci_sw_image_upgrade_task.py
new file mode 100644
index 0000000..5eaa87c
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/omci_sw_image_upgrade_task.py
@@ -0,0 +1,64 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from task import Task
+from twisted.internet import reactor
+from voltha.protos.voltha_pb2 import ImageDownload
+
+class OmciSwImageUpgradeTask(Task):
+ name = "OMCI Software Image Upgrade Task"
+
+
+ def __init__(self, img_id, omci_upgrade_sm_cls, omci_agent, image_download, clock=None):
+ super(OmciSwImageUpgradeTask, self).__init__(OmciSwImageUpgradeTask.name, omci_agent, image_download.id,
+ exclusive=False,
+ watchdog_timeout=45)
+ self.log.debug("OmciSwImageUpgradeTask create ", image_id=img_id)
+ self._image_id = img_id
+ self._omci_upgrade_sm_cls = omci_upgrade_sm_cls
+ # self._omci_agent = omci_agent
+ self._image_download = image_download
+ self.reactor = clock if clock is not None else reactor
+ self._omci_upgrade_sm = None
+ self.log.debug("OmciSwImageUpgradeTask create end", image_id=img_id)
+
+ @property
+ def status(self):
+ return self._image_download
+
+ def start(self):
+ self.log.debug("OmciSwImageUpgradeTask start")
+ super(OmciSwImageUpgradeTask, self).start()
+ if self._omci_upgrade_sm is None:
+ self._omci_upgrade_sm = self._omci_upgrade_sm_cls(self._image_id, self.omci_agent, self._image_download, clock=self.reactor)
+ d = self._omci_upgrade_sm.start()
+ d.chainDeferred(self.deferred)
+ #else:
+ # if restart:
+ # self._omci_upgrade_sm.reset_image()
+
+ def stop(self):
+ self.log.debug("OmciSwImageUpgradeTask stop")
+ if self._omci_upgrade_sm is not None:
+ self._omci_upgrade_sm.stop()
+ self._omci_upgrade_sm = None
+
+ def onu_bootup(self):
+ self.log.debug("onu_bootup", state=self._omci_upgrade_sm.status.image_state);
+ if self._omci_upgrade_sm is not None \
+ and self._omci_upgrade_sm.status.image_state == ImageDownload.IMAGE_ACTIVATE:
+ self._omci_upgrade_sm.do_commit()
+
diff --git a/python/adapters/extensions/omci/tasks/onu_capabilities_task.py b/python/adapters/extensions/omci/tasks/onu_capabilities_task.py
new file mode 100644
index 0000000..048382c
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/onu_capabilities_task.py
@@ -0,0 +1,282 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from binascii import hexlify
+from twisted.internet.defer import inlineCallbacks, failure, returnValue
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_me import OmciFrame
+from voltha.extensions.omci.omci import EntityOperations
+
+
+class GetNextException(Exception):
+ pass
+
+
+class GetCapabilitiesFailure(Exception):
+ pass
+
+
+class OnuCapabilitiesTask(Task):
+ """
+ OpenOMCI MIB Capabilities Task
+
+ This task requests information on supported MEs via the OMCI (ME#287)
+ Managed entity.
+
+ This task should be ran after MIB Synchronization and before any MIB
+ Downloads to the ONU.
+
+ Upon completion, the Task deferred callback is invoked with dictionary
+ containing the supported managed entities and message types.
+
+ results = {
+ 'supported-managed-entities': {set of supported managed entities},
+ 'supported-message-types': {set of supported message types}
+ }
+ """
+ task_priority = 240
+ name = "ONU Capabilities Task"
+
+ max_mib_get_next_retries = 3
+ mib_get_next_delay = 5
+ DEFAULT_OCTETS_PER_MESSAGE = 29
+
+ def __init__(self, omci_agent, device_id, omci_pdu_size=DEFAULT_OCTETS_PER_MESSAGE):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param omci_pdu_size: (int) OMCI Data payload size (not counting any trailers)
+ """
+ super(OnuCapabilitiesTask, self).__init__(OnuCapabilitiesTask.name,
+ omci_agent,
+ device_id,
+ priority=OnuCapabilitiesTask.task_priority)
+ self._local_deferred = None
+ self._device = omci_agent.get_device(device_id)
+ self._pdu_size = omci_pdu_size
+ self._supported_entities = set()
+ self._supported_msg_types = set()
+
+ def cancel_deferred(self):
+ super(OnuCapabilitiesTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def supported_managed_entities(self):
+ """
+ Return a set of the Managed Entity class IDs supported on this ONU
+
+ None is returned if no MEs have been discovered
+
+ :return: (set of ints)
+ """
+ return frozenset(self._supported_entities) if len(self._supported_entities) else None
+
+ @property
+ def supported_message_types(self):
+ """
+ Return a set of the Message Types supported on this ONU
+
+ None is returned if no message types have been discovered
+
+ :return: (set of EntityOperations)
+ """
+ return frozenset(self._supported_msg_types) if len(self._supported_msg_types) else None
+
+ def start(self):
+ """
+ Start MIB Capabilities task
+ """
+ super(OnuCapabilitiesTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_capabilities)
+
+ def stop(self):
+ """
+ Shutdown MIB Capabilities task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ super(OnuCapabilitiesTask, self).stop()
+
+ @inlineCallbacks
+ def perform_get_capabilities(self):
+ """
+ Perform the MIB Capabilities sequence.
+
+ The sequence is to perform a Get request with the attribute mask equal
+ to 'me_type_table'. The response to this request will carry the size
+ of (number of get-next sequences).
+
+ Then a loop is entered and get-next commands are sent for each sequence
+ requested.
+ """
+ self.log.debug('perform-get')
+
+ try:
+ self.strobe_watchdog()
+ self._supported_entities = yield self.get_supported_entities()
+
+ self.strobe_watchdog()
+ self._supported_msg_types = yield self.get_supported_message_types()
+
+ self.log.debug('get-success',
+ supported_entities=self.supported_managed_entities,
+ supported_msg_types=self.supported_message_types)
+ results = {
+ 'supported-managed-entities': self.supported_managed_entities,
+ 'supported-message-types': self.supported_message_types
+ }
+ self.deferred.callback(results)
+
+ except Exception as e:
+ self.log.exception('perform-get', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ def get_count_from_data_buffer(self, data):
+ """
+ Extract the 4 octet buffer length from the OMCI PDU contents
+ """
+ self.log.debug('get-count-buffer', data=hexlify(data))
+ return int(hexlify(data[:4]), 16)
+
+ @inlineCallbacks
+ def get_supported_entities(self):
+ """
+ Get the supported ME Types for this ONU.
+ """
+ try:
+ # Get the number of requests needed
+ frame = OmciFrame(me_type_table=True).get()
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get count of supported entities failed with status code: {}'.
+ format(status))
+ data = omci_msg.fields['data']['me_type_table']
+ count = self.get_count_from_data_buffer(bytearray(data))
+
+ seq_no = 0
+ data_buffer = bytearray(0)
+ self.log.debug('me-type-count', octets=count, data=hexlify(data))
+
+ # Start the loop
+ for offset in xrange(0, count, self._pdu_size):
+ frame = OmciFrame(me_type_table=seq_no).get_next()
+ seq_no += 1
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure(
+ 'Get supported entities request at offset {} of {} failed with status code: {}'.
+ format(offset + 1, count, status))
+
+ # Extract the data
+ num_octets = count - offset
+ if num_octets > self._pdu_size:
+ num_octets = self._pdu_size
+
+ data = omci_msg.fields['data']['me_type_table']
+ data_buffer += bytearray(data[:num_octets])
+
+ me_types = {(data_buffer[x] << 8) + data_buffer[x + 1]
+ for x in xrange(0, len(data_buffer), 2)}
+ returnValue(me_types)
+
+ except Exception as e:
+ self.log.exception('get-entities', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def get_supported_message_types(self):
+ """
+ Get the supported Message Types (actions) for this ONU.
+ """
+ try:
+ # Get the number of requests needed
+ frame = OmciFrame(message_type_table=True).get()
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get count of supported msg types failed with status code: {}'.
+ format(status))
+
+ data = omci_msg.fields['data']['message_type_table']
+ count = self.get_count_from_data_buffer(bytearray(data))
+
+ seq_no = 0
+ data_buffer = list()
+ self.log.debug('me-type-count', octets=count, data=hexlify(data))
+
+ # Start the loop
+ for offset in xrange(0, count, self._pdu_size):
+ frame = OmciFrame(message_type_table=seq_no).get_next()
+ seq_no += 1
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure(
+ 'Get supported msg types request at offset {} of {} failed with status code: {}'.
+ format(offset + 1, count, status))
+
+ # Extract the data
+ num_octets = count - offset
+ if num_octets > self._pdu_size:
+ num_octets = self._pdu_size
+
+ data = omci_msg.fields['data']['message_type_table']
+ data_buffer += data[:num_octets]
+
+ def buffer_to_message_type(value):
+ """
+ Convert an integer value to the appropriate EntityOperations enumeration
+ :param value: (int) Message type value (4..29)
+ :return: (EntityOperations) Enumeration, None on failure
+ """
+ next((v for k, v in EntityOperations.__members__.items() if v.value == value), None)
+
+ msg_types = {buffer_to_message_type(v) for v in data_buffer if v is not None}
+ returnValue({msg_type for msg_type in msg_types if msg_type is not None})
+
+ except Exception as e:
+ self.log.exception('get-msg-types', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/reboot_task.py b/python/adapters/extensions/omci/tasks/reboot_task.py
new file mode 100644
index 0000000..316e23b
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/reboot_task.py
@@ -0,0 +1,125 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from enum import IntEnum
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure, TimeoutError
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_me import OntGFrame
+from voltha.extensions.omci.omci_cc import DEFAULT_OMCI_TIMEOUT
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class RebootException(Exception):
+ pass
+
+
+class DeviceBusy(Exception):
+ pass
+
+
+class RebootFlags(IntEnum):
+ Reboot_Unconditionally = 0,
+ Reboot_If_No_POTS_VoIP_In_Progress = 1,
+ Reboot_If_No_Emergency_Call_In_Progress = 2
+
+
+class OmciRebootRequest(Task):
+ """
+ OpenOMCI routine to request reboot of an ONU
+ """
+ task_priority = Task.MAX_PRIORITY
+ name = "ONU OMCI Reboot Task"
+ # adopt the global default
+ DEFAULT_REBOOT_TIMEOUT = DEFAULT_OMCI_TIMEOUT
+
+ def __init__(self, omci_agent, device_id,
+ flags=RebootFlags.Reboot_Unconditionally,
+ timeout=DEFAULT_REBOOT_TIMEOUT):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param flags: (RebootFlags) Reboot condition
+ """
+ super(OmciRebootRequest, self).__init__(OmciRebootRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciRebootRequest.task_priority,
+ exclusive=True)
+ self._device = omci_agent.get_device(device_id)
+ self._flags = flags
+ self._timeout = timeout
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciRebootRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """ Start task """
+ super(OmciRebootRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_reboot)
+
+ @inlineCallbacks
+ def perform_reboot(self):
+ """
+ Perform the reboot requests
+
+ Depending on the ONU implementation, a response may not be returned. For this
+ reason, a timeout is considered successful.
+ """
+ self.log.info('perform-reboot')
+
+ try:
+ frame = OntGFrame().reboot(reboot_code=self._flags)
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame, timeout=self._timeout)
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('reboot-status', status=status)
+
+ # Did it fail
+ if status != RC.Success.value:
+ if self._flags != RebootFlags.Reboot_Unconditionally and\
+ status == RC.DeviceBusy.value:
+ raise DeviceBusy('ONU is busy, try again later')
+ else:
+ msg = 'Reboot request failed with status {}'.format(status)
+ raise RebootException(msg)
+
+ self.log.info('reboot-success')
+ self.deferred.callback(self)
+
+ except TimeoutError:
+ self.log.info('timeout', msg='Request timeout is not considered an error')
+ self.deferred.callback(None)
+
+ except DeviceBusy as e:
+ self.log.warn('perform-reboot', msg=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('perform-reboot', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/sync_time_task.py b/python/adapters/extensions/omci/tasks/sync_time_task.py
new file mode 100644
index 0000000..b5b1dc9
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/sync_time_task.py
@@ -0,0 +1,107 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_me import OntGFrame
+from voltha.extensions.omci.omci_defs import ReasonCodes as RC
+from datetime import datetime
+
+
+class SyncTimeTask(Task):
+ """
+ OpenOMCI - Synchronize the ONU time with server
+ """
+ task_priority = Task.DEFAULT_PRIORITY + 10
+ name = "Sync Time Task"
+
+ def __init__(self, omci_agent, device_id, use_utc=True):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param use_utc: (bool) Use UTC time if True, otherwise local time
+ """
+ super(SyncTimeTask, self).__init__(SyncTimeTask.name,
+ omci_agent,
+ device_id,
+ priority=SyncTimeTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._use_utc = use_utc
+
+ def cancel_deferred(self):
+ super(SyncTimeTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the tasks
+ """
+ super(SyncTimeTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_sync_time)
+
+ def stop(self):
+ """
+ Shutdown the tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(SyncTimeTask, self).stop()
+
+ @inlineCallbacks
+ def perform_sync_time(self):
+ """
+ Sync the time
+ """
+ self.log.debug('perform-sync-time')
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # ONT-G (ME #256)
+ dt = datetime.utcnow() if self._use_utc else datetime.now()
+
+ results = yield device.omci_cc.send(OntGFrame().synchronize_time(dt))
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ self.log.debug('sync-time', status=status)
+
+ if status == RC.Success:
+ self.log.info('sync-time', success_info=omci_msg['success_info'] & 0x0f)
+
+ assert status == RC.Success, 'Unexpected Response Status: {}'.format(status)
+
+ # Successful if here
+ self.deferred.callback(results)
+
+ except TimeoutError as e:
+ self.log.warn('sync-time-timeout', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('sync-time', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/python/adapters/extensions/omci/tasks/task.py b/python/adapters/extensions/omci/tasks/task.py
new file mode 100644
index 0000000..36020c0
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/task.py
@@ -0,0 +1,188 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from twisted.internet import defer, reactor
+from twisted.internet.defer import failure
+
+
+class WatchdogTimeoutFailure(Exception):
+ """Task callback/errback not called properly before watchdog expiration"""
+ pass
+
+
+class Task(object):
+ """
+ OpenOMCI Base Task implementation
+
+ An OMCI task can be one or more OMCI requests, comparisons, or whatever
+ is needed to do a specific unit of work that needs to be ran to completion
+ successfully.
+
+ On successful completion, the task should called the 'callback' method of
+ the deferred and pass back whatever is meaningful to the user/state-machine
+ that launched it.
+
+ On failure, the 'errback' routine should be called with an appropriate
+ Failure object.
+ """
+ DEFAULT_PRIORITY = 128
+ MIN_PRIORITY = 0
+ MAX_PRIORITY = 255
+ DEFAULT_WATCHDOG_SECS = 10 # 10 seconds
+ MIN_WATCHDOG_SECS = 3 # 3 seconds
+ MAX_WATCHDOG_SECS = 60 # 60 seconds
+
+ _next_task_id = 0
+
+ def __init__(self, name, omci_agent, device_id, priority=DEFAULT_PRIORITY,
+ exclusive=True, watchdog_timeout=DEFAULT_WATCHDOG_SECS):
+ """
+ Class initialization
+
+ :param name: (str) Task Name
+ :param device_id: (str) ONU Device ID
+ :param priority: (int) Task priority (0..255) 255 Highest
+ :param exclusive: (bool) If True, this task needs exclusive access to the
+ OMCI Communications channel when it runs
+ :param watchdog_timeout (int or float) Watchdog timeout (seconds) after task start, to
+ run longer, periodically call 'strobe_watchdog()' to reschedule.
+ """
+ assert Task.MIN_PRIORITY <= priority <= Task.MAX_PRIORITY, \
+ 'Priority should be {}..{}'.format(Task.MIN_PRIORITY, Task.MAX_PRIORITY)
+
+ assert Task.MIN_WATCHDOG_SECS <= watchdog_timeout <= Task.MAX_WATCHDOG_SECS, \
+ 'Watchdog timeout should be {}..{} seconds'
+
+ Task._next_task_id += 1
+ self._task_id = Task._next_task_id
+ self.log = structlog.get_logger(device_id=device_id, name=name,
+ task_id=self._task_id)
+ self.name = name
+ self.device_id = device_id
+ self.omci_agent = omci_agent
+ self._running = False
+ self._exclusive = exclusive
+ self._deferred = defer.Deferred() # Fires upon completion
+ self._watchdog = None
+ self._watchdog_timeout = watchdog_timeout
+ self._priority = priority
+
+ def __str__(self):
+ return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}, Watchdog: {}'.format(
+ self.name, self.task_id, self.priority, self.exclusive, self.watchdog_timeout)
+
+ @property
+ def priority(self):
+ return self._priority
+
+ @property
+ def task_id(self):
+ return self._task_id
+
+ @property
+ def exclusive(self):
+ return self._exclusive
+
+ @property
+ def watchdog_timeout(self):
+ return self._watchdog_timeout
+
+ @property
+ def deferred(self):
+ return self._deferred
+
+ @property
+ def running(self):
+ # Is the Task running?
+ #
+ # Can be useful for tasks that use inline callbacks to detect
+ # if the task has been canceled.
+ #
+ return self._running
+
+ def cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._watchdog = self._watchdog, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start task operations
+ """
+ self.log.debug('starting')
+ assert self._deferred is not None and not self._deferred.called, \
+ 'Cannot re-use the same task'
+ self._running = True
+ self.strobe_watchdog()
+
+ def stop(self):
+ """
+ Stop task synchronization
+ """
+ self.log.debug('stopping')
+ self._running = False
+ self.cancel_deferred()
+ self.omci_agent = None # Should only start/stop once
+
+ def task_cleanup(self):
+ """
+ This method should only be called from the TaskRunner's callback/errback
+ that is added when the task is initially queued. It is responsible for
+ clearing of the 'running' flag and canceling of the watchdog time
+ """
+ self._running = False
+ d, self._watchdog = self._watchdog, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def strobe_watchdog(self):
+ """
+ Signal that we have not hung/deadlocked
+ """
+ # Create if first time (called at Task start)
+
+ def watchdog_timeout():
+ # Task may have hung (blocked) or failed to call proper success/error
+ # completion callback/errback
+ if not self.deferred.called:
+ err_msg = 'Task {}:{} watchdog timeout'.format(self.name, self.task_id)
+ self.log.error("task-watchdog-timeout", running=self.running,
+ timeout=self.watchdog_timeout, error=err_msg)
+
+ self.deferred.errback(failure.Failure(WatchdogTimeoutFailure(err_msg)))
+ self.deferred.cancel()
+
+ if self._watchdog is not None:
+ if self._watchdog.called:
+ # Too late, timeout failure in progress
+ self.log.warn('task-watchdog-tripped', running=self.running,
+ timeout=self.watchdog_timeout)
+ return
+
+ d, self._watchdog = self._watchdog, None
+ d.cancel()
+
+ # Schedule/re-schedule the watchdog timer
+ self._watchdog = reactor.callLater(self.watchdog_timeout, watchdog_timeout)
diff --git a/python/adapters/extensions/omci/tasks/task_runner.py b/python/adapters/extensions/omci/tasks/task_runner.py
new file mode 100644
index 0000000..eb7a252
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/task_runner.py
@@ -0,0 +1,285 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from twisted.internet import reactor
+
+
+class TaskRunner(object):
+ """
+ Control the number of running tasks utilizing the OMCI Communications
+ channel (OMCI_CC
+ """
+ def __init__(self, device_id, clock=None):
+ self.log = structlog.get_logger(device_id=device_id)
+ self._pending_queue = dict() # task-priority -> [tasks]
+ self._running_queue = dict() # task-id -> task
+ self._active = False
+
+ self._successful_tasks = 0
+ self._failed_tasks = 0
+ self._watchdog_timeouts = 0
+ self._last_watchdog_failure_task = ''
+ self.reactor = clock if clock is not None else reactor
+
+ def __str__(self):
+ return 'TaskRunner: Pending: {}, Running:{}'.format(self.pending_tasks,
+ self.running_tasks)
+
+ @property
+ def active(self):
+ return self._active
+
+ @property
+ def pending_tasks(self):
+ """
+ Get the number of tasks pending to run
+ """
+ count = 0
+ for tasks in self._pending_queue.itervalues():
+ count += len(tasks)
+ return count
+
+ @property
+ def running_tasks(self):
+ """
+ Get the number of tasks currently running
+ """
+ return len(self._running_queue)
+
+ @property
+ def successful_tasks_completed(self):
+ return self._successful_tasks
+
+ @property
+ def failed_tasks(self):
+ return self._failed_tasks
+
+ @property
+ def watchdog_timeouts(self):
+ return self._watchdog_timeouts
+
+ @property
+ def last_watchdog_failure_task(self):
+ """ Task name of last tasks to fail due to watchdog"""
+ return self._last_watchdog_failure_task
+
+ # TODO: add properties for various stats as needed
+
+ def start(self):
+ """
+ Start the Task runner
+ """
+ self.log.debug('starting', active=self._active)
+
+ if not self._active:
+ assert len(self._running_queue) == 0, 'Running task queue not empty'
+ self._active = True
+ self._run_next_task()
+
+ def stop(self):
+ """
+ Stop the Task runner, first stopping any tasks and flushing the queue
+ """
+ self.log.debug('stopping', active=self._active)
+
+ if self._active:
+ self._active = False
+
+ pq, self._pending_queue = self._pending_queue, dict()
+ rq, self._running_queue = self._running_queue, dict()
+
+ # Stop running tasks
+ for task in rq.itervalues():
+ try:
+ task.stop()
+ except:
+ pass
+
+ # Kill pending tasks
+ for d in pq.iterkeys():
+ try:
+ d.cancel()
+ except:
+ pass
+
+ def _run_next_task(self):
+ """
+ Search for next task to run, if one can
+ :return:
+ """
+ self.log.debug('run-next', active=self._active,
+ num_running=len(self._running_queue),
+ num_pending=len(self._pending_queue))
+
+ if self._active and len(self._pending_queue) > 0:
+ # Cannot run a new task if a running one needs the OMCI_CC exclusively
+
+ if any(task.exclusive for task in self._running_queue.itervalues()):
+ self.log.debug('exclusive-running')
+ return # An exclusive task is already running
+
+ try:
+ priorities = [k for k in self._pending_queue.iterkeys()]
+ priorities.sort(reverse=True)
+ highest_priority = priorities[0] if len(priorities) else None
+
+ if highest_priority is not None:
+ queue = self._pending_queue[highest_priority]
+ next_task = queue[0] if len(queue) else None
+
+ if next_task is not None:
+ if next_task.exclusive and len(self._running_queue) > 0:
+ self.log.debug('next-is-exclusive', task=str(next_task))
+ return # Next task to run needs exclusive access
+
+ queue.pop(0)
+ if len(queue) == 0:
+ del self._pending_queue[highest_priority]
+
+ self.log.debug('starting-task', task=str(next_task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
+
+ self._running_queue[next_task.task_id] = next_task
+ self.reactor.callLater(0, next_task.start)
+
+ # Run again if others are waiting
+ if len(self._pending_queue):
+ self._run_next_task()
+
+ except Exception as e:
+ self.log.exception('run-next', e=e)
+
+ def _on_task_success(self, results, task):
+ """
+ A task completed successfully callback
+ :param results: deferred results
+ :param task: (Task) The task that succeeded
+ :return: deferred results
+ """
+ self.log.debug('task-success', task_id=str(task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
+ try:
+ assert task is not None and task.task_id in self._running_queue,\
+ 'Task not found in running queue'
+
+ task.task_cleanup()
+ self._successful_tasks += 1
+ del self._running_queue[task.task_id]
+
+ except Exception as e:
+ self.log.exception('task-error', task=str(task), e=e)
+
+ finally:
+ reactor.callLater(0, self._run_next_task)
+
+ return results
+
+ def _on_task_failure(self, failure, task):
+ """
+ A task completed with failure callback
+ :param failure: (Failure) Failure results
+ :param task: (Task) The task that failed
+ :return: (Failure) Failure results
+ """
+ from voltha.extensions.omci.tasks.task import WatchdogTimeoutFailure
+
+ self.log.debug('task-failure', task_id=str(task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
+ try:
+ assert task is not None and task.task_id in self._running_queue,\
+ 'Task not found in running queue'
+
+ task.task_cleanup()
+ self._failed_tasks += 1
+ del self._running_queue[task.task_id]
+
+ if isinstance(failure.value, WatchdogTimeoutFailure):
+ self._watchdog_timeouts += 1
+ self._last_watchdog_failure_task = task.name
+
+ except Exception as e:
+ # Check the pending queue
+
+ for priority, tasks in self._pending_queue.iteritems():
+ found = next((t for t in tasks if t.task_id == task.task_id), None)
+
+ if found is not None:
+ self._pending_queue[task.priority].remove(task)
+ if len(self._pending_queue[task.priority]) == 0:
+ del self._pending_queue[task.priority]
+ return failure
+
+ self.log.exception('task-error', task=str(task), e=e)
+ raise
+
+ finally:
+ reactor.callLater(0, self._run_next_task)
+
+ return failure
+
+ def queue_task(self, task):
+ """
+ Place a task on the queue to run
+
+ :param task: (Task) task to run
+ :return: (deferred) Deferred that will fire on task completion
+ """
+ self.log.debug('queue-task', active=self._active, task=str(task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
+
+ if task.priority not in self._pending_queue:
+ self._pending_queue[task.priority] = []
+
+ task.deferred.addCallbacks(self._on_task_success, self._on_task_failure,
+ callbackArgs=[task], errbackArgs=[task])
+
+ self._pending_queue[task.priority].append(task)
+ self._run_next_task()
+
+ return task.deferred
+
+ def cancel_task(self, task_id):
+ """
+ Cancel a pending or running task. The cancel method will be called
+ for the task's deferred
+
+ :param task_id: (int) Task identifier
+ """
+ task = self._running_queue.get(task_id, None)
+
+ if task is not None:
+ try:
+ task.stop()
+ except Exception as e:
+ self.log.exception('stop-error', task=str(task), e=e)
+
+ reactor.callLater(0, self._run_next_task)
+
+ else:
+ for priority, tasks in self._pending_queue.iteritems():
+ task = next((t for t in tasks if t.task_id == task_id), None)
+
+ if task is not None:
+ try:
+ task.deferred.cancel()
+ except Exception as e:
+ self.log.exception('cancel-error', task=str(task), e=e)
+ return
+