VOL-945: Added support for OpenOMCI Task watchdog

Change-Id: Ie81031183db159514f00bcf764ab484d19335ddb
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_task.py b/tests/utests/voltha/extensions/omci/mock/mock_task.py
index 499a945..088ec92 100644
--- a/tests/utests/voltha/extensions/omci/mock/mock_task.py
+++ b/tests/utests/voltha/extensions/omci/mock/mock_task.py
@@ -25,7 +25,8 @@
                  success=True,
                  delay=0,
                  value=None,
-                 priority=Task.DEFAULT_PRIORITY):
+                 priority=Task.DEFAULT_PRIORITY,
+                 watchdog_timeout=Task.DEFAULT_WATCHDOG_SECS):
         """
         Class initialization
 
@@ -35,6 +36,7 @@
         :param success: (bool) True if the task should complete successfully
         :param delay: (int/float) Time it takes the task to complete
         :param priority (int) Priority of the task
+        :param watchdog_timeout (int or float) Watchdog timeout after task start
         :param value: (various) The value (string, int, ...) to return if successful
                                 or an Exception to send to the errBack if 'success'
                                 is False
@@ -43,7 +45,8 @@
                                          omci_agent,
                                          device_id,
                                          exclusive=exclusive,
-                                         priority=priority)
+                                         priority=priority,
+                                         watchdog_timeout=watchdog_timeout)
         self._delay = delay
         self._success = success
         self._value = value
diff --git a/tests/utests/voltha/extensions/omci/test_task_runner.py b/tests/utests/voltha/extensions/omci/test_task_runner.py
index e15541b..cd0e043 100644
--- a/tests/utests/voltha/extensions/omci/test_task_runner.py
+++ b/tests/utests/voltha/extensions/omci/test_task_runner.py
@@ -54,6 +54,12 @@
         self.runner.stop()
         self.assertFalse(self.runner.active)
 
+    def unexpected_error(self, _failure):
+        self.assertEqual('Should not be here, expected success', _failure)
+
+    def unexpected_success(self, _results):
+        self.assertEqual('Should not be here, expected a failure', _results)
+
     def test_simple_task_init(self):
         t = SimpleTask(None, DEVICE_ID,
                        exclusive=True, priority=0,
@@ -64,19 +70,35 @@
         self.assertTrue(t.exclusive)
         self.assertFalse(t.deferred.called)
 
-    @raises(AssertionError)
-    def test_simple_negative_priority(self):
-        SimpleTask(None, DEVICE_ID, priority=-1)
+    def test_task_defaults_and_bound(self):
+        # Make sure no one has changed some declared defaults/max/min values
+        from voltha.extensions.omci.tasks.task import Task
+        self.assertEqual(128, Task.DEFAULT_PRIORITY)
+        self.assertEqual(0, Task.MIN_PRIORITY)
+        self.assertEqual(255, Task.MAX_PRIORITY)
+        self.assertEqual(10, Task.DEFAULT_WATCHDOG_SECS)
+        self.assertEqual(3, Task.MIN_WATCHDOG_SECS)
+        self.assertEqual(60, Task.MAX_WATCHDOG_SECS)
 
     @raises(AssertionError)
-    def test_simple_big_priority(self):
-        SimpleTask(None, DEVICE_ID, priority=256)
+    def test_task_priority_min(self):
+        from voltha.extensions.omci.tasks.task import Task
+        _ = SimpleTask(None, DEVICE_ID, priority=Task.MIN_PRIORITY - 1)
 
-    def unexpected_error(self, _failure):
-        self.assertEqual('Should not be here, expected success', _failure)
+    @raises(AssertionError)
+    def test_task_priority_max(self):
+        from voltha.extensions.omci.tasks.task import Task
+        _ = SimpleTask(None, DEVICE_ID, priority=Task.MAX_PRIORITY + 1)
 
-    def unexpected_success(self, _results):
-        self.assertEqual('Should not be here, expected a failure', _results)
+    @raises(AssertionError)
+    def test_task_watchdog_min(self):
+        from voltha.extensions.omci.tasks.task import Task
+        _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MIN_WATCHDOG_SECS - 0.000001)
+
+    @raises(AssertionError)
+    def test_task_watchdog_max(self):
+        from voltha.extensions.omci.tasks.task import Task
+        _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MAX_WATCHDOG_SECS + 0.000001)
 
     @deferred(timeout=5)
     def test_simple_success(self):
@@ -236,8 +258,38 @@
         self.assertEqual(self.runner.running_tasks, 0)
         return d
 
-    @deferred(timeout=200)
-    def test_cancel_running(self):
+    @raises(CancelledError)
+    @deferred(timeout=2)
+    def test_task_stop_queued(self):
+        t = SimpleTask(None, DEVICE_ID,
+                       exclusive=True, priority=9,
+                       success=True, value=1, delay=0)
+
+        d = self.runner.queue_task(t)
+        self.assertEqual(self.runner.pending_tasks, 1)
+        self.assertEqual(self.runner.running_tasks, 0)
+
+        t.stop()
+        self.assertEqual(self.runner.pending_tasks, 0)
+        self.assertEqual(self.runner.running_tasks, 0)
+        return d
+
+    def test_task_stop_not_queued(self):
+        t = SimpleTask(None, DEVICE_ID,
+                       exclusive=True, priority=9,
+                       success=True, value=1, delay=0)
+
+        self.assertEqual(self.runner.pending_tasks, 0)
+        self.assertEqual(self.runner.running_tasks, 0)
+
+        t.stop()
+        self.assertFalse(t.running)
+
+    @deferred(timeout=3)
+    def test_task_runner_cancel_running(self):
+        # Both task run in parallel but t1 will finish first and
+        # will request t2 to terminate by calling the TaskRunner's
+        # cancel task method
         t1 = SimpleTask(None, DEVICE_ID,
                         exclusive=False, priority=9,
                         success=True, value=1, delay=0.5)
@@ -251,11 +303,14 @@
         self.assertEqual(self.runner.pending_tasks, 2)
         self.assertEqual(self.runner.running_tasks, 0)
 
-        def kill_task_t2(_, task_id):
+        def kill_task_t2(_, task_2_id):
+            # Called on successful completion of task t1
+            self.assertIsInstance(task_2_id, int)
             self.assertEqual(self.runner.pending_tasks, 0)
             self.assertEqual(self.runner.running_tasks, 1)
 
-            self.runner.cancel_task(task_id)
+            # Cancel task runner and t2 task ID
+            self.runner.cancel_task(task_2_id)
             self.assertEqual(self.runner.running_tasks, 0)
 
         d1.addCallbacks(kill_task_t2, self.unexpected_error,
@@ -268,11 +323,162 @@
             self.assertEqual(self.runner.successful_tasks_completed, 1)
             self.assertEqual(self.runner.failed_tasks, 1)
 
+        # T2 should not finish successfully, should get a cancel error
         d2.addCallbacks(self.unexpected_success, expected_error)
 
+        # Run it
         self.runner.start()
         return defer.gatherResults([d1, d2], consumeErrors=True)
 
+    @deferred(timeout=3)
+    def test_task_stop_running(self):
+        # Run two tasks where T1 completes first and requests T2 to be
+        # canceled by calling T2's stop method
+
+        t1 = SimpleTask(None, DEVICE_ID,
+                        exclusive=False, priority=9,
+                        success=True, value=1, delay=0.5)
+        t2 = SimpleTask(None, DEVICE_ID,
+                        exclusive=False, priority=9,
+                        success=True, value=1, delay=200)
+
+        d1 = self.runner.queue_task(t1)
+        d2 = self.runner.queue_task(t2)
+
+        self.assertEqual(self.runner.pending_tasks, 2)
+        self.assertEqual(self.runner.running_tasks, 0)
+
+        def kill_task_t2(_, task_2):
+            # Called on successful completion of task t1
+            self.assertIsInstance(task_2, SimpleTask)
+            self.assertEqual(self.runner.pending_tasks, 0)
+            self.assertEqual(self.runner.running_tasks, 1)
+
+            # Cancel by telling the task to stop itself
+            task_2.stop()
+            self.assertEqual(self.runner.running_tasks, 0)
+
+        d1.addCallbacks(kill_task_t2, self.unexpected_error,
+                        callbackArgs=[t2])
+
+        def expected_error(failure):
+            self.assertTrue(isinstance(failure.value, CancelledError))
+            self.assertEqual(self.runner.pending_tasks, 0)
+            self.assertEqual(self.runner.running_tasks, 0)
+            self.assertEqual(self.runner.successful_tasks_completed, 1)
+            self.assertEqual(self.runner.failed_tasks, 1)
+
+        # T2 should not finish successfully, should get a cancel error
+        d2.addCallbacks(self.unexpected_success, expected_error)
+
+        # Run it
+        self.runner.start()
+        return defer.gatherResults([d1, d2], consumeErrors=True)
+
+    @deferred(timeout=3)
+    def test_task_cancel_not_queued(self):
+        t = SimpleTask(None, DEVICE_ID,
+                       exclusive=True, priority=9,
+                       success=True, value=1, delay=0)
+
+        self.assertEqual(self.runner.pending_tasks, 0)
+        self.assertEqual(self.runner.running_tasks, 0)
+
+        def expected_error(failure):
+            self.assertTrue(isinstance(failure.value, CancelledError))
+            self.assertEqual(self.runner.pending_tasks, 0)
+            self.assertEqual(self.runner.running_tasks, 0)
+            self.assertEqual(self.runner.successful_tasks_completed, 0)
+            self.assertEqual(self.runner.failed_tasks, 0)
+            # self.fail(msg='made it here')    # Uncomment to verify called
+
+        t.deferred.addCallbacks(self.unexpected_success, expected_error)
+
+        self.runner.start()
+        t.deferred.cancel()
+        self.assertFalse(t.running)
+        return t.deferred
+
+    @deferred(timeout=3)
+    def test_task_deferred_cancel_running(self):
+        # Run two tasks where T1 completes first and requests T2 to be
+        # canceled by doing a 'cancel' on T2's deferred
+
+        t1 = SimpleTask(None, DEVICE_ID,
+                        exclusive=False, priority=9,
+                        success=True, value=1, delay=0.5)
+        t2 = SimpleTask(None, DEVICE_ID,
+                        exclusive=False, priority=9,
+                        success=True, value=1, delay=200)
+
+        d1 = self.runner.queue_task(t1)
+        d2 = self.runner.queue_task(t2)
+
+        self.assertEqual(self.runner.pending_tasks, 2)
+        self.assertEqual(self.runner.running_tasks, 0)
+
+        def kill_task_t2(_, deferred_2):
+            # Called on successful completion of task t1
+            self.assertIsInstance(deferred_2, defer.Deferred)
+            self.assertEqual(self.runner.pending_tasks, 0)
+            self.assertEqual(self.runner.running_tasks, 1)
+
+            # Cancel the deferred for T2
+            deferred_2.cancel()
+            self.assertEqual(self.runner.running_tasks, 0)
+
+        d1.addCallbacks(kill_task_t2, self.unexpected_error,
+                        callbackArgs=[t2.deferred])
+
+        def expected_error(failure):
+            self.assertTrue(isinstance(failure.value, CancelledError))
+            self.assertEqual(self.runner.pending_tasks, 0)
+            self.assertEqual(self.runner.running_tasks, 0)
+            self.assertEqual(self.runner.successful_tasks_completed, 1)
+            self.assertEqual(self.runner.failed_tasks, 1)
+            # self.fail(msg='made it here')    # Uncomment to verify called
+
+        # T2 should not finish successfully, should get a cancel error
+        d2.addCallbacks(self.unexpected_success, expected_error)
+
+        # Run it
+        self.runner.start()
+        return defer.gatherResults([d1, d2], consumeErrors=True)
+
+    @deferred(timeout=3)
+    def test_watchdog_timeout(self):
+        t = SimpleTask(None, DEVICE_ID, delay=2)
+
+        self.assertEqual(self.runner.pending_tasks, 0)
+        self.assertEqual(self.runner.running_tasks, 0)
+        self.assertEqual(self.runner.watchdog_timeouts, 0)
+
+        # Actual watchdog minimum is probably to long for an automated test, reach
+        # around and force ti to something smaller (kids, don't try this at home)
+
+        t._watchdog_timeout = 0.1
+        self.runner.queue_task(t)
+
+        self.assertEqual(self.runner.pending_tasks, 1)
+        self.assertEqual(self.runner.running_tasks, 0)
+
+        def expected_error(failure):
+            from voltha.extensions.omci.tasks.task import WatchdogTimeoutFailure
+            self.assertTrue(isinstance(failure.value, WatchdogTimeoutFailure))
+            self.assertEqual(self.runner.pending_tasks, 0)
+            self.assertEqual(self.runner.running_tasks, 0)
+            self.assertEqual(self.runner.successful_tasks_completed, 0)
+            self.assertEqual(self.runner.failed_tasks, 1)
+            self.assertEqual(self.runner.watchdog_timeouts, 1)
+            self.assertEqual(self.runner.last_watchdog_failure_task, t.name)
+            # self.fail(msg='made it here')    # Uncomment to verify called
+
+        t.deferred.addCallbacks(self.unexpected_success, expected_error)
+
+        # Run it
+        self.runner.start()
+        return t.deferred
+
 
 if __name__ == '__main__':
     main()
diff --git a/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py b/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py
index a85372c..e782913 100644
--- a/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py
+++ b/voltha/adapters/adtran_onu/omci/adtn_mib_download_task.py
@@ -133,9 +133,6 @@
         :param operation: (str) what operation was being performed
         :return: True if successful, False if the entity existed (already created)
         """
