blob: 5a3ca98c18fb0f75b820c0495d55e4c0fab2030e [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import asyncio
15import faulthandler
16import functools
17import logging
18import os
19import signal
20import time
21from concurrent import futures
22from typing import Any, Dict, List, Optional
23
24import grpc
25import pkg_resources
26from common.log_count_handler import MsgCounterHandler
27from common.log_counter import ServiceLogErrorReporter
28from common.metrics_export import get_metrics
29from common.service_registry import ServiceRegistry
30from configuration.exceptions import LoadConfigError
31from configuration.mconfig_managers import get_mconfig_manager
32from configuration.service_configs import load_service_config
33from orc8r.protos.common_pb2 import LogLevel, Void
34from orc8r.protos.metricsd_pb2 import MetricsContainer
35from orc8r.protos.service303_pb2 import (
36 GetOperationalStatesResponse,
37 ReloadConfigResponse,
38 ServiceInfo,
39 State,
40)
41from orc8r.protos.service303_pb2_grpc import (
42 Service303Servicer,
43 Service303Stub,
44 add_Service303Servicer_to_server,
45)
46
47MAX_DEFAULT_WORKER = 10
48
49
50async def loop_exit():
51 """
52 Stop the loop in an async context
53 """
54 loop = asyncio.get_event_loop()
55 loop.stop()
56
57
58class MagmaService(Service303Servicer):
59 """
60 MagmaService provides the framework for all Magma services.
61 This class also implements the Service303 interface for external
62 entities to interact with the service.
63 """
64
65 def __init__(self, name, empty_mconfig, loop=None):
66 self._name = name
67 self._port = 0
68 self._get_status_callback = None
69 self._get_operational_states_cb = None
70 self._log_count_handler = MsgCounterHandler()
71
72 # Init logging before doing anything
73 logging.basicConfig(
74 level=logging.INFO,
75 format='[%(asctime)s %(levelname)s %(name)s] %(message)s',
76 )
77 # Add a handler to count errors
78 logging.root.addHandler(self._log_count_handler)
79
80 # Set gRPC polling strategy
81 self._set_grpc_poll_strategy()
82
83 # Load the managed config if present
84 self._mconfig = empty_mconfig
85 self._mconfig_metadata = None
86 self._mconfig_manager = get_mconfig_manager()
87 self.reload_mconfig()
88
89 self._state = ServiceInfo.STARTING
90 self._health = ServiceInfo.APP_UNHEALTHY
91 if loop is None:
92 loop = asyncio.get_event_loop()
93 self._loop = loop
94 self._start_time = int(time.time())
95 self._register_signal_handlers()
96
97 # Load the service config if present
98 self._config = None
99 self.reload_config()
100
101 # Count errors
102 self.log_counter = ServiceLogErrorReporter(
103 loop=self._loop,
104 service_config=self._config,
105 handler=self._log_count_handler,
106 )
107 self.log_counter.start()
108
109 # Operational States
110 self._operational_states = []
111
112 self._version = '0.0.0'
113 # Load the service version if available
114 try:
115 # Check if service on docker
116 if self._config and 'init_system' in self._config \
117 and self._config['init_system'] == 'docker':
118 # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n"
119 # Skip the "feg_gateway_python:" part
120 image = os.popen(
121 'docker ps --filter name=magmad --format "{{.Image}}" | '
122 'cut -d ":" -f 2',
123 )
124 image_tag = image.read().strip('\n')
125 self._version = image_tag
126 else:
127 self._version = pkg_resources.get_distribution('orc8r').version
128 except pkg_resources.ResolutionError as e:
129 logging.info(e)
130
131 if self._config and 'grpc_workers' in self._config:
132 self._server = grpc.server(
133 futures.ThreadPoolExecutor(
134 max_workers=self._config['grpc_workers'],
135 ),
136 )
137 else:
138 self._server = grpc.server(
139 futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER),
140 )
141 add_Service303Servicer_to_server(self, self._server)
142
143 @property
144 def version(self):
145 """Return the current running version of the Magma service"""
146 return self._version
147
148 @property
149 def name(self):
150 """Return the name of service
151
152 Returns:
153 tr: name of service
154 """
155 return self._name
156
157 @property
158 def rpc_server(self):
159 """Return the RPC server used by the service"""
160 return self._server
161
162 @property
163 def port(self):
164 """Return the listening port of the service"""
165 return self._port
166
167 @property
168 def loop(self):
169 """Return the asyncio event loop used by the service"""
170 return self._loop
171
172 @property
173 def state(self):
174 """Return the state of the service"""
175 return self._state
176
177 @property
178 def config(self) -> Dict[str, Any]:
179 """Return the service config"""
180 return self._config
181
182 @property
183 def mconfig(self):
184 """Return the managed config"""
185 return self._mconfig
186
187 @property
188 def mconfig_metadata(self):
189 """Return the metadata of the managed config"""
190 return self._mconfig_metadata
191
192 @property
193 def mconfig_manager(self):
194 """Return the mconfig manager for this service"""
195 return self._mconfig_manager
196
197 def reload_config(self):
198 """Reload the local config for the service"""
199 try:
200 self._config = load_service_config(self._name)
201 self._setup_logging()
202 except LoadConfigError as e:
203 logging.warning(e)
204
205 def reload_mconfig(self):
206 """Reload the managed config for the service"""
207 try:
208 # reload mconfig manager in case feature flag for streaming changed
209 self._mconfig_manager = get_mconfig_manager()
210 self._mconfig = self._mconfig_manager.load_service_mconfig(
211 self._name,
212 self._mconfig,
213 )
214 self._mconfig_metadata = \
215 self._mconfig_manager.load_mconfig_metadata()
216 except LoadConfigError as e:
217 logging.warning(e)
218
219 def add_operational_states(self, states: List[State]):
220 """Add a list of states into the service
221
222 Args:
223 states (List[State]): [description]
224 """
225 self._operational_states.extend(states)
226
227 def run(self):
228 """
229 Start the service and runs the event loop until a term signal
230 is received or a StopService rpc call is made on the Service303
231 interface.
232 """
233 logging.info("Starting %s...", self._name)
234 (host, port) = ServiceRegistry.get_service_address(self._name)
235 self._port = self._server.add_insecure_port('{}:{}'.format(host, port))
236 logging.info("Listening on address %s:%d", host, self._port)
237 self._state = ServiceInfo.ALIVE
238 # Python services are healthy immediately when run
239 self._health = ServiceInfo.APP_HEALTHY
240 self._server.start()
241 self._loop.run_forever()
242 # Waiting for the term signal or StopService rpc call
243
244 def close(self):
245 """
246 Clean up the service before termination. This needs to be
247 called atleast once after the service has been inited.
248 """
249 self._loop.close()
250 self._server.stop(0).wait()
251
252 def register_get_status_callback(self, get_status_callback):
253 """Register function for getting status
254
255 Must return a map(string, string)
256 """
257 self._get_status_callback = get_status_callback
258
259 def register_operational_states_callback(self, get_operational_states_cb):
260 """Register the callback function that gets called on GetOperationalStates rpc
261
262 Args:
263 get_operational_states_cb ([type]): callback function
264 """
265 self._get_operational_states_cb = get_operational_states_cb
266
267 def _stop(self, reason):
268 """Stop the service gracefully"""
269 logging.info("Stopping %s with reason %s...", self._name, reason)
270 self._state = ServiceInfo.STOPPING
271 self._server.stop(0)
272
273 for pending_task in asyncio.Task.all_tasks(self._loop):
274 pending_task.cancel()
275
276 self._state = ServiceInfo.STOPPED
277 self._health = ServiceInfo.APP_UNHEALTHY
278 asyncio.ensure_future(loop_exit())
279
280 def _set_grpc_poll_strategy(self):
281 """
282 The new default 'epollex' poll strategy is causing fd leaks, leading to
283 service crashes after 1024 open fds.
284 See https://github.com/grpc/grpc/issues/15759
285 """
286 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll'
287
288 def _get_log_level_from_config(self) -> Optional[int]:
289 if self._config is None:
290 return None
291 log_level = self._config.get('log_level', None)
292 if log_level is None:
293 return None
294 # convert from log level string to LogLevel enum value
295 try:
296 proto_level = LogLevel.Value(log_level)
297 except ValueError:
298 logging.error(
299 'Unknown logging level in config: %s, ignoring',
300 log_level,
301 )
302 return None
303 return proto_level
304
305 def _get_log_level_from_mconfig(self) -> Optional[int]:
306 if self._mconfig is None:
307 return None
308 return self._mconfig.log_level
309
310 def _setup_logging(self):
311 """Set up log level from config values
312
313 The config file on the AGW takes precedence over the mconfig
314 If neither config file nor mconfig has the log level config, default to INFO
315 """
316 log_level_from_config = self._get_log_level_from_config()
317 log_level_from_mconfig = self._get_log_level_from_mconfig()
318
319 if log_level_from_config is not None:
320 log_level = log_level_from_config
321 elif log_level_from_mconfig is not None:
322 log_level = log_level_from_mconfig
323 else:
324 logging.warning(
325 'logging level is not specified in either yml config '
326 'or mconfig, defaulting to INFO',
327 )
328 log_level = LogLevel.Value('INFO')
Wei-Yu Chen5cbdfbb2021-12-02 01:10:21 +0800329
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800330 self._set_log_level(log_level)
331
332 @staticmethod
333 def _set_log_level(proto_level: int):
334 """Set log level based on proto-enum level
335
336 Args:
337 proto_level (int): proto enum defined in common.proto
338 """
339 if proto_level == LogLevel.Value('DEBUG'):
340 level = logging.DEBUG
341 elif proto_level == LogLevel.Value('INFO'):
342 level = logging.INFO
343 elif proto_level == LogLevel.Value('WARNING'):
344 level = logging.WARNING
345 elif proto_level == LogLevel.Value('ERROR'):
346 level = logging.ERROR
347 elif proto_level == LogLevel.Value('FATAL'):
348 level = logging.FATAL
349 else:
350 logging.error(
351 'Unknown logging level: %d, defaulting to INFO',
352 proto_level,
353 )
354 level = logging.INFO
355
356 logging.info(
357 "Setting logging level to %s",
358 logging.getLevelName(level),
359 )
360 logger = logging.getLogger('')
361 logger.setLevel(level)
362
363 def _register_signal_handlers(self):
364 """Register signal handlers
365
366 Right now we just exit on SIGINT/SIGTERM/SIGQUIT
367 """
368 for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']:
369 self._loop.add_signal_handler(
370 getattr(signal, signame),
371 functools.partial(self._stop, signame),
372 )
373
374 def _signal_handler():
375 logging.info('Handling SIGHUP...')
376 faulthandler.dump_traceback()
377 self._loop.add_signal_handler(
378 signal.SIGHUP, functools.partial(_signal_handler),
379 )
380
381 def GetServiceInfo(self, request, context):
382 """Return the service info (name, version, state, meta, etc.)"""
383 service_info = ServiceInfo(
384 name=self._name,
385 version=self._version,
386 state=self._state,
387 health=self._health,
388 start_time_secs=self._start_time,
389 )
390 if self._get_status_callback is not None:
391 status = self._get_status_callback()
392 try:
393 service_info.status.meta.update(status)
394 except (TypeError, ValueError) as exp:
395 logging.error("Error getting service status: %s", exp)
396 return service_info
397
398 def StopService(self, request, context):
399 """Handle request to stop the service"""
400 logging.info("Request to stop service.")
401 self._loop.call_soon_threadsafe(self._stop, 'RPC')
402 return Void()
403
404 def GetMetrics(self, request, context):
405 """
406 Collects timeseries samples from prometheus python client on this
407 process
408 """
409 metrics = MetricsContainer()
410 metrics.family.extend(get_metrics())
411 return metrics
412
413 def SetLogLevel(self, request, context):
414 """Handle request to set the log level"""
415 self._set_log_level(request.level)
416 return Void()
417
418 def SetLogVerbosity(self, request, context):
419 pass # Not Implemented
420
421 def ReloadServiceConfig(self, request, context):
422 """Handle request to reload the service config file"""
423 self.reload_config()
424 return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS)
425
426 def GetOperationalStates(self, request, context):
427 """Return the operational states of devices managed by this service."""
428 res = GetOperationalStatesResponse()
429 if self._get_operational_states_cb is not None:
430 states = self._get_operational_states_cb()
431 res.states.extend(states)
432 return res
433
434
435def get_service303_client(service_name: str, location: str) \
436 -> Optional[Service303Stub]:
437 """
438 Return a grpc client attached to the given service
439 name and location.
440 Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL)
441 """
442 try:
443 chan = ServiceRegistry.get_rpc_channel(
444 service_name,
445 location,
446 )
447 return Service303Stub(chan)
448 except ValueError:
449 # Service can't be contacted
450 logging.error('Failed to get RPC channel to %s', service_name)
451 return None