VOL-945: Added support for OpenOMCI Task watchdog
Change-Id: Ie81031183db159514f00bcf764ab484d19335ddb
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_task.py b/tests/utests/voltha/extensions/omci/mock/mock_task.py
index 499a945..088ec92 100644
--- a/tests/utests/voltha/extensions/omci/mock/mock_task.py
+++ b/tests/utests/voltha/extensions/omci/mock/mock_task.py
@@ -25,7 +25,8 @@
success=True,
delay=0,
value=None,
- priority=Task.DEFAULT_PRIORITY):
+ priority=Task.DEFAULT_PRIORITY,
+ watchdog_timeout=Task.DEFAULT_WATCHDOG_SECS):
"""
Class initialization
@@ -35,6 +36,7 @@
:param success: (bool) True if the task should complete successfully
:param delay: (int/float) Time it takes the task to complete
:param priority (int) Priority of the task
+ :param watchdog_timeout (int or float) Watchdog timeout after task start
:param value: (various) The value (string, int, ...) to return if successful
or an Exception to send to the errBack if 'success'
is False
@@ -43,7 +45,8 @@
omci_agent,
device_id,
exclusive=exclusive,
- priority=priority)
+ priority=priority,
+ watchdog_timeout=watchdog_timeout)
self._delay = delay
self._success = success
self._value = value
diff --git a/tests/utests/voltha/extensions/omci/test_task_runner.py b/tests/utests/voltha/extensions/omci/test_task_runner.py
index e15541b..cd0e043 100644
--- a/tests/utests/voltha/extensions/omci/test_task_runner.py
+++ b/tests/utests/voltha/extensions/omci/test_task_runner.py
@@ -54,6 +54,12 @@
self.runner.stop()
self.assertFalse(self.runner.active)
+ def unexpected_error(self, _failure):
+ self.assertEqual('Should not be here, expected success', _failure)
+
+ def unexpected_success(self, _results):
+ self.assertEqual('Should not be here, expected a failure', _results)
+
def test_simple_task_init(self):
t = SimpleTask(None, DEVICE_ID,
exclusive=True, priority=0,
@@ -64,19 +70,35 @@
self.assertTrue(t.exclusive)
self.assertFalse(t.deferred.called)
- @raises(AssertionError)
- def test_simple_negative_priority(self):
- SimpleTask(None, DEVICE_ID, priority=-1)
+ def test_task_defaults_and_bound(self):
+ # Make sure no one has changed some declared defaults/max/min values
+ from voltha.extensions.omci.tasks.task import Task
+ self.assertEqual(128, Task.DEFAULT_PRIORITY)
+ self.assertEqual(0, Task.MIN_PRIORITY)
+ self.assertEqual(255, Task.MAX_PRIORITY)
+ self.assertEqual(10, Task.DEFAULT_WATCHDOG_SECS)
+ self.assertEqual(3, Task.MIN_WATCHDOG_SECS)
+ self.assertEqual(60, Task.MAX_WATCHDOG_SECS)
@raises(AssertionError)
- def test_simple_big_priority(self):
- SimpleTask(None, DEVICE_ID, priority=256)
+ def test_task_priority_min(self):
+ from voltha.extensions.omci.tasks.task import Task
+ _ = SimpleTask(None, DEVICE_ID, priority=Task.MIN_PRIORITY - 1)
- def unexpected_error(self, _failure):
- self.assertEqual('Should not be here, expected success', _failure)
+ @raises(AssertionError)
+ def test_task_priority_max(self):
+ from voltha.extensions.omci.tasks.task import Task
+ _ = SimpleTask(None, DEVICE_ID, priority=Task.MAX_PRIORITY + 1)
- def unexpected_success(self, _results):
- self.assertEqual('Should not be here, expected a failure', _results)
+ @raises(AssertionError)
+ def test_task_watchdog_min(self):
+ from voltha.extensions.omci.tasks.task import Task
+ _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MIN_WATCHDOG_SECS - 0.000001)
+
+ @raises(AssertionError)
+ def test_task_watchdog_max(self):
+ from voltha.extensions.omci.tasks.task import Task
+ _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MAX_WATCHDOG_SECS + 0.000001)
@deferred(timeout=5)
def test_simple_success(self):
@@ -236,8 +258,38 @@
self.assertEqual(self.runner.running_tasks, 0)
return d
- @deferred(timeout=200)
- def test_cancel_running(self):
+ @raises(CancelledError)
+ @deferred(timeout=2)
+ def test_task_stop_queued(self):
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=9,
+ success=True, value=1, delay=0)
+
+ d = self.runner.queue_task(t)
+ self.assertEqual(self.runner.pending_tasks, 1)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ t.stop()
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ return d
+
+ def test_task_stop_not_queued(self):
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=9,
+ success=True, value=1, delay=0)
+
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ t.stop()
+ self.assertFalse(t.running)
+
+ @deferred(timeout=3)
+ def test_task_runner_cancel_running(self):
+ # Both task run in parallel but t1 will finish first and
+ # will request t2 to terminate by calling the TaskRunner's
+ # cancel task method
t1 = SimpleTask(None, DEVICE_ID,
exclusive=False, priority=9,
success=True, value=1, delay=0.5)
@@ -251,11 +303,14 @@
self.assertEqual(self.runner.pending_tasks, 2)
self.assertEqual(self.runner.running_tasks, 0)
- def kill_task_t2(_, task_id):
+ def kill_task_t2(_, task_2_id):
+ # Called on successful completion of task t1
+ self.assertIsInstance(task_2_id, int)
self.assertEqual(self.runner.pending_tasks, 0)
self.assertEqual(self.runner.running_tasks, 1)
- self.runner.cancel_task(task_id)
+ # Cancel task runner and t2 task ID
+ self.runner.cancel_task(task_2_id)
self.assertEqual(self.runner.running_tasks, 0)
d1.addCallbacks(kill_task_t2, self.unexpected_error,
@@ -268,11 +323,162 @@
self.assertEqual(self.runner.successful_tasks_completed, 1)
self.assertEqual(self.runner.failed_tasks, 1)
+ # T2 should not finish successfully, should get a cancel error
d2.addCallbacks(self.unexpected_success, expected_error)
+ # Run it
self.runner.start()
return defer.gatherResults([d1, d2], consumeErrors=True)
+ @deferred(timeout=3)
+ def test_task_stop_running(self):
+ # Run two tasks where T1 completes first and requests T2 to be
+ # canceled by calling T2's stop method
+
+ t1 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=0.5)
+ t2 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=200)
+
+ d1 = self.runner.queue_task(t1)
+ d2 = self.runner.queue_task(t2)
+
+ self.assertEqual(self.runner.pending_tasks, 2)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ def kill_task_t2(_, task_2):
+ # Called on successful completion of task t1
+ self.assertIsInstance(task_2, SimpleTask)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 1)
+
+ # Cancel by telling the task to stop itself
+ task_2.stop()
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ d1.addCallbacks(kill_task_t2, self.unexpected_error,
+ callbackArgs=[t2])
+
+ def expected_error(failure):
+ self.assertTrue(isinstance(failure.value, CancelledError))
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 1)
+ self.assertEqual(self.runner.failed_tasks, 1)
+
+ # T2 should not finish successfully, should get a cancel error
+ d2.addCallbacks(self.unexpected_success, expected_error)
+
+ # Run it
+ self.runner.start()
+ return defer.gatherResults([d1, d2], consumeErrors=True)
+
+ @deferred(timeout=3)
+ def test_task_cancel_not_queued(self):
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=9,
+ success=True, value=1, delay=0)
+
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ def expected_error(failure):
+ self.assertTrue(isinstance(failure.value, CancelledError))
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 0)
+ self.assertEqual(self.runner.failed_tasks, 0)
+ # self.fail(msg='made it here') # Uncomment to verify called
+
+ t.deferred.addCallbacks(self.unexpected_success, expected_error)
+
+ self.runner.start()
+ t.deferred.cancel()
+ self.assertFalse(t.running)
+ return t.deferred
+
+ @deferred(timeout=3)
+ def test_task_deferred_cancel_running(self):
+ # Run two tasks where T1 completes first and requests T2 to be
+ # canceled by doing a 'cancel' on T2's deferred
+
+ t1 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=0.5)
+ t2 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=200)
+
+ d1 = self.runner.queue_task(t1)
+ d2 = self.runner.queue_task(t2)
+
+ self.assertEqual(self.runner.pending_tasks, 2)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ def kill_task_t2(_, deferred_2):
+ # Called on successful completion of task t1
+ self.assertIsInstance(deferred_2, defer.Deferred)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 1)
+
+ # Cancel the deferred for T2
+ deferred_2.cancel()
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ d1.addCallbacks(kill_task_t2, self.unexpected_error,
+ callbackArgs=[t2.deferred])
+
+ def expected_error(failure):
+ self.assertTrue(isinstance(failure.value, CancelledError))
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 1)
+ self.assertEqual(self.runner.failed_tasks, 1)
+ # self.fail(msg='made it here') # Uncomment to verify called
+
+ # T2 should not finish successfully, should get a cancel error
+ d2.addCallbacks(self.unexpected_success, expected_error)
+
+ # Run it
+ self.runner.start()
+ return defer.gatherResults([d1, d2], consumeErrors=True)
+
+ @deferred(timeout=3)
+ def test_watchdog_timeout(self):
+ t = SimpleTask(None, DEVICE_ID, delay=2)
+
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.watchdog_timeouts, 0)
+
+ # Actual watchdog minimum is probably to long for an automated test, reach
+ # around and force ti to something smaller (kids, don't try this at home)
+
+ t._watchdog_timeout = 0.1
+ self.runner.queue_task(t)
+
+ self.assertEqual(self.runner.pending_tasks, 1)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ def expected_error(failure):
+ from voltha.extensions.omci.tasks.task import WatchdogTimeoutFailure
+ self.assertTrue(isinstance(failure.value, WatchdogTimeoutFailure))
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 0)
+ self.assertEqual(self.runner.failed_tasks, 1)
+ self.assertEqual(self.runner.watchdog_timeouts, 1)
+ self.assertEqual(self.runner.last_watchdog_failure_task, t.name)
+ # self.fail(msg='made it here') # Uncomment to verify called
+
+ t.deferred.addCallbacks(self.unexpected_success, expected_error)
+
+ # Run it
+ self.runner.start()
+ return t.deferred
+
if __name__ == '__main__':
main()
diff --git a/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py b/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py
index a85372c..e782913 100644
--- a/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py
+++ b/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py
@@ -133,9 +133,6 @@
:param operation: (str) what operation was being performed
:return: True if successful, False if the entity existed (already created)
"""
- if not self.running:
- raise MibDownloadFailure('Download Task was cancelled')
-
omci_msg = results.fields['omci_message'].fields
status = omci_msg['success_code']
error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
@@ -146,6 +143,7 @@
failed_mask=failed_mask, unsupported_mask=unsupported_mask)
if status == RC.Success:
+ self.strobe_watchdog()
return True
elif status == RC.InstanceExists:
@@ -178,6 +176,7 @@
try:
# Lock the UNI ports to prevent any alarms during initial configuration
# of the ONU
+ self.strobe_watchdog()
yield self.enable_unis(self._handler.uni_ports, True)
# Provision the initial bridge configuration
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
index 2c9c961..b325ca7 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
@@ -146,9 +146,6 @@
"""
self.log.debug('function-entry')
- if not self.running:
- raise MibDownloadFailure('Download Task was cancelled')
-
omci_msg = results.fields['omci_message'].fields
status = omci_msg['success_code']
error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
@@ -159,6 +156,7 @@
failed_mask=failed_mask, unsupported_mask=unsupported_mask)
if status == RC.Success:
+ self.strobe_watchdog()
return True
elif status == RC.InstanceExists:
@@ -191,6 +189,7 @@
try:
# Lock the UNI ports to prevent any alarms during initial configuration
# of the ONU
+ self.strobe_watchdog()
yield self.enable_uni(self._uni_port, True)
# Provision the initial bridge configuration
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
index 2a12eda..b998b0e 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
@@ -65,10 +65,6 @@
except:
pass
- def stop_if_not_running(self):
- if not self.running:
- raise BrcmUniLockException('UNI Lock Task was cancelled')
-
def start(self):
"""
Start UNI/PPTP Lock/Unlock Task
@@ -91,7 +87,7 @@
frame = msg.set()
self.log.debug('openomci-msg', msg=msg)
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
+ self.strobe_watchdog()
status = results.fields['omci_message'].fields['success_code']
self.log.info('response-status', status=status)
@@ -106,11 +102,11 @@
for key, value in pptp.iteritems():
msg = PptpEthernetUniFrame(key,
- attributes=dict(administrative_state=state))
+ attributes=dict(administrative_state=state))
frame = msg.set()
self.log.debug('openomci-msg', msg=msg)
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
+ self.strobe_watchdog()
status = results.fields['omci_message'].fields['success_code']
self.log.info('response-status', status=status)
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index aed018e..54c2d28 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -46,6 +46,7 @@
MulticastGemportsConfigData
from voltha.protos.bbf_fiber_multicast_distribution_set_body_pb2 import \
MulticastDistributionSetData
+from voltha.protos.omci_mib_db_pb2 import MibDeviceData
from voltha.protos.omci_alarm_db_pb2 import AlarmDeviceData
log = structlog.get_logger()
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index c725cce..ebb5124 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -902,3 +902,20 @@
return self._database.query(self._device_id, class_id=class_id,
instance_id=instance_id,
attributes=attributes)
+
+ def mib_set(self, class_id, entity_id, attributes):
+ """
+ Set attributes of an existing ME Class instance
+
+ This method is primarily used by other state machines to save ME specific
+ information to the persistent database. Access by objects external to the
+ OpenOMCI library is discouraged.
+
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Class entity ID
+ :param attributes: (dict) attribute -> value pairs to set
+ """
+ # It must exist first (but attributes can be new)
+ if isinstance(attributes, dict) and len(attributes) and\
+ self.query_mib(class_id, entity_id) is not None:
+ self._database.set(self._device_id, class_id, entity_id, attributes)
diff --git a/voltha/extensions/omci/tasks/alarm_resync_task.py b/voltha/extensions/omci/tasks/alarm_resync_task.py
index b8b8e69..2278296 100644
--- a/voltha/extensions/omci/tasks/alarm_resync_task.py
+++ b/voltha/extensions/omci/tasks/alarm_resync_task.py
@@ -53,6 +53,7 @@
max_alarm_upload_next_retries = 3
alarm_upload_next_delay = 10 # Max * delay < 60 seconds
+ watchdog_timeout = 15 # Should be > any retry delay
def __init__(self, omci_agent, device_id):
"""
@@ -62,10 +63,11 @@
:param device_id: (str) ONU Device ID
"""
super(AlarmResyncTask, self).__init__(AlarmResyncTask.name,
- omci_agent,
- device_id,
- priority=AlarmResyncTask.task_priority,
- exclusive=False)
+ omci_agent,
+ device_id,
+ priority=AlarmResyncTask.task_priority,
+ exclusive=False,
+ watchdog_timeout=AlarmResyncTask.watchdog_timeout)
self._local_deferred = None
self._device = omci_agent.get_device(device_id)
self._db_active = MibDbVolatileDict(omci_agent)
@@ -102,10 +104,6 @@
self._db_active = None
super(AlarmResyncTask, self).stop()
- def stop_if_not_running(self):
- if not self.running:
- raise AlarmResyncException('Resync Task was cancelled')
-
@inlineCallbacks
def perform_alarm_resync(self):
"""
@@ -122,9 +120,11 @@
self.log.info('perform-alarm-resync')
try:
+ self.strobe_watchdog()
command_sequence_number = yield self.snapshot_alarm()
# Start the ALARM upload sequence, save alarms to the table
+ self.strobe_watchdog()
commands_retrieved, alarm_table = yield self.upload_alarm(command_sequence_number)
if commands_retrieved < command_sequence_number:
@@ -158,7 +158,7 @@
# Send ALARM Upload so ONU snapshots its ALARM
try:
command_sequence_number = yield self.send_alarm_upload()
- self.stop_if_not_running()
+ self.strobe_watchdog()
if command_sequence_number is None:
if retries >= max_tries:
@@ -169,8 +169,8 @@
if retries >= max_tries:
raise
+ self.strobe_watchdog()
yield asleep(AlarmResyncTask.retry_delay)
- self.stop_if_not_running()
continue
except Exception as e:
@@ -181,7 +181,7 @@
if command_sequence_number is None:
raise AlarmCopyException('Failed to snapshot ALARM copy after {} retries'.
- format(AlarmResyncTask.max_retries))
+ format(AlarmResyncTask.max_retries))
returnValue(command_sequence_number)
@@ -195,8 +195,9 @@
########################################
# Begin ALARM Upload
try:
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send_get_all_alarm()
- self.stop_if_not_running()
+
command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
if command_sequence_number is None or command_sequence_number <= 0:
@@ -222,8 +223,8 @@
for retries in xrange(0, max_tries):
try:
+ self.strobe_watchdog()
response = yield self._device.omci_cc.get_all_alarm_next(seq_no)
- self.stop_if_not_running()
omci_msg = response.fields['omci_message'].fields
alarm_class_id[seq_no] = omci_msg['alarmed_entity_class']
@@ -248,6 +249,7 @@
command_sequence_number=command_sequence_number)
if retries < max_tries - 1:
+ self.strobe_watchdog()
yield asleep(AlarmResyncTask.alarm_upload_next_delay)
else:
raise
@@ -256,5 +258,6 @@
self.log.exception('resync', e=e, seq_no=seq_no,
command_sequence_number=command_sequence_number)
+ self.strobe_watchdog()
returnValue((seq_no + 1, alarm_class_id, alarm_entity_id, attributes)) # seq_no is zero based and alarm table.
diff --git a/voltha/extensions/omci/tasks/alarm_sync_data.py b/voltha/extensions/omci/tasks/alarm_sync_data.py
index 115ef9e..331fe1b 100644
--- a/voltha/extensions/omci/tasks/alarm_sync_data.py
+++ b/voltha/extensions/omci/tasks/alarm_sync_data.py
@@ -72,10 +72,6 @@
self.cancel_deferred()
super(AlarmSyncDataTask, self).stop()
- def stop_if_not_running(self):
- if not self.running:
- raise AlarmSyncDataFailure('Update Task was cancelled')
-
@inlineCallbacks
def perform_alarm_sync_data(self):
"""
@@ -89,8 +85,9 @@
#########################################
# ONU Data (ME #2)
# alarm_retrieval_mode=1, time=DEFAULT_OMCI_TIMEOUT
+ self.strobe_watchdog()
results = yield device.omci_cc.send_get_all_alarm(alarm_retrieval_mode=1)
- self.stop_if_not_running()
+
command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
for seq_no in xrange(command_sequence_number):
@@ -102,8 +99,9 @@
self.log.debug('alarm-data-next-request', seq_no=seq_no,
retry=retry,
command_sequence_number=command_sequence_number)
+ self.strobe_watchdog()
yield device.omci_cc.send_get_all_alarm_next(seq_no)
- self.stop_if_not_running()
+
self.log.debug('alarm-data-next-success', seq_no=seq_no,
command_sequence_number=command_sequence_number)
break
@@ -115,8 +113,9 @@
if retry >= 2:
raise AlarmSyncDataFailure('Alarm timeout failure on req {} of {}'.
format(seq_no + 1, command_sequence_number))
+
+ self.strobe_watchdog()
yield asleep(0.3)
- self.stop_if_not_running()
# Successful if here
self.log.info('alarm-synchronized')
diff --git a/voltha/extensions/omci/tasks/file_download_task.py b/voltha/extensions/omci/tasks/file_download_task.py
index 7d8719d..5234c41 100755
--- a/voltha/extensions/omci/tasks/file_download_task.py
+++ b/voltha/extensions/omci/tasks/file_download_task.py
@@ -20,12 +20,14 @@
import requests
import os
+
class FileDownloadTask(Task):
- task_priority = 250
name = "Image File Download Task"
def __init__(self, omci_agent, device_id, url, local_path):
- super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, device_id)
+ super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, device_id,
+ exclusive=False,
+ watchdog_timeout=45)
self.url = url
self.local_path = local_path
# self.log.debug('{} running'.format(FileDownloadTask.name))
@@ -38,10 +40,13 @@
dir_name = os.path.dirname(self.local_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
-
+
+ self.strobe_watchdog()
r = requests.get(self.url, stream=True)
+
with open(self.local_path, 'wb') as f:
- for chunk in r.iter_content(chunk_size=1024):
+ for chunk in r.iter_content(chunk_size=1024):
+ self.strobe_watchdog()
if chunk: # filter out keep-alive new chunks
f.write(chunk)
self.deferred.callback('device {} success downloaded {} '.format(self.device_id, self.url))
@@ -52,4 +57,4 @@
def stop(self):
self.cancel_deferred()
super(FileDownloadTask, self).stop()
-
+
diff --git a/voltha/extensions/omci/tasks/get_mds_task.py b/voltha/extensions/omci/tasks/get_mds_task.py
index e2f11fd..3807f7d 100644
--- a/voltha/extensions/omci/tasks/get_mds_task.py
+++ b/voltha/extensions/omci/tasks/get_mds_task.py
@@ -83,6 +83,7 @@
#########################################
# Request (MDS supplied value does not matter for a 'get' request)
+ self.strobe_watchdog()
results = yield device.omci_cc.send(OntDataFrame().get())
omci_msg = results.fields['omci_message'].fields
diff --git a/voltha/extensions/omci/tasks/interval_data_task.py b/voltha/extensions/omci/tasks/interval_data_task.py
index 04cc8eb..9475cd8 100644
--- a/voltha/extensions/omci/tasks/interval_data_task.py
+++ b/voltha/extensions/omci/tasks/interval_data_task.py
@@ -148,6 +148,7 @@
self.log.debug('interval-get-request', class_id=self._class_id,
entity_id=self._entity_id)
try:
+ self.strobe_watchdog()
results = yield device.omci_cc.send(frame)
omci_msg = results.fields['omci_message'].fields
diff --git a/voltha/extensions/omci/tasks/mib_resync_task.py b/voltha/extensions/omci/tasks/mib_resync_task.py
index 30a888f..a6b6892 100644
--- a/voltha/extensions/omci/tasks/mib_resync_task.py
+++ b/voltha/extensions/omci/tasks/mib_resync_task.py
@@ -53,6 +53,7 @@
max_mib_upload_next_retries = 3
mib_upload_next_delay = 10 # Max * delay < 60 seconds
+ watchdog_timeout = 15 # Should be > max delay
def __init__(self, omci_agent, device_id):
"""
@@ -102,10 +103,6 @@
self._db_active = None
super(MibResyncTask, self).stop()
- def stop_if_not_running(self):
- if not self.running:
- raise MibResyncException('Resync Task was cancelled')
-
@inlineCallbacks
def perform_mib_resync(self):
"""
@@ -138,6 +135,7 @@
number_of_commands = results[1]
# Start the MIB upload sequence
+ self.strobe_watchdog()
commands_retrieved = yield self.upload_mib(number_of_commands)
if commands_retrieved < number_of_commands:
@@ -176,8 +174,8 @@
for retries in xrange(0, max_tries + 1):
# Send MIB Upload so ONU snapshots its MIB
try:
+ self.strobe_watchdog()
number_of_commands = yield self.send_mib_upload()
- self.stop_if_not_running()
if number_of_commands is None:
if retries >= max_tries:
@@ -189,8 +187,8 @@
if retries >= max_tries:
raise
+ self.strobe_watchdog()
yield asleep(MibResyncTask.db_copy_retry_delay)
- self.stop_if_not_running()
continue
# Get a snapshot of the local MIB database
@@ -220,8 +218,9 @@
########################################
# Begin MIB Upload
try:
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send_mib_upload()
- self.stop_if_not_running()
+
number_of_commands = results.fields['omci_message'].fields['number_of_commands']
if number_of_commands is None or number_of_commands <= 0:
@@ -244,8 +243,8 @@
for retries in xrange(0, max_tries):
try:
+ self.strobe_watchdog()
response = yield self._device.omci_cc.send_mib_upload_next(seq_no)
- self.stop_if_not_running()
omci_msg = response.fields['omci_message'].fields
class_id = omci_msg['object_entity_class']
@@ -269,6 +268,7 @@
number_of_commands=number_of_commands)
if retries < max_tries - 1:
+ self.strobe_watchdog()
yield asleep(MibResyncTask.mib_upload_next_delay)
else:
raise
@@ -287,6 +287,8 @@
:param db_active: (dict) ONU's database snapshot
:return: (dict), (dict), dict() Differences
"""
+ self.strobe_watchdog()
+
# Class & Entities only in local copy (OpenOMCI)
on_olt_only = self.get_lsh_only_dict(db_copy, db_active)
diff --git a/voltha/extensions/omci/tasks/mib_upload.py b/voltha/extensions/omci/tasks/mib_upload.py
index eb0fa6b..639f8fd 100644
--- a/voltha/extensions/omci/tasks/mib_upload.py
+++ b/voltha/extensions/omci/tasks/mib_upload.py
@@ -80,10 +80,6 @@
self.cancel_deferred()
super(MibUploadTask, self).stop()
- def stop_if_not_running(self):
- if not self.running:
- raise MibUploadFailure('Upload Task was cancelled')
-
@inlineCallbacks
def perform_mib_upload(self):
"""
@@ -99,8 +95,8 @@
#########################################
# MIB Reset
+ self.strobe_watchdog()
results = yield device.omci_cc.send_mib_reset()
- self.stop_if_not_running()
status = results.fields['omci_message'].fields['success_code']
if status != ReasonCodes.Success.value:
@@ -109,8 +105,9 @@
########################################
# Begin MIB Upload
+ self.strobe_watchdog()
results = yield device.omci_cc.send_mib_upload()
- self.stop_if_not_running()
+
number_of_commands = results.fields['omci_message'].fields['number_of_commands']
for seq_no in xrange(number_of_commands):
@@ -122,8 +119,9 @@
self.log.debug('mib-upload-next-request', seq_no=seq_no,
retry=retry,
number_of_commands=number_of_commands)
+ self.strobe_watchdog()
yield device.omci_cc.send_mib_upload_next(seq_no)
- self.stop_if_not_running()
+
self.log.debug('mib-upload-next-success', seq_no=seq_no,
number_of_commands=number_of_commands)
break
@@ -135,8 +133,8 @@
if retry >= 2:
raise MibUploadFailure('Upload timeout failure on req {} of {}'.
format(seq_no + 1, number_of_commands))
+ self.strobe_watchdog()
yield asleep(0.3)
- self.stop_if_not_running()
# Successful if here
self.log.info('mib-synchronized')
@@ -158,4 +156,3 @@
except Exception as e:
self.log.exception('mib-upload', e=e)
self.deferred.errback(failure.Failure(e))
-
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
index e818d87..e0e92c3 100644
--- a/voltha/extensions/omci/tasks/omci_create_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -65,10 +65,6 @@
except:
pass
- def stop_if_not_running(self):
- if not self.running:
- raise CreatePMException('Create PM ME Task was cancelled')
-
def start(self):
""" Start task """
super(OmciCreatePMRequest, self).start()
@@ -121,8 +117,8 @@
data=data
)
)
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
status = results.fields['omci_message'].fields['success_code']
self.log.debug('perform-create-status', status=status)
diff --git a/voltha/extensions/omci/tasks/omci_delete_pm_task.py b/voltha/extensions/omci/tasks/omci_delete_pm_task.py
index c80a575..7906a83 100644
--- a/voltha/extensions/omci/tasks/omci_delete_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_delete_pm_task.py
@@ -63,10 +63,6 @@
except:
pass
- def stop_if_not_running(self):
- if not self.running:
- raise DeletePMException('Delete PM ME Task was cancelled')
-
def start(self):
""" Start task """
super(OmciDeletePMRequest, self).start()
@@ -90,8 +86,8 @@
entity_id=entity_id
)
)
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
status = results.fields['omci_message'].fields['success_code']
self.log.info('perform-delete-status', status=status)
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
index 3bfdf06..b199f8f 100644
--- a/voltha/extensions/omci/tasks/omci_get_request.py
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -86,10 +86,6 @@
except:
pass
- def stop_if_not_running(self):
- if not self.running:
- raise GetException('Get Request Task was cancelled')
-
@property
def me_class(self):
"""The OMCI Managed Entity Class associated with this request"""
@@ -156,8 +152,8 @@
try:
frame = MEFrame(self._entity_class, self._entity_id, self._attributes).get()
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
status = results.fields['omci_message'].fields['success_code']
self.log.info('perform-get-status', status=status)
@@ -172,6 +168,7 @@
results_omci['attributes_mask']
if missing_attr > 0:
+ self.strobe_watchdog()
self._local_deferred = reactor.callLater(0,
self.perform_get_missing_attributes,
missing_attr)
@@ -185,6 +182,7 @@
raise GetException('Get failed with status code: {}'.
format(RC.AttributeFailure.value))
+ self.strobe_watchdog()
self._local_deferred = reactor.callLater(0,
self.perform_get_failed_attributes,
results,
@@ -232,8 +230,8 @@
)
)
try:
+ self.strobe_watchdog()
get_results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
get_omci = get_results.fields['omci_message'].fields
if get_omci['success_code'] != RC.Success.value:
@@ -266,8 +264,8 @@
try:
frame = MEFrame(self._entity_class, self._entity_id, {attr}).get()
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
status = results.fields['omci_message'].fields['success_code']
diff --git a/voltha/extensions/omci/tasks/omci_modify_request.py b/voltha/extensions/omci/tasks/omci_modify_request.py
index de15011..27dfb3d 100644
--- a/voltha/extensions/omci/tasks/omci_modify_request.py
+++ b/voltha/extensions/omci/tasks/omci_modify_request.py
@@ -78,10 +78,6 @@
except:
pass
- def stop_if_not_running(self):
- if not self.running:
- raise ModifyException('Modify Request Task was cancelled')
-
@property
def success_code(self):
"""
@@ -124,7 +120,6 @@
"""
For Set requests, a failure may indicate that one or more attributes
are not supported by this ONU. This property returns any those unsupported attributes
- attributes
:return: None if not a set request, otherwise the attribute mask of any illegal
parameters
@@ -159,8 +154,8 @@
self.log.info('perform-request')
try:
+ self.strobe_watchdog()
self._results = yield self._device.omci_cc.send(self._frame)
- self.stop_if_not_running()
status = self._results.fields['omci_message'].fields['success_code']
self.log.info('response-status', status=status)
diff --git a/voltha/extensions/omci/tasks/onu_capabilities_task.py b/voltha/extensions/omci/tasks/onu_capabilities_task.py
index 4a8c1d3..e93d0ed 100644
--- a/voltha/extensions/omci/tasks/onu_capabilities_task.py
+++ b/voltha/extensions/omci/tasks/onu_capabilities_task.py
@@ -122,10 +122,6 @@
self._device = None
super(OnuCapabilitiesTask, self).stop()
- def stop_if_not_running(self):
- if not self.running:
- raise GetCapabilitiesFailure('Get Capabilities Task was cancelled')
-
@inlineCallbacks
def perform_get_capabilities(self):
"""
@@ -141,11 +137,11 @@
self.log.info('perform-get')
try:
+ self.strobe_watchdog()
self._supported_entities = yield self.get_supported_entities()
- self.stop_if_not_running()
+ self.strobe_watchdog()
self._supported_msg_types = yield self.get_supported_message_types()
- self.stop_if_not_running()
self.log.debug('get-success',
supported_entities=self.supported_managed_entities,
@@ -164,7 +160,7 @@
"""
Extract the 4 octet buffer length from the OMCI PDU contents
"""
- self.log.debug('get-count-buffer', data=data)
+ self.log.debug('get-count-buffer', data=hexlify(data))
return int(hexlify(data[:4]), 16)
@inlineCallbacks
@@ -175,8 +171,8 @@
try:
# Get the number of requests needed
frame = OmciFrame(me_type_table=True).get()
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
omci_msg = results.fields['omci_message']
status = omci_msg.fields['success_code']
@@ -189,14 +185,14 @@
seq_no = 0
data_buffer = bytearray(0)
- self.log.debug('me-type-count', octets=count, data=data)
+ self.log.debug('me-type-count', octets=count, data=hexlify(data))
# Start the loop
for offset in xrange(0, count, self._pdu_size):
frame = OmciFrame(me_type_table=seq_no).get_next()
seq_no += 1
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
omci_msg = results.fields['omci_message']
status = omci_msg.fields['success_code']
@@ -230,8 +226,8 @@
try:
# Get the number of requests needed
frame = OmciFrame(message_type_table=True).get()
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
omci_msg = results.fields['omci_message']
status = omci_msg.fields['success_code']
@@ -245,14 +241,14 @@
seq_no = 0
data_buffer = list()
- self.log.debug('me-type-count', octets=count, data=data)
+ self.log.debug('me-type-count', octets=count, data=hexlify(data))
# Start the loop
for offset in xrange(0, count, self._pdu_size):
frame = OmciFrame(message_type_table=seq_no).get_next()
seq_no += 1
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
- self.stop_if_not_running()
omci_msg = results.fields['omci_message']
status = omci_msg.fields['success_code']
diff --git a/voltha/extensions/omci/tasks/reboot_task.py b/voltha/extensions/omci/tasks/reboot_task.py
index 8cbf808..316e23b 100644
--- a/voltha/extensions/omci/tasks/reboot_task.py
+++ b/voltha/extensions/omci/tasks/reboot_task.py
@@ -94,6 +94,7 @@
try:
frame = OntGFrame().reboot(reboot_code=self._flags)
+ self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame, timeout=self._timeout)
status = results.fields['omci_message'].fields['success_code']
@@ -109,7 +110,7 @@
raise RebootException(msg)
self.log.info('reboot-success')
- self.deferred.callback(None)
+ self.deferred.callback(self)
except TimeoutError:
self.log.info('timeout', msg='Request timeout is not considered an error')
diff --git a/voltha/extensions/omci/tasks/task.py b/voltha/extensions/omci/tasks/task.py
index 9e10c65..36020c0 100644
--- a/voltha/extensions/omci/tasks/task.py
+++ b/voltha/extensions/omci/tasks/task.py
@@ -14,7 +14,13 @@
# limitations under the License.
#
import structlog
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.internet.defer import failure
+
+
+class WatchdogTimeoutFailure(Exception):
+ """Task callback/errback not called properly before watchdog expiration"""
+ pass
class Task(object):
@@ -35,10 +41,14 @@
DEFAULT_PRIORITY = 128
MIN_PRIORITY = 0
MAX_PRIORITY = 255
+ DEFAULT_WATCHDOG_SECS = 10 # 10 seconds
+ MIN_WATCHDOG_SECS = 3 # 3 seconds
+ MAX_WATCHDOG_SECS = 60 # 60 seconds
+
_next_task_id = 0
def __init__(self, name, omci_agent, device_id, priority=DEFAULT_PRIORITY,
- exclusive=True):
+ exclusive=True, watchdog_timeout=DEFAULT_WATCHDOG_SECS):
"""
Class initialization
@@ -47,10 +57,15 @@
:param priority: (int) Task priority (0..255) 255 Highest
:param exclusive: (bool) If True, this task needs exclusive access to the
OMCI Communications channel when it runs
+ :param watchdog_timeout (int or float) Watchdog timeout (seconds) after task start, to
+ run longer, periodically call 'strobe_watchdog()' to reschedule.
"""
assert Task.MIN_PRIORITY <= priority <= Task.MAX_PRIORITY, \
'Priority should be {}..{}'.format(Task.MIN_PRIORITY, Task.MAX_PRIORITY)
+ assert Task.MIN_WATCHDOG_SECS <= watchdog_timeout <= Task.MAX_WATCHDOG_SECS, \
+ 'Watchdog timeout should be {}..{} seconds'
+
Task._next_task_id += 1
self._task_id = Task._next_task_id
self.log = structlog.get_logger(device_id=device_id, name=name,
@@ -60,13 +75,14 @@
self.omci_agent = omci_agent
self._running = False
self._exclusive = exclusive
- # TODO: Should we watch for a cancel on the task's deferred as well?
self._deferred = defer.Deferred() # Fires upon completion
+ self._watchdog = None
+ self._watchdog_timeout = watchdog_timeout
self._priority = priority
def __str__(self):
- return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}'.format(
- self.name, self.task_id, self.priority, self.exclusive)
+ return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}, Watchdog: {}'.format(
+ self.name, self.task_id, self.priority, self.exclusive, self.watchdog_timeout)
@property
def priority(self):
@@ -81,6 +97,10 @@
return self._exclusive
@property
+ def watchdog_timeout(self):
+ return self._watchdog_timeout
+
+ @property
def deferred(self):
return self._deferred
@@ -94,12 +114,15 @@
return self._running
def cancel_deferred(self):
- d, self._deferred = self._deferred, None
- try:
- if d is not None and not d.called:
- d.cancel()
- except:
- pass
+ d1, self._deferred = self._deferred, None
+ d2, self._watchdog = self._watchdog, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
def start(self):
"""
@@ -109,6 +132,7 @@
assert self._deferred is not None and not self._deferred.called, \
'Cannot re-use the same task'
self._running = True
+ self.strobe_watchdog()
def stop(self):
"""
@@ -118,3 +142,47 @@
self._running = False
self.cancel_deferred()
self.omci_agent = None # Should only start/stop once
+
+ def task_cleanup(self):
+ """
+ This method should only be called from the TaskRunner's callback/errback
+ that is added when the task is initially queued. It is responsible for
+ clearing of the 'running' flag and canceling of the watchdog time
+ """
+ self._running = False
+ d, self._watchdog = self._watchdog, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def strobe_watchdog(self):
+ """
+ Signal that we have not hung/deadlocked
+ """
+ # Create if first time (called at Task start)
+
+ def watchdog_timeout():
+ # Task may have hung (blocked) or failed to call proper success/error
+ # completion callback/errback
+ if not self.deferred.called:
+ err_msg = 'Task {}:{} watchdog timeout'.format(self.name, self.task_id)
+ self.log.error("task-watchdog-timeout", running=self.running,
+ timeout=self.watchdog_timeout, error=err_msg)
+
+ self.deferred.errback(failure.Failure(WatchdogTimeoutFailure(err_msg)))
+ self.deferred.cancel()
+
+ if self._watchdog is not None:
+ if self._watchdog.called:
+ # Too late, timeout failure in progress
+ self.log.warn('task-watchdog-tripped', running=self.running,
+ timeout=self.watchdog_timeout)
+ return
+
+ d, self._watchdog = self._watchdog, None
+ d.cancel()
+
+ # Schedule/re-schedule the watchdog timer
+ self._watchdog = reactor.callLater(self.watchdog_timeout, watchdog_timeout)
diff --git a/voltha/extensions/omci/tasks/task_runner.py b/voltha/extensions/omci/tasks/task_runner.py
index e06ae1c..364e0b3 100644
--- a/voltha/extensions/omci/tasks/task_runner.py
+++ b/voltha/extensions/omci/tasks/task_runner.py
@@ -30,6 +30,8 @@
self._successful_tasks = 0
self._failed_tasks = 0
+ self._watchdog_timeouts = 0
+ self._last_watchdog_failure_task = ''
def __str__(self):
return 'TaskRunner: Pending: {}, Running:{}'.format(self.pending_tasks,
@@ -64,6 +66,15 @@
def failed_tasks(self):
return self._failed_tasks
+ @property
+ def watchdog_timeouts(self):
+ return self._watchdog_timeouts
+
+ @property
+ def last_watchdog_failure_task(self):
+ """ Task name of last tasks to fail due to watchdog"""
+ return self._last_watchdog_failure_task
+
# TODO: add properties for various stats as needed
def start(self):
@@ -165,6 +176,7 @@
assert task is not None and task.task_id in self._running_queue,\
'Task not found in running queue'
+ task.task_cleanup()
self._successful_tasks += 1
del self._running_queue[task.task_id]
@@ -183,6 +195,8 @@
:param task: (Task) The task that failed
:return: (Failure) Failure results
"""
+ from voltha.extensions.omci.tasks.task import WatchdogTimeoutFailure
+
self.log.debug('task-failure', task_id=str(task),
running=len(self._running_queue),
pending=len(self._pending_queue))
@@ -190,9 +204,14 @@
assert task is not None and task.task_id in self._running_queue,\
'Task not found in running queue'
+ task.task_cleanup()
self._failed_tasks += 1
del self._running_queue[task.task_id]
+ if isinstance(failure.value, WatchdogTimeoutFailure):
+ self._watchdog_timeouts += 1
+ self._last_watchdog_failure_task = task.name
+
except Exception as e:
# Check the pending queue