blob: 1e9848331f84aa977548fa488fbc8d17e4bcec0b [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import asyncio
7import faulthandler
8import functools
9import logging
10import os
11import signal
12import time
13from concurrent import futures
14from typing import Any, Dict, List, Optional
15
16import grpc
17import pkg_resources
18from common.log_count_handler import MsgCounterHandler
19from common.log_counter import ServiceLogErrorReporter
20from common.metrics_export import get_metrics
21from common.service_registry import ServiceRegistry
22from configuration.exceptions import LoadConfigError
23from configuration.mconfig_managers import get_mconfig_manager
24from configuration.service_configs import load_service_config
25from orc8r.protos.common_pb2 import LogLevel, Void
26from orc8r.protos.metricsd_pb2 import MetricsContainer
27from orc8r.protos.service303_pb2 import (
28 GetOperationalStatesResponse,
29 ReloadConfigResponse,
30 ServiceInfo,
31 State,
32)
33from orc8r.protos.service303_pb2_grpc import (
34 Service303Servicer,
35 Service303Stub,
36 add_Service303Servicer_to_server,
37)
38
39MAX_DEFAULT_WORKER = 10
40
41
42async def loop_exit():
43 """
44 Stop the loop in an async context
45 """
46 loop = asyncio.get_event_loop()
47 loop.stop()
48
49
50class MagmaService(Service303Servicer):
51 """
52 MagmaService provides the framework for all Magma services.
53 This class also implements the Service303 interface for external
54 entities to interact with the service.
55 """
56
57 def __init__(self, name, empty_mconfig, loop=None):
58 self._name = name
59 self._port = 0
60 self._get_status_callback = None
61 self._get_operational_states_cb = None
62 self._log_count_handler = MsgCounterHandler()
63
64 # Init logging before doing anything
65 logging.basicConfig(
66 level=logging.INFO,
67 format='[%(asctime)s %(levelname)s %(name)s] %(message)s',
68 )
69 # Add a handler to count errors
70 logging.root.addHandler(self._log_count_handler)
71
72 # Set gRPC polling strategy
73 self._set_grpc_poll_strategy()
74
75 # Load the managed config if present
76 self._mconfig = empty_mconfig
77 self._mconfig_metadata = None
78 self._mconfig_manager = get_mconfig_manager()
79 self.reload_mconfig()
80
81 self._state = ServiceInfo.STARTING
82 self._health = ServiceInfo.APP_UNHEALTHY
83 if loop is None:
84 loop = asyncio.get_event_loop()
85 self._loop = loop
86 self._start_time = int(time.time())
87 self._register_signal_handlers()
88
89 # Load the service config if present
90 self._config = None
91 self.reload_config()
92
93 # Count errors
94 self.log_counter = ServiceLogErrorReporter(
95 loop=self._loop,
96 service_config=self._config,
97 handler=self._log_count_handler,
98 )
99 self.log_counter.start()
100
101 # Operational States
102 self._operational_states = []
103
104 self._version = '0.0.0'
105 # Load the service version if available
106 try:
107 # Check if service on docker
108 if self._config and 'init_system' in self._config \
109 and self._config['init_system'] == 'docker':
110 # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n"
111 # Skip the "feg_gateway_python:" part
112 image = os.popen(
113 'docker ps --filter name=magmad --format "{{.Image}}" | '
114 'cut -d ":" -f 2',
115 )
116 image_tag = image.read().strip('\n')
117 self._version = image_tag
118 else:
119 self._version = pkg_resources.get_distribution('orc8r').version
120 except pkg_resources.ResolutionError as e:
121 logging.info(e)
122
123 if self._config and 'grpc_workers' in self._config:
124 self._server = grpc.server(
125 futures.ThreadPoolExecutor(
126 max_workers=self._config['grpc_workers'],
127 ),
128 )
129 else:
130 self._server = grpc.server(
131 futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER),
132 )
133 add_Service303Servicer_to_server(self, self._server)
134
135 @property
136 def version(self):
137 """Return the current running version of the Magma service"""
138 return self._version
139
140 @property
141 def name(self):
142 """Return the name of service
143
144 Returns:
145 tr: name of service
146 """
147 return self._name
148
149 @property
150 def rpc_server(self):
151 """Return the RPC server used by the service"""
152 return self._server
153
154 @property
155 def port(self):
156 """Return the listening port of the service"""
157 return self._port
158
159 @property
160 def loop(self):
161 """Return the asyncio event loop used by the service"""
162 return self._loop
163
164 @property
165 def state(self):
166 """Return the state of the service"""
167 return self._state
168
169 @property
170 def config(self) -> Dict[str, Any]:
171 """Return the service config"""
172 return self._config
173
174 @property
175 def mconfig(self):
176 """Return the managed config"""
177 return self._mconfig
178
179 @property
180 def mconfig_metadata(self):
181 """Return the metadata of the managed config"""
182 return self._mconfig_metadata
183
184 @property
185 def mconfig_manager(self):
186 """Return the mconfig manager for this service"""
187 return self._mconfig_manager
188
189 def reload_config(self):
190 """Reload the local config for the service"""
191 try:
192 self._config = load_service_config(self._name)
193 self._setup_logging()
194 except LoadConfigError as e:
195 logging.warning(e)
196
197 def reload_mconfig(self):
198 """Reload the managed config for the service"""
199 try:
200 # reload mconfig manager in case feature flag for streaming changed
201 self._mconfig_manager = get_mconfig_manager()
202 self._mconfig = self._mconfig_manager.load_service_mconfig(
203 self._name,
204 self._mconfig,
205 )
206 self._mconfig_metadata = \
207 self._mconfig_manager.load_mconfig_metadata()
208 except LoadConfigError as e:
209 logging.warning(e)
210
211 def add_operational_states(self, states: List[State]):
212 """Add a list of states into the service
213
214 Args:
215 states (List[State]): [description]
216 """
217 self._operational_states.extend(states)
218
219 def run(self):
220 """
221 Start the service and runs the event loop until a term signal
222 is received or a StopService rpc call is made on the Service303
223 interface.
224 """
Wei-Yu Chen678f0a52021-12-21 13:50:52 +0800225
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800226 logging.info("Starting %s...", self._name)
227 (host, port) = ServiceRegistry.get_service_address(self._name)
228 self._port = self._server.add_insecure_port('{}:{}'.format(host, port))
229 logging.info("Listening on address %s:%d", host, self._port)
230 self._state = ServiceInfo.ALIVE
231 # Python services are healthy immediately when run
232 self._health = ServiceInfo.APP_HEALTHY
233 self._server.start()
234 self._loop.run_forever()
235 # Waiting for the term signal or StopService rpc call
236
237 def close(self):
238 """
239 Clean up the service before termination. This needs to be
240 called atleast once after the service has been inited.
241 """
242 self._loop.close()
243 self._server.stop(0).wait()
244
245 def register_get_status_callback(self, get_status_callback):
246 """Register function for getting status
247
248 Must return a map(string, string)
249 """
250 self._get_status_callback = get_status_callback
251
252 def register_operational_states_callback(self, get_operational_states_cb):
253 """Register the callback function that gets called on GetOperationalStates rpc
254
255 Args:
256 get_operational_states_cb ([type]): callback function
257 """
258 self._get_operational_states_cb = get_operational_states_cb
259
260 def _stop(self, reason):
261 """Stop the service gracefully"""
262 logging.info("Stopping %s with reason %s...", self._name, reason)
263 self._state = ServiceInfo.STOPPING
264 self._server.stop(0)
265
Wei-Yu Chenad55cb82022-02-15 20:07:01 +0800266 try:
267 asyncio_all_tasks = asyncio.all_tasks
268 except AttributeError as e:
269 asyncio_all_tasks = asyncio.Task.all_tasks
270
271 for pending_task in asyncio_all_tasks(self._loop):
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800272 pending_task.cancel()
273
274 self._state = ServiceInfo.STOPPED
275 self._health = ServiceInfo.APP_UNHEALTHY
276 asyncio.ensure_future(loop_exit())
277
278 def _set_grpc_poll_strategy(self):
279 """
280 The new default 'epollex' poll strategy is causing fd leaks, leading to
281 service crashes after 1024 open fds.
282 See https://github.com/grpc/grpc/issues/15759
283 """
284 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll'
285
286 def _get_log_level_from_config(self) -> Optional[int]:
287 if self._config is None:
288 return None
289 log_level = self._config.get('log_level', None)
290 if log_level is None:
291 return None
292 # convert from log level string to LogLevel enum value
293 try:
294 proto_level = LogLevel.Value(log_level)
295 except ValueError:
296 logging.error(
297 'Unknown logging level in config: %s, ignoring',
298 log_level,
299 )
300 return None
301 return proto_level
302
303 def _get_log_level_from_mconfig(self) -> Optional[int]:
304 if self._mconfig is None:
305 return None
306 return self._mconfig.log_level
307
308 def _setup_logging(self):
309 """Set up log level from config values
310
311 The config file on the AGW takes precedence over the mconfig
312 If neither config file nor mconfig has the log level config, default to INFO
313 """
314 log_level_from_config = self._get_log_level_from_config()
315 log_level_from_mconfig = self._get_log_level_from_mconfig()
316
317 if log_level_from_config is not None:
318 log_level = log_level_from_config
319 elif log_level_from_mconfig is not None:
320 log_level = log_level_from_mconfig
321 else:
322 logging.warning(
323 'logging level is not specified in either yml config '
324 'or mconfig, defaulting to INFO',
325 )
326 log_level = LogLevel.Value('INFO')
Wei-Yu Chen5cbdfbb2021-12-02 01:10:21 +0800327
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800328 self._set_log_level(log_level)
329
330 @staticmethod
331 def _set_log_level(proto_level: int):
332 """Set log level based on proto-enum level
333
334 Args:
335 proto_level (int): proto enum defined in common.proto
336 """
337 if proto_level == LogLevel.Value('DEBUG'):
338 level = logging.DEBUG
339 elif proto_level == LogLevel.Value('INFO'):
340 level = logging.INFO
341 elif proto_level == LogLevel.Value('WARNING'):
342 level = logging.WARNING
343 elif proto_level == LogLevel.Value('ERROR'):
344 level = logging.ERROR
345 elif proto_level == LogLevel.Value('FATAL'):
346 level = logging.FATAL
347 else:
348 logging.error(
349 'Unknown logging level: %d, defaulting to INFO',
350 proto_level,
351 )
352 level = logging.INFO
353
354 logging.info(
355 "Setting logging level to %s",
356 logging.getLevelName(level),
357 )
358 logger = logging.getLogger('')
359 logger.setLevel(level)
360
361 def _register_signal_handlers(self):
362 """Register signal handlers
363
364 Right now we just exit on SIGINT/SIGTERM/SIGQUIT
365 """
366 for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']:
367 self._loop.add_signal_handler(
368 getattr(signal, signame),
369 functools.partial(self._stop, signame),
370 )
371
372 def _signal_handler():
373 logging.info('Handling SIGHUP...')
374 faulthandler.dump_traceback()
375 self._loop.add_signal_handler(
376 signal.SIGHUP, functools.partial(_signal_handler),
377 )
378
379 def GetServiceInfo(self, request, context):
380 """Return the service info (name, version, state, meta, etc.)"""
381 service_info = ServiceInfo(
382 name=self._name,
383 version=self._version,
384 state=self._state,
385 health=self._health,
386 start_time_secs=self._start_time,
387 )
388 if self._get_status_callback is not None:
389 status = self._get_status_callback()
390 try:
391 service_info.status.meta.update(status)
392 except (TypeError, ValueError) as exp:
393 logging.error("Error getting service status: %s", exp)
394 return service_info
395
396 def StopService(self, request, context):
397 """Handle request to stop the service"""
398 logging.info("Request to stop service.")
399 self._loop.call_soon_threadsafe(self._stop, 'RPC')
400 return Void()
401
402 def GetMetrics(self, request, context):
403 """
404 Collects timeseries samples from prometheus python client on this
405 process
406 """
407 metrics = MetricsContainer()
408 metrics.family.extend(get_metrics())
409 return metrics
410
411 def SetLogLevel(self, request, context):
412 """Handle request to set the log level"""
413 self._set_log_level(request.level)
414 return Void()
415
416 def SetLogVerbosity(self, request, context):
417 pass # Not Implemented
418
419 def ReloadServiceConfig(self, request, context):
420 """Handle request to reload the service config file"""
421 self.reload_config()
422 return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS)
423
424 def GetOperationalStates(self, request, context):
425 """Return the operational states of devices managed by this service."""
426 res = GetOperationalStatesResponse()
427 if self._get_operational_states_cb is not None:
428 states = self._get_operational_states_cb()
429 res.states.extend(states)
430 return res
431
432
433def get_service303_client(service_name: str, location: str) \
434 -> Optional[Service303Stub]:
435 """
436 Return a grpc client attached to the given service
437 name and location.
438 Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL)
439 """
440 try:
441 chan = ServiceRegistry.get_rpc_channel(
442 service_name,
443 location,
444 )
445 return Service303Stub(chan)
446 except ValueError:
447 # Service can't be contacted
448 logging.error('Failed to get RPC channel to %s', service_name)
449 return None