VOL-1395: Common shared libraries needed for Python based device adapters.
This is an initial check-in of code from the master branch. Additional work
is expected on a few items to work with the new go-core and will be covered
by separate JIRAs and commits.
Change-Id: I0856ec6b79b8d3e49082c609eb9c7eedd75b1708
diff --git a/python/adapters/extensions/omci/tasks/task.py b/python/adapters/extensions/omci/tasks/task.py
new file mode 100644
index 0000000..36020c0
--- /dev/null
+++ b/python/adapters/extensions/omci/tasks/task.py
@@ -0,0 +1,188 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from twisted.internet import defer, reactor
+from twisted.internet.defer import failure
+
+
+class WatchdogTimeoutFailure(Exception):
+ """Task callback/errback not called properly before watchdog expiration"""
+ pass
+
+
+class Task(object):
+ """
+ OpenOMCI Base Task implementation
+
+ An OMCI task can be one or more OMCI requests, comparisons, or whatever
+ is needed to do a specific unit of work that needs to be ran to completion
+ successfully.
+
+ On successful completion, the task should called the 'callback' method of
+ the deferred and pass back whatever is meaningful to the user/state-machine
+ that launched it.
+
+ On failure, the 'errback' routine should be called with an appropriate
+ Failure object.
+ """
+ DEFAULT_PRIORITY = 128
+ MIN_PRIORITY = 0
+ MAX_PRIORITY = 255
+ DEFAULT_WATCHDOG_SECS = 10 # 10 seconds
+ MIN_WATCHDOG_SECS = 3 # 3 seconds
+ MAX_WATCHDOG_SECS = 60 # 60 seconds
+
+ _next_task_id = 0
+
+ def __init__(self, name, omci_agent, device_id, priority=DEFAULT_PRIORITY,
+ exclusive=True, watchdog_timeout=DEFAULT_WATCHDOG_SECS):
+ """
+ Class initialization
+
+ :param name: (str) Task Name
+ :param device_id: (str) ONU Device ID
+ :param priority: (int) Task priority (0..255) 255 Highest
+ :param exclusive: (bool) If True, this task needs exclusive access to the
+ OMCI Communications channel when it runs
+ :param watchdog_timeout (int or float) Watchdog timeout (seconds) after task start, to
+ run longer, periodically call 'strobe_watchdog()' to reschedule.
+ """
+ assert Task.MIN_PRIORITY <= priority <= Task.MAX_PRIORITY, \
+ 'Priority should be {}..{}'.format(Task.MIN_PRIORITY, Task.MAX_PRIORITY)
+
+ assert Task.MIN_WATCHDOG_SECS <= watchdog_timeout <= Task.MAX_WATCHDOG_SECS, \
+ 'Watchdog timeout should be {}..{} seconds'
+
+ Task._next_task_id += 1
+ self._task_id = Task._next_task_id
+ self.log = structlog.get_logger(device_id=device_id, name=name,
+ task_id=self._task_id)
+ self.name = name
+ self.device_id = device_id
+ self.omci_agent = omci_agent
+ self._running = False
+ self._exclusive = exclusive
+ self._deferred = defer.Deferred() # Fires upon completion
+ self._watchdog = None
+ self._watchdog_timeout = watchdog_timeout
+ self._priority = priority
+
+ def __str__(self):
+ return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}, Watchdog: {}'.format(
+ self.name, self.task_id, self.priority, self.exclusive, self.watchdog_timeout)
+
+ @property
+ def priority(self):
+ return self._priority
+
+ @property
+ def task_id(self):
+ return self._task_id
+
+ @property
+ def exclusive(self):
+ return self._exclusive
+
+ @property
+ def watchdog_timeout(self):
+ return self._watchdog_timeout
+
+ @property
+ def deferred(self):
+ return self._deferred
+
+ @property
+ def running(self):
+ # Is the Task running?
+ #
+ # Can be useful for tasks that use inline callbacks to detect
+ # if the task has been canceled.
+ #
+ return self._running
+
+ def cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._watchdog = self._watchdog, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start task operations
+ """
+ self.log.debug('starting')
+ assert self._deferred is not None and not self._deferred.called, \
+ 'Cannot re-use the same task'
+ self._running = True
+ self.strobe_watchdog()
+
+ def stop(self):
+ """
+ Stop task synchronization
+ """
+ self.log.debug('stopping')
+ self._running = False
+ self.cancel_deferred()
+ self.omci_agent = None # Should only start/stop once
+
+ def task_cleanup(self):
+ """
+ This method should only be called from the TaskRunner's callback/errback
+ that is added when the task is initially queued. It is responsible for
+ clearing of the 'running' flag and canceling of the watchdog time
+ """
+ self._running = False
+ d, self._watchdog = self._watchdog, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def strobe_watchdog(self):
+ """
+ Signal that we have not hung/deadlocked
+ """
+ # Create if first time (called at Task start)
+
+ def watchdog_timeout():
+ # Task may have hung (blocked) or failed to call proper success/error
+ # completion callback/errback
+ if not self.deferred.called:
+ err_msg = 'Task {}:{} watchdog timeout'.format(self.name, self.task_id)
+ self.log.error("task-watchdog-timeout", running=self.running,
+ timeout=self.watchdog_timeout, error=err_msg)
+
+ self.deferred.errback(failure.Failure(WatchdogTimeoutFailure(err_msg)))
+ self.deferred.cancel()
+
+ if self._watchdog is not None:
+ if self._watchdog.called:
+ # Too late, timeout failure in progress
+ self.log.warn('task-watchdog-tripped', running=self.running,
+ timeout=self.watchdog_timeout)
+ return
+
+ d, self._watchdog = self._watchdog, None
+ d.cancel()
+
+ # Schedule/re-schedule the watchdog timer
+ self._watchdog = reactor.callLater(self.watchdog_timeout, watchdog_timeout)