blob: 2bcce61acf246ea0716b41b9f2e85eda83c61cbe [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import asyncio
15import faulthandler
16import functools
17import logging
18import os
19import signal
20import time
21from concurrent import futures
22from typing import Any, Dict, List, Optional
23
24import grpc
25import pkg_resources
26from common.log_count_handler import MsgCounterHandler
27from common.log_counter import ServiceLogErrorReporter
28from common.metrics_export import get_metrics
29from common.service_registry import ServiceRegistry
30from configuration.exceptions import LoadConfigError
31from configuration.mconfig_managers import get_mconfig_manager
32from configuration.service_configs import load_service_config
33from orc8r.protos.common_pb2 import LogLevel, Void
34from orc8r.protos.metricsd_pb2 import MetricsContainer
35from orc8r.protos.service303_pb2 import (
36 GetOperationalStatesResponse,
37 ReloadConfigResponse,
38 ServiceInfo,
39 State,
40)
41from orc8r.protos.service303_pb2_grpc import (
42 Service303Servicer,
43 Service303Stub,
44 add_Service303Servicer_to_server,
45)
46
47MAX_DEFAULT_WORKER = 10
48
49
50async def loop_exit():
51 """
52 Stop the loop in an async context
53 """
54 loop = asyncio.get_event_loop()
55 loop.stop()
56
57
58class MagmaService(Service303Servicer):
59 """
60 MagmaService provides the framework for all Magma services.
61 This class also implements the Service303 interface for external
62 entities to interact with the service.
63 """
64
65 def __init__(self, name, empty_mconfig, loop=None):
66 self._name = name
67 self._port = 0
68 self._get_status_callback = None
69 self._get_operational_states_cb = None
70 self._log_count_handler = MsgCounterHandler()
71
72 # Init logging before doing anything
73 logging.basicConfig(
74 level=logging.INFO,
75 format='[%(asctime)s %(levelname)s %(name)s] %(message)s',
76 )
77 # Add a handler to count errors
78 logging.root.addHandler(self._log_count_handler)
79
80 # Set gRPC polling strategy
81 self._set_grpc_poll_strategy()
82
83 # Load the managed config if present
84 self._mconfig = empty_mconfig
85 self._mconfig_metadata = None
86 self._mconfig_manager = get_mconfig_manager()
87 self.reload_mconfig()
88
89 self._state = ServiceInfo.STARTING
90 self._health = ServiceInfo.APP_UNHEALTHY
91 if loop is None:
92 loop = asyncio.get_event_loop()
93 self._loop = loop
94 self._start_time = int(time.time())
95 self._register_signal_handlers()
96
97 # Load the service config if present
98 self._config = None
99 self.reload_config()
100
101 # Count errors
102 self.log_counter = ServiceLogErrorReporter(
103 loop=self._loop,
104 service_config=self._config,
105 handler=self._log_count_handler,
106 )
107 self.log_counter.start()
108
109 # Operational States
110 self._operational_states = []
111
112 self._version = '0.0.0'
113 # Load the service version if available
114 try:
115 # Check if service on docker
116 if self._config and 'init_system' in self._config \
117 and self._config['init_system'] == 'docker':
118 # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n"
119 # Skip the "feg_gateway_python:" part
120 image = os.popen(
121 'docker ps --filter name=magmad --format "{{.Image}}" | '
122 'cut -d ":" -f 2',
123 )
124 image_tag = image.read().strip('\n')
125 self._version = image_tag
126 else:
127 self._version = pkg_resources.get_distribution('orc8r').version
128 except pkg_resources.ResolutionError as e:
129 logging.info(e)
130
131 if self._config and 'grpc_workers' in self._config:
132 self._server = grpc.server(
133 futures.ThreadPoolExecutor(
134 max_workers=self._config['grpc_workers'],
135 ),
136 )
137 else:
138 self._server = grpc.server(
139 futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER),
140 )
141 add_Service303Servicer_to_server(self, self._server)
142
143 @property
144 def version(self):
145 """Return the current running version of the Magma service"""
146 return self._version
147
148 @property
149 def name(self):
150 """Return the name of service
151
152 Returns:
153 tr: name of service
154 """
155 return self._name
156
157 @property
158 def rpc_server(self):
159 """Return the RPC server used by the service"""
160 return self._server
161
162 @property
163 def port(self):
164 """Return the listening port of the service"""
165 return self._port
166
167 @property
168 def loop(self):
169 """Return the asyncio event loop used by the service"""
170 return self._loop
171
172 @property
173 def state(self):
174 """Return the state of the service"""
175 return self._state
176
177 @property
178 def config(self) -> Dict[str, Any]:
179 """Return the service config"""
180 return self._config
181
182 @property
183 def mconfig(self):
184 """Return the managed config"""
185 return self._mconfig
186
187 @property
188 def mconfig_metadata(self):
189 """Return the metadata of the managed config"""
190 return self._mconfig_metadata
191
192 @property
193 def mconfig_manager(self):
194 """Return the mconfig manager for this service"""
195 return self._mconfig_manager
196
197 def reload_config(self):
198 """Reload the local config for the service"""
199 try:
200 self._config = load_service_config(self._name)
201 self._setup_logging()
202 except LoadConfigError as e:
203 logging.warning(e)
204
205 def reload_mconfig(self):
206 """Reload the managed config for the service"""
207 try:
208 # reload mconfig manager in case feature flag for streaming changed
209 self._mconfig_manager = get_mconfig_manager()
210 self._mconfig = self._mconfig_manager.load_service_mconfig(
211 self._name,
212 self._mconfig,
213 )
214 self._mconfig_metadata = \
215 self._mconfig_manager.load_mconfig_metadata()
216 except LoadConfigError as e:
217 logging.warning(e)
218
219 def add_operational_states(self, states: List[State]):
220 """Add a list of states into the service
221
222 Args:
223 states (List[State]): [description]
224 """
225 self._operational_states.extend(states)
226
227 def run(self):
228 """
229 Start the service and runs the event loop until a term signal
230 is received or a StopService rpc call is made on the Service303
231 interface.
232 """
233 logging.info("Starting %s...", self._name)
234 (host, port) = ServiceRegistry.get_service_address(self._name)
235 self._port = self._server.add_insecure_port('{}:{}'.format(host, port))
236 logging.info("Listening on address %s:%d", host, self._port)
237 self._state = ServiceInfo.ALIVE
238 # Python services are healthy immediately when run
239 self._health = ServiceInfo.APP_HEALTHY
240 self._server.start()
241 self._loop.run_forever()
242 # Waiting for the term signal or StopService rpc call
243
244 def close(self):
245 """
246 Clean up the service before termination. This needs to be
247 called atleast once after the service has been inited.
248 """
249 self._loop.close()
250 self._server.stop(0).wait()
251
252 def register_get_status_callback(self, get_status_callback):
253 """Register function for getting status
254
255 Must return a map(string, string)
256 """
257 self._get_status_callback = get_status_callback
258
259 def register_operational_states_callback(self, get_operational_states_cb):
260 """Register the callback function that gets called on GetOperationalStates rpc
261
262 Args:
263 get_operational_states_cb ([type]): callback function
264 """
265 self._get_operational_states_cb = get_operational_states_cb
266
267 def _stop(self, reason):
268 """Stop the service gracefully"""
269 logging.info("Stopping %s with reason %s...", self._name, reason)
270 self._state = ServiceInfo.STOPPING
271 self._server.stop(0)
272
273 for pending_task in asyncio.Task.all_tasks(self._loop):
274 pending_task.cancel()
275
276 self._state = ServiceInfo.STOPPED
277 self._health = ServiceInfo.APP_UNHEALTHY
278 asyncio.ensure_future(loop_exit())
279
280 def _set_grpc_poll_strategy(self):
281 """
282 The new default 'epollex' poll strategy is causing fd leaks, leading to
283 service crashes after 1024 open fds.
284 See https://github.com/grpc/grpc/issues/15759
285 """
286 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll'
287
288 def _get_log_level_from_config(self) -> Optional[int]:
289 if self._config is None:
290 return None
291 log_level = self._config.get('log_level', None)
292 if log_level is None:
293 return None
294 # convert from log level string to LogLevel enum value
295 try:
296 proto_level = LogLevel.Value(log_level)
297 except ValueError:
298 logging.error(
299 'Unknown logging level in config: %s, ignoring',
300 log_level,
301 )
302 return None
303 return proto_level
304
305 def _get_log_level_from_mconfig(self) -> Optional[int]:
306 if self._mconfig is None:
307 return None
308 return self._mconfig.log_level
309
310 def _setup_logging(self):
311 """Set up log level from config values
312
313 The config file on the AGW takes precedence over the mconfig
314 If neither config file nor mconfig has the log level config, default to INFO
315 """
316 log_level_from_config = self._get_log_level_from_config()
317 log_level_from_mconfig = self._get_log_level_from_mconfig()
318
319 if log_level_from_config is not None:
320 log_level = log_level_from_config
321 elif log_level_from_mconfig is not None:
322 log_level = log_level_from_mconfig
323 else:
324 logging.warning(
325 'logging level is not specified in either yml config '
326 'or mconfig, defaulting to INFO',
327 )
328 log_level = LogLevel.Value('INFO')
329 self._set_log_level(log_level)
330
331 @staticmethod
332 def _set_log_level(proto_level: int):
333 """Set log level based on proto-enum level
334
335 Args:
336 proto_level (int): proto enum defined in common.proto
337 """
338 if proto_level == LogLevel.Value('DEBUG'):
339 level = logging.DEBUG
340 elif proto_level == LogLevel.Value('INFO'):
341 level = logging.INFO
342 elif proto_level == LogLevel.Value('WARNING'):
343 level = logging.WARNING
344 elif proto_level == LogLevel.Value('ERROR'):
345 level = logging.ERROR
346 elif proto_level == LogLevel.Value('FATAL'):
347 level = logging.FATAL
348 else:
349 logging.error(
350 'Unknown logging level: %d, defaulting to INFO',
351 proto_level,
352 )
353 level = logging.INFO
354
355 logging.info(
356 "Setting logging level to %s",
357 logging.getLevelName(level),
358 )
359 logger = logging.getLogger('')
360 logger.setLevel(level)
361
362 def _register_signal_handlers(self):
363 """Register signal handlers
364
365 Right now we just exit on SIGINT/SIGTERM/SIGQUIT
366 """
367 for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']:
368 self._loop.add_signal_handler(
369 getattr(signal, signame),
370 functools.partial(self._stop, signame),
371 )
372
373 def _signal_handler():
374 logging.info('Handling SIGHUP...')
375 faulthandler.dump_traceback()
376 self._loop.add_signal_handler(
377 signal.SIGHUP, functools.partial(_signal_handler),
378 )
379
380 def GetServiceInfo(self, request, context):
381 """Return the service info (name, version, state, meta, etc.)"""
382 service_info = ServiceInfo(
383 name=self._name,
384 version=self._version,
385 state=self._state,
386 health=self._health,
387 start_time_secs=self._start_time,
388 )
389 if self._get_status_callback is not None:
390 status = self._get_status_callback()
391 try:
392 service_info.status.meta.update(status)
393 except (TypeError, ValueError) as exp:
394 logging.error("Error getting service status: %s", exp)
395 return service_info
396
397 def StopService(self, request, context):
398 """Handle request to stop the service"""
399 logging.info("Request to stop service.")
400 self._loop.call_soon_threadsafe(self._stop, 'RPC')
401 return Void()
402
403 def GetMetrics(self, request, context):
404 """
405 Collects timeseries samples from prometheus python client on this
406 process
407 """
408 metrics = MetricsContainer()
409 metrics.family.extend(get_metrics())
410 return metrics
411
412 def SetLogLevel(self, request, context):
413 """Handle request to set the log level"""
414 self._set_log_level(request.level)
415 return Void()
416
417 def SetLogVerbosity(self, request, context):
418 pass # Not Implemented
419
420 def ReloadServiceConfig(self, request, context):
421 """Handle request to reload the service config file"""
422 self.reload_config()
423 return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS)
424
425 def GetOperationalStates(self, request, context):
426 """Return the operational states of devices managed by this service."""
427 res = GetOperationalStatesResponse()
428 if self._get_operational_states_cb is not None:
429 states = self._get_operational_states_cb()
430 res.states.extend(states)
431 return res
432
433
434def get_service303_client(service_name: str, location: str) \
435 -> Optional[Service303Stub]:
436 """
437 Return a grpc client attached to the given service
438 name and location.
439 Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL)
440 """
441 try:
442 chan = ServiceRegistry.get_rpc_channel(
443 service_name,
444 location,
445 )
446 return Service303Stub(chan)
447 except ValueError:
448 # Service can't be contacted
449 logging.error('Failed to get RPC channel to %s', service_name)
450 return None