-        if not self.running:
-            raise MibDownloadFailure('Download Task was cancelled')
-
         omci_msg = results.fields['omci_message'].fields
         status = omci_msg['success_code']
         error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
@@ -146,6 +143,7 @@
                        failed_mask=failed_mask, unsupported_mask=unsupported_mask)
 
         if status == RC.Success:
+            self.strobe_watchdog()
             return True
 
         elif status == RC.InstanceExists:
@@ -178,6 +176,7 @@
             try:
                 # Lock the UNI ports to prevent any alarms during initial configuration
                 # of the ONU
+                self.strobe_watchdog()
                 yield self.enable_unis(self._handler.uni_ports, True)
 
                 # Provision the initial bridge configuration
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
index 2c9c961..b325ca7 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
@@ -146,9 +146,6 @@
         """
         self.log.debug('function-entry')
 
-        if not self.running:
-            raise MibDownloadFailure('Download Task was cancelled')
-
         omci_msg = results.fields['omci_message'].fields
         status = omci_msg['success_code']
         error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
@@ -159,6 +156,7 @@
                        failed_mask=failed_mask, unsupported_mask=unsupported_mask)
 
         if status == RC.Success:
+            self.strobe_watchdog()
             return True
 
         elif status == RC.InstanceExists:
@@ -191,6 +189,7 @@
             try:
                 # Lock the UNI ports to prevent any alarms during initial configuration
                 # of the ONU
+                self.strobe_watchdog()
                 yield self.enable_uni(self._uni_port, True)
 
                 # Provision the initial bridge configuration
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
index 2a12eda..b998b0e 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
@@ -65,10 +65,6 @@
         except:
             pass
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise BrcmUniLockException('UNI Lock Task was cancelled')
-
     def start(self):
         """
         Start UNI/PPTP Lock/Unlock Task
