blob: 59f06ca5e2a07d735a0d679a6e9cffb48e46cdc9 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import asyncio
15import faulthandler
16import functools
17import logging
18import os
19import signal
20import time
21from concurrent import futures
22from typing import Any, Dict, List, Optional
23
24import grpc
25import pkg_resources
26from common.log_count_handler import MsgCounterHandler
27from common.log_counter import ServiceLogErrorReporter
28from common.metrics_export import get_metrics
29from common.service_registry import ServiceRegistry
30from configuration.exceptions import LoadConfigError
31from configuration.mconfig_managers import get_mconfig_manager
32from configuration.service_configs import load_service_config
33from orc8r.protos.common_pb2 import LogLevel, Void
34from orc8r.protos.metricsd_pb2 import MetricsContainer
35from orc8r.protos.service303_pb2 import (
36 GetOperationalStatesResponse,
37 ReloadConfigResponse,
38 ServiceInfo,
39 State,
40)
41from orc8r.protos.service303_pb2_grpc import (
42 Service303Servicer,
43 Service303Stub,
44 add_Service303Servicer_to_server,
45)
46
47MAX_DEFAULT_WORKER = 10
48
49
50async def loop_exit():
51 """
52 Stop the loop in an async context
53 """
54 loop = asyncio.get_event_loop()
55 loop.stop()
56
57
58class MagmaService(Service303Servicer):
59 """
60 MagmaService provides the framework for all Magma services.
61 This class also implements the Service303 interface for external
62 entities to interact with the service.
63 """
64
65 def __init__(self, name, empty_mconfig, loop=None):
66 self._name = name
67 self._port = 0
68 self._get_status_callback = None
69 self._get_operational_states_cb = None
70 self._log_count_handler = MsgCounterHandler()
71
72 # Init logging before doing anything
73 logging.basicConfig(
74 level=logging.INFO,
75 format='[%(asctime)s %(levelname)s %(name)s] %(message)s',
76 )
77 # Add a handler to count errors
78 logging.root.addHandler(self._log_count_handler)
79
80 # Set gRPC polling strategy
81 self._set_grpc_poll_strategy()
82
83 # Load the managed config if present
84 self._mconfig = empty_mconfig
85 self._mconfig_metadata = None
86 self._mconfig_manager = get_mconfig_manager()
87 self.reload_mconfig()
88
89 self._state = ServiceInfo.STARTING
90 self._health = ServiceInfo.APP_UNHEALTHY
91 if loop is None:
92 loop = asyncio.get_event_loop()
93 self._loop = loop
94 self._start_time = int(time.time())
95 self._register_signal_handlers()
96
97 # Load the service config if present
98 self._config = None
99 self.reload_config()
100
101 # Count errors
102 self.log_counter = ServiceLogErrorReporter(
103 loop=self._loop,
104 service_config=self._config,
105 handler=self._log_count_handler,
106 )
107 self.log_counter.start()
108
109 # Operational States
110 self._operational_states = []
111
112 self._version = '0.0.0'
113 # Load the service version if available
114 try:
115 # Check if service on docker
116 if self._config and 'init_system' in self._config \
117 and self._config['init_system'] == 'docker':
118 # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n"
119 # Skip the "feg_gateway_python:" part
120 image = os.popen(
121 'docker ps --filter name=magmad --format "{{.Image}}" | '
122 'cut -d ":" -f 2',
123 )
124 image_tag = image.read().strip('\n')
125 self._version = image_tag
126 else:
127 self._version = pkg_resources.get_distribution('orc8r').version
128 except pkg_resources.ResolutionError as e:
129 logging.info(e)
130
131 if self._config and 'grpc_workers' in self._config:
132 self._server = grpc.server(
133 futures.ThreadPoolExecutor(
134 max_workers=self._config['grpc_workers'],
135 ),
136 )
137 else:
138 self._server = grpc.server(
139 futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER),
140 )
141 add_Service303Servicer_to_server(self, self._server)
142
143 @property
144 def version(self):
145 """Return the current running version of the Magma service"""
146 return self._version
147
148 @property
149 def name(self):
150 """Return the name of service
151
152 Returns:
153 tr: name of service
154 """
155 return self._name
156
157 @property
158 def rpc_server(self):
159 """Return the RPC server used by the service"""
160 return self._server
161
162 @property
163 def port(self):
164 """Return the listening port of the service"""
165 return self._port
166
167 @property
168 def loop(self):
169 """Return the asyncio event loop used by the service"""
170 return self._loop
171
172 @property
173 def state(self):
174 """Return the state of the service"""
175 return self._state
176
177 @property
178 def config(self) -> Dict[str, Any]:
179 """Return the service config"""
180 return self._config
181
182 @property
183 def mconfig(self):
184 """Return the managed config"""
185 return self._mconfig
186
187 @property
188 def mconfig_metadata(self):
189 """Return the metadata of the managed config"""
190 return self._mconfig_metadata
191
192 @property
193 def mconfig_manager(self):
194 """Return the mconfig manager for this service"""
195 return self._mconfig_manager
196
197 def reload_config(self):
198 """Reload the local config for the service"""
199 try:
200 self._config = load_service_config(self._name)
201 self._setup_logging()
202 except LoadConfigError as e:
203 logging.warning(e)
204
205 def reload_mconfig(self):
206 """Reload the managed config for the service"""
207 try:
208 # reload mconfig manager in case feature flag for streaming changed
209 self._mconfig_manager = get_mconfig_manager()
210 self._mconfig = self._mconfig_manager.load_service_mconfig(
211 self._name,
212 self._mconfig,
213 )
214 self._mconfig_metadata = \
215 self._mconfig_manager.load_mconfig_metadata()
216 except LoadConfigError as e:
217 logging.warning(e)
218
219 def add_operational_states(self, states: List[State]):
220 """Add a list of states into the service
221
222 Args:
223 states (List[State]): [description]
224 """
225 self._operational_states.extend(states)
226
227 def run(self):
228 """
229 Start the service and runs the event loop until a term signal
230 is received or a StopService rpc call is made on the Service303
231 interface.
232 """
Wei-Yu Chen678f0a52021-12-21 13:50:52 +0800233
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800234 logging.info("Starting %s...", self._name)
235 (host, port) = ServiceRegistry.get_service_address(self._name)
236 self._port = self._server.add_insecure_port('{}:{}'.format(host, port))
237 logging.info("Listening on address %s:%d", host, self._port)
238 self._state = ServiceInfo.ALIVE
239 # Python services are healthy immediately when run
240 self._health = ServiceInfo.APP_HEALTHY
241 self._server.start()
242 self._loop.run_forever()
243 # Waiting for the term signal or StopService rpc call
244
245 def close(self):
246 """
247 Clean up the service before termination. This needs to be
248 called atleast once after the service has been inited.
249 """
250 self._loop.close()
251 self._server.stop(0).wait()
252
253 def register_get_status_callback(self, get_status_callback):
254 """Register function for getting status
255
256 Must return a map(string, string)
257 """
258 self._get_status_callback = get_status_callback
259
260 def register_operational_states_callback(self, get_operational_states_cb):
261 """Register the callback function that gets called on GetOperationalStates rpc
262
263 Args:
264 get_operational_states_cb ([type]): callback function
265 """
266 self._get_operational_states_cb = get_operational_states_cb
267
268 def _stop(self, reason):
269 """Stop the service gracefully"""
270 logging.info("Stopping %s with reason %s...", self._name, reason)
271 self._state = ServiceInfo.STOPPING
272 self._server.stop(0)
273
274 for pending_task in asyncio.Task.all_tasks(self._loop):
275 pending_task.cancel()
276
277 self._state = ServiceInfo.STOPPED
278 self._health = ServiceInfo.APP_UNHEALTHY
279 asyncio.ensure_future(loop_exit())
280
281 def _set_grpc_poll_strategy(self):
282 """
283 The new default 'epollex' poll strategy is causing fd leaks, leading to
284 service crashes after 1024 open fds.
285 See https://github.com/grpc/grpc/issues/15759
286 """
287 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll'
288
289 def _get_log_level_from_config(self) -> Optional[int]:
290 if self._config is None:
291 return None
292 log_level = self._config.get('log_level', None)
293 if log_level is None:
294 return None
295 # convert from log level string to LogLevel enum value
296 try:
297 proto_level = LogLevel.Value(log_level)
298 except ValueError:
299 logging.error(
300 'Unknown logging level in config: %s, ignoring',
301 log_level,
302 )
303 return None
304 return proto_level
305
306 def _get_log_level_from_mconfig(self) -> Optional[int]:
307 if self._mconfig is None:
308 return None
309 return self._mconfig.log_level
310
311 def _setup_logging(self):
312 """Set up log level from config values
313
314 The config file on the AGW takes precedence over the mconfig
315 If neither config file nor mconfig has the log level config, default to INFO
316 """
317 log_level_from_config = self._get_log_level_from_config()
318 log_level_from_mconfig = self._get_log_level_from_mconfig()
319
320 if log_level_from_config is not None:
321 log_level = log_level_from_config
322 elif log_level_from_mconfig is not None:
323 log_level = log_level_from_mconfig
324 else:
325 logging.warning(
326 'logging level is not specified in either yml config '
327 'or mconfig, defaulting to INFO',
328 )
329 log_level = LogLevel.Value('INFO')
Wei-Yu Chen5cbdfbb2021-12-02 01:10:21 +0800330
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800331 self._set_log_level(log_level)
332
333 @staticmethod
334 def _set_log_level(proto_level: int):
335 """Set log level based on proto-enum level
336
337 Args:
338 proto_level (int): proto enum defined in common.proto
339 """
340 if proto_level == LogLevel.Value('DEBUG'):
341 level = logging.DEBUG
342 elif proto_level == LogLevel.Value('INFO'):
343 level = logging.INFO
344 elif proto_level == LogLevel.Value('WARNING'):
345 level = logging.WARNING
346 elif proto_level == LogLevel.Value('ERROR'):
347 level = logging.ERROR
348 elif proto_level == LogLevel.Value('FATAL'):
349 level = logging.FATAL
350 else:
351 logging.error(
352 'Unknown logging level: %d, defaulting to INFO',
353 proto_level,
354 )
355 level = logging.INFO
356
357 logging.info(
358 "Setting logging level to %s",
359 logging.getLevelName(level),
360 )
361 logger = logging.getLogger('')
362 logger.setLevel(level)
363
364 def _register_signal_handlers(self):
365 """Register signal handlers
366
367 Right now we just exit on SIGINT/SIGTERM/SIGQUIT
368 """
369 for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']:
370 self._loop.add_signal_handler(
371 getattr(signal, signame),
372 functools.partial(self._stop, signame),
373 )
374
375 def _signal_handler():
376 logging.info('Handling SIGHUP...')
377 faulthandler.dump_traceback()
378 self._loop.add_signal_handler(
379 signal.SIGHUP, functools.partial(_signal_handler),
380 )
381
382 def GetServiceInfo(self, request, context):
383 """Return the service info (name, version, state, meta, etc.)"""
384 service_info = ServiceInfo(
385 name=self._name,
386 version=self._version,
387 state=self._state,
388 health=self._health,
389 start_time_secs=self._start_time,
390 )
391 if self._get_status_callback is not None:
392 status = self._get_status_callback()
393 try:
394 service_info.status.meta.update(status)
395 except (TypeError, ValueError) as exp:
396 logging.error("Error getting service status: %s", exp)
397 return service_info
398
399 def StopService(self, request, context):
400 """Handle request to stop the service"""
401 logging.info("Request to stop service.")
402 self._loop.call_soon_threadsafe(self._stop, 'RPC')
403 return Void()
404
405 def GetMetrics(self, request, context):
406 """
407 Collects timeseries samples from prometheus python client on this
408 process
409 """
410 metrics = MetricsContainer()
411 metrics.family.extend(get_metrics())
412 return metrics
413
414 def SetLogLevel(self, request, context):
415 """Handle request to set the log level"""
416 self._set_log_level(request.level)
417 return Void()
418
419 def SetLogVerbosity(self, request, context):
420 pass # Not Implemented
421
422 def ReloadServiceConfig(self, request, context):
423 """Handle request to reload the service config file"""
424 self.reload_config()
425 return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS)
426
427 def GetOperationalStates(self, request, context):
428 """Return the operational states of devices managed by this service."""
429 res = GetOperationalStatesResponse()
430 if self._get_operational_states_cb is not None:
431 states = self._get_operational_states_cb()
432 res.states.extend(states)
433 return res
434
435
436def get_service303_client(service_name: str, location: str) \
437 -> Optional[Service303Stub]:
438 """
439 Return a grpc client attached to the given service
440 name and location.
441 Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL)
442 """
443 try:
444 chan = ServiceRegistry.get_rpc_channel(
445 service_name,
446 location,
447 )
448 return Service303Stub(chan)
449 except ValueError:
450 # Service can't be contacted
451 logging.error('Failed to get RPC channel to %s', service_name)
452 return None