blob: 33dc2dd76756675562ee272e9435e047591e76e8 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import asyncio
7import faulthandler
8import functools
9import logging
10import os
11import signal
12import time
13from concurrent import futures
14from typing import Any, Dict, List, Optional
15
16import grpc
17import pkg_resources
18from common.log_count_handler import MsgCounterHandler
19from common.log_counter import ServiceLogErrorReporter
20from common.metrics_export import get_metrics
21from common.service_registry import ServiceRegistry
22from configuration.exceptions import LoadConfigError
23from configuration.mconfig_managers import get_mconfig_manager
24from configuration.service_configs import load_service_config
25from orc8r.protos.common_pb2 import LogLevel, Void
26from orc8r.protos.metricsd_pb2 import MetricsContainer
27from orc8r.protos.service303_pb2 import (
28 GetOperationalStatesResponse,
29 ReloadConfigResponse,
30 ServiceInfo,
31 State,
32)
33from orc8r.protos.service303_pb2_grpc import (
34 Service303Servicer,
35 Service303Stub,
36 add_Service303Servicer_to_server,
37)
38
39MAX_DEFAULT_WORKER = 10
40
41
42async def loop_exit():
43 """
44 Stop the loop in an async context
45 """
46 loop = asyncio.get_event_loop()
47 loop.stop()
48
49
50class MagmaService(Service303Servicer):
51 """
52 MagmaService provides the framework for all Magma services.
53 This class also implements the Service303 interface for external
54 entities to interact with the service.
55 """
56
57 def __init__(self, name, empty_mconfig, loop=None):
58 self._name = name
59 self._port = 0
60 self._get_status_callback = None
61 self._get_operational_states_cb = None
62 self._log_count_handler = MsgCounterHandler()
63
64 # Init logging before doing anything
65 logging.basicConfig(
66 level=logging.INFO,
67 format='[%(asctime)s %(levelname)s %(name)s] %(message)s',
68 )
69 # Add a handler to count errors
70 logging.root.addHandler(self._log_count_handler)
71
72 # Set gRPC polling strategy
73 self._set_grpc_poll_strategy()
74
75 # Load the managed config if present
76 self._mconfig = empty_mconfig
77 self._mconfig_metadata = None
78 self._mconfig_manager = get_mconfig_manager()
79 self.reload_mconfig()
80
81 self._state = ServiceInfo.STARTING
82 self._health = ServiceInfo.APP_UNHEALTHY
83 if loop is None:
84 loop = asyncio.get_event_loop()
85 self._loop = loop
86 self._start_time = int(time.time())
87 self._register_signal_handlers()
88
89 # Load the service config if present
90 self._config = None
91 self.reload_config()
92
93 # Count errors
94 self.log_counter = ServiceLogErrorReporter(
95 loop=self._loop,
96 service_config=self._config,
97 handler=self._log_count_handler,
98 )
99 self.log_counter.start()
100
101 # Operational States
102 self._operational_states = []
103
104 self._version = '0.0.0'
105 # Load the service version if available
106 try:
107 # Check if service on docker
108 if self._config and 'init_system' in self._config \
109 and self._config['init_system'] == 'docker':
110 # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n"
111 # Skip the "feg_gateway_python:" part
112 image = os.popen(
113 'docker ps --filter name=magmad --format "{{.Image}}" | '
114 'cut -d ":" -f 2',
115 )
116 image_tag = image.read().strip('\n')
117 self._version = image_tag
118 else:
119 self._version = pkg_resources.get_distribution('orc8r').version
120 except pkg_resources.ResolutionError as e:
121 logging.info(e)
122
123 if self._config and 'grpc_workers' in self._config:
124 self._server = grpc.server(
125 futures.ThreadPoolExecutor(
126 max_workers=self._config['grpc_workers'],
127 ),
128 )
129 else:
130 self._server = grpc.server(
131 futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER),
132 )
133 add_Service303Servicer_to_server(self, self._server)
134
135 @property
136 def version(self):
137 """Return the current running version of the Magma service"""
138 return self._version
139
140 @property
141 def name(self):
142 """Return the name of service
143
144 Returns:
145 tr: name of service
146 """
147 return self._name
148
149 @property
150 def rpc_server(self):
151 """Return the RPC server used by the service"""
152 return self._server
153
154 @property
155 def port(self):
156 """Return the listening port of the service"""
157 return self._port
158
159 @property
160 def loop(self):
161 """Return the asyncio event loop used by the service"""
162 return self._loop
163
164 @property
165 def state(self):
166 """Return the state of the service"""
167 return self._state
168
169 @property
170 def config(self) -> Dict[str, Any]:
171 """Return the service config"""
172 return self._config
173
174 @property
175 def mconfig(self):
176 """Return the managed config"""
177 return self._mconfig
178
179 @property
180 def mconfig_metadata(self):
181 """Return the metadata of the managed config"""
182 return self._mconfig_metadata
183
184 @property
185 def mconfig_manager(self):
186 """Return the mconfig manager for this service"""
187 return self._mconfig_manager
188
189 def reload_config(self):
190 """Reload the local config for the service"""
191 try:
192 self._config = load_service_config(self._name)
193 self._setup_logging()
194 except LoadConfigError as e:
195 logging.warning(e)
196
197 def reload_mconfig(self):
198 """Reload the managed config for the service"""
199 try:
200 # reload mconfig manager in case feature flag for streaming changed
201 self._mconfig_manager = get_mconfig_manager()
202 self._mconfig = self._mconfig_manager.load_service_mconfig(
203 self._name,
204 self._mconfig,
205 )
206 self._mconfig_metadata = \
207 self._mconfig_manager.load_mconfig_metadata()
208 except LoadConfigError as e:
209 logging.warning(e)
210
211 def add_operational_states(self, states: List[State]):
212 """Add a list of states into the service
213
214 Args:
215 states (List[State]): [description]
216 """
217 self._operational_states.extend(states)
218
219 def run(self):
220 """
221 Start the service and runs the event loop until a term signal
222 is received or a StopService rpc call is made on the Service303
223 interface.
224 """
Wei-Yu Chen678f0a52021-12-21 13:50:52 +0800225
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800226 logging.info("Starting %s...", self._name)
227 (host, port) = ServiceRegistry.get_service_address(self._name)
228 self._port = self._server.add_insecure_port('{}:{}'.format(host, port))
229 logging.info("Listening on address %s:%d", host, self._port)
230 self._state = ServiceInfo.ALIVE
231 # Python services are healthy immediately when run
232 self._health = ServiceInfo.APP_HEALTHY
233 self._server.start()
234 self._loop.run_forever()
235 # Waiting for the term signal or StopService rpc call
236
237 def close(self):
238 """
239 Clean up the service before termination. This needs to be
240 called atleast once after the service has been inited.
241 """
242 self._loop.close()
243 self._server.stop(0).wait()
244
245 def register_get_status_callback(self, get_status_callback):
246 """Register function for getting status
247
248 Must return a map(string, string)
249 """
250 self._get_status_callback = get_status_callback
251
252 def register_operational_states_callback(self, get_operational_states_cb):
253 """Register the callback function that gets called on GetOperationalStates rpc
254
255 Args:
256 get_operational_states_cb ([type]): callback function
257 """
258 self._get_operational_states_cb = get_operational_states_cb
259
260 def _stop(self, reason):
261 """Stop the service gracefully"""
262 logging.info("Stopping %s with reason %s...", self._name, reason)
263 self._state = ServiceInfo.STOPPING
264 self._server.stop(0)
265
Wei-Yu Chenad55cb82022-02-15 20:07:01 +0800266 try:
267 asyncio_all_tasks = asyncio.all_tasks
268 except AttributeError as e:
269 asyncio_all_tasks = asyncio.Task.all_tasks
270
271 for pending_task in asyncio_all_tasks(self._loop):
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800272 pending_task.cancel()
273
274 self._state = ServiceInfo.STOPPED
275 self._health = ServiceInfo.APP_UNHEALTHY
276 asyncio.ensure_future(loop_exit())
277
278 def _set_grpc_poll_strategy(self):
279 """
280 The new default 'epollex' poll strategy is causing fd leaks, leading to
281 service crashes after 1024 open fds.
282 See https://github.com/grpc/grpc/issues/15759
283 """
284 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll'
285
286 def _get_log_level_from_config(self) -> Optional[int]:
287 if self._config is None:
288 return None
289 log_level = self._config.get('log_level', None)
290 if log_level is None:
291 return None
292 # convert from log level string to LogLevel enum value
293 try:
294 proto_level = LogLevel.Value(log_level)
295 except ValueError:
296 logging.error(
297 'Unknown logging level in config: %s, ignoring',
298 log_level,
299 )
300 return None
301 return proto_level
302
303 def _get_log_level_from_mconfig(self) -> Optional[int]:
304 if self._mconfig is None:
305 return None
306 return self._mconfig.log_level
307
308 def _setup_logging(self):
309 """Set up log level from config values
310
311 The config file on the AGW takes precedence over the mconfig
312 If neither config file nor mconfig has the log level config, default to INFO
313 """
314 log_level_from_config = self._get_log_level_from_config()
315 log_level_from_mconfig = self._get_log_level_from_mconfig()
316
317 if log_level_from_config is not None:
318 log_level = log_level_from_config
319 elif log_level_from_mconfig is not None:
320 log_level = log_level_from_mconfig
321 else:
322 logging.warning(
323 'logging level is not specified in either yml config '
324 'or mconfig, defaulting to INFO',
325 )
326 log_level = LogLevel.Value('INFO')
Wei-Yu Chen5cbdfbb2021-12-02 01:10:21 +0800327
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800328 self._set_log_level(log_level)
329
330 @staticmethod
331 def _set_log_level(proto_level: int):
332 """Set log level based on proto-enum level
333
334 Args:
335 proto_level (int): proto enum defined in common.proto
336 """
337 if proto_level == LogLevel.Value('DEBUG'):
338 level = logging.DEBUG
339 elif proto_level == LogLevel.Value('INFO'):
340 level = logging.INFO
341 elif proto_level == LogLevel.Value('WARNING'):
342 level = logging.WARNING
343 elif proto_level == LogLevel.Value('ERROR'):
344 level = logging.ERROR
345 elif proto_level == LogLevel.Value('FATAL'):
346 level = logging.FATAL
347 else:
348 logging.error(
349 'Unknown logging level: %d, defaulting to INFO',
350 proto_level,
351 )
352 level = logging.INFO
353
354 logging.info(
355 "Setting logging level to %s",
356 logging.getLevelName(level),
357 )
Wei-Yu Chen31ebdb52022-06-27 16:53:11 +0800358
359 # Depress the spyne log as INFO, avoid the debugging messages from enodebd service
360 logging.getLogger('spyne').setLevel(logging.INFO)
361
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800362 logger = logging.getLogger('')
363 logger.setLevel(level)
364
365 def _register_signal_handlers(self):
366 """Register signal handlers
367
368 Right now we just exit on SIGINT/SIGTERM/SIGQUIT
369 """
370 for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']:
371 self._loop.add_signal_handler(
372 getattr(signal, signame),
373 functools.partial(self._stop, signame),
374 )
375
376 def _signal_handler():
377 logging.info('Handling SIGHUP...')
378 faulthandler.dump_traceback()
379 self._loop.add_signal_handler(
380 signal.SIGHUP, functools.partial(_signal_handler),
381 )
382
383 def GetServiceInfo(self, request, context):
384 """Return the service info (name, version, state, meta, etc.)"""
385 service_info = ServiceInfo(
386 name=self._name,
387 version=self._version,
388 state=self._state,
389 health=self._health,
390 start_time_secs=self._start_time,
391 )
392 if self._get_status_callback is not None:
393 status = self._get_status_callback()
394 try:
395 service_info.status.meta.update(status)
396 except (TypeError, ValueError) as exp:
397 logging.error("Error getting service status: %s", exp)
398 return service_info
399
400 def StopService(self, request, context):
401 """Handle request to stop the service"""
402 logging.info("Request to stop service.")
403 self._loop.call_soon_threadsafe(self._stop, 'RPC')
404 return Void()
405
406 def GetMetrics(self, request, context):
407 """
408 Collects timeseries samples from prometheus python client on this
409 process
410 """
411 metrics = MetricsContainer()
412 metrics.family.extend(get_metrics())
413 return metrics
414
415 def SetLogLevel(self, request, context):
416 """Handle request to set the log level"""
417 self._set_log_level(request.level)
418 return Void()
419
420 def SetLogVerbosity(self, request, context):
421 pass # Not Implemented
422
423 def ReloadServiceConfig(self, request, context):
424 """Handle request to reload the service config file"""
425 self.reload_config()
426 return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS)
427
428 def GetOperationalStates(self, request, context):
429 """Return the operational states of devices managed by this service."""
430 res = GetOperationalStatesResponse()
431 if self._get_operational_states_cb is not None:
432 states = self._get_operational_states_cb()
433 res.states.extend(states)
434 return res
435
436
437def get_service303_client(service_name: str, location: str) \
438 -> Optional[Service303Stub]:
439 """
440 Return a grpc client attached to the given service
441 name and location.
442 Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL)
443 """
444 try:
445 chan = ServiceRegistry.get_rpc_channel(
446 service_name,
447 location,
448 )
449 return Service303Stub(chan)
450 except ValueError:
451 # Service can't be contacted
452 logging.error('Failed to get RPC channel to %s', service_name)
453 return None