@@ -91,7 +87,7 @@
             frame = msg.set()
             self.log.debug('openomci-msg', msg=msg)
             results = yield self._device.omci_cc.send(frame)
-            self.stop_if_not_running()
+            self.strobe_watchdog()
 
             status = results.fields['omci_message'].fields['success_code']
             self.log.info('response-status', status=status)
@@ -106,11 +102,11 @@
 
             for key, value in pptp.iteritems():
                 msg = PptpEthernetUniFrame(key,
-                                       attributes=dict(administrative_state=state))
+                                           attributes=dict(administrative_state=state))
                 frame = msg.set()
                 self.log.debug('openomci-msg', msg=msg)
                 results = yield self._device.omci_cc.send(frame)
-                self.stop_if_not_running()
+                self.strobe_watchdog()
 
                 status = results.fields['omci_message'].fields['success_code']
                 self.log.info('response-status', status=status)
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index aed018e..54c2d28 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -46,6 +46,7 @@
     MulticastGemportsConfigData
 from voltha.protos.bbf_fiber_multicast_distribution_set_body_pb2 import \
     MulticastDistributionSetData
+from voltha.protos.omci_mib_db_pb2 import MibDeviceData
 from voltha.protos.omci_alarm_db_pb2 import AlarmDeviceData
 
 log = structlog.get_logger()
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index c725cce..ebb5124 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -902,3 +902,20 @@
         return self._database.query(self._device_id, class_id=class_id,
                                     instance_id=instance_id,
                                     attributes=attributes)
+
+    def mib_set(self, class_id, entity_id, attributes):
+        """
+        Set attributes of an existing ME Class instance
+
+        This method is primarily used by other state machines to save ME specific
+        information to the persistent database. Access by objects external to the
+        OpenOMCI library is discouraged.
+
+        :param class_id: (int) ME Class ID
+        :param entity_id: (int) ME Class entity ID
+        :param attributes: (dict) attribute -> value pairs to set
+        """
+        # It must exist first (but attributes can be new)
+        if isinstance(attributes, dict) and len(attributes) and\
+                self.query_mib(class_id, entity_id) is not None:
+            self._database.set(self._device_id, class_id, entity_id, attributes)
diff --git a/voltha/extensions/omci/tasks/alarm_resync_task.py b/voltha/extensions/omci/tasks/alarm_resync_task.py
index b8b8e69..2278296 100644
--- a/voltha/extensions/omci/tasks/alarm_resync_task.py
+++ b/voltha/extensions/omci/tasks/alarm_resync_task.py
@@ -53,6 +53,7 @@
 
     max_alarm_upload_next_retries = 3
     alarm_upload_next_delay = 10          # Max * delay < 60 seconds
