blob: 54aaca76506c968a950fa2dd0bacdd23426fa7e4 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import abc
7import asyncio
8import logging
9import time
10from contextlib import suppress
11from typing import Optional, cast
12
13
14class Job(abc.ABC):
15 """
16 This is a base class that provides functions for a specific task to
17 ensure regular completion of the loop.
18
19 A co-routine run must be implemented by a subclass.
20 periodic() will call the co-routine at a regular interval set by
21 self._interval.
22 """
23
24 def __init__(
25 self,
26 interval: int,
27 loop: Optional[asyncio.AbstractEventLoop] = None,
28 ) -> None:
29 if loop is None:
30 self._loop = asyncio.get_event_loop()
31 else:
32 self._loop = loop
33 # Task in charge of periodically running the task
34 self._periodic_task = cast(Optional[asyncio.Task], None)
35 # Task in charge of deciding how long to wait until next run
36 self._interval_wait_task = cast(Optional[asyncio.Task], None)
37 self._interval = interval # in seconds
38 self._last_run = cast(Optional[float], None)
39 self._timeout = cast(Optional[float], None)
40 # Condition variable used to control how long the job waits until
41 # executing its task again.
42 self._cond = self._cond = asyncio.Condition(loop=self._loop)
43
44 @abc.abstractmethod
45 async def _run(self):
46 """
47 Once implemented by a subclass, this function will contain the actual
48 work of this Job.
49 """
50 pass
51
52 def start(self) -> None:
53 """
54 kicks off the _periodic while loop
55 """
56 if self._periodic_task is None:
57 self._periodic_task = self._loop.create_task(self._periodic())
58
59 def stop(self) -> None:
60 """
61 cancels the _periodic while loop
62 """
63 if self._periodic_task is not None:
64 self._periodic_task.cancel()
65 with suppress(asyncio.CancelledError):
66 # Await task to execute it's cancellation
67 self._loop.run_until_complete(self._periodic_task)
68 self._periodic_task = None
69
70 def set_timeout(self, timeout: float) -> None:
71 self._timeout = timeout
72
73 def set_interval(self, interval: int) -> None:
74 """
75 sets the interval used in _periodic to decide how long to sleep
76 """
77 self._interval = interval
78
79 def heartbeat(self) -> None:
80 # record time to keep track of iteration length
81 self._last_run = time.time()
82
83 def not_completed(self, current_time: float) -> bool:
84 last_time = self._last_run
85
86 if last_time is None:
87 return True
88 if last_time < current_time - (self._timeout or 120):
89 return True
90 return False
91
92 async def _sleep_for_interval(self):
93 await asyncio.sleep(self._interval)
94 async with self._cond:
95 self._cond.notify()
96
97 async def wake_up(self):
98 """
99 Cancels the _sleep_for_interval task if it exists, and notifies the
100 cond var so that the _periodic loop can continue.
101 """
102 if self._interval_wait_task is not None:
103 self._interval_wait_task.cancel()
104
105 async with self._cond:
106 self._cond.notify()
107
108 async def _periodic(self) -> None:
109 while True:
110 self.heartbeat()
111
112 try:
113 await self._run()
114 except Exception as exp: # pylint: disable=broad-except
115 logging.exception("Exception from _run: %s", exp)
116
117 # Wait for self._interval seconds or wake_up is explicitly called
118 self._interval_wait_task = \
119 self._loop.create_task(self._sleep_for_interval())
120 async with self._cond:
121 await self._cond.wait()