blob: eb7a252548690e373adc1ac6a55831071530b326 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet import reactor
18
19
20class TaskRunner(object):
21 """
22 Control the number of running tasks utilizing the OMCI Communications
23 channel (OMCI_CC
24 """
25 def __init__(self, device_id, clock=None):
26 self.log = structlog.get_logger(device_id=device_id)
27 self._pending_queue = dict() # task-priority -> [tasks]
28 self._running_queue = dict() # task-id -> task
29 self._active = False
30
31 self._successful_tasks = 0
32 self._failed_tasks = 0
33 self._watchdog_timeouts = 0
34 self._last_watchdog_failure_task = ''
35 self.reactor = clock if clock is not None else reactor
36
37 def __str__(self):
38 return 'TaskRunner: Pending: {}, Running:{}'.format(self.pending_tasks,
39 self.running_tasks)
40
41 @property
42 def active(self):
43 return self._active
44
45 @property
46 def pending_tasks(self):
47 """
48 Get the number of tasks pending to run
49 """
50 count = 0
51 for tasks in self._pending_queue.itervalues():
52 count += len(tasks)
53 return count
54
55 @property
56 def running_tasks(self):
57 """
58 Get the number of tasks currently running
59 """
60 return len(self._running_queue)
61
62 @property
63 def successful_tasks_completed(self):
64 return self._successful_tasks
65
66 @property
67 def failed_tasks(self):
68 return self._failed_tasks
69
70 @property
71 def watchdog_timeouts(self):
72 return self._watchdog_timeouts
73
74 @property
75 def last_watchdog_failure_task(self):
76 """ Task name of last tasks to fail due to watchdog"""
77 return self._last_watchdog_failure_task
78
79 # TODO: add properties for various stats as needed
80
81 def start(self):
82 """
83 Start the Task runner
84 """
85 self.log.debug('starting', active=self._active)
86
87 if not self._active:
88 assert len(self._running_queue) == 0, 'Running task queue not empty'
89 self._active = True
90 self._run_next_task()
91
92 def stop(self):
93 """
94 Stop the Task runner, first stopping any tasks and flushing the queue
95 """
96 self.log.debug('stopping', active=self._active)
97
98 if self._active:
99 self._active = False
100
101 pq, self._pending_queue = self._pending_queue, dict()
102 rq, self._running_queue = self._running_queue, dict()
103
104 # Stop running tasks
105 for task in rq.itervalues():
106 try:
107 task.stop()
108 except:
109 pass
110
111 # Kill pending tasks
112 for d in pq.iterkeys():
113 try:
114 d.cancel()
115 except:
116 pass
117
118 def _run_next_task(self):
119 """
120 Search for next task to run, if one can
121 :return:
122 """
123 self.log.debug('run-next', active=self._active,
124 num_running=len(self._running_queue),
125 num_pending=len(self._pending_queue))
126
127 if self._active and len(self._pending_queue) > 0:
128 # Cannot run a new task if a running one needs the OMCI_CC exclusively
129
130 if any(task.exclusive for task in self._running_queue.itervalues()):
131 self.log.debug('exclusive-running')
132 return # An exclusive task is already running
133
134 try:
135 priorities = [k for k in self._pending_queue.iterkeys()]
136 priorities.sort(reverse=True)
137 highest_priority = priorities[0] if len(priorities) else None
138
139 if highest_priority is not None:
140 queue = self._pending_queue[highest_priority]
141 next_task = queue[0] if len(queue) else None
142
143 if next_task is not None:
144 if next_task.exclusive and len(self._running_queue) > 0:
145 self.log.debug('next-is-exclusive', task=str(next_task))
146 return # Next task to run needs exclusive access
147
148 queue.pop(0)
149 if len(queue) == 0:
150 del self._pending_queue[highest_priority]
151
152 self.log.debug('starting-task', task=str(next_task),
153 running=len(self._running_queue),
154 pending=len(self._pending_queue))
155
156 self._running_queue[next_task.task_id] = next_task
157 self.reactor.callLater(0, next_task.start)
158
159 # Run again if others are waiting
160 if len(self._pending_queue):
161 self._run_next_task()
162
163 except Exception as e:
164 self.log.exception('run-next', e=e)
165
166 def _on_task_success(self, results, task):
167 """
168 A task completed successfully callback
169 :param results: deferred results
170 :param task: (Task) The task that succeeded
171 :return: deferred results
172 """
173 self.log.debug('task-success', task_id=str(task),
174 running=len(self._running_queue),
175 pending=len(self._pending_queue))
176 try:
177 assert task is not None and task.task_id in self._running_queue,\
178 'Task not found in running queue'
179
180 task.task_cleanup()
181 self._successful_tasks += 1
182 del self._running_queue[task.task_id]
183
184 except Exception as e:
185 self.log.exception('task-error', task=str(task), e=e)
186
187 finally:
188 reactor.callLater(0, self._run_next_task)
189
190 return results
191
192 def _on_task_failure(self, failure, task):
193 """
194 A task completed with failure callback
195 :param failure: (Failure) Failure results
196 :param task: (Task) The task that failed
197 :return: (Failure) Failure results
198 """
199 from voltha.extensions.omci.tasks.task import WatchdogTimeoutFailure
200
201 self.log.debug('task-failure', task_id=str(task),
202 running=len(self._running_queue),
203 pending=len(self._pending_queue))
204 try:
205 assert task is not None and task.task_id in self._running_queue,\
206 'Task not found in running queue'
207
208 task.task_cleanup()
209 self._failed_tasks += 1
210 del self._running_queue[task.task_id]
211
212 if isinstance(failure.value, WatchdogTimeoutFailure):
213 self._watchdog_timeouts += 1
214 self._last_watchdog_failure_task = task.name
215
216 except Exception as e:
217 # Check the pending queue
218
219 for priority, tasks in self._pending_queue.iteritems():
220 found = next((t for t in tasks if t.task_id == task.task_id), None)
221
222 if found is not None:
223 self._pending_queue[task.priority].remove(task)
224 if len(self._pending_queue[task.priority]) == 0:
225 del self._pending_queue[task.priority]
226 return failure
227
228 self.log.exception('task-error', task=str(task), e=e)
229 raise
230
231 finally:
232 reactor.callLater(0, self._run_next_task)
233
234 return failure
235
236 def queue_task(self, task):
237 """
238 Place a task on the queue to run
239
240 :param task: (Task) task to run
241 :return: (deferred) Deferred that will fire on task completion
242 """
243 self.log.debug('queue-task', active=self._active, task=str(task),
244 running=len(self._running_queue),
245 pending=len(self._pending_queue))
246
247 if task.priority not in self._pending_queue:
248 self._pending_queue[task.priority] = []
249
250 task.deferred.addCallbacks(self._on_task_success, self._on_task_failure,
251 callbackArgs=[task], errbackArgs=[task])
252
253 self._pending_queue[task.priority].append(task)
254 self._run_next_task()
255
256 return task.deferred
257
258 def cancel_task(self, task_id):
259 """
260 Cancel a pending or running task. The cancel method will be called
261 for the task's deferred
262
263 :param task_id: (int) Task identifier
264 """
265 task = self._running_queue.get(task_id, None)
266
267 if task is not None:
268 try:
269 task.stop()
270 except Exception as e:
271 self.log.exception('stop-error', task=str(task), e=e)
272
273 reactor.callLater(0, self._run_next_task)
274
275 else:
276 for priority, tasks in self._pending_queue.iteritems():
277 task = next((t for t in tasks if t.task_id == task_id), None)
278
279 if task is not None:
280 try:
281 task.deferred.cancel()
282 except Exception as e:
283 self.log.exception('cancel-error', task=str(task), e=e)
284 return
285