+    watchdog_timeout = 15                 # Should be > any retry delay
 
     def __init__(self, omci_agent, device_id):
         """
@@ -62,10 +63,11 @@
         :param device_id: (str) ONU Device ID
         """
         super(AlarmResyncTask, self).__init__(AlarmResyncTask.name,
-                                            omci_agent,
-                                            device_id,
-                                            priority=AlarmResyncTask.task_priority,
-                                            exclusive=False)
+                                              omci_agent,
+                                              device_id,
+                                              priority=AlarmResyncTask.task_priority,
+                                              exclusive=False,
+                                              watchdog_timeout=AlarmResyncTask.watchdog_timeout)
         self._local_deferred = None
         self._device = omci_agent.get_device(device_id)
         self._db_active = MibDbVolatileDict(omci_agent)
@@ -102,10 +104,6 @@
         self._db_active = None
         super(AlarmResyncTask, self).stop()
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise AlarmResyncException('Resync Task was cancelled')
-
     @inlineCallbacks
     def perform_alarm_resync(self):
         """
@@ -122,9 +120,11 @@
         self.log.info('perform-alarm-resync')
 
         try:
+            self.strobe_watchdog()
             command_sequence_number = yield self.snapshot_alarm()
 
             # Start the ALARM upload sequence, save alarms to the table
+            self.strobe_watchdog()
             commands_retrieved, alarm_table = yield self.upload_alarm(command_sequence_number)
 
             if commands_retrieved < command_sequence_number:
@@ -158,7 +158,7 @@
                 # Send ALARM Upload so ONU snapshots its ALARM
                 try:
                     command_sequence_number = yield self.send_alarm_upload()
-                    self.stop_if_not_running()
+                    self.strobe_watchdog()
 
                     if command_sequence_number is None:
                         if retries >= max_tries:
@@ -169,8 +169,8 @@
                     if retries >= max_tries:
                         raise
 
+                    self.strobe_watchdog()
                     yield asleep(AlarmResyncTask.retry_delay)
-                    self.stop_if_not_running()
                     continue
 
         except Exception as e:
@@ -181,7 +181,7 @@
 
         if command_sequence_number is None:
             raise AlarmCopyException('Failed to snapshot ALARM copy after {} retries'.
-                                   format(AlarmResyncTask.max_retries))
+                                     format(AlarmResyncTask.max_retries))
 
         returnValue(command_sequence_number)
 
@@ -195,8 +195,9 @@
         ########################################
         # Begin ALARM Upload
         try:
+            self.strobe_watchdog()
             results = yield self._device.omci_cc.send_get_all_alarm()
-            self.stop_if_not_running()
+
             command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
 
             if command_sequence_number is None or command_sequence_number <= 0:
@@ -222,8 +223,8 @@
 
             for retries in xrange(0, max_tries):
                 try:
+                    self.strobe_watchdog()
                     response = yield self._device.omci_cc.get_all_alarm_next(seq_no)
-                    self.stop_if_not_running()
 
                     omci_msg = response.fields['omci_message'].fields
                     alarm_class_id[seq_no] = omci_msg['alarmed_entity_class']
@@ -248,6 +249,7 @@
                                   command_sequence_number=command_sequence_number)
 
                     if retries < max_tries - 1:
+                        self.strobe_watchdog()
                         yield asleep(AlarmResyncTask.alarm_upload_next_delay)
                     else:
                         raise
@@ -256,5 +258,6 @@
                     self.log.exception('resync', e=e, seq_no=seq_no,
                                        command_sequence_number=command_sequence_number)
 
+        self.strobe_watchdog()
         returnValue((seq_no + 1, alarm_class_id, alarm_entity_id, attributes))     # seq_no is zero based and alarm table.
 
diff --git a/voltha/extensions/omci/tasks/alarm_sync_data.py b/voltha/extensions/omci/tasks/alarm_sync_data.py
index 115ef9e..331fe1b 100644
--- a/voltha/extensions/omci/tasks/alarm_sync_data.py
+++ b/voltha/extensions/omci/tasks/alarm_sync_data.py
@@ -72,10 +72,6 @@
         self.cancel_deferred()
         super(AlarmSyncDataTask, self).stop()
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise AlarmSyncDataFailure('Update Task was cancelled')
-
     @inlineCallbacks
     def perform_alarm_sync_data(self):
         """
@@ -89,8 +85,9 @@
             #########################################
             # ONU Data (ME #2)
             # alarm_retrieval_mode=1, time=DEFAULT_OMCI_TIMEOUT
+            self.strobe_watchdog()
             results = yield device.omci_cc.send_get_all_alarm(alarm_retrieval_mode=1)
-            self.stop_if_not_running()
+
             command_sequence_number = results.fields['omci_message'].fields['number_of_commands']
 
             for seq_no in xrange(command_sequence_number):
@@ -102,8 +99,9 @@
                         self.log.debug('alarm-data-next-request', seq_no=seq_no,
                                        retry=retry,
                                        command_sequence_number=command_sequence_number)
+                        self.strobe_watchdog()
                         yield device.omci_cc.send_get_all_alarm_next(seq_no)
-                        self.stop_if_not_running()
+
                         self.log.debug('alarm-data-next-success', seq_no=seq_no,
                                        command_sequence_number=command_sequence_number)
                         break
@@ -115,8 +113,9 @@
                         if retry >= 2:
                             raise AlarmSyncDataFailure('Alarm timeout failure on req {} of {}'.
                                                        format(seq_no + 1, command_sequence_number))
+
+                        self.strobe_watchdog()
                         yield asleep(0.3)
-                        self.stop_if_not_running()
 
             # Successful if here
             self.log.info('alarm-synchronized')
diff --git a/voltha/extensions/omci/tasks/file_download_task.py b/voltha/extensions/omci/tasks/file_download_task.py
index 7d8719d..5234c41 100755
--- a/voltha/extensions/omci/tasks/file_download_task.py
+++ b/voltha/extensions/omci/tasks/file_download_task.py
@@ -20,12 +20,14 @@
 import requests
 import os
 
