blob: 36020c0eff94cee889c0b1b4788933da15551877 [file] [log] [blame]
Chip Boling32aab302019-01-23 10:50:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet import defer, reactor
18from twisted.internet.defer import failure
19
20
21class WatchdogTimeoutFailure(Exception):
22 """Task callback/errback not called properly before watchdog expiration"""
23 pass
24
25
26class Task(object):
27 """
28 OpenOMCI Base Task implementation
29
30 An OMCI task can be one or more OMCI requests, comparisons, or whatever
31 is needed to do a specific unit of work that needs to be ran to completion
32 successfully.
33
34 On successful completion, the task should called the 'callback' method of
35 the deferred and pass back whatever is meaningful to the user/state-machine
36 that launched it.
37
38 On failure, the 'errback' routine should be called with an appropriate
39 Failure object.
40 """
41 DEFAULT_PRIORITY = 128
42 MIN_PRIORITY = 0
43 MAX_PRIORITY = 255
44 DEFAULT_WATCHDOG_SECS = 10 # 10 seconds
45 MIN_WATCHDOG_SECS = 3 # 3 seconds
46 MAX_WATCHDOG_SECS = 60 # 60 seconds
47
48 _next_task_id = 0
49
50 def __init__(self, name, omci_agent, device_id, priority=DEFAULT_PRIORITY,
51 exclusive=True, watchdog_timeout=DEFAULT_WATCHDOG_SECS):
52 """
53 Class initialization
54
55 :param name: (str) Task Name
56 :param device_id: (str) ONU Device ID
57 :param priority: (int) Task priority (0..255) 255 Highest
58 :param exclusive: (bool) If True, this task needs exclusive access to the
59 OMCI Communications channel when it runs
60 :param watchdog_timeout (int or float) Watchdog timeout (seconds) after task start, to
61 run longer, periodically call 'strobe_watchdog()' to reschedule.
62 """
63 assert Task.MIN_PRIORITY <= priority <= Task.MAX_PRIORITY, \
64 'Priority should be {}..{}'.format(Task.MIN_PRIORITY, Task.MAX_PRIORITY)
65
66 assert Task.MIN_WATCHDOG_SECS <= watchdog_timeout <= Task.MAX_WATCHDOG_SECS, \
67 'Watchdog timeout should be {}..{} seconds'
68
69 Task._next_task_id += 1
70 self._task_id = Task._next_task_id
71 self.log = structlog.get_logger(device_id=device_id, name=name,
72 task_id=self._task_id)
73 self.name = name
74 self.device_id = device_id
75 self.omci_agent = omci_agent
76 self._running = False
77 self._exclusive = exclusive
78 self._deferred = defer.Deferred() # Fires upon completion
79 self._watchdog = None
80 self._watchdog_timeout = watchdog_timeout
81 self._priority = priority
82
83 def __str__(self):
84 return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}, Watchdog: {}'.format(
85 self.name, self.task_id, self.priority, self.exclusive, self.watchdog_timeout)
86
87 @property
88 def priority(self):
89 return self._priority
90
91 @property
92 def task_id(self):
93 return self._task_id
94
95 @property
96 def exclusive(self):
97 return self._exclusive
98
99 @property
100 def watchdog_timeout(self):
101 return self._watchdog_timeout
102
103 @property
104 def deferred(self):
105 return self._deferred
106
107 @property
108 def running(self):
109 # Is the Task running?
110 #
111 # Can be useful for tasks that use inline callbacks to detect
112 # if the task has been canceled.
113 #
114 return self._running
115
116 def cancel_deferred(self):
117 d1, self._deferred = self._deferred, None
118 d2, self._watchdog = self._watchdog, None
119
120 for d in [d1, d2]:
121 try:
122 if d is not None and not d.called:
123 d.cancel()
124 except:
125 pass
126
127 def start(self):
128 """
129 Start task operations
130 """
131 self.log.debug('starting')
132 assert self._deferred is not None and not self._deferred.called, \
133 'Cannot re-use the same task'
134 self._running = True
135 self.strobe_watchdog()
136
137 def stop(self):
138 """
139 Stop task synchronization
140 """
141 self.log.debug('stopping')
142 self._running = False
143 self.cancel_deferred()
144 self.omci_agent = None # Should only start/stop once
145
146 def task_cleanup(self):
147 """
148 This method should only be called from the TaskRunner's callback/errback
149 that is added when the task is initially queued. It is responsible for
150 clearing of the 'running' flag and canceling of the watchdog time
151 """
152 self._running = False
153 d, self._watchdog = self._watchdog, None
154 try:
155 if d is not None and not d.called:
156 d.cancel()
157 except:
158 pass
159
160 def strobe_watchdog(self):
161 """
162 Signal that we have not hung/deadlocked
163 """
164 # Create if first time (called at Task start)
165
166 def watchdog_timeout():
167 # Task may have hung (blocked) or failed to call proper success/error
168 # completion callback/errback
169 if not self.deferred.called:
170 err_msg = 'Task {}:{} watchdog timeout'.format(self.name, self.task_id)
171 self.log.error("task-watchdog-timeout", running=self.running,
172 timeout=self.watchdog_timeout, error=err_msg)
173
174 self.deferred.errback(failure.Failure(WatchdogTimeoutFailure(err_msg)))
175 self.deferred.cancel()
176
177 if self._watchdog is not None:
178 if self._watchdog.called:
179 # Too late, timeout failure in progress
180 self.log.warn('task-watchdog-tripped', running=self.running,
181 timeout=self.watchdog_timeout)
182 return
183
184 d, self._watchdog = self._watchdog, None
185 d.cancel()
186
187 # Schedule/re-schedule the watchdog timer
188 self._watchdog = reactor.callLater(self.watchdog_timeout, watchdog_timeout)