blob: 6175d3fda3c18ec7a84d0e96aab407de13fcc7c5 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Zack Williams84a71e92019-11-15 09:00:19 -070016from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060017import structlog
18from twisted.internet import reactor
Zack Williams84a71e92019-11-15 09:00:19 -070019import six
Chip Boling67b674a2019-02-08 11:42:18 -060020
21
22class TaskRunner(object):
23 """
24 Control the number of running tasks utilizing the OMCI Communications
25 channel (OMCI_CC
26 """
27 def __init__(self, device_id, clock=None):
28 self.log = structlog.get_logger(device_id=device_id)
29 self._pending_queue = dict() # task-priority -> [tasks]
30 self._running_queue = dict() # task-id -> task
31 self._active = False
32
33 self._successful_tasks = 0
34 self._failed_tasks = 0
35 self._watchdog_timeouts = 0
36 self._last_watchdog_failure_task = ''
37 self.reactor = clock if clock is not None else reactor
38
39 def __str__(self):
40 return 'TaskRunner: Pending: {}, Running:{}'.format(self.pending_tasks,
41 self.running_tasks)
42
43 @property
44 def active(self):
45 return self._active
46
47 @property
48 def pending_tasks(self):
49 """
50 Get the number of tasks pending to run
51 """
52 count = 0
Zack Williams84a71e92019-11-15 09:00:19 -070053 for tasks in six.itervalues(self._pending_queue):
Chip Boling67b674a2019-02-08 11:42:18 -060054 count += len(tasks)
55 return count
56
57 @property
58 def running_tasks(self):
59 """
60 Get the number of tasks currently running
61 """
62 return len(self._running_queue)
63
64 @property
65 def successful_tasks_completed(self):
66 return self._successful_tasks
67
68 @property
69 def failed_tasks(self):
70 return self._failed_tasks
71
72 @property
73 def watchdog_timeouts(self):
74 return self._watchdog_timeouts
75
76 @property
77 def last_watchdog_failure_task(self):
78 """ Task name of last tasks to fail due to watchdog"""
79 return self._last_watchdog_failure_task
80
81 # TODO: add properties for various stats as needed
82
83 def start(self):
84 """
85 Start the Task runner
86 """
87 self.log.debug('starting', active=self._active)
88
89 if not self._active:
90 assert len(self._running_queue) == 0, 'Running task queue not empty'
91 self._active = True
92 self._run_next_task()
93
94 def stop(self):
95 """
96 Stop the Task runner, first stopping any tasks and flushing the queue
97 """
98 self.log.debug('stopping', active=self._active)
99
100 if self._active:
101 self._active = False
102
103 pq, self._pending_queue = self._pending_queue, dict()
104 rq, self._running_queue = self._running_queue, dict()
105
106 # Stop running tasks
Zack Williams84a71e92019-11-15 09:00:19 -0700107 for task in six.itervalues(rq):
Chip Boling67b674a2019-02-08 11:42:18 -0600108 try:
109 task.stop()
110 except:
111 pass
112
113 # Kill pending tasks
Zack Williams84a71e92019-11-15 09:00:19 -0700114 for d in six.iterkeys(pq):
Chip Boling67b674a2019-02-08 11:42:18 -0600115 try:
116 d.cancel()
117 except:
118 pass
119
120 def _run_next_task(self):
121 """
122 Search for next task to run, if one can
123 :return:
124 """
125 self.log.debug('run-next', active=self._active,
126 num_running=len(self._running_queue),
127 num_pending=len(self._pending_queue))
128
129 if self._active and len(self._pending_queue) > 0:
130 # Cannot run a new task if a running one needs the OMCI_CC exclusively
131
Zack Williams84a71e92019-11-15 09:00:19 -0700132 if any(task.exclusive for task in six.itervalues(self._running_queue)):
Chip Boling67b674a2019-02-08 11:42:18 -0600133 self.log.debug('exclusive-running')
134 return # An exclusive task is already running
135
136 try:
Zack Williams84a71e92019-11-15 09:00:19 -0700137 priorities = [k for k in six.iterkeys(self._pending_queue)]
Chip Boling67b674a2019-02-08 11:42:18 -0600138 priorities.sort(reverse=True)
139 highest_priority = priorities[0] if len(priorities) else None
140
141 if highest_priority is not None:
142 queue = self._pending_queue[highest_priority]
143 next_task = queue[0] if len(queue) else None
144
145 if next_task is not None:
146 if next_task.exclusive and len(self._running_queue) > 0:
147 self.log.debug('next-is-exclusive', task=str(next_task))
148 return # Next task to run needs exclusive access
149
150 queue.pop(0)
151 if len(queue) == 0:
152 del self._pending_queue[highest_priority]
153
154 self.log.debug('starting-task', task=str(next_task),
155 running=len(self._running_queue),
156 pending=len(self._pending_queue))
157
158 self._running_queue[next_task.task_id] = next_task
159 self.reactor.callLater(0, next_task.start)
160
161 # Run again if others are waiting
162 if len(self._pending_queue):
163 self._run_next_task()
164
165 except Exception as e:
166 self.log.exception('run-next', e=e)
167
168 def _on_task_success(self, results, task):
169 """
170 A task completed successfully callback
171 :param results: deferred results
172 :param task: (Task) The task that succeeded
173 :return: deferred results
174 """
175 self.log.debug('task-success', task_id=str(task),
176 running=len(self._running_queue),
177 pending=len(self._pending_queue))
178 try:
179 assert task is not None and task.task_id in self._running_queue,\
180 'Task not found in running queue'
181
182 task.task_cleanup()
183 self._successful_tasks += 1
184 del self._running_queue[task.task_id]
185
186 except Exception as e:
187 self.log.exception('task-error', task=str(task), e=e)
188
189 finally:
190 reactor.callLater(0, self._run_next_task)
191
192 return results
193
194 def _on_task_failure(self, failure, task):
195 """
196 A task completed with failure callback
197 :param failure: (Failure) Failure results
198 :param task: (Task) The task that failed
199 :return: (Failure) Failure results
200 """
201 from pyvoltha.adapters.extensions.omci.tasks.task import WatchdogTimeoutFailure
202
203 self.log.debug('task-failure', task_id=str(task),
204 running=len(self._running_queue),
205 pending=len(self._pending_queue))
206 try:
207 assert task is not None and task.task_id in self._running_queue,\
208 'Task not found in running queue'
209
210 task.task_cleanup()
211 self._failed_tasks += 1
212 del self._running_queue[task.task_id]
213
214 if isinstance(failure.value, WatchdogTimeoutFailure):
215 self._watchdog_timeouts += 1
216 self._last_watchdog_failure_task = task.name
217
218 except Exception as e:
219 # Check the pending queue
220
Zack Williams84a71e92019-11-15 09:00:19 -0700221 for priority, tasks in six.iteritems(self._pending_queue):
Chip Boling67b674a2019-02-08 11:42:18 -0600222 found = next((t for t in tasks if t.task_id == task.task_id), None)
223
224 if found is not None:
225 self._pending_queue[task.priority].remove(task)
226 if len(self._pending_queue[task.priority]) == 0:
227 del self._pending_queue[task.priority]
228 return failure
229
230 self.log.exception('task-error', task=str(task), e=e)
231 raise
232
233 finally:
234 reactor.callLater(0, self._run_next_task)
235
236 return failure
237
238 def queue_task(self, task):
239 """
240 Place a task on the queue to run
241
242 :param task: (Task) task to run
243 :return: (deferred) Deferred that will fire on task completion
244 """
245 self.log.debug('queue-task', active=self._active, task=str(task),
246 running=len(self._running_queue),
247 pending=len(self._pending_queue))
248
249 if task.priority not in self._pending_queue:
250 self._pending_queue[task.priority] = []
251
252 task.deferred.addCallbacks(self._on_task_success, self._on_task_failure,
253 callbackArgs=[task], errbackArgs=[task])
254
255 self._pending_queue[task.priority].append(task)
256 self._run_next_task()
257
258 return task.deferred
259
260 def cancel_task(self, task_id):
261 """
262 Cancel a pending or running task. The cancel method will be called
263 for the task's deferred
264
265 :param task_id: (int) Task identifier
266 """
267 task = self._running_queue.get(task_id, None)
268
269 if task is not None:
270 try:
271 task.stop()
272 except Exception as e:
273 self.log.exception('stop-error', task=str(task), e=e)
274
275 reactor.callLater(0, self._run_next_task)
276
277 else:
Zack Williams84a71e92019-11-15 09:00:19 -0700278 for priority, tasks in six.iteritems(self._pending_queue):
Chip Boling67b674a2019-02-08 11:42:18 -0600279 task = next((t for t in tasks if t.task_id == task_id), None)
280
281 if task is not None:
282 try:
283 task.deferred.cancel()
284 except Exception as e:
285 self.log.exception('cancel-error', task=str(task), e=e)
286 return
287