+
 class FileDownloadTask(Task):
-    task_priority = 250
     name = "Image File Download Task"
 
     def __init__(self, omci_agent, device_id, url, local_path):
-        super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, device_id)
+        super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, device_id,
+                                               exclusive=False,
+                                               watchdog_timeout=45)
         self.url = url
         self.local_path = local_path
         # self.log.debug('{} running'.format(FileDownloadTask.name))
@@ -38,10 +40,13 @@
             dir_name = os.path.dirname(self.local_path)
             if not os.path.exists(dir_name):
                 os.makedirs(dir_name)
-                
+
+            self.strobe_watchdog()
             r = requests.get(self.url, stream=True)
+
             with open(self.local_path, 'wb') as f:
-                for chunk in r.iter_content(chunk_size=1024): 
+                for chunk in r.iter_content(chunk_size=1024):
+                    self.strobe_watchdog()
                     if chunk: # filter out keep-alive new chunks
                         f.write(chunk)
             self.deferred.callback('device {} success downloaded {} '.format(self.device_id, self.url))
@@ -52,4 +57,4 @@
     def stop(self):
         self.cancel_deferred()
         super(FileDownloadTask, self).stop()
-        
+
diff --git a/voltha/extensions/omci/tasks/get_mds_task.py b/voltha/extensions/omci/tasks/get_mds_task.py
index e2f11fd..3807f7d 100644
--- a/voltha/extensions/omci/tasks/get_mds_task.py
+++ b/voltha/extensions/omci/tasks/get_mds_task.py
@@ -83,6 +83,7 @@
             #########################################
             # Request (MDS supplied value does not matter for a 'get' request)
 
+            self.strobe_watchdog()
             results = yield device.omci_cc.send(OntDataFrame().get())
 
             omci_msg = results.fields['omci_message'].fields
diff --git a/voltha/extensions/omci/tasks/interval_data_task.py b/voltha/extensions/omci/tasks/interval_data_task.py
index 04cc8eb..9475cd8 100644
--- a/voltha/extensions/omci/tasks/interval_data_task.py
+++ b/voltha/extensions/omci/tasks/interval_data_task.py
@@ -148,6 +148,7 @@
             self.log.debug('interval-get-request', class_id=self._class_id,
                            entity_id=self._entity_id)
             try:
+                self.strobe_watchdog()
                 results = yield device.omci_cc.send(frame)
 
                 omci_msg = results.fields['omci_message'].fields
diff --git a/voltha/extensions/omci/tasks/mib_resync_task.py b/voltha/extensions/omci/tasks/mib_resync_task.py
index 30a888f..a6b6892 100644
--- a/voltha/extensions/omci/tasks/mib_resync_task.py
+++ b/voltha/extensions/omci/tasks/mib_resync_task.py
@@ -53,6 +53,7 @@
 
     max_mib_upload_next_retries = 3
     mib_upload_next_delay = 10          # Max * delay < 60 seconds
+    watchdog_timeout = 15               # Should be > max delay
 
     def __init__(self, omci_agent, device_id):
         """
@@ -102,10 +103,6 @@
         self._db_active = None
         super(MibResyncTask, self).stop()
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise MibResyncException('Resync Task was cancelled')
-
     @inlineCallbacks
     def perform_mib_resync(self):
         """
@@ -138,6 +135,7 @@
                 number_of_commands = results[1]
 
                 # Start the MIB upload sequence
+                self.strobe_watchdog()
                 commands_retrieved = yield self.upload_mib(number_of_commands)
 
                 if commands_retrieved < number_of_commands:
@@ -176,8 +174,8 @@
             for retries in xrange(0, max_tries + 1):
                 # Send MIB Upload so ONU snapshots its MIB
                 try:
+                    self.strobe_watchdog()
                     number_of_commands = yield self.send_mib_upload()
-                    self.stop_if_not_running()
 
                     if number_of_commands is None:
                         if retries >= max_tries:
@@ -189,8 +187,8 @@
                     if retries >= max_tries:
                         raise
 
+                    self.strobe_watchdog()
                     yield asleep(MibResyncTask.db_copy_retry_delay)
-                    self.stop_if_not_running()
                     continue
 
                 # Get a snapshot of the local MIB database
@@ -220,8 +218,9 @@
         ########################################
         # Begin MIB Upload
         try:
+            self.strobe_watchdog()
             results = yield self._device.omci_cc.send_mib_upload()
-            self.stop_if_not_running()
+
             number_of_commands = results.fields['omci_message'].fields['number_of_commands']
 
             if number_of_commands is None or number_of_commands <= 0:
@@ -244,8 +243,8 @@
 
             for retries in xrange(0, max_tries):
                 try:
+                    self.strobe_watchdog()
                     response = yield self._device.omci_cc.send_mib_upload_next(seq_no)
-                    self.stop_if_not_running()
 
                     omci_msg = response.fields['omci_message'].fields
                     class_id = omci_msg['object_entity_class']
@@ -269,6 +268,7 @@
                                   number_of_commands=number_of_commands)
 
                     if retries < max_tries - 1:
+                        self.strobe_watchdog()
                         yield asleep(MibResyncTask.mib_upload_next_delay)
                     else:
                         raise
@@ -287,6 +287,8 @@
         :param db_active: (dict) ONU's database snapshot
         :return: (dict), (dict), dict()  Differences
         """
+        self.strobe_watchdog()
+
         # Class & Entities only in local copy (OpenOMCI)
         on_olt_only = self.get_lsh_only_dict(db_copy, db_active)
 
diff --git a/voltha/extensions/omci/tasks/mib_upload.py b/voltha/extensions/omci/tasks/mib_upload.py
index eb0fa6b..639f8fd 100644
--- a/voltha/extensions/omci/tasks/mib_upload.py
+++ b/voltha/extensions/omci/tasks/mib_upload.py
@@ -80,10 +80,6 @@
         self.cancel_deferred()
         super(MibUploadTask, self).stop()
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise MibUploadFailure('Upload Task was cancelled')
-
     @inlineCallbacks
     def perform_mib_upload(self):
         """
