blob: f3ba0d29d1ccc7ffb65997bf1500d5e77a5661ca [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import abc
15import asyncio
16import logging
17import time
18from contextlib import suppress
19from typing import Optional, cast
20
21
22class Job(abc.ABC):
23 """
24 This is a base class that provides functions for a specific task to
25 ensure regular completion of the loop.
26
27 A co-routine run must be implemented by a subclass.
28 periodic() will call the co-routine at a regular interval set by
29 self._interval.
30 """
31
32 def __init__(
33 self,
34 interval: int,
35 loop: Optional[asyncio.AbstractEventLoop] = None,
36 ) -> None:
37 if loop is None:
38 self._loop = asyncio.get_event_loop()
39 else:
40 self._loop = loop
41 # Task in charge of periodically running the task
42 self._periodic_task = cast(Optional[asyncio.Task], None)
43 # Task in charge of deciding how long to wait until next run
44 self._interval_wait_task = cast(Optional[asyncio.Task], None)
45 self._interval = interval # in seconds
46 self._last_run = cast(Optional[float], None)
47 self._timeout = cast(Optional[float], None)
48 # Condition variable used to control how long the job waits until
49 # executing its task again.
50 self._cond = self._cond = asyncio.Condition(loop=self._loop)
51
52 @abc.abstractmethod
53 async def _run(self):
54 """
55 Once implemented by a subclass, this function will contain the actual
56 work of this Job.
57 """
58 pass
59
60 def start(self) -> None:
61 """
62 kicks off the _periodic while loop
63 """
64 if self._periodic_task is None:
65 self._periodic_task = self._loop.create_task(self._periodic())
66
67 def stop(self) -> None:
68 """
69 cancels the _periodic while loop
70 """
71 if self._periodic_task is not None:
72 self._periodic_task.cancel()
73 with suppress(asyncio.CancelledError):
74 # Await task to execute it's cancellation
75 self._loop.run_until_complete(self._periodic_task)
76 self._periodic_task = None
77
78 def set_timeout(self, timeout: float) -> None:
79 self._timeout = timeout
80
81 def set_interval(self, interval: int) -> None:
82 """
83 sets the interval used in _periodic to decide how long to sleep
84 """
85 self._interval = interval
86
87 def heartbeat(self) -> None:
88 # record time to keep track of iteration length
89 self._last_run = time.time()
90
91 def not_completed(self, current_time: float) -> bool:
92 last_time = self._last_run
93
94 if last_time is None:
95 return True
96 if last_time < current_time - (self._timeout or 120):
97 return True
98 return False
99
100 async def _sleep_for_interval(self):
101 await asyncio.sleep(self._interval)
102 async with self._cond:
103 self._cond.notify()
104
105 async def wake_up(self):
106 """
107 Cancels the _sleep_for_interval task if it exists, and notifies the
108 cond var so that the _periodic loop can continue.
109 """
110 if self._interval_wait_task is not None:
111 self._interval_wait_task.cancel()
112
113 async with self._cond:
114 self._cond.notify()
115
116 async def _periodic(self) -> None:
117 while True:
118 self.heartbeat()
119
120 try:
121 await self._run()
122 except Exception as exp: # pylint: disable=broad-except
123 logging.exception("Exception from _run: %s", exp)
124
125 # Wait for self._interval seconds or wake_up is explicitly called
126 self._interval_wait_task = \
127 self._loop.create_task(self._sleep_for_interval())
128 async with self._cond:
129 await self._cond.wait()