Wei-Yu Chen | ad55cb8 | 2022-02-15 20:07:01 +0800 | [diff] [blame] | 1 | # SPDX-FileCopyrightText: 2020 The Magma Authors. |
| 2 | # SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org> |
| 3 | # |
| 4 | # SPDX-License-Identifier: BSD-3-Clause |
Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame] | 5 | |
| 6 | import asyncio |
| 7 | import faulthandler |
| 8 | import functools |
| 9 | import logging |
| 10 | import os |
| 11 | import signal |
| 12 | import time |
| 13 | from concurrent import futures |
| 14 | from typing import Any, Dict, List, Optional |
| 15 | |
| 16 | import grpc |
| 17 | import pkg_resources |
| 18 | from common.log_count_handler import MsgCounterHandler |
| 19 | from common.log_counter import ServiceLogErrorReporter |
| 20 | from common.metrics_export import get_metrics |
| 21 | from common.service_registry import ServiceRegistry |
| 22 | from configuration.exceptions import LoadConfigError |
| 23 | from configuration.mconfig_managers import get_mconfig_manager |
| 24 | from configuration.service_configs import load_service_config |
| 25 | from orc8r.protos.common_pb2 import LogLevel, Void |
| 26 | from orc8r.protos.metricsd_pb2 import MetricsContainer |
| 27 | from orc8r.protos.service303_pb2 import ( |
| 28 | GetOperationalStatesResponse, |
| 29 | ReloadConfigResponse, |
| 30 | ServiceInfo, |
| 31 | State, |
| 32 | ) |
| 33 | from orc8r.protos.service303_pb2_grpc import ( |
| 34 | Service303Servicer, |
| 35 | Service303Stub, |
| 36 | add_Service303Servicer_to_server, |
| 37 | ) |
| 38 | |
| 39 | MAX_DEFAULT_WORKER = 10 |
| 40 | |
| 41 | |
| 42 | async def loop_exit(): |
| 43 | """ |
| 44 | Stop the loop in an async context |
| 45 | """ |
| 46 | loop = asyncio.get_event_loop() |
| 47 | loop.stop() |
| 48 | |
| 49 | |
| 50 | class MagmaService(Service303Servicer): |
| 51 | """ |
| 52 | MagmaService provides the framework for all Magma services. |
| 53 | This class also implements the Service303 interface for external |
| 54 | entities to interact with the service. |
| 55 | """ |
| 56 | |
| 57 | def __init__(self, name, empty_mconfig, loop=None): |
| 58 | self._name = name |
| 59 | self._port = 0 |
| 60 | self._get_status_callback = None |
| 61 | self._get_operational_states_cb = None |
| 62 | self._log_count_handler = MsgCounterHandler() |
| 63 | |
| 64 | # Init logging before doing anything |
| 65 | logging.basicConfig( |
| 66 | level=logging.INFO, |
| 67 | format='[%(asctime)s %(levelname)s %(name)s] %(message)s', |
| 68 | ) |
| 69 | # Add a handler to count errors |
| 70 | logging.root.addHandler(self._log_count_handler) |
| 71 | |
| 72 | # Set gRPC polling strategy |
| 73 | self._set_grpc_poll_strategy() |
| 74 | |
| 75 | # Load the managed config if present |
| 76 | self._mconfig = empty_mconfig |
| 77 | self._mconfig_metadata = None |
| 78 | self._mconfig_manager = get_mconfig_manager() |
| 79 | self.reload_mconfig() |
| 80 | |
| 81 | self._state = ServiceInfo.STARTING |
| 82 | self._health = ServiceInfo.APP_UNHEALTHY |
| 83 | if loop is None: |
| 84 | loop = asyncio.get_event_loop() |
| 85 | self._loop = loop |
| 86 | self._start_time = int(time.time()) |
| 87 | self._register_signal_handlers() |
| 88 | |
| 89 | # Load the service config if present |
| 90 | self._config = None |
| 91 | self.reload_config() |
| 92 | |
| 93 | # Count errors |
| 94 | self.log_counter = ServiceLogErrorReporter( |
| 95 | loop=self._loop, |
| 96 | service_config=self._config, |
| 97 | handler=self._log_count_handler, |
| 98 | ) |
| 99 | self.log_counter.start() |
| 100 | |
| 101 | # Operational States |
| 102 | self._operational_states = [] |
| 103 | |
| 104 | self._version = '0.0.0' |
| 105 | # Load the service version if available |
| 106 | try: |
| 107 | # Check if service on docker |
| 108 | if self._config and 'init_system' in self._config \ |
| 109 | and self._config['init_system'] == 'docker': |
| 110 | # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n" |
| 111 | # Skip the "feg_gateway_python:" part |
| 112 | image = os.popen( |
| 113 | 'docker ps --filter name=magmad --format "{{.Image}}" | ' |
| 114 | 'cut -d ":" -f 2', |
| 115 | ) |
| 116 | image_tag = image.read().strip('\n') |
| 117 | self._version = image_tag |
| 118 | else: |
| 119 | self._version = pkg_resources.get_distribution('orc8r').version |
| 120 | except pkg_resources.ResolutionError as e: |
| 121 | logging.info(e) |
| 122 | |
| 123 | if self._config and 'grpc_workers' in self._config: |
| 124 | self._server = grpc.server( |
| 125 | futures.ThreadPoolExecutor( |
| 126 | max_workers=self._config['grpc_workers'], |
| 127 | ), |
| 128 | ) |
| 129 | else: |
| 130 | self._server = grpc.server( |
| 131 | futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER), |
| 132 | ) |
| 133 | add_Service303Servicer_to_server(self, self._server) |
| 134 | |
| 135 | @property |
| 136 | def version(self): |
| 137 | """Return the current running version of the Magma service""" |
| 138 | return self._version |
| 139 | |
| 140 | @property |
| 141 | def name(self): |
| 142 | """Return the name of service |
| 143 | |
| 144 | Returns: |
| 145 | tr: name of service |
| 146 | """ |
| 147 | return self._name |
| 148 | |
| 149 | @property |
| 150 | def rpc_server(self): |
| 151 | """Return the RPC server used by the service""" |
| 152 | return self._server |
| 153 | |
| 154 | @property |
| 155 | def port(self): |
| 156 | """Return the listening port of the service""" |
| 157 | return self._port |
| 158 | |
| 159 | @property |
| 160 | def loop(self): |
| 161 | """Return the asyncio event loop used by the service""" |
| 162 | return self._loop |
| 163 | |
| 164 | @property |
| 165 | def state(self): |
| 166 | """Return the state of the service""" |
| 167 | return self._state |
| 168 | |
| 169 | @property |
| 170 | def config(self) -> Dict[str, Any]: |
| 171 | """Return the service config""" |
| 172 | return self._config |
| 173 | |
| 174 | @property |
| 175 | def mconfig(self): |
| 176 | """Return the managed config""" |
| 177 | return self._mconfig |
| 178 | |
| 179 | @property |
| 180 | def mconfig_metadata(self): |
| 181 | """Return the metadata of the managed config""" |
| 182 | return self._mconfig_metadata |
| 183 | |
| 184 | @property |
| 185 | def mconfig_manager(self): |
| 186 | """Return the mconfig manager for this service""" |
| 187 | return self._mconfig_manager |
| 188 | |
| 189 | def reload_config(self): |
| 190 | """Reload the local config for the service""" |
| 191 | try: |
| 192 | self._config = load_service_config(self._name) |
| 193 | self._setup_logging() |
| 194 | except LoadConfigError as e: |
| 195 | logging.warning(e) |
| 196 | |
| 197 | def reload_mconfig(self): |
| 198 | """Reload the managed config for the service""" |
| 199 | try: |
| 200 | # reload mconfig manager in case feature flag for streaming changed |
| 201 | self._mconfig_manager = get_mconfig_manager() |
| 202 | self._mconfig = self._mconfig_manager.load_service_mconfig( |
| 203 | self._name, |
| 204 | self._mconfig, |
| 205 | ) |
| 206 | self._mconfig_metadata = \ |
| 207 | self._mconfig_manager.load_mconfig_metadata() |
| 208 | except LoadConfigError as e: |
| 209 | logging.warning(e) |
| 210 | |
| 211 | def add_operational_states(self, states: List[State]): |
| 212 | """Add a list of states into the service |
| 213 | |
| 214 | Args: |
| 215 | states (List[State]): [description] |
| 216 | """ |
| 217 | self._operational_states.extend(states) |
| 218 | |
| 219 | def run(self): |
| 220 | """ |
| 221 | Start the service and runs the event loop until a term signal |
| 222 | is received or a StopService rpc call is made on the Service303 |
| 223 | interface. |
| 224 | """ |
Wei-Yu Chen | 678f0a5 | 2021-12-21 13:50:52 +0800 | [diff] [blame] | 225 | |
Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame] | 226 | logging.info("Starting %s...", self._name) |
| 227 | (host, port) = ServiceRegistry.get_service_address(self._name) |
| 228 | self._port = self._server.add_insecure_port('{}:{}'.format(host, port)) |
| 229 | logging.info("Listening on address %s:%d", host, self._port) |
| 230 | self._state = ServiceInfo.ALIVE |
| 231 | # Python services are healthy immediately when run |
| 232 | self._health = ServiceInfo.APP_HEALTHY |
| 233 | self._server.start() |
| 234 | self._loop.run_forever() |
| 235 | # Waiting for the term signal or StopService rpc call |
| 236 | |
| 237 | def close(self): |
| 238 | """ |
| 239 | Clean up the service before termination. This needs to be |
| 240 | called atleast once after the service has been inited. |
| 241 | """ |
| 242 | self._loop.close() |
| 243 | self._server.stop(0).wait() |
| 244 | |
| 245 | def register_get_status_callback(self, get_status_callback): |
| 246 | """Register function for getting status |
| 247 | |
| 248 | Must return a map(string, string) |
| 249 | """ |
| 250 | self._get_status_callback = get_status_callback |
| 251 | |
| 252 | def register_operational_states_callback(self, get_operational_states_cb): |
| 253 | """Register the callback function that gets called on GetOperationalStates rpc |
| 254 | |
| 255 | Args: |
| 256 | get_operational_states_cb ([type]): callback function |
| 257 | """ |
| 258 | self._get_operational_states_cb = get_operational_states_cb |
| 259 | |
| 260 | def _stop(self, reason): |
| 261 | """Stop the service gracefully""" |
| 262 | logging.info("Stopping %s with reason %s...", self._name, reason) |
| 263 | self._state = ServiceInfo.STOPPING |
| 264 | self._server.stop(0) |
| 265 | |
Wei-Yu Chen | ad55cb8 | 2022-02-15 20:07:01 +0800 | [diff] [blame] | 266 | try: |
| 267 | asyncio_all_tasks = asyncio.all_tasks |
| 268 | except AttributeError as e: |
| 269 | asyncio_all_tasks = asyncio.Task.all_tasks |
| 270 | |
| 271 | for pending_task in asyncio_all_tasks(self._loop): |
Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame] | 272 | pending_task.cancel() |
| 273 | |
| 274 | self._state = ServiceInfo.STOPPED |
| 275 | self._health = ServiceInfo.APP_UNHEALTHY |
| 276 | asyncio.ensure_future(loop_exit()) |
| 277 | |
| 278 | def _set_grpc_poll_strategy(self): |
| 279 | """ |
| 280 | The new default 'epollex' poll strategy is causing fd leaks, leading to |
| 281 | service crashes after 1024 open fds. |
| 282 | See https://github.com/grpc/grpc/issues/15759 |
| 283 | """ |
| 284 | os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll' |
| 285 | |
| 286 | def _get_log_level_from_config(self) -> Optional[int]: |
| 287 | if self._config is None: |
| 288 | return None |
| 289 | log_level = self._config.get('log_level', None) |
| 290 | if log_level is None: |
| 291 | return None |
| 292 | # convert from log level string to LogLevel enum value |
| 293 | try: |
| 294 | proto_level = LogLevel.Value(log_level) |
| 295 | except ValueError: |
| 296 | logging.error( |
| 297 | 'Unknown logging level in config: %s, ignoring', |
| 298 | log_level, |
| 299 | ) |
| 300 | return None |
| 301 | return proto_level |
| 302 | |
| 303 | def _get_log_level_from_mconfig(self) -> Optional[int]: |
| 304 | if self._mconfig is None: |
| 305 | return None |
| 306 | return self._mconfig.log_level |
| 307 | |
| 308 | def _setup_logging(self): |
| 309 | """Set up log level from config values |
| 310 | |
| 311 | The config file on the AGW takes precedence over the mconfig |
| 312 | If neither config file nor mconfig has the log level config, default to INFO |
| 313 | """ |
| 314 | log_level_from_config = self._get_log_level_from_config() |
| 315 | log_level_from_mconfig = self._get_log_level_from_mconfig() |
| 316 | |
| 317 | if log_level_from_config is not None: |
| 318 | log_level = log_level_from_config |
| 319 | elif log_level_from_mconfig is not None: |
| 320 | log_level = log_level_from_mconfig |
| 321 | else: |
| 322 | logging.warning( |
| 323 | 'logging level is not specified in either yml config ' |
| 324 | 'or mconfig, defaulting to INFO', |
| 325 | ) |
| 326 | log_level = LogLevel.Value('INFO') |
Wei-Yu Chen | 5cbdfbb | 2021-12-02 01:10:21 +0800 | [diff] [blame] | 327 | |
Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame] | 328 | self._set_log_level(log_level) |
| 329 | |
| 330 | @staticmethod |
| 331 | def _set_log_level(proto_level: int): |
| 332 | """Set log level based on proto-enum level |
| 333 | |
| 334 | Args: |
| 335 | proto_level (int): proto enum defined in common.proto |
| 336 | """ |
| 337 | if proto_level == LogLevel.Value('DEBUG'): |
| 338 | level = logging.DEBUG |
| 339 | elif proto_level == LogLevel.Value('INFO'): |
| 340 | level = logging.INFO |
| 341 | elif proto_level == LogLevel.Value('WARNING'): |
| 342 | level = logging.WARNING |
| 343 | elif proto_level == LogLevel.Value('ERROR'): |
| 344 | level = logging.ERROR |
| 345 | elif proto_level == LogLevel.Value('FATAL'): |
| 346 | level = logging.FATAL |
| 347 | else: |
| 348 | logging.error( |
| 349 | 'Unknown logging level: %d, defaulting to INFO', |
| 350 | proto_level, |
| 351 | ) |
| 352 | level = logging.INFO |
| 353 | |
| 354 | logging.info( |
| 355 | "Setting logging level to %s", |
| 356 | logging.getLevelName(level), |
| 357 | ) |
| 358 | logger = logging.getLogger('') |
| 359 | logger.setLevel(level) |
| 360 | |
| 361 | def _register_signal_handlers(self): |
| 362 | """Register signal handlers |
| 363 | |
| 364 | Right now we just exit on SIGINT/SIGTERM/SIGQUIT |
| 365 | """ |
| 366 | for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']: |
| 367 | self._loop.add_signal_handler( |
| 368 | getattr(signal, signame), |
| 369 | functools.partial(self._stop, signame), |
| 370 | ) |
| 371 | |
| 372 | def _signal_handler(): |
| 373 | logging.info('Handling SIGHUP...') |
| 374 | faulthandler.dump_traceback() |
| 375 | self._loop.add_signal_handler( |
| 376 | signal.SIGHUP, functools.partial(_signal_handler), |
| 377 | ) |
| 378 | |
| 379 | def GetServiceInfo(self, request, context): |
| 380 | """Return the service info (name, version, state, meta, etc.)""" |
| 381 | service_info = ServiceInfo( |
| 382 | name=self._name, |
| 383 | version=self._version, |
| 384 | state=self._state, |
| 385 | health=self._health, |
| 386 | start_time_secs=self._start_time, |
| 387 | ) |
| 388 | if self._get_status_callback is not None: |
| 389 | status = self._get_status_callback() |
| 390 | try: |
| 391 | service_info.status.meta.update(status) |
| 392 | except (TypeError, ValueError) as exp: |
| 393 | logging.error("Error getting service status: %s", exp) |
| 394 | return service_info |
| 395 | |
| 396 | def StopService(self, request, context): |
| 397 | """Handle request to stop the service""" |
| 398 | logging.info("Request to stop service.") |
| 399 | self._loop.call_soon_threadsafe(self._stop, 'RPC') |
| 400 | return Void() |
| 401 | |
| 402 | def GetMetrics(self, request, context): |
| 403 | """ |
| 404 | Collects timeseries samples from prometheus python client on this |
| 405 | process |
| 406 | """ |
| 407 | metrics = MetricsContainer() |
| 408 | metrics.family.extend(get_metrics()) |
| 409 | return metrics |
| 410 | |
| 411 | def SetLogLevel(self, request, context): |
| 412 | """Handle request to set the log level""" |
| 413 | self._set_log_level(request.level) |
| 414 | return Void() |
| 415 | |
| 416 | def SetLogVerbosity(self, request, context): |
| 417 | pass # Not Implemented |
| 418 | |
| 419 | def ReloadServiceConfig(self, request, context): |
| 420 | """Handle request to reload the service config file""" |
| 421 | self.reload_config() |
| 422 | return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS) |
| 423 | |
| 424 | def GetOperationalStates(self, request, context): |
| 425 | """Return the operational states of devices managed by this service.""" |
| 426 | res = GetOperationalStatesResponse() |
| 427 | if self._get_operational_states_cb is not None: |
| 428 | states = self._get_operational_states_cb() |
| 429 | res.states.extend(states) |
| 430 | return res |
| 431 | |
| 432 | |
| 433 | def get_service303_client(service_name: str, location: str) \ |
| 434 | -> Optional[Service303Stub]: |
| 435 | """ |
| 436 | Return a grpc client attached to the given service |
| 437 | name and location. |
| 438 | Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL) |
| 439 | """ |
| 440 | try: |
| 441 | chan = ServiceRegistry.get_rpc_channel( |
| 442 | service_name, |
| 443 | location, |
| 444 | ) |
| 445 | return Service303Stub(chan) |
| 446 | except ValueError: |
| 447 | # Service can't be contacted |
| 448 | logging.error('Failed to get RPC channel to %s', service_name) |
| 449 | return None |