| # |
| # Copyright 2017 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| gRPC client meant to connect to a gRPC server endpoint, and query the |
| end-point's schema by calling SchemaService.Schema(Empty) and all of its |
| semantics are derived from the recovered schema. |
| """ |
| |
| from __future__ import absolute_import |
| import os |
| import sys |
| import time |
| from random import randint |
| from zlib import decompress |
| |
| import functools |
| import grpc |
| from consul import Consul |
| from grpc._channel import _Rendezvous |
| from structlog import get_logger |
| from twisted.internet import reactor |
| from twisted.internet.defer import inlineCallbacks, returnValue |
| from twisted.internet.error import ConnectError |
| |
| from .protos.schema_pb2_grpc import SchemaServiceStub |
| from google.protobuf.empty_pb2 import Empty |
| |
| from .asleep import asleep |
| |
| log = get_logger() |
| |
| |
| class GrpcClient(object): |
| """ |
| Connect to a gRPC server, fetch its schema, and process the downloaded |
| schema files to drive the customization of the north-bound interface(s) |
| of Chameleon. |
| """ |
| RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5] |
| |
| def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055', |
| reconnect_callback=None, credentials=None, restart_on_disconnect=False): |
| self.consul_endpoint = consul_endpoint |
| self.endpoint = endpoint |
| self.work_dir = work_dir |
| self.reconnect_callback = reconnect_callback |
| self.credentials = credentials |
| self.restart_on_disconnect = restart_on_disconnect |
| |
| self.plugin_dir = os.path.abspath(os.path.join( |
| os.path.dirname(__file__), 'protoc_plugins')) |
| |
| self.channel = None |
| self.schema = None |
| self.retries = 0 |
| self.shutting_down = False |
| self.connected = False |
| self.was_connected = False |
| |
| def start(self): |
| log.debug('starting') |
| if not self.connected: |
| reactor.callLater(0, self.connect) |
| log.info('started') |
| return self |
| |
| def stop(self): |
| log.debug('stopping') |
| if self.shutting_down: |
| return |
| self.shutting_down = True |
| log.info('stopped') |
| |
| def set_reconnect_callback(self, reconnect_callback): |
| self.reconnect_callback = reconnect_callback |
| return self |
| |
| def connectivity_callback(self, client, connectivity): |
| if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]): |
| log.info("connectivity lost -- restarting") |
| os.execv(sys.executable, ['python'] + sys.argv) |
| |
| if (connectivity == connectivity.READY): |
| self.was_connected = True |
| |
| # Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is |
| # disconnected. So on idle, force a connectivity check. |
| if (connectivity == connectivity.IDLE) and (self.was_connected): |
| connectivity = client.channel._channel.check_connectivity_state(True) |
| # The result will probably show IDLE, but passing in True has the side effect of reconnecting if the |
| # connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for. |
| |
| @inlineCallbacks |
| def connect(self): |
| """ |
| (Re-)Connect to end-point |
| """ |
| |
| if self.shutting_down or self.connected: |
| return |
| |
| try: |
| if self.endpoint.startswith('@'): |
| _endpoint = yield self._get_endpoint_from_consul( |
| self.endpoint[1:]) |
| else: |
| _endpoint = self.endpoint |
| |
| if self.credentials: |
| log.info('securely connecting', endpoint=_endpoint) |
| self.channel = grpc.secure_channel(_endpoint, self.credentials) |
| else: |
| log.info('insecurely connecting', endpoint=_endpoint) |
| self.channel = grpc.insecure_channel(_endpoint) |
| |
| if self.restart_on_disconnect: |
| connectivity_callback = functools.partial(self.connectivity_callback, self) |
| self.channel.subscribe(connectivity_callback) |
| |
| # Delay between initiating connection and executing first gRPC. See CORD-3012. |
| time.sleep(0.5) |
| |
| swagger_from = self._retrieve_schema() |
| self._compile_proto_files(swagger_from) |
| self._clear_backoff() |
| |
| self.connected = True |
| if self.reconnect_callback is not None: |
| reactor.callLater(0, self.reconnect_callback) |
| |
| return |
| |
| except _Rendezvous as e: |
| if e.code() == grpc.StatusCode.UNAVAILABLE: |
| log.info('grpc-endpoint-not-available') |
| else: |
| log.exception('rendezvous error', e=e) |
| yield self._backoff('not-available') |
| |
| except Exception: |
| if not self.shutting_down: |
| log.exception('cannot-connect', endpoint=_endpoint) |
| yield self._backoff('unknown-error') |
| |
| reactor.callLater(0, self.connect) |
| |
| def _backoff(self, msg): |
| wait_time = self.RETRY_BACKOFF[min(self.retries, |
| len(self.RETRY_BACKOFF) - 1)] |
| self.retries += 1 |
| log.error(msg, retry_in=wait_time) |
| return asleep(wait_time) |
| |
| def _clear_backoff(self): |
| if self.retries: |
| log.info('reconnected', after_retries=self.retries) |
| self.retries = 0 |
| |
| @inlineCallbacks |
| def _get_endpoint_from_consul(self, service_name): |
| """ |
| Look up an appropriate grpc endpoint (host, port) from |
| consul, under the service name specified by service-name |
| """ |
| host = self.consul_endpoint.split(':')[0].strip() |
| port = int(self.consul_endpoint.split(':')[1].strip()) |
| |
| while True: |
| log.debug('consul-lookup', host=host, port=port) |
| consul = Consul(host=host, port=port) |
| _, services = consul.catalog.service(service_name) |
| log.debug('consul-response', services=services) |
| if services: |
| break |
| log.warning('no-service', consul_host=host, consul_port=port, |
| service_name=service_name) |
| yield asleep(1.0) |
| |
| # pick local addresses when resolving a service via consul |
| # see CORD-815 (https://jira.opencord.org/browse/CORD-815) |
| |
| service = services[randint(0, len(services) - 1)] |
| endpoint = '{}:{}'.format(service['ServiceAddress'], |
| service['ServicePort']) |
| returnValue(endpoint) |
| |
| def _retrieve_schema(self): |
| """ |
| Retrieve schema from gRPC end-point, and save all *.proto files in |
| the work directory. |
| """ |
| assert isinstance(self.channel, grpc.Channel) |
| stub = SchemaServiceStub(self.channel) |
| # try: |
| schemas = stub.GetSchema(Empty(), timeout=120) |
| # except _Rendezvous, e: |
| # if e.code == grpc.StatusCode.UNAVAILABLE: |
| # |
| # else: |
| # raise e |
| |
| os.system('mkdir -p %s' % self.work_dir) |
| os.system('rm -fr /tmp/%s/*' % |
| self.work_dir.replace('/tmp/', '')) # safer |
| |
| for proto_file in schemas.protos: |
| proto_fname = proto_file.file_name |
| proto_content = proto_file.proto |
| log.debug('saving-proto', fname=proto_fname, dir=self.work_dir, |
| length=len(proto_content)) |
| with open(os.path.join(self.work_dir, proto_fname), 'w') as f: |
| f.write(proto_content) |
| |
| desc_content = decompress(proto_file.descriptor) |
| desc_fname = proto_fname.replace('.proto', '.desc') |
| log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir, |
| length=len(desc_content)) |
| with open(os.path.join(self.work_dir, desc_fname), 'wb') as f: |
| f.write(desc_content) |
| return schemas.swagger_from |
| |
| def _compile_proto_files(self, swagger_from): |
| """ |
| For each *.proto file in the work directory, compile the proto |
| file into the respective *_pb2.py file as well as generate the |
| web server gateway python file *_gw.py. |
| :return: None |
| """ |
| |
| chameleon_base_dir = os.path.abspath(os.path.join( |
| os.path.dirname(__file__), '.' |
| )) |
| |
| for fname in [f for f in os.listdir(self.work_dir) |
| if f.endswith('.proto')]: |
| |
| need_swagger = fname == swagger_from |
| log.debug('compiling', file=fname, need_swagger=need_swagger) |
| cmd = ( |
| 'cd %s && ' |
| 'env PATH=%s PYTHONPATH=%s ' |
| 'python -m grpc.tools.protoc ' |
| '-I. ' |
| '--python_out=. ' |
| '--grpc_python_out=. ' |
| '--plugin=protoc-gen-gw=%s/gw_gen.py ' |
| '--gw_out=. ' |
| '%s' % ( |
| self.work_dir, |
| ':'.join([os.environ['PATH'], self.plugin_dir]), |
| chameleon_base_dir, |
| self.plugin_dir, |
| fname) |
| ) |
| log.debug('executing', cmd=cmd, file=fname) |
| os.system(cmd) |
| log.info('compiled', file=fname) |
| |
| # test-load each _pb2 file to see all is right |
| if self.work_dir not in sys.path: |
| sys.path.insert(0, self.work_dir) |
| |
| for fname in [f for f in os.listdir(self.work_dir) |
| if f.endswith('_pb2.py')]: |
| modname = fname[:-len('.py')] |
| log.debug('test-import', modname=modname) |
| _ = __import__(modname) # noqa: F841 |
| |
| @inlineCallbacks |
| def invoke(self, stub, method_name, request, metadata, retry=1): |
| """ |
| Invoke a gRPC call to the remote server and return the response. |
| :param stub: Reference to the *_pb2 service stub |
| :param method_name: The method name inside the service stub |
| :param request: The request protobuf message |
| :param metadata: [(str, str), (str, str), ...] |
| :return: The response protobuf message and returned trailing metadata |
| """ |
| |
| if not self.connected: |
| raise ConnectError() |
| |
| try: |
| method = getattr(stub(self.channel), method_name) |
| response, rendezvous = method.with_call(request, metadata=metadata) |
| returnValue((response, rendezvous.trailing_metadata())) |
| |
| except grpc._channel._Rendezvous as e: |
| code = e.code() |
| if code == grpc.StatusCode.UNAVAILABLE: |
| e = ConnectError() |
| |
| if self.connected: |
| self.connected = False |
| yield self.connect() |
| if retry > 0: |
| response = yield self.invoke(stub, method_name, |
| request, metadata, |
| retry=retry - 1) |
| returnValue(response) |
| |
| elif code in ( |
| grpc.StatusCode.NOT_FOUND, |
| grpc.StatusCode.INVALID_ARGUMENT, |
| grpc.StatusCode.ALREADY_EXISTS, |
| grpc.StatusCode.UNAUTHENTICATED, |
| grpc.StatusCode.PERMISSION_DENIED): |
| |
| pass # don't log error, these occur naturally |
| |
| else: |
| log.exception(e) |
| |
| raise e |