@@ -99,8 +95,8 @@
 
             #########################################
             # MIB Reset
+            self.strobe_watchdog()
             results = yield device.omci_cc.send_mib_reset()
-            self.stop_if_not_running()
 
             status = results.fields['omci_message'].fields['success_code']
             if status != ReasonCodes.Success.value:
@@ -109,8 +105,9 @@
 
             ########################################
             # Begin MIB Upload
+            self.strobe_watchdog()
             results = yield device.omci_cc.send_mib_upload()
-            self.stop_if_not_running()
+
             number_of_commands = results.fields['omci_message'].fields['number_of_commands']
 
             for seq_no in xrange(number_of_commands):
@@ -122,8 +119,9 @@
                         self.log.debug('mib-upload-next-request', seq_no=seq_no,
                                        retry=retry,
                                        number_of_commands=number_of_commands)
+                        self.strobe_watchdog()
                         yield device.omci_cc.send_mib_upload_next(seq_no)
-                        self.stop_if_not_running()
+
                         self.log.debug('mib-upload-next-success', seq_no=seq_no,
                                        number_of_commands=number_of_commands)
                         break
@@ -135,8 +133,8 @@
                         if retry >= 2:
                             raise MibUploadFailure('Upload timeout failure on req {} of {}'.
                                                    format(seq_no + 1, number_of_commands))
+                        self.strobe_watchdog()
                         yield asleep(0.3)
-                        self.stop_if_not_running()
 
             # Successful if here
             self.log.info('mib-synchronized')
@@ -158,4 +156,3 @@
         except Exception as e:
             self.log.exception('mib-upload', e=e)
             self.deferred.errback(failure.Failure(e))
-
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
index e818d87..e0e92c3 100644
--- a/voltha/extensions/omci/tasks/omci_create_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -65,10 +65,6 @@
         except:
             pass
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise CreatePMException('Create PM ME Task was cancelled')
-
     def start(self):
         """ Start task """
         super(OmciCreatePMRequest, self).start()
@@ -121,8 +117,8 @@
                             data=data
                         )
                     )
+                self.strobe_watchdog()
                 results = yield self._device.omci_cc.send(frame)
-                self.stop_if_not_running()
 
                 status = results.fields['omci_message'].fields['success_code']
                 self.log.debug('perform-create-status', status=status)
diff --git a/voltha/extensions/omci/tasks/omci_delete_pm_task.py b/voltha/extensions/omci/tasks/omci_delete_pm_task.py
index c80a575..7906a83 100644
--- a/voltha/extensions/omci/tasks/omci_delete_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_delete_pm_task.py
@@ -63,10 +63,6 @@
         except:
             pass
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise DeletePMException('Delete PM ME Task was cancelled')
-
     def start(self):
         """ Start task """
         super(OmciDeletePMRequest, self).start()
@@ -90,8 +86,8 @@
                         entity_id=entity_id
                     )
                 )
+                self.strobe_watchdog()
                 results = yield self._device.omci_cc.send(frame)
-                self.stop_if_not_running()
 
                 status = results.fields['omci_message'].fields['success_code']
                 self.log.info('perform-delete-status', status=status)
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
index 3bfdf06..b199f8f 100644
--- a/voltha/extensions/omci/tasks/omci_get_request.py
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -86,10 +86,6 @@
         except:
             pass
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise GetException('Get Request Task was cancelled')
-
     @property
     def me_class(self):
         """The OMCI Managed Entity Class associated with this request"""
@@ -156,8 +152,8 @@
 
         try:
             frame = MEFrame(self._entity_class, self._entity_id, self._attributes).get()
+            self.strobe_watchdog()
             results = yield self._device.omci_cc.send(frame)
-            self.stop_if_not_running()
 
             status = results.fields['omci_message'].fields['success_code']
             self.log.info('perform-get-status', status=status)
@@ -172,6 +168,7 @@
                     results_omci['attributes_mask']
 
                 if missing_attr > 0:
+                    self.strobe_watchdog()
                     self._local_deferred = reactor.callLater(0,
                                                              self.perform_get_missing_attributes,
                                                              missing_attr)
@@ -185,6 +182,7 @@
                     raise GetException('Get failed with status code: {}'.
                                        format(RC.AttributeFailure.value))
 
+                self.strobe_watchdog()
                 self._local_deferred = reactor.callLater(0,
                                                          self.perform_get_failed_attributes,
                                                          results,
@@ -232,8 +230,8 @@
                     )
                 )
                 try:
+                    self.strobe_watchdog()
                     get_results = yield self._device.omci_cc.send(frame)
-                    self.stop_if_not_running()
 
                     get_omci = get_results.fields['omci_message'].fields
                     if get_omci['success_code'] != RC.Success.value:
@@ -266,8 +264,8 @@
             try:
                 frame = MEFrame(self._entity_class, self._entity_id, {attr}).get()
 
+                self.strobe_watchdog()
                 results = yield self._device.omci_cc.send(frame)
-                self.stop_if_not_running()
 
                 status = results.fields['omci_message'].fields['success_code']
 
diff --git a/voltha/extensions/omci/tasks/omci_modify_request.py b/voltha/extensions/omci/tasks/omci_modify_request.py
index de15011..27dfb3d 100644
--- a/voltha/extensions/omci/tasks/omci_modify_request.py
+++ b/voltha/extensions/omci/tasks/omci_modify_request.py
@@ -78,10 +78,6 @@
         except:
             pass
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise ModifyException('Modify Request Task was cancelled')
-
     @property
     def success_code(self):
         """
@@ -124,7 +120,6 @@
         """
         For Set requests, a failure may indicate that one or more attributes
         are not supported by this ONU. This property returns any those unsupported attributes
-        attributes
 
         :return: None if not a set request, otherwise the attribute mask of any illegal
                  parameters
