Init commit for standalone enodebd
Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/common/service.py b/common/service.py
new file mode 100644
index 0000000..2bcce61
--- /dev/null
+++ b/common/service.py
@@ -0,0 +1,450 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import asyncio
+import faulthandler
+import functools
+import logging
+import os
+import signal
+import time
+from concurrent import futures
+from typing import Any, Dict, List, Optional
+
+import grpc
+import pkg_resources
+from common.log_count_handler import MsgCounterHandler
+from common.log_counter import ServiceLogErrorReporter
+from common.metrics_export import get_metrics
+from common.service_registry import ServiceRegistry
+from configuration.exceptions import LoadConfigError
+from configuration.mconfig_managers import get_mconfig_manager
+from configuration.service_configs import load_service_config
+from orc8r.protos.common_pb2 import LogLevel, Void
+from orc8r.protos.metricsd_pb2 import MetricsContainer
+from orc8r.protos.service303_pb2 import (
+ GetOperationalStatesResponse,
+ ReloadConfigResponse,
+ ServiceInfo,
+ State,
+)
+from orc8r.protos.service303_pb2_grpc import (
+ Service303Servicer,
+ Service303Stub,
+ add_Service303Servicer_to_server,
+)
+
+MAX_DEFAULT_WORKER = 10
+
+
+async def loop_exit():
+ """
+ Stop the loop in an async context
+ """
+ loop = asyncio.get_event_loop()
+ loop.stop()
+
+
+class MagmaService(Service303Servicer):
+ """
+ MagmaService provides the framework for all Magma services.
+ This class also implements the Service303 interface for external
+ entities to interact with the service.
+ """
+
+ def __init__(self, name, empty_mconfig, loop=None):
+ self._name = name
+ self._port = 0
+ self._get_status_callback = None
+ self._get_operational_states_cb = None
+ self._log_count_handler = MsgCounterHandler()
+
+ # Init logging before doing anything
+ logging.basicConfig(
+ level=logging.INFO,
+ format='[%(asctime)s %(levelname)s %(name)s] %(message)s',
+ )
+ # Add a handler to count errors
+ logging.root.addHandler(self._log_count_handler)
+
+ # Set gRPC polling strategy
+ self._set_grpc_poll_strategy()
+
+ # Load the managed config if present
+ self._mconfig = empty_mconfig
+ self._mconfig_metadata = None
+ self._mconfig_manager = get_mconfig_manager()
+ self.reload_mconfig()
+
+ self._state = ServiceInfo.STARTING
+ self._health = ServiceInfo.APP_UNHEALTHY
+ if loop is None:
+ loop = asyncio.get_event_loop()
+ self._loop = loop
+ self._start_time = int(time.time())
+ self._register_signal_handlers()
+
+ # Load the service config if present
+ self._config = None
+ self.reload_config()
+
+ # Count errors
+ self.log_counter = ServiceLogErrorReporter(
+ loop=self._loop,
+ service_config=self._config,
+ handler=self._log_count_handler,
+ )
+ self.log_counter.start()
+
+ # Operational States
+ self._operational_states = []
+
+ self._version = '0.0.0'
+ # Load the service version if available
+ try:
+ # Check if service on docker
+ if self._config and 'init_system' in self._config \
+ and self._config['init_system'] == 'docker':
+ # image comes in form of "feg_gateway_python:<IMAGE_TAG>\n"
+ # Skip the "feg_gateway_python:" part
+ image = os.popen(
+ 'docker ps --filter name=magmad --format "{{.Image}}" | '
+ 'cut -d ":" -f 2',
+ )
+ image_tag = image.read().strip('\n')
+ self._version = image_tag
+ else:
+ self._version = pkg_resources.get_distribution('orc8r').version
+ except pkg_resources.ResolutionError as e:
+ logging.info(e)
+
+ if self._config and 'grpc_workers' in self._config:
+ self._server = grpc.server(
+ futures.ThreadPoolExecutor(
+ max_workers=self._config['grpc_workers'],
+ ),
+ )
+ else:
+ self._server = grpc.server(
+ futures.ThreadPoolExecutor(max_workers=MAX_DEFAULT_WORKER),
+ )
+ add_Service303Servicer_to_server(self, self._server)
+
+ @property
+ def version(self):
+ """Return the current running version of the Magma service"""
+ return self._version
+
+ @property
+ def name(self):
+ """Return the name of service
+
+ Returns:
+ tr: name of service
+ """
+ return self._name
+
+ @property
+ def rpc_server(self):
+ """Return the RPC server used by the service"""
+ return self._server
+
+ @property
+ def port(self):
+ """Return the listening port of the service"""
+ return self._port
+
+ @property
+ def loop(self):
+ """Return the asyncio event loop used by the service"""
+ return self._loop
+
+ @property
+ def state(self):
+ """Return the state of the service"""
+ return self._state
+
+ @property
+ def config(self) -> Dict[str, Any]:
+ """Return the service config"""
+ return self._config
+
+ @property
+ def mconfig(self):
+ """Return the managed config"""
+ return self._mconfig
+
+ @property
+ def mconfig_metadata(self):
+ """Return the metadata of the managed config"""
+ return self._mconfig_metadata
+
+ @property
+ def mconfig_manager(self):
+ """Return the mconfig manager for this service"""
+ return self._mconfig_manager
+
+ def reload_config(self):
+ """Reload the local config for the service"""
+ try:
+ self._config = load_service_config(self._name)
+ self._setup_logging()
+ except LoadConfigError as e:
+ logging.warning(e)
+
+ def reload_mconfig(self):
+ """Reload the managed config for the service"""
+ try:
+ # reload mconfig manager in case feature flag for streaming changed
+ self._mconfig_manager = get_mconfig_manager()
+ self._mconfig = self._mconfig_manager.load_service_mconfig(
+ self._name,
+ self._mconfig,
+ )
+ self._mconfig_metadata = \
+ self._mconfig_manager.load_mconfig_metadata()
+ except LoadConfigError as e:
+ logging.warning(e)
+
+ def add_operational_states(self, states: List[State]):
+ """Add a list of states into the service
+
+ Args:
+ states (List[State]): [description]
+ """
+ self._operational_states.extend(states)
+
+ def run(self):
+ """
+ Start the service and runs the event loop until a term signal
+ is received or a StopService rpc call is made on the Service303
+ interface.
+ """
+ logging.info("Starting %s...", self._name)
+ (host, port) = ServiceRegistry.get_service_address(self._name)
+ self._port = self._server.add_insecure_port('{}:{}'.format(host, port))
+ logging.info("Listening on address %s:%d", host, self._port)
+ self._state = ServiceInfo.ALIVE
+ # Python services are healthy immediately when run
+ self._health = ServiceInfo.APP_HEALTHY
+ self._server.start()
+ self._loop.run_forever()
+ # Waiting for the term signal or StopService rpc call
+
+ def close(self):
+ """
+ Clean up the service before termination. This needs to be
+ called atleast once after the service has been inited.
+ """
+ self._loop.close()
+ self._server.stop(0).wait()
+
+ def register_get_status_callback(self, get_status_callback):
+ """Register function for getting status
+
+ Must return a map(string, string)
+ """
+ self._get_status_callback = get_status_callback
+
+ def register_operational_states_callback(self, get_operational_states_cb):
+ """Register the callback function that gets called on GetOperationalStates rpc
+
+ Args:
+ get_operational_states_cb ([type]): callback function
+ """
+ self._get_operational_states_cb = get_operational_states_cb
+
+ def _stop(self, reason):
+ """Stop the service gracefully"""
+ logging.info("Stopping %s with reason %s...", self._name, reason)
+ self._state = ServiceInfo.STOPPING
+ self._server.stop(0)
+
+ for pending_task in asyncio.Task.all_tasks(self._loop):
+ pending_task.cancel()
+
+ self._state = ServiceInfo.STOPPED
+ self._health = ServiceInfo.APP_UNHEALTHY
+ asyncio.ensure_future(loop_exit())
+
+ def _set_grpc_poll_strategy(self):
+ """
+ The new default 'epollex' poll strategy is causing fd leaks, leading to
+ service crashes after 1024 open fds.
+ See https://github.com/grpc/grpc/issues/15759
+ """
+ os.environ['GRPC_POLL_STRATEGY'] = 'epoll1,poll'
+
+ def _get_log_level_from_config(self) -> Optional[int]:
+ if self._config is None:
+ return None
+ log_level = self._config.get('log_level', None)
+ if log_level is None:
+ return None
+ # convert from log level string to LogLevel enum value
+ try:
+ proto_level = LogLevel.Value(log_level)
+ except ValueError:
+ logging.error(
+ 'Unknown logging level in config: %s, ignoring',
+ log_level,
+ )
+ return None
+ return proto_level
+
+ def _get_log_level_from_mconfig(self) -> Optional[int]:
+ if self._mconfig is None:
+ return None
+ return self._mconfig.log_level
+
+ def _setup_logging(self):
+ """Set up log level from config values
+
+ The config file on the AGW takes precedence over the mconfig
+ If neither config file nor mconfig has the log level config, default to INFO
+ """
+ log_level_from_config = self._get_log_level_from_config()
+ log_level_from_mconfig = self._get_log_level_from_mconfig()
+
+ if log_level_from_config is not None:
+ log_level = log_level_from_config
+ elif log_level_from_mconfig is not None:
+ log_level = log_level_from_mconfig
+ else:
+ logging.warning(
+ 'logging level is not specified in either yml config '
+ 'or mconfig, defaulting to INFO',
+ )
+ log_level = LogLevel.Value('INFO')
+ self._set_log_level(log_level)
+
+ @staticmethod
+ def _set_log_level(proto_level: int):
+ """Set log level based on proto-enum level
+
+ Args:
+ proto_level (int): proto enum defined in common.proto
+ """
+ if proto_level == LogLevel.Value('DEBUG'):
+ level = logging.DEBUG
+ elif proto_level == LogLevel.Value('INFO'):
+ level = logging.INFO
+ elif proto_level == LogLevel.Value('WARNING'):
+ level = logging.WARNING
+ elif proto_level == LogLevel.Value('ERROR'):
+ level = logging.ERROR
+ elif proto_level == LogLevel.Value('FATAL'):
+ level = logging.FATAL
+ else:
+ logging.error(
+ 'Unknown logging level: %d, defaulting to INFO',
+ proto_level,
+ )
+ level = logging.INFO
+
+ logging.info(
+ "Setting logging level to %s",
+ logging.getLevelName(level),
+ )
+ logger = logging.getLogger('')
+ logger.setLevel(level)
+
+ def _register_signal_handlers(self):
+ """Register signal handlers
+
+ Right now we just exit on SIGINT/SIGTERM/SIGQUIT
+ """
+ for signame in ['SIGINT', 'SIGTERM', 'SIGQUIT']:
+ self._loop.add_signal_handler(
+ getattr(signal, signame),
+ functools.partial(self._stop, signame),
+ )
+
+ def _signal_handler():
+ logging.info('Handling SIGHUP...')
+ faulthandler.dump_traceback()
+ self._loop.add_signal_handler(
+ signal.SIGHUP, functools.partial(_signal_handler),
+ )
+
+ def GetServiceInfo(self, request, context):
+ """Return the service info (name, version, state, meta, etc.)"""
+ service_info = ServiceInfo(
+ name=self._name,
+ version=self._version,
+ state=self._state,
+ health=self._health,
+ start_time_secs=self._start_time,
+ )
+ if self._get_status_callback is not None:
+ status = self._get_status_callback()
+ try:
+ service_info.status.meta.update(status)
+ except (TypeError, ValueError) as exp:
+ logging.error("Error getting service status: %s", exp)
+ return service_info
+
+ def StopService(self, request, context):
+ """Handle request to stop the service"""
+ logging.info("Request to stop service.")
+ self._loop.call_soon_threadsafe(self._stop, 'RPC')
+ return Void()
+
+ def GetMetrics(self, request, context):
+ """
+ Collects timeseries samples from prometheus python client on this
+ process
+ """
+ metrics = MetricsContainer()
+ metrics.family.extend(get_metrics())
+ return metrics
+
+ def SetLogLevel(self, request, context):
+ """Handle request to set the log level"""
+ self._set_log_level(request.level)
+ return Void()
+
+ def SetLogVerbosity(self, request, context):
+ pass # Not Implemented
+
+ def ReloadServiceConfig(self, request, context):
+ """Handle request to reload the service config file"""
+ self.reload_config()
+ return ReloadConfigResponse(result=ReloadConfigResponse.RELOAD_SUCCESS)
+
+ def GetOperationalStates(self, request, context):
+ """Return the operational states of devices managed by this service."""
+ res = GetOperationalStatesResponse()
+ if self._get_operational_states_cb is not None:
+ states = self._get_operational_states_cb()
+ res.states.extend(states)
+ return res
+
+
+def get_service303_client(service_name: str, location: str) \
+ -> Optional[Service303Stub]:
+ """
+ Return a grpc client attached to the given service
+ name and location.
+ Example Use: client = get_service303_client("state", ServiceRegistry.LOCAL)
+ """
+ try:
+ chan = ServiceRegistry.get_rpc_channel(
+ service_name,
+ location,
+ )
+ return Service303Stub(chan)
+ except ValueError:
+ # Service can't be contacted
+ logging.error('Failed to get RPC channel to %s', service_name)
+ return None