Wei-Yu Chen | 49950b9 | 2021-11-08 19:19:18 +0800 | [diff] [blame^] | 1 | """ |
| 2 | Copyright 2020 The Magma Authors. |
| 3 | |
| 4 | This source code is licensed under the BSD-style license found in the |
| 5 | LICENSE file in the root directory of this source tree. |
| 6 | |
| 7 | Unless required by applicable law or agreed to in writing, software |
| 8 | distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | See the License for the specific language governing permissions and |
| 11 | limitations under the License. |
| 12 | """ |
| 13 | |
| 14 | import asyncio |
| 15 | import faulthandler |
| 16 | import functools |
| 17 | import logging |
| 18 | import os |
| 19 | import signal |
| 20 | import time |
| 21 | from concurrent import futures |
| 22 | from typing import Any, Dict, List, Optional |
| 23 | |
| 24 | import grpc |
| 25 | import pkg_resources |
| 26 | from common.log_count_handler import MsgCounterHandler |
| 27 | from common.log_counter import ServiceLogErrorReporter |
| 28 | from common.metrics_export import get_metrics |
| 29 | from common.service_registry import ServiceRegistry |
| 30 | from configuration.exceptions import LoadConfigError |
| 31 | from configuration.mconfig_managers import get_mconfig_manager |
| 32 | from configuration.service_configs import load_service_config |
| 33 | from orc8r.protos.common_pb2 import LogLevel, Void |
| 34 | from orc8r.protos.metricsd_pb2 import MetricsContainer |
| 35 | from orc8r.protos.service303_pb2 import ( |
| 36 | GetOperationalStatesResponse, |
| 37 | ReloadConfigResponse, |
| 38 | ServiceInfo, |
| 39 | State, |
| 40 | ) |
| 41 | from orc8r.protos.service303_pb2_grpc import ( |
| 42 | Service303Servicer, |
| 43 | Service303Stub, |
| 44 | add_Service303Servicer_to_server, |
| 45 | ) |
| 46 | |
| 47 | MAX_DEFAULT_WORKER = 10 |
| 48 | |
| 49 | |
| 50 | async def loop_exit(): |
| 51 | """ |
| 52 | Stop the loop in an async context |
| 53 | """ |
| 54 | loop = asyncio.get_event_loop() |
| 55 | loop.stop() |
| 56 | |
| 57 | |
| 58 | class MagmaService(Service303Servicer): |
| 59 | """ |
| 60 | MagmaService provides the framework for all Magma services. |
| 61 | This class also implements the Service303 interface for external |
| 62 | entities to interact with the service. |
| 63 | """ |
| 64 | |
| 65 | def __init__(self, name, empty_mconfig, loop=None): |
| 66 | self._name = name |
| 67 | self._port = 0 |
| 68 | self._get_status_callback = None |
| 69 | self._get_operational_states_cb = None |
| 70 | self._log_count_handler = MsgCounterHandler() |
| 71 | |
| 72 | # Init logging before doing anything |
| 73 | logging.basicConfig( |
| 74 | level=logging.INFO, |
| 75 | format='[%(asctime)s %(levelname)s %(name)s] %(message)s', |
| 76 | ) |
| 77 | # Add a handler to count errors |
| 78 | logging.root.addHandler(self._log_count_handler) |
| 79 | |
| 80 | # Set gRPC polling strategy |
| 81 | self._set_grpc_poll_strategy() |
| 82 | |
| 83 | # Load the managed config if present |
| 84 | self._mconfig = empty_mconfig |
| 85 | self._mconfig_metadata = None |
| 86 | self._mconfig_manager = get_mconfig_manager() |
| 87 | self.reload_mconfig() |
| 88 | |
| 89 | self._state = ServiceInfo.STARTING |
| 90 | self._health = ServiceInfo.APP_UNHEALTHY |
| 91 | if loop is None: |
| 92 | loop = asyncio.get_event_loop() |
| 93 | self._loop = loop |
| 94 | self._start_time = int(time.time()) |
| 95 | self._register_signal_handlers() |
| 96 | |
| 97 | # Load the service config if present |
| 98 | self._config = None |
| 99 | self.reload_config() |
| 100 | |
| 101 | # Count errors |
| 102 | self.log_counter = ServiceLogErrorReporter( |
| 103 | loop=self._loop, |
| 104 | service_config=self._config, |
| 105 | handler=self._log_count_handler, |
| 106 | ) |
| 107 | self.log_counter.start() |
| 108 | |
| 109 | # Operational States |
| 110 | self._operational_states = [] |
| 111 | |
| 112 | self._version = '0.0.0' |
| 113 | # Load the service version if available |
| 114 | try: |
| 115 | # Check if service on docker |
| 116 | if self._config and 'init_system' in self._config \ |
| 117 | and self._config['init_system'] == 'docker': |
| 118 | # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n" |
| 119 | # Skip the "feg_gateway_python:" part |
| 120 | image = os.popen( |
| 121 | 'docker ps --filter name=magmad --format "{{.Image}}" | ' |
| 122 | 'cut -d ":" -f 2', |
| 123 | ) |
| 124 | image_tag = image.read().strip('\n') |
| 125 | self._version = image_tag |
| 126 | else: |
| 127 | self._version = pkg_resources.get_distribution('orc8r').version |
| 128 | except pkg_resources.ResolutionError as e: |
| 129 | logging.info(e) |
| 130 | |
| 131 | if self._config and 'grpc_workers' in self._config: |
| 132 | self._server = grpc.server( |
| 133 | futures.ThreadPoolExecutor( |
| 134 | max_workers=self._config['grpc_workers'], |
| 135 | ), |
| 136 | ) |
| 137 | else: |
| 138 | self._server = grpc.server( |
| 139 | futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER), |
| 140 | ) |
| 141 | add_Service303Servicer_to_server(self, self._server) |
| 142 | |
| 143 | @property |
| 144 | def version(self): |
| 145 | """Return the current running version of the Magma service""" |
| 146 | return self._version |
| 147 | |
| 148 | @property |
| 149 | def name(self): |
| 150 | """Return the name of service |
| 151 | |
| 152 | Returns: |
| 153 | tr: name of service |
| 154 | """ |
| 155 | return self._name |
| 156 | |
| 157 | @property |
| 158 | def rpc_server(self): |
| 159 | """Return the RPC server used by the service""" |
| 160 | return self._server |
| 161 | |
| 162 | @property |
| 163 | def port(self): |
| 164 | """Return the listening port of the service""" |
| 165 | return self._port |
| 166 | |
| 167 | @property |
| 168 | def loop(self): |
| 169 | """Return the asyncio event loop used by the service""" |
| 170 | return self._loop |
| 171 | |
| 172 | @property |
| 173 | def state(self): |
| 174 | """Return the state of the service""" |
| 175 | return self._state |
| 176 | |
| 177 | @property |
| 178 | def config(self) -> Dict[str, Any]: |
| 179 | """Return the service config""" |
| 180 | return self._config |
| 181 | |
| 182 | @property |
| 183 | def mconfig(self): |
| 184 | """Return the managed config""" |
| 185 | return self._mconfig |
| 186 | |
| 187 | @property |
| 188 | def mconfig_metadata(self): |
| 189 | """Return the metadata of the managed config""" |
| 190 | return self._mconfig_metadata |
| 191 | |
| 192 | @property |
| 193 | def mconfig_manager(self): |
| 194 | """Return the mconfig manager for this service""" |
| 195 | return self._mconfig_manager |
| 196 | |
| 197 | def reload_config(self): |
| 198 | """Reload the local config for the service""" |
| 199 | try: |
| 200 | self._config = load_service_config(self._name) |
| 201 | self._setup_logging() |
| 202 | except LoadConfigError as e: |
| 203 | logging.warning(e) |
| 204 | |
| 205 | def reload_mconfig(self): |
| 206 | """Reload the managed config for the service""" |
| 207 | try: |
| 208 | # reload mconfig manager in case feature flag for streaming changed |
| 209 | self._mconfig_manager = get_mconfig_manager() |
| 210 | self._mconfig = self._mconfig_manager.load_service_mconfig( |
| 211 | self._name, |
| 212 | self._mconfig, |
| 213 | ) |
| 214 | self._mconfig_metadata = \ |
| 215 | self._mconfig_manager.load_mconfig_metadata() |
| 216 | except LoadConfigError as e: |
| 217 | logging.warning(e) |
| 218 | |
| 219 | def add_operational_states(self, states: List[State]): |
| 220 | """Add a list of states into the service |
| 221 | |
| 222 | Args: |
| 223 | states (List[State]): [description] |
| 224 | """ |
| 225 | self._operational_states.extend(states) |
| 226 | |
| 227 | def run(self): |
| 228 | """ |
| 229 | Start the service and runs the event loop until a term signal |
| 230 | is received or a StopService rpc call is made on the Service303 |
| 231 | interface. |
| 232 | """ |
| 233 | logging.info("Starting %s...", self._name) |
| 234 | (host, port) = ServiceRegistry.get_service_address(self._name) |
| 235 | self._port = self._server.add_insecure_port('{}:{}'.format(host, port)) |
| 236 | logging.info("Listening on address %s:%d", host, self._port) |
| 237 | self._state = ServiceInfo.ALIVE |
| 238 | # Python services are healthy immediately when run |
| 239 | self._health = ServiceInfo.APP_HEALTHY |
| 240 | self._server.start() |
| 241 | self._loop.run_forever() |
| 242 | # Waiting for the term signal or StopService rpc call |
| 243 | |
| 244 | def close(self): |
| 245 | """ |
| 246 | Clean up the service before termination. This needs to be |
| 247 | called atleast once after the service has been inited. |
| 248 | """ |
| 249 | self._loop.close() |
| 250 | self._server.stop(0).wait() |
| 251 | |
| 252 | def register_get_status_callback(self, get_status_callback): |
| 253 | """Register function for getting status |
| 254 | |
| 255 | Must return a map(string, string) |
| 256 | """ |
| 257 | self._get_status_callback = get_status_callback |
| 258 | |
| 259 | def register_operational_states_callback(self, get_operational_states_cb): |
| 260 | """Register the callback function that gets called on GetOperationalStates rpc |
| 261 | |
| 262 | Args: |
| 263 | get_operational_states_cb ([type]): callback function |
| 264 | """ |
| 265 | self._get_operational_states_cb = get_operational_states_cb |
| 266 | |
| 267 | def _stop(self, reason): |
| 268 | """Stop the service gracefully""" |
| 269 | logging.info("Stopping %s with reason %s...", self._name, reason) |
| 270 | self._state = ServiceInfo.STOPPING |
| 271 | self._server.stop(0) |
| 272 | |
| 273 | for pending_task in asyncio.Task.all_tasks(self._loop): |
| 274 | pending_task.cancel() |
| 275 | |
| 276 | self._state = ServiceInfo.STOPPED |
| 277 | self._health = ServiceInfo.APP_UNHEALTHY |
| 278 | asyncio.ensure_future(loop_exit()) |
| 279 | |
| 280 | def _set_grpc_poll_strategy(self): |
| 281 | """ |
| 282 | The new default 'epollex' poll strategy is causing fd leaks, leading to |
| 283 | service crashes after 1024 open fds. |
| 284 | See https://github.com/grpc/grpc/issues/15759 |
| 285 | """ |
| 286 | os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll' |
| 287 | |
| 288 | def _get_log_level_from_config(self) -> Optional[int]: |
| 289 | if self._config is None: |
| 290 | return None |
| 291 | log_level = self._config.get('log_level', None) |
| 292 | if log_level is None: |
| 293 | return None |
| 294 | # convert from log level string to LogLevel enum value |
| 295 | try: |
| 296 | proto_level = LogLevel.Value(log_level) |
| 297 | except ValueError: |
| 298 | logging.error( |
| 299 | 'Unknown logging level in config: %s, ignoring', |
| 300 | log_level, |
| 301 | ) |
| 302 | return None |
| 303 | return proto_level |
| 304 | |
| 305 | def _get_log_level_from_mconfig(self) -> Optional[int]: |
| 306 | if self._mconfig is None: |
| 307 | return None |
| 308 | return self._mconfig.log_level |
| 309 | |
| 310 | def _setup_logging(self): |
| 311 | """Set up log level from config values |
| 312 | |
| 313 | The config file on the AGW takes precedence over the mconfig |
| 314 | If neither config file nor mconfig has the log level config, default to INFO |
| 315 | """ |
| 316 | log_level_from_config = self._get_log_level_from_config() |
| 317 | log_level_from_mconfig = self._get_log_level_from_mconfig() |
| 318 | |
| 319 | if log_level_from_config is not None: |
| 320 | log_level = log_level_from_config |
| 321 | elif log_level_from_mconfig is not None: |
| 322 | log_level = log_level_from_mconfig |
| 323 | else: |
| 324 | logging.warning( |
| 325 | 'logging level is not specified in either yml config ' |
| 326 | 'or mconfig, defaulting to INFO', |
| 327 | ) |
| 328 | log_level = LogLevel.Value('INFO') |
| 329 | self._set_log_level(log_level) |
| 330 | |
| 331 | @staticmethod |
| 332 | def _set_log_level(proto_level: int): |
| 333 | """Set log level based on proto-enum level |
| 334 | |
| 335 | Args: |
| 336 | proto_level (int): proto enum defined in common.proto |
| 337 | """ |
| 338 | if proto_level == LogLevel.Value('DEBUG'): |
| 339 | level = logging.DEBUG |
| 340 | elif proto_level == LogLevel.Value('INFO'): |
| 341 | level = logging.INFO |
| 342 | elif proto_level == LogLevel.Value('WARNING'): |
| 343 | level = logging.WARNING |
| 344 | elif proto_level == LogLevel.Value('ERROR'): |
| 345 | level = logging.ERROR |
| 346 | elif proto_level == LogLevel.Value('FATAL'): |
| 347 | level = logging.FATAL |
| 348 | else: |
| 349 | logging.error( |
| 350 | 'Unknown logging level: %d, defaulting to INFO', |
| 351 | proto_level, |
| 352 | ) |
| 353 | level = logging.INFO |
| 354 | |
| 355 | logging.info( |
| 356 | "Setting logging level to %s", |
| 357 | logging.getLevelName(level), |
| 358 | ) |
| 359 | logger = logging.getLogger('') |
| 360 | logger.setLevel(level) |
| 361 | |
| 362 | def _register_signal_handlers(self): |
| 363 | """Register signal handlers |
| 364 | |
| 365 | Right now we just exit on SIGINT/SIGTERM/SIGQUIT |
| 366 | """ |
| 367 | for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']: |
| 368 | self._loop.add_signal_handler( |
| 369 | getattr(signal, signame), |
| 370 | functools.partial(self._stop, signame), |
| 371 | ) |
| 372 | |
| 373 | def _signal_handler(): |
| 374 | logging.info('Handling SIGHUP...') |
| 375 | faulthandler.dump_traceback() |
| 376 | self._loop.add_signal_handler( |
| 377 | signal.SIGHUP, functools.partial(_signal_handler), |
| 378 | ) |
| 379 | |
| 380 | def GetServiceInfo(self, request, context): |
| 381 | """Return the service info (name, version, state, meta, etc.)""" |
| 382 | service_info = ServiceInfo( |
| 383 | name=self._name, |
| 384 | version=self._version, |
| 385 | state=self._state, |
| 386 | health=self._health, |
| 387 | start_time_secs=self._start_time, |
| 388 | ) |
| 389 | if self._get_status_callback is not None: |
| 390 | status = self._get_status_callback() |
| 391 | try: |
| 392 | service_info.status.meta.update(status) |
| 393 | except (TypeError, ValueError) as exp: |
| 394 | logging.error("Error getting service status: %s", exp) |
| 395 | return service_info |
| 396 | |
| 397 | def StopService(self, request, context): |
| 398 | """Handle request to stop the service""" |
| 399 | logging.info("Request to stop service.") |
| 400 | self._loop.call_soon_threadsafe(self._stop, 'RPC') |
| 401 | return Void() |
| 402 | |
| 403 | def GetMetrics(self, request, context): |
| 404 | """ |
| 405 | Collects timeseries samples from prometheus python client on this |
| 406 | process |
| 407 | """ |
| 408 | metrics = MetricsContainer() |
| 409 | metrics.family.extend(get_metrics()) |
| 410 | return metrics |
| 411 | |
| 412 | def SetLogLevel(self, request, context): |
| 413 | """Handle request to set the log level""" |
| 414 | self._set_log_level(request.level) |
| 415 | return Void() |
| 416 | |
| 417 | def SetLogVerbosity(self, request, context): |
| 418 | pass # Not Implemented |
| 419 | |
| 420 | def ReloadServiceConfig(self, request, context): |
| 421 | """Handle request to reload the service config file""" |
| 422 | self.reload_config() |
| 423 | return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS) |
| 424 | |
| 425 | def GetOperationalStates(self, request, context): |
| 426 | """Return the operational states of devices managed by this service.""" |
| 427 | res = GetOperationalStatesResponse() |
| 428 | if self._get_operational_states_cb is not None: |
| 429 | states = self._get_operational_states_cb() |
| 430 | res.states.extend(states) |
| 431 | return res |
| 432 | |
| 433 | |
| 434 | def get_service303_client(service_name: str, location: str) \ |
| 435 | -> Optional[Service303Stub]: |
| 436 | """ |
| 437 | Return a grpc client attached to the given service |
| 438 | name and location. |
| 439 | Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL) |
| 440 | """ |
| 441 | try: |
| 442 | chan = ServiceRegistry.get_rpc_channel( |
| 443 | service_name, |
| 444 | location, |
| 445 | ) |
| 446 | return Service303Stub(chan) |
| 447 | except ValueError: |
| 448 | # Service can't be contacted |
| 449 | logging.error('Failed to get RPC channel to %s', service_name) |
| 450 | return None |