@@ -159,8 +154,8 @@
         self.log.info('perform-request')
 
         try:
+            self.strobe_watchdog()
             self._results = yield self._device.omci_cc.send(self._frame)
-            self.stop_if_not_running()
 
             status = self._results.fields['omci_message'].fields['success_code']
             self.log.info('response-status', status=status)
diff --git a/voltha/extensions/omci/tasks/onu_capabilities_task.py b/voltha/extensions/omci/tasks/onu_capabilities_task.py
index 4a8c1d3..e93d0ed 100644
--- a/voltha/extensions/omci/tasks/onu_capabilities_task.py
+++ b/voltha/extensions/omci/tasks/onu_capabilities_task.py
@@ -122,10 +122,6 @@
         self._device = None
         super(OnuCapabilitiesTask, self).stop()
 
-    def stop_if_not_running(self):
-        if not self.running:
-            raise GetCapabilitiesFailure('Get Capabilities Task was cancelled')
-
     @inlineCallbacks
     def perform_get_capabilities(self):
         """
@@ -141,11 +137,11 @@
         self.log.info('perform-get')
 
         try:
+            self.strobe_watchdog()
             self._supported_entities = yield self.get_supported_entities()
-            self.stop_if_not_running()
 
+            self.strobe_watchdog()
             self._supported_msg_types = yield self.get_supported_message_types()
-            self.stop_if_not_running()
 
             self.log.debug('get-success',
                            supported_entities=self.supported_managed_entities,
@@ -164,7 +160,7 @@
         """
         Extract the 4 octet buffer length from the OMCI PDU contents
         """
-        self.log.debug('get-count-buffer', data=data)
+        self.log.debug('get-count-buffer', data=hexlify(data))
         return int(hexlify(data[:4]), 16)
 
     @inlineCallbacks
@@ -175,8 +171,8 @@
         try:
             # Get the number of requests needed
             frame = OmciFrame(me_type_table=True).get()
+            self.strobe_watchdog()
             results = yield self._device.omci_cc.send(frame)
-            self.stop_if_not_running()
 
             omci_msg = results.fields['omci_message']
             status = omci_msg.fields['success_code']
@@ -189,14 +185,14 @@
 
             seq_no = 0
             data_buffer = bytearray(0)
-            self.log.debug('me-type-count', octets=count, data=data)
+            self.log.debug('me-type-count', octets=count, data=hexlify(data))
 
             # Start the loop
             for offset in xrange(0, count, self._pdu_size):
                 frame = OmciFrame(me_type_table=seq_no).get_next()
                 seq_no += 1
+                self.strobe_watchdog()
                 results = yield self._device.omci_cc.send(frame)
-                self.stop_if_not_running()
 
                 omci_msg = results.fields['omci_message']
                 status = omci_msg.fields['success_code']
@@ -230,8 +226,8 @@
         try:
             # Get the number of requests needed
             frame = OmciFrame(message_type_table=True).get()
+            self.strobe_watchdog()
             results = yield self._device.omci_cc.send(frame)
-            self.stop_if_not_running()
 
             omci_msg = results.fields['omci_message']
             status = omci_msg.fields['success_code']
@@ -245,14 +241,14 @@
 
             seq_no = 0
             data_buffer = list()
-            self.log.debug('me-type-count', octets=count, data=data)
+            self.log.debug('me-type-count', octets=count, data=hexlify(data))
 
             # Start the loop
             for offset in xrange(0, count, self._pdu_size):
                 frame = OmciFrame(message_type_table=seq_no).get_next()
                 seq_no += 1
+                self.strobe_watchdog()
                 results = yield self._device.omci_cc.send(frame)
-                self.stop_if_not_running()
 
                 omci_msg = results.fields['omci_message']
                 status = omci_msg.fields['success_code']
diff --git a/voltha/extensions/omci/tasks/reboot_task.py b/voltha/extensions/omci/tasks/reboot_task.py
index 8cbf808..316e23b 100644
--- a/voltha/extensions/omci/tasks/reboot_task.py
+++ b/voltha/extensions/omci/tasks/reboot_task.py
@@ -94,6 +94,7 @@
 
         try:
             frame = OntGFrame().reboot(reboot_code=self._flags)
+            self.strobe_watchdog()
             results = yield self._device.omci_cc.send(frame, timeout=self._timeout)
 
             status = results.fields['omci_message'].fields['success_code']
@@ -109,7 +110,7 @@
                     raise RebootException(msg)
 
             self.log.info('reboot-success')
-            self.deferred.callback(None)
+            self.deferred.callback(self)
 
         except TimeoutError:
             self.log.info('timeout', msg='Request timeout is not considered an error')
diff --git a/voltha/extensions/omci/tasks/task.py b/voltha/extensions/omci/tasks/task.py
index 9e10c65..36020c0 100644
--- a/voltha/extensions/omci/tasks/task.py
+++ b/voltha/extensions/omci/tasks/task.py
@@ -14,7 +14,13 @@
 # limitations under the License.
 #
 import structlog
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.internet.defer import failure
+
+
+class WatchdogTimeoutFailure(Exception):
+    """Task callback/errback not called properly before watchdog expiration"""
+    pass
 
 
 class Task(object):
@@ -35,10 +41,14 @@
     DEFAULT_PRIORITY = 128
     MIN_PRIORITY = 0
     MAX_PRIORITY = 255
+    DEFAULT_WATCHDOG_SECS = 10          # 10 seconds
+    MIN_WATCHDOG_SECS = 3               # 3 seconds
+    MAX_WATCHDOG_SECS = 60              # 60 seconds
+
     _next_task_id = 0
 
     def __init__(self, name, omci_agent, device_id, priority=DEFAULT_PRIORITY,
-                 exclusive=True):
+                 exclusive=True, watchdog_timeout=DEFAULT_WATCHDOG_SECS):
         """
         Class initialization
 
@@ -47,10 +57,15 @@
         :param priority: (int) Task priority (0..255) 255 Highest
         :param exclusive: (bool) If True, this task needs exclusive access to the
                                  OMCI Communications channel when it runs
+        :param watchdog_timeout (int or float) Watchdog timeout (seconds) after task start, to
+                                run longer, periodically call 'strobe_watchdog()' to reschedule.
         """
         assert Task.MIN_PRIORITY <= priority <= Task.MAX_PRIORITY, \
             'Priority should be {}..{}'.format(Task.MIN_PRIORITY, Task.MAX_PRIORITY)
 
+        assert Task.MIN_WATCHDOG_SECS <= watchdog_timeout <= Task.MAX_WATCHDOG_SECS, \
+            'Watchdog timeout should be {}..{} seconds'
+
         Task._next_task_id += 1
         self._task_id = Task._next_task_id
         self.log = structlog.get_logger(device_id=device_id, name=name,
@@ -60,13 +75,14 @@
         self.omci_agent = omci_agent
         self._running = False
         self._exclusive = exclusive
-        # TODO: Should we watch for a cancel on the task's deferred as well?
         self._deferred = defer.Deferred()       # Fires upon completion
+        self._watchdog = None
+        self._watchdog_timeout = watchdog_timeout
         self._priority = priority
 
     def __str__(self):
-        return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}'.format(
-            self.name, self.task_id, self.priority, self.exclusive)
+        return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}, Watchdog: {}'.format(
+            self.name, self.task_id, self.priority, self.exclusive, self.watchdog_timeout)
 
     @property
     def priority(self):
@@ -81,6 +97,10 @@
         return self._exclusive
 
     @property
+    def watchdog_timeout(self):
+        return self._watchdog_timeout
+
+    @property
     def deferred(self):
         return self._deferred
 
@@ -94,12 +114,15 @@
         return self._running
 
     def cancel_deferred(self):
-        d, self._deferred = self._deferred, None
-        try:
-            if d is not None and not d.called:
-                d.cancel()
-        except:
-            pass
+        d1, self._deferred = self._deferred, None
+        d2, self._watchdog = self._watchdog, None
+
+        for d in [d1, d2]:
+            try:
+                if d is not None and not d.called:
+                    d.cancel()
+            except:
+                pass
 
     def start(self):
         """
@@ -109,6 +132,7 @@
         assert self._deferred is not None and not self._deferred.called, \
             'Cannot re-use the same task'
         self._running = True
+        self.strobe_watchdog()
 
     def stop(self):
         """
@@ -118,3 +142,47 @@
         self._running = False
         self.cancel_deferred()
         self.omci_agent = None      # Should only start/stop once
+
+    def task_cleanup(self):
+        """
+        This method should only be called from the TaskRunner's callback/errback
+        that is added when the task is initially queued. It is responsible for
+        clearing of the 'running' flag and canceling of the watchdog time
+        """
+        self._running = False
+        d, self._watchdog = self._watchdog, None
+        try:
+            if d is not None and not d.called:
+                d.cancel()
+        except:
+            pass
+
+    def strobe_watchdog(self):
+        """
+        Signal that we have not hung/deadlocked
+        """
+        # Create if first time (called at Task start)
+
+        def watchdog_timeout():
+            # Task may have hung (blocked) or failed to call proper success/error
+            # completion callback/errback
+            if not self.deferred.called:
+                err_msg = 'Task {}:{} watchdog timeout'.format(self.name, self.task_id)
+                self.log.error("task-watchdog-timeout", running=self.running,
+                               timeout=self.watchdog_timeout, error=err_msg)
+
+                self.deferred.errback(failure.Failure(WatchdogTimeoutFailure(err_msg)))
+                self.deferred.cancel()
+
+        if self._watchdog is not None:
+            if self._watchdog.called:
+                # Too late, timeout failure in progress
+                self.log.warn('task-watchdog-tripped', running=self.running,
+                              timeout=self.watchdog_timeout)
+                return
+
+            d, self._watchdog = self._watchdog, None
+            d.cancel()
+
+        # Schedule/re-schedule the watchdog timer
+        self._watchdog = reactor.callLater(self.watchdog_timeout, watchdog_timeout)
diff --git a/voltha/extensions/omci/tasks/task_runner.py b/voltha/extensions/omci/tasks/task_runner.py
index e06ae1c..364e0b3 100644
--- a/voltha/extensions/omci/tasks/task_runner.py
+++ b/voltha/extensions/omci/tasks/task_runner.py
@@ -30,6 +30,8 @@
 
         self._successful_tasks = 0
         self._failed_tasks = 0
+        self._watchdog_timeouts = 0
+        self._last_watchdog_failure_task = ''
 
     def __str__(self):
         return 'TaskRunner: Pending: {}, Running:{}'.format(self.pending_tasks,
@@ -64,6 +66,15 @@
     def failed_tasks(self):
         return self._failed_tasks
 
+    @property
+    def watchdog_timeouts(self):
+        return self._watchdog_timeouts
+
+    @property
+    def last_watchdog_failure_task(self):
+        """ Task name of last tasks to fail due to watchdog"""
+        return self._last_watchdog_failure_task
+
     # TODO: add properties for various stats as needed
 
     def start(self):
@@ -165,6 +176,7 @@
             assert task is not None and task.task_id in self._running_queue,\
                 'Task not found in running queue'
 
+            task.task_cleanup()
             self._successful_tasks += 1
             del self._running_queue[task.task_id]
 
@@ -183,6 +195,8 @@
         :param task: (Task) The task that failed
         :return: (Failure) Failure results
         """
+        from voltha.extensions.omci.tasks.task import WatchdogTimeoutFailure
+
         self.log.debug('task-failure', task_id=str(task),
                        running=len(self._running_queue),
                        pending=len(self._pending_queue))
@@ -190,9 +204,14 @@
             assert task is not None and task.task_id in self._running_queue,\
                 'Task not found in running queue'
 
+            task.task_cleanup()
             self._failed_tasks += 1
             del self._running_queue[task.task_id]
 
+            if isinstance(failure.value, WatchdogTimeoutFailure):
+                self._watchdog_timeouts += 1
+                self._last_watchdog_failure_task = task.name
+
         except Exception as e:
             # Check the pending queue