blob: e35e1514d26ee903ff9b1a26dacb0f9e5377251e [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17from unittest import TestCase, main
18from nose.tools import raises
19from twisted.internet import defer
20from twisted.internet.defer import inlineCallbacks, returnValue, CancelledError
21from mock.mock_task import SimpleTask
22from nose.twistedtools import deferred
23from pyvoltha.adapters.extensions.omci.tasks.task_runner import TaskRunner
24
25DEVICE_ID = 'omci-unit-tests'
26
27
28class TestTaskRunner(TestCase):
29 """
30 Test the Task Runner Object
31 """
32
33 def setUp(self):
34 # defer.setDebugging(True)
35 self.runner = TaskRunner(DEVICE_ID)
36
37 def tearDown(self):
38 r, self.runner = self.runner, None
39 r.stop()
40
41 def test_default_init(self):
42 self.assertFalse(self.runner.active)
43 self.assertEqual(self.runner.pending_tasks, 0)
44 self.assertEqual(self.runner.running_tasks, 0)
45 self.assertEqual(self.runner.successful_tasks_completed, 0)
46 self.assertEqual(self.runner.failed_tasks, 0)
47
48 def test_start_stop(self):
49 self.assertFalse(self.runner.active)
50
51 self.runner.start()
52 self.assertTrue(self.runner.active)
53
54 self.runner.stop()
55 self.assertFalse(self.runner.active)
56
57 def unexpected_error(self, _failure):
58 self.assertEqual('Should not be here, expected success', _failure)
59
60 def unexpected_success(self, _results):
61 self.assertEqual('Should not be here, expected a failure', _results)
62
63 def test_simple_task_init(self):
64 t = SimpleTask(None, DEVICE_ID,
65 exclusive=True, priority=0,
66 success=True, value=0, delay=0)
67
68 self.assertEqual(t.priority, 0)
69 self.assertGreater(t.task_id, 0)
70 self.assertTrue(t.exclusive)
71 self.assertFalse(t.deferred.called)
72
73 def test_task_defaults_and_bound(self):
74 # Make sure no one has changed some declared defaults/max/min values
75 from pyvoltha.adapters.extensions.omci.tasks.task import Task
76 self.assertEqual(128, Task.DEFAULT_PRIORITY)
77 self.assertEqual(0, Task.MIN_PRIORITY)
78 self.assertEqual(255, Task.MAX_PRIORITY)
79 self.assertEqual(10, Task.DEFAULT_WATCHDOG_SECS)
80 self.assertEqual(3, Task.MIN_WATCHDOG_SECS)
81 self.assertEqual(60, Task.MAX_WATCHDOG_SECS)
82
83 @raises(AssertionError)
84 def test_task_priority_min(self):
85 from pyvoltha.adapters.extensions.omci.tasks.task import Task
86 _ = SimpleTask(None, DEVICE_ID, priority=Task.MIN_PRIORITY - 1)
87
88 @raises(AssertionError)
89 def test_task_priority_max(self):
90 from pyvoltha.adapters.extensions.omci.tasks.task import Task
91 _ = SimpleTask(None, DEVICE_ID, priority=Task.MAX_PRIORITY + 1)
92
93 @raises(AssertionError)
94 def test_task_watchdog_min(self):
95 from pyvoltha.adapters.extensions.omci.tasks.task import Task
96 _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MIN_WATCHDOG_SECS - 0.000001)
97
98 @raises(AssertionError)
99 def test_task_watchdog_max(self):
100 from pyvoltha.adapters.extensions.omci.tasks.task import Task
101 _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MAX_WATCHDOG_SECS + 0.000001)
102
103 @deferred(timeout=5)
104 def test_simple_success(self):
105 expected_result = 123
106
107 t = SimpleTask(None, DEVICE_ID,
108 exclusive=True, priority=0,
109 success=True, value=expected_result, delay=0)
110
111 d = self.runner.queue_task(t)
112 self.assertEqual(self.runner.pending_tasks, 1)
113 self.assertEqual(self.runner.running_tasks, 0)
114 self.runner.start()
115
116 def check_results(results):
117 self.assertEqual(results, expected_result)
118 self.assertEqual(self.runner.pending_tasks, 0)
119 self.assertEqual(self.runner.running_tasks, 0)
120 self.assertEqual(self.runner.successful_tasks_completed, 1)
121 self.assertEqual(self.runner.failed_tasks, 0)
122 self.assertTrue(self.runner.active)
123 return results
124
125 d.addCallbacks(check_results, self.unexpected_error)
126 return d
127
128 @raises(Exception)
129 @deferred(timeout=5)
130 def test_simple_failure(self):
131 self.expected_failure = Exception('Testing a task failure')
132
133 t = SimpleTask(None, DEVICE_ID,
134 exclusive=True, priority=0,
135 success=False, value=self.expected_failure,
136 delay=0)
137
138 d = self.runner.queue_task(t)
139 self.assertEqual(self.runner.pending_tasks, 1)
140 self.assertEqual(self.runner.running_tasks, 0)
141 self.runner.start()
142
143 def expected_failure(failure):
144 self.assertEqual(failure, self.expected_failure)
145 self.assertEqual(self.runner.pending_tasks, 0)
146 self.assertEqual(self.runner.running_tasks, 0)
147 self.assertEqual(self.runner.successful_tasks_completed, 0)
148 self.assertEqual(self.runner.failed_tasks, 1)
149 self.assertTrue(self.runner.active)
150 return failure
151
152 d.addCallbacks(self.unexpected_success, expected_failure)
153 return d
154
155 @deferred(timeout=5)
156 def test_priority(self):
157 self.last_value_set = 0
158
159 t1 = SimpleTask(None, DEVICE_ID,
160 exclusive=True, priority=1,
161 success=True, value=1, delay=0)
162
163 t2 = SimpleTask(None, DEVICE_ID,
164 exclusive=True, priority=2, # Should finish first
165 success=True, value=2, delay=0)
166
167 d1 = self.runner.queue_task(t1)
168 d2 = self.runner.queue_task(t2)
169
170 def set_last_value(results):
171 self.last_value_set = results
172
173 d1.addCallbacks(set_last_value, self.unexpected_error)
174 d2.addCallbacks(set_last_value, self.unexpected_error)
175
176 self.assertEqual(self.runner.pending_tasks, 2)
177 self.assertEqual(self.runner.running_tasks, 0)
178
179 d = defer.gatherResults([d1, d2], consumeErrors=True)
180
181 def check_results(_):
182 self.assertEqual(self.last_value_set, 1)
183 self.assertEqual(self.runner.pending_tasks, 0)
184 self.assertEqual(self.runner.running_tasks, 0)
185 self.assertEqual(self.runner.successful_tasks_completed, 2)
186
187 d.addCallbacks(check_results, self.unexpected_error)
188
189 self.runner.start()
190 return d
191
192 @inlineCallbacks
193 def check_that_t1_t2_running_and_last_is_not(self, results):
194 from pyvoltha.common.utils.asleep import asleep
195 yield asleep(0.1)
196
197 self.assertEqual(self.runner.pending_tasks, 1)
198 self.assertEqual(self.runner.running_tasks, 2)
199 self.assertEqual(self.runner.successful_tasks_completed, 1)
200
201 returnValue(results)
202
203 @deferred(timeout=10)
204 def test_concurrent(self):
205 blocker = SimpleTask(None, DEVICE_ID,
206 exclusive=True, priority=10,
207 success=True, value=1, delay=0.5)
208
209 t1 = SimpleTask(None, DEVICE_ID,
210 exclusive=False, priority=9,
211 success=True, value=1, delay=2)
212
213 t2 = SimpleTask(None, DEVICE_ID,
214 exclusive=False, priority=9,
215 success=True, value=1, delay=2)
216
217 last = SimpleTask(None, DEVICE_ID,
218 exclusive=True, priority=8,
219 success=True, value=1, delay=0)
220
221 d0 = self.runner.queue_task(blocker)
222 d0.addCallbacks(self.check_that_t1_t2_running_and_last_is_not,
223 self.unexpected_error)
224
225 d1 = self.runner.queue_task(t1)
226 d2 = self.runner.queue_task(t2)
227 d3 = self.runner.queue_task(last)
228
229 self.assertEqual(self.runner.pending_tasks, 4)
230 self.assertEqual(self.runner.running_tasks, 0)
231
232 d = defer.gatherResults([d0, d1, d2, d3], consumeErrors=True)
233
234 def check_final_results(_):
235 self.assertEqual(self.runner.pending_tasks, 0)
236 self.assertEqual(self.runner.running_tasks, 0)
237 self.assertEqual(self.runner.successful_tasks_completed, 4)
238 self.assertEqual(self.runner.failed_tasks, 0)
239
240 d.addCallbacks(check_final_results, self.unexpected_error)
241
242 self.runner.start()
243 return d
244
245 @raises(CancelledError)
246 @deferred(timeout=2)
247 def test_cancel_queued(self):
248 t = SimpleTask(None, DEVICE_ID,
249 exclusive=True, priority=9,
250 success=True, value=1, delay=0)
251
252 d = self.runner.queue_task(t)
253 self.assertEqual(self.runner.pending_tasks, 1)
254 self.assertEqual(self.runner.running_tasks, 0)
255
256 self.runner.cancel_task(t.task_id)
257 self.assertEqual(self.runner.pending_tasks, 0)
258 self.assertEqual(self.runner.running_tasks, 0)
259 return d
260
261 @raises(CancelledError)
262 @deferred(timeout=2)
263 def test_task_stop_queued(self):
264 t = SimpleTask(None, DEVICE_ID,
265 exclusive=True, priority=9,
266 success=True, value=1, delay=0)
267
268 d = self.runner.queue_task(t)
269 self.assertEqual(self.runner.pending_tasks, 1)
270 self.assertEqual(self.runner.running_tasks, 0)
271
272 t.stop()
273 self.assertEqual(self.runner.pending_tasks, 0)
274 self.assertEqual(self.runner.running_tasks, 0)
275 return d
276
277 def test_task_stop_not_queued(self):
278 t = SimpleTask(None, DEVICE_ID,
279 exclusive=True, priority=9,
280 success=True, value=1, delay=0)
281
282 self.assertEqual(self.runner.pending_tasks, 0)
283 self.assertEqual(self.runner.running_tasks, 0)
284
285 t.stop()
286 self.assertFalse(t.running)
287
288 @deferred(timeout=3)
289 def test_task_runner_cancel_running(self):
290 # Both task run in parallel but t1 will finish first and
291 # will request t2 to terminate by calling the TaskRunner's
292 # cancel task method
293 t1 = SimpleTask(None, DEVICE_ID,
294 exclusive=False, priority=9,
295 success=True, value=1, delay=0.5)
296 t2 = SimpleTask(None, DEVICE_ID,
297 exclusive=False, priority=9,
298 success=True, value=1, delay=200)
299
300 d1 = self.runner.queue_task(t1)
301 d2 = self.runner.queue_task(t2)
302
303 self.assertEqual(self.runner.pending_tasks, 2)
304 self.assertEqual(self.runner.running_tasks, 0)
305
306 def kill_task_t2(_, task_2_id):
307 # Called on successful completion of task t1
308 self.assertIsInstance(task_2_id, int)
309 self.assertEqual(self.runner.pending_tasks, 0)
310 self.assertEqual(self.runner.running_tasks, 1)
311
312 # Cancel task runner and t2 task ID
313 self.runner.cancel_task(task_2_id)
314 self.assertEqual(self.runner.running_tasks, 0)
315
316 d1.addCallbacks(kill_task_t2, self.unexpected_error,
317 callbackArgs=[t2.task_id])
318
319 def expected_error(failure):
320 self.assertTrue(isinstance(failure.value, CancelledError))
321 self.assertEqual(self.runner.pending_tasks, 0)
322 self.assertEqual(self.runner.running_tasks, 0)
323 self.assertEqual(self.runner.successful_tasks_completed, 1)
324 self.assertEqual(self.runner.failed_tasks, 1)
325
326 # T2 should not finish successfully, should get a cancel error
327 d2.addCallbacks(self.unexpected_success, expected_error)
328
329 # Run it
330 self.runner.start()
331 return defer.gatherResults([d1, d2], consumeErrors=True)
332
333 @deferred(timeout=3)
334 def test_task_stop_running(self):
335 # Run two tasks where T1 completes first and requests T2 to be
336 # canceled by calling T2's stop method
337
338 t1 = SimpleTask(None, DEVICE_ID,
339 exclusive=False, priority=9,
340 success=True, value=1, delay=0.5)
341 t2 = SimpleTask(None, DEVICE_ID,
342 exclusive=False, priority=9,
343 success=True, value=1, delay=200)
344
345 d1 = self.runner.queue_task(t1)
346 d2 = self.runner.queue_task(t2)
347
348 self.assertEqual(self.runner.pending_tasks, 2)
349 self.assertEqual(self.runner.running_tasks, 0)
350
351 def kill_task_t2(_, task_2):
352 # Called on successful completion of task t1
353 self.assertIsInstance(task_2, SimpleTask)
354 self.assertEqual(self.runner.pending_tasks, 0)
355 self.assertEqual(self.runner.running_tasks, 1)
356
357 # Cancel by telling the task to stop itself
358 task_2.stop()
359 self.assertEqual(self.runner.running_tasks, 0)
360
361 d1.addCallbacks(kill_task_t2, self.unexpected_error,
362 callbackArgs=[t2])
363
364 def expected_error(failure):
365 self.assertTrue(isinstance(failure.value, CancelledError))
366 self.assertEqual(self.runner.pending_tasks, 0)
367 self.assertEqual(self.runner.running_tasks, 0)
368 self.assertEqual(self.runner.successful_tasks_completed, 1)
369 self.assertEqual(self.runner.failed_tasks, 1)
370
371 # T2 should not finish successfully, should get a cancel error
372 d2.addCallbacks(self.unexpected_success, expected_error)
373
374 # Run it
375 self.runner.start()
376 return defer.gatherResults([d1, d2], consumeErrors=True)
377
378 @deferred(timeout=3)
379 def test_task_cancel_not_queued(self):
380 t = SimpleTask(None, DEVICE_ID,
381 exclusive=True, priority=9,
382 success=True, value=1, delay=0)
383
384 self.assertEqual(self.runner.pending_tasks, 0)
385 self.assertEqual(self.runner.running_tasks, 0)
386
387 def expected_error(failure):
388 self.assertTrue(isinstance(failure.value, CancelledError))
389 self.assertEqual(self.runner.pending_tasks, 0)
390 self.assertEqual(self.runner.running_tasks, 0)
391 self.assertEqual(self.runner.successful_tasks_completed, 0)
392 self.assertEqual(self.runner.failed_tasks, 0)
393 # self.fail(msg='made it here') # Uncomment to verify called
394
395 t.deferred.addCallbacks(self.unexpected_success, expected_error)
396
397 self.runner.start()
398 t.deferred.cancel()
399 self.assertFalse(t.running)
400 return t.deferred
401
402 @deferred(timeout=3)
403 def test_task_deferred_cancel_running(self):
404 # Run two tasks where T1 completes first and requests T2 to be
405 # canceled by doing a 'cancel' on T2's deferred
406
407 t1 = SimpleTask(None, DEVICE_ID,
408 exclusive=False, priority=9,
409 success=True, value=1, delay=0.5)
410 t2 = SimpleTask(None, DEVICE_ID,
411 exclusive=False, priority=9,
412 success=True, value=1, delay=200)
413
414 d1 = self.runner.queue_task(t1)
415 d2 = self.runner.queue_task(t2)
416
417 self.assertEqual(self.runner.pending_tasks, 2)
418 self.assertEqual(self.runner.running_tasks, 0)
419
420 def kill_task_t2(_, deferred_2):
421 # Called on successful completion of task t1
422 self.assertIsInstance(deferred_2, defer.Deferred)
423 self.assertEqual(self.runner.pending_tasks, 0)
424 self.assertEqual(self.runner.running_tasks, 1)
425
426 # Cancel the deferred for T2
427 deferred_2.cancel()
428 self.assertEqual(self.runner.running_tasks, 0)
429
430 d1.addCallbacks(kill_task_t2, self.unexpected_error,
431 callbackArgs=[t2.deferred])
432
433 def expected_error(failure):
434 self.assertTrue(isinstance(failure.value, CancelledError))
435 self.assertEqual(self.runner.pending_tasks, 0)
436 self.assertEqual(self.runner.running_tasks, 0)
437 self.assertEqual(self.runner.successful_tasks_completed, 1)
438 self.assertEqual(self.runner.failed_tasks, 1)
439 # self.fail(msg='made it here') # Uncomment to verify called
440
441 # T2 should not finish successfully, should get a cancel error
442 d2.addCallbacks(self.unexpected_success, expected_error)
443
444 # Run it
445 self.runner.start()
446 return defer.gatherResults([d1, d2], consumeErrors=True)
447
448 @deferred(timeout=3)
449 def test_watchdog_timeout(self):
450 t = SimpleTask(None, DEVICE_ID, delay=2)
451
452 self.assertEqual(self.runner.pending_tasks, 0)
453 self.assertEqual(self.runner.running_tasks, 0)
454 self.assertEqual(self.runner.watchdog_timeouts, 0)
455
456 # Actual watchdog minimum is probably to long for an automated test, reach
457 # around and force ti to something smaller (kids, don't try this at home)
458
459 t._watchdog_timeout = 0.1
460 self.runner.queue_task(t)
461
462 self.assertEqual(self.runner.pending_tasks, 1)
463 self.assertEqual(self.runner.running_tasks, 0)
464
465 def expected_error(failure):
466 from pyvoltha.adapters.extensions.omci.tasks.task import WatchdogTimeoutFailure
467 self.assertTrue(isinstance(failure.value, WatchdogTimeoutFailure))
468 self.assertEqual(self.runner.pending_tasks, 0)
469 self.assertEqual(self.runner.running_tasks, 0)
470 self.assertEqual(self.runner.successful_tasks_completed, 0)
471 self.assertEqual(self.runner.failed_tasks, 1)
472 self.assertEqual(self.runner.watchdog_timeouts, 1)
473 self.assertEqual(self.runner.last_watchdog_failure_task, t.name)
474 # self.fail(msg='made it here') # Uncomment to verify called
475
476 t.deferred.addCallbacks(self.unexpected_success, expected_error)
477
478 # Run it
479 self.runner.start()
480 return t.deferred
481
482
483if __name__ == '__main__':
484 main()