blob: ac339f4e215b03b6ff2a1e19ca4371181eea7291 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC server endpoint"""
import os
import sys
import threading
from collections import OrderedDict
from os.path import abspath, basename, dirname, join
import grpc
from grpc_reflection.v1alpha import reflection
from concurrent import futures
import zlib
from protos import schema_pb2, schema_pb2_grpc
# initialize config and logger
from xosconfig import Config
from multistructlog import create_logger
xos_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/..")
sys.path.append(xos_path)
Config.init()
log = create_logger(Config().get("logging"))
# Must be imported after Config.init()
from xos_version import get_version_dict
SERVER_KEY = "/opt/cord_profile/core_api_key.pem"
SERVER_CERT = "/opt/cord_profile/core_api_cert.pem"
SERVER_CA = "/usr/local/share/ca-certificates/local_certs.crt"
class SchemaService(schema_pb2_grpc.SchemaServiceServicer):
def __init__(self, thread_pool):
self.thread_pool = thread_pool
protos = self._load_schema()
self.schemas = schema_pb2.Schemas(
protos=protos, swagger_from="xos.proto", yang_from="xos.proto"
)
def stop(self):
pass
def _load_schema(self):
"""Pre-load schema file so that we can serve it up (file sizes
are small enough to do so
"""
proto_dir = abspath(join(dirname(__file__), "./protos"))
def find_files(dir, suffix):
proto_blacklist = ["schema.proto"]
proto_files = [
join(dir, fname)
for fname in os.listdir(dir)
if fname.endswith(suffix) and fname not in proto_blacklist
]
return proto_files
proto_map = OrderedDict() # to have deterministic data
for proto_file in find_files(proto_dir, ".proto"):
with open(proto_file, "r") as f:
proto_content = f.read()
fname = basename(proto_file)
# assure no two files have the same basename
assert fname not in proto_map
desc_file = proto_file.replace(".proto", ".desc")
with open(desc_file, "r") as f:
descriptor_content = zlib.compress(f.read())
proto_map[fname] = schema_pb2.ProtoFile(
file_name=fname, proto=proto_content, descriptor=descriptor_content
)
return proto_map.values()
def GetSchema(self, request, context):
"""Return current schema files and descriptor"""
return self.schemas
class XOSGrpcServer(object):
def __init__(self, port=50055, model_status=0, model_output=""):
self.port = port
self.model_status = model_status
self.model_output = model_output
log.info("Initializing GRPC Server", port=port)
self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
self.server = grpc.server(self.thread_pool)
self.django_initialized = False
self.django_apps = []
server_key = open(SERVER_KEY, "r").read()
server_cert = open(SERVER_CERT, "r").read()
server_ca = open(SERVER_CA, "r").read()
self.credentials = grpc.ssl_server_credentials(
[(server_key, server_cert)], server_ca, False
)
self.delayed_shutdown_timer = None
self.exit_event = threading.Event()
self.services = []
def init_django(self):
try:
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
django.setup()
from django.apps import apps
self.django_apps = apps
self.django_initialized = True
except BaseException:
log.exception("Failed to initialize django")
def register_dynamicload(self):
from xos_dynamicload_api import DynamicLoadService
from protos import dynamicload_pb2_grpc, dynamicload_pb2
dynamic_load_service = DynamicLoadService(self.thread_pool, self)
self.register(
"dynamicload",
dynamicload_pb2_grpc.add_dynamicloadServicer_to_server,
dynamic_load_service,
)
dynamic_load_service.set_django_apps(self.django_apps)
self.service_names.append(dynamicload_pb2.DESCRIPTOR.services_by_name['dynamicload'].full_name)
def register_core(self):
from xos_grpc_api import XosService
from protos import xos_pb2_grpc, xos_pb2
self.register(
"xos", xos_pb2_grpc.add_xosServicer_to_server, XosService(self.thread_pool)
)
self.service_names.append(xos_pb2.DESCRIPTOR.services_by_name['xos'].full_name)
def register_utility(self):
from xos_utility_api import UtilityService
from protos import utility_pb2_grpc, utility_pb2
self.register(
"utility",
utility_pb2_grpc.add_utilityServicer_to_server,
UtilityService(self.thread_pool),
)
self.service_names.append(utility_pb2.DESCRIPTOR.services_by_name['utility'].full_name)
def register_modeldefs(self):
from xos_modeldefs_api import ModelDefsService
from protos import modeldefs_pb2_grpc, modeldefs_pb2
self.register(
"modeldefs",
modeldefs_pb2_grpc.add_modeldefsServicer_to_server,
ModelDefsService(self.thread_pool),
)
self.service_names.append(modeldefs_pb2.DESCRIPTOR.services_by_name['modeldefs'].full_name)
def register_filetransfer(self):
from xos_filetransfer_api import FileTransferService
from protos import filetransfer_pb2_grpc, filetransfer_pb2
self.register(
"filetransfer",
filetransfer_pb2_grpc.add_filetransferServicer_to_server,
FileTransferService(self.thread_pool),
)
self.service_names.append(filetransfer_pb2.DESCRIPTOR.services_by_name['filetransfer'].full_name)
def start(self):
log.info("Starting GRPC Server")
log.info("XOS Core Build Details", **get_version_dict())
self.service_names = []
self.register(
"schema",
schema_pb2_grpc.add_SchemaServiceServicer_to_server,
SchemaService(self.thread_pool),
)
if self.model_status != 0:
log.error("Models are broken. Please remove the synchronizer causing the problem and restart the core.")
sys.exit(-1)
self.init_django()
if not self.django_initialized:
log.error("Django is broken. Please remove the synchronizer causing the problem and restart the core.")
sys.exit(-1)
self.register_dynamicload()
self.register_core()
self.register_utility()
self.register_modeldefs()
self.register_filetransfer()
self.service_names.append(reflection.SERVICE_NAME)
reflection.enable_server_reflection(self.service_names, self.server)
# open port
self.server.add_insecure_port("[::]:%s" % self.port)
self.server.add_secure_port("[::]:50051", self.credentials)
# strat the server
self.server.start()
log.info("GRPC Server Started")
return self
def stop(self, grace=0):
log.info("Stopping GRPC Server")
for service in self.services:
service.stop()
self.server.stop(grace)
log.info("stopped")
def stop_and_exit(self):
log.info("Stop and Exit")
self.exit_event.set()
def register(self, name, activator_func, service):
"""
Allow late registration of gRPC servicers
:param activator_func: The gRPC "add_XYZServicer_to_server method
autogenerated by protoc.
:param service: The object implementing the service.
:return: None
"""
self.services.append(service)
activator_func(service, self.server)
def delayed_shutdown(self, seconds):
log.info("Delayed shutdown", seconds=seconds)
if self.delayed_shutdown_timer:
self.delayed_shutdown_timer.cancel()
self.delayed_shutdown_timer = threading.Timer(seconds, self.stop_and_exit)
self.delayed_shutdown_timer.start()