blob: 66a71e393b2d431d59b966edc1a315159a57150a [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
Zack Williams84a71e92019-11-15 09:00:19 -070017from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060018from unittest import TestCase, main
19from nose.tools import raises
20from twisted.internet import defer
21from twisted.internet.defer import inlineCallbacks, returnValue, CancelledError
Zack Williams84a71e92019-11-15 09:00:19 -070022from .mock.mock_task import SimpleTask
Chip Boling67b674a2019-02-08 11:42:18 -060023from nose.twistedtools import deferred
24from pyvoltha.adapters.extensions.omci.tasks.task_runner import TaskRunner
25
26DEVICE_ID = 'omci-unit-tests'
27
28
29class TestTaskRunner(TestCase):
30 """
31 Test the Task Runner Object
32 """
33
34 def setUp(self):
35 # defer.setDebugging(True)
36 self.runner = TaskRunner(DEVICE_ID)
37
38 def tearDown(self):
39 r, self.runner = self.runner, None
40 r.stop()
41
42 def test_default_init(self):
43 self.assertFalse(self.runner.active)
44 self.assertEqual(self.runner.pending_tasks, 0)
45 self.assertEqual(self.runner.running_tasks, 0)
46 self.assertEqual(self.runner.successful_tasks_completed, 0)
47 self.assertEqual(self.runner.failed_tasks, 0)
48
49 def test_start_stop(self):
50 self.assertFalse(self.runner.active)
51
52 self.runner.start()
53 self.assertTrue(self.runner.active)
54
55 self.runner.stop()
56 self.assertFalse(self.runner.active)
57
58 def unexpected_error(self, _failure):
59 self.assertEqual('Should not be here, expected success', _failure)
60
61 def unexpected_success(self, _results):
62 self.assertEqual('Should not be here, expected a failure', _results)
63
64 def test_simple_task_init(self):
65 t = SimpleTask(None, DEVICE_ID,
66 exclusive=True, priority=0,
67 success=True, value=0, delay=0)
68
69 self.assertEqual(t.priority, 0)
70 self.assertGreater(t.task_id, 0)
71 self.assertTrue(t.exclusive)
72 self.assertFalse(t.deferred.called)
73
74 def test_task_defaults_and_bound(self):
75 # Make sure no one has changed some declared defaults/max/min values
76 from pyvoltha.adapters.extensions.omci.tasks.task import Task
77 self.assertEqual(128, Task.DEFAULT_PRIORITY)
78 self.assertEqual(0, Task.MIN_PRIORITY)
79 self.assertEqual(255, Task.MAX_PRIORITY)
80 self.assertEqual(10, Task.DEFAULT_WATCHDOG_SECS)
81 self.assertEqual(3, Task.MIN_WATCHDOG_SECS)
82 self.assertEqual(60, Task.MAX_WATCHDOG_SECS)
83
84 @raises(AssertionError)
85 def test_task_priority_min(self):
86 from pyvoltha.adapters.extensions.omci.tasks.task import Task
87 _ = SimpleTask(None, DEVICE_ID, priority=Task.MIN_PRIORITY - 1)
88
89 @raises(AssertionError)
90 def test_task_priority_max(self):
91 from pyvoltha.adapters.extensions.omci.tasks.task import Task
92 _ = SimpleTask(None, DEVICE_ID, priority=Task.MAX_PRIORITY + 1)
93
94 @raises(AssertionError)
95 def test_task_watchdog_min(self):
96 from pyvoltha.adapters.extensions.omci.tasks.task import Task
97 _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MIN_WATCHDOG_SECS - 0.000001)
98
99 @raises(AssertionError)
100 def test_task_watchdog_max(self):
101 from pyvoltha.adapters.extensions.omci.tasks.task import Task
102 _ = SimpleTask(None, DEVICE_ID, watchdog_timeout=Task.MAX_WATCHDOG_SECS + 0.000001)
103
104 @deferred(timeout=5)
105 def test_simple_success(self):
106 expected_result = 123
107
108 t = SimpleTask(None, DEVICE_ID,
109 exclusive=True, priority=0,
110 success=True, value=expected_result, delay=0)
111
112 d = self.runner.queue_task(t)
113 self.assertEqual(self.runner.pending_tasks, 1)
114 self.assertEqual(self.runner.running_tasks, 0)
115 self.runner.start()
116
117 def check_results(results):
118 self.assertEqual(results, expected_result)
119 self.assertEqual(self.runner.pending_tasks, 0)
120 self.assertEqual(self.runner.running_tasks, 0)
121 self.assertEqual(self.runner.successful_tasks_completed, 1)
122 self.assertEqual(self.runner.failed_tasks, 0)
123 self.assertTrue(self.runner.active)
124 return results
125
126 d.addCallbacks(check_results, self.unexpected_error)
127 return d
128
129 @raises(Exception)
130 @deferred(timeout=5)
131 def test_simple_failure(self):
132 self.expected_failure = Exception('Testing a task failure')
133
134 t = SimpleTask(None, DEVICE_ID,
135 exclusive=True, priority=0,
136 success=False, value=self.expected_failure,
137 delay=0)
138
139 d = self.runner.queue_task(t)
140 self.assertEqual(self.runner.pending_tasks, 1)
141 self.assertEqual(self.runner.running_tasks, 0)
142 self.runner.start()
143
144 def expected_failure(failure):
145 self.assertEqual(failure, self.expected_failure)
146 self.assertEqual(self.runner.pending_tasks, 0)
147 self.assertEqual(self.runner.running_tasks, 0)
148 self.assertEqual(self.runner.successful_tasks_completed, 0)
149 self.assertEqual(self.runner.failed_tasks, 1)
150 self.assertTrue(self.runner.active)
151 return failure
152
153 d.addCallbacks(self.unexpected_success, expected_failure)
154 return d
155
156 @deferred(timeout=5)
157 def test_priority(self):
158 self.last_value_set = 0
159
160 t1 = SimpleTask(None, DEVICE_ID,
161 exclusive=True, priority=1,
162 success=True, value=1, delay=0)
163
164 t2 = SimpleTask(None, DEVICE_ID,
165 exclusive=True, priority=2, # Should finish first
166 success=True, value=2, delay=0)
167
168 d1 = self.runner.queue_task(t1)
169 d2 = self.runner.queue_task(t2)
170
171 def set_last_value(results):
172 self.last_value_set = results
173
174 d1.addCallbacks(set_last_value, self.unexpected_error)
175 d2.addCallbacks(set_last_value, self.unexpected_error)
176
177 self.assertEqual(self.runner.pending_tasks, 2)
178 self.assertEqual(self.runner.running_tasks, 0)
179
180 d = defer.gatherResults([d1, d2], consumeErrors=True)
181
182 def check_results(_):
183 self.assertEqual(self.last_value_set, 1)
184 self.assertEqual(self.runner.pending_tasks, 0)
185 self.assertEqual(self.runner.running_tasks, 0)
186 self.assertEqual(self.runner.successful_tasks_completed, 2)
187
188 d.addCallbacks(check_results, self.unexpected_error)
189
190 self.runner.start()
191 return d
192
193 @inlineCallbacks
194 def check_that_t1_t2_running_and_last_is_not(self, results):
195 from pyvoltha.common.utils.asleep import asleep
196 yield asleep(0.1)
197
198 self.assertEqual(self.runner.pending_tasks, 1)
199 self.assertEqual(self.runner.running_tasks, 2)
200 self.assertEqual(self.runner.successful_tasks_completed, 1)
201
202 returnValue(results)
203
204 @deferred(timeout=10)
205 def test_concurrent(self):
206 blocker = SimpleTask(None, DEVICE_ID,
207 exclusive=True, priority=10,
208 success=True, value=1, delay=0.5)
209
210 t1 = SimpleTask(None, DEVICE_ID,
211 exclusive=False, priority=9,
212 success=True, value=1, delay=2)
213
214 t2 = SimpleTask(None, DEVICE_ID,
215 exclusive=False, priority=9,
216 success=True, value=1, delay=2)
217
218 last = SimpleTask(None, DEVICE_ID,
219 exclusive=True, priority=8,
220 success=True, value=1, delay=0)
221
222 d0 = self.runner.queue_task(blocker)
223 d0.addCallbacks(self.check_that_t1_t2_running_and_last_is_not,
224 self.unexpected_error)
225
226 d1 = self.runner.queue_task(t1)
227 d2 = self.runner.queue_task(t2)
228 d3 = self.runner.queue_task(last)
229
230 self.assertEqual(self.runner.pending_tasks, 4)
231 self.assertEqual(self.runner.running_tasks, 0)
232
233 d = defer.gatherResults([d0, d1, d2, d3], consumeErrors=True)
234
235 def check_final_results(_):
236 self.assertEqual(self.runner.pending_tasks, 0)
237 self.assertEqual(self.runner.running_tasks, 0)
238 self.assertEqual(self.runner.successful_tasks_completed, 4)
239 self.assertEqual(self.runner.failed_tasks, 0)
240
241 d.addCallbacks(check_final_results, self.unexpected_error)
242
243 self.runner.start()
244 return d
245
246 @raises(CancelledError)
247 @deferred(timeout=2)
248 def test_cancel_queued(self):
249 t = SimpleTask(None, DEVICE_ID,
250 exclusive=True, priority=9,
251 success=True, value=1, delay=0)
252
253 d = self.runner.queue_task(t)
254 self.assertEqual(self.runner.pending_tasks, 1)
255 self.assertEqual(self.runner.running_tasks, 0)
256
257 self.runner.cancel_task(t.task_id)
258 self.assertEqual(self.runner.pending_tasks, 0)
259 self.assertEqual(self.runner.running_tasks, 0)
260 return d
261
262 @raises(CancelledError)
263 @deferred(timeout=2)
264 def test_task_stop_queued(self):
265 t = SimpleTask(None, DEVICE_ID,
266 exclusive=True, priority=9,
267 success=True, value=1, delay=0)
268
269 d = self.runner.queue_task(t)
270 self.assertEqual(self.runner.pending_tasks, 1)
271 self.assertEqual(self.runner.running_tasks, 0)
272
273 t.stop()
274 self.assertEqual(self.runner.pending_tasks, 0)
275 self.assertEqual(self.runner.running_tasks, 0)
276 return d
277
278 def test_task_stop_not_queued(self):
279 t = SimpleTask(None, DEVICE_ID,
280 exclusive=True, priority=9,
281 success=True, value=1, delay=0)
282
283 self.assertEqual(self.runner.pending_tasks, 0)
284 self.assertEqual(self.runner.running_tasks, 0)
285
286 t.stop()
287 self.assertFalse(t.running)
288
289 @deferred(timeout=3)
290 def test_task_runner_cancel_running(self):
291 # Both task run in parallel but t1 will finish first and
292 # will request t2 to terminate by calling the TaskRunner's
293 # cancel task method
294 t1 = SimpleTask(None, DEVICE_ID,
295 exclusive=False, priority=9,
296 success=True, value=1, delay=0.5)
297 t2 = SimpleTask(None, DEVICE_ID,
298 exclusive=False, priority=9,
299 success=True, value=1, delay=200)
300
301 d1 = self.runner.queue_task(t1)
302 d2 = self.runner.queue_task(t2)
303
304 self.assertEqual(self.runner.pending_tasks, 2)
305 self.assertEqual(self.runner.running_tasks, 0)
306
307 def kill_task_t2(_, task_2_id):
308 # Called on successful completion of task t1
309 self.assertIsInstance(task_2_id, int)
310 self.assertEqual(self.runner.pending_tasks, 0)
311 self.assertEqual(self.runner.running_tasks, 1)
312
313 # Cancel task runner and t2 task ID
314 self.runner.cancel_task(task_2_id)
315 self.assertEqual(self.runner.running_tasks, 0)
316
317 d1.addCallbacks(kill_task_t2, self.unexpected_error,
318 callbackArgs=[t2.task_id])
319
320 def expected_error(failure):
321 self.assertTrue(isinstance(failure.value, CancelledError))
322 self.assertEqual(self.runner.pending_tasks, 0)
323 self.assertEqual(self.runner.running_tasks, 0)
324 self.assertEqual(self.runner.successful_tasks_completed, 1)
325 self.assertEqual(self.runner.failed_tasks, 1)
326
327 # T2 should not finish successfully, should get a cancel error
328 d2.addCallbacks(self.unexpected_success, expected_error)
329
330 # Run it
331 self.runner.start()
332 return defer.gatherResults([d1, d2], consumeErrors=True)
333
334 @deferred(timeout=3)
335 def test_task_stop_running(self):
336 # Run two tasks where T1 completes first and requests T2 to be
337 # canceled by calling T2's stop method
338
339 t1 = SimpleTask(None, DEVICE_ID,
340 exclusive=False, priority=9,
341 success=True, value=1, delay=0.5)
342 t2 = SimpleTask(None, DEVICE_ID,
343 exclusive=False, priority=9,
344 success=True, value=1, delay=200)
345
346 d1 = self.runner.queue_task(t1)
347 d2 = self.runner.queue_task(t2)
348
349 self.assertEqual(self.runner.pending_tasks, 2)
350 self.assertEqual(self.runner.running_tasks, 0)
351
352 def kill_task_t2(_, task_2):
353 # Called on successful completion of task t1
354 self.assertIsInstance(task_2, SimpleTask)
355 self.assertEqual(self.runner.pending_tasks, 0)
356 self.assertEqual(self.runner.running_tasks, 1)
357
358 # Cancel by telling the task to stop itself
359 task_2.stop()
360 self.assertEqual(self.runner.running_tasks, 0)
361
362 d1.addCallbacks(kill_task_t2, self.unexpected_error,
363 callbackArgs=[t2])
364
365 def expected_error(failure):
366 self.assertTrue(isinstance(failure.value, CancelledError))
367 self.assertEqual(self.runner.pending_tasks, 0)
368 self.assertEqual(self.runner.running_tasks, 0)
369 self.assertEqual(self.runner.successful_tasks_completed, 1)
370 self.assertEqual(self.runner.failed_tasks, 1)
371
372 # T2 should not finish successfully, should get a cancel error
373 d2.addCallbacks(self.unexpected_success, expected_error)
374
375 # Run it
376 self.runner.start()
377 return defer.gatherResults([d1, d2], consumeErrors=True)
378
379 @deferred(timeout=3)
380 def test_task_cancel_not_queued(self):
381 t = SimpleTask(None, DEVICE_ID,
382 exclusive=True, priority=9,
383 success=True, value=1, delay=0)
384
385 self.assertEqual(self.runner.pending_tasks, 0)
386 self.assertEqual(self.runner.running_tasks, 0)
387
388 def expected_error(failure):
389 self.assertTrue(isinstance(failure.value, CancelledError))
390 self.assertEqual(self.runner.pending_tasks, 0)
391 self.assertEqual(self.runner.running_tasks, 0)
392 self.assertEqual(self.runner.successful_tasks_completed, 0)
393 self.assertEqual(self.runner.failed_tasks, 0)
394 # self.fail(msg='made it here') # Uncomment to verify called
395
396 t.deferred.addCallbacks(self.unexpected_success, expected_error)
397
398 self.runner.start()
399 t.deferred.cancel()
400 self.assertFalse(t.running)
401 return t.deferred
402
403 @deferred(timeout=3)
404 def test_task_deferred_cancel_running(self):
405 # Run two tasks where T1 completes first and requests T2 to be
406 # canceled by doing a 'cancel' on T2's deferred
407
408 t1 = SimpleTask(None, DEVICE_ID,
409 exclusive=False, priority=9,
410 success=True, value=1, delay=0.5)
411 t2 = SimpleTask(None, DEVICE_ID,
412 exclusive=False, priority=9,
413 success=True, value=1, delay=200)
414
415 d1 = self.runner.queue_task(t1)
416 d2 = self.runner.queue_task(t2)
417
418 self.assertEqual(self.runner.pending_tasks, 2)
419 self.assertEqual(self.runner.running_tasks, 0)
420
421 def kill_task_t2(_, deferred_2):
422 # Called on successful completion of task t1
423 self.assertIsInstance(deferred_2, defer.Deferred)
424 self.assertEqual(self.runner.pending_tasks, 0)
425 self.assertEqual(self.runner.running_tasks, 1)
426
427 # Cancel the deferred for T2
428 deferred_2.cancel()
429 self.assertEqual(self.runner.running_tasks, 0)
430
431 d1.addCallbacks(kill_task_t2, self.unexpected_error,
432 callbackArgs=[t2.deferred])
433
434 def expected_error(failure):
435 self.assertTrue(isinstance(failure.value, CancelledError))
436 self.assertEqual(self.runner.pending_tasks, 0)
437 self.assertEqual(self.runner.running_tasks, 0)
438 self.assertEqual(self.runner.successful_tasks_completed, 1)
439 self.assertEqual(self.runner.failed_tasks, 1)
440 # self.fail(msg='made it here') # Uncomment to verify called
441
442 # T2 should not finish successfully, should get a cancel error
443 d2.addCallbacks(self.unexpected_success, expected_error)
444
445 # Run it
446 self.runner.start()
447 return defer.gatherResults([d1, d2], consumeErrors=True)
448
449 @deferred(timeout=3)
450 def test_watchdog_timeout(self):
451 t = SimpleTask(None, DEVICE_ID, delay=2)
452
453 self.assertEqual(self.runner.pending_tasks, 0)
454 self.assertEqual(self.runner.running_tasks, 0)
455 self.assertEqual(self.runner.watchdog_timeouts, 0)
456
457 # Actual watchdog minimum is probably to long for an automated test, reach
458 # around and force ti to something smaller (kids, don't try this at home)
459
460 t._watchdog_timeout = 0.1
461 self.runner.queue_task(t)
462
463 self.assertEqual(self.runner.pending_tasks, 1)
464 self.assertEqual(self.runner.running_tasks, 0)
465
466 def expected_error(failure):
467 from pyvoltha.adapters.extensions.omci.tasks.task import WatchdogTimeoutFailure
468 self.assertTrue(isinstance(failure.value, WatchdogTimeoutFailure))
469 self.assertEqual(self.runner.pending_tasks, 0)
470 self.assertEqual(self.runner.running_tasks, 0)
471 self.assertEqual(self.runner.successful_tasks_completed, 0)
472 self.assertEqual(self.runner.failed_tasks, 1)
473 self.assertEqual(self.runner.watchdog_timeouts, 1)
474 self.assertEqual(self.runner.last_watchdog_failure_task, t.name)
475 # self.fail(msg='made it here') # Uncomment to verify called
476
477 t.deferred.addCallbacks(self.unexpected_success, expected_error)
478
479 # Run it
480 self.runner.start()
481 return t.deferred
482
483
484if __name__ == '__main__':
485 main()