+gRPC client meant to connect to a gRPC server endpoint, and query the
+end-point's schema by calling SchemaService.Schema(Empty) and all of its
+semantics are derived from the recovered schema.
+import os
+import sys
+import time
+from random import randint
+from zlib import decompress
+import functools
+import grpc
+from consul import Consul
+from grpc._channel import _Rendezvous
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from werkzeug.exceptions import ServiceUnavailable
+from protos.schema_pb2_grpc import SchemaServiceStub
+from google.protobuf.empty_pb2 import Empty
+from asleep import asleep
+log = get_logger()
+class GrpcClient(object):
+    """
+    Connect to a gRPC server, fetch its schema, and process the downloaded
+    schema files to drive the customization of the north-bound interface(s)
+    of Chameleon.
+    """
+    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+    def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
+                 reconnect_callback=None, credentials=None, restart_on_disconnect=False):
+        self.consul_endpoint = consul_endpoint
+        self.endpoint = endpoint
+        self.work_dir = work_dir
+        self.reconnect_callback = reconnect_callback
+        self.credentials = credentials
+        self.restart_on_disconnect = restart_on_disconnect
+        self.plugin_dir = os.path.abspath(os.path.join(
+            os.path.dirname(__file__), 'protoc_plugins'))
+ = None
+        self.schema = None
+        self.retries = 0
+        self.shutting_down = False
+        self.connected = False
+        self.was_connected = False
+    def start(self):
+        log.debug('starting')
+        if not self.connected:
+            reactor.callLater(0, self.connect)
+        return self
+    def stop(self):
+        log.debug('stopping')
+        if self.shutting_down:
+            return
+        self.shutting_down = True
+    def set_reconnect_callback(self, reconnect_callback):
+        self.reconnect_callback = reconnect_callback
+        return self
+    def connectivity_callback(self, client, connectivity):
+        if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]):
+  "connectivity lost -- restarting")
+            os.execv(sys.executable, ['python'] + sys.argv)
+        if (connectivity == connectivity.READY):
+            self.was_connected = True
+        # Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is
+        # disconnected. So on idle, force a connectivity check.
+        if (connectivity == connectivity.IDLE) and (self.was_connected):
+            connectivity =
+            # The result will probably show IDLE, but passing in True has the side effect of reconnecting if the
+            # connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for.
+    @inlineCallbacks
+    def connect(self):
+        """
+        (Re-)Connect to end-point
+        """
+        if self.shutting_down or self.connected:
+            return
+        try:
+            if self.endpoint.startswith('@'):
+                _endpoint = yield self._get_endpoint_from_consul(
+                    self.endpoint[1:])
+            else:
+                _endpoint = self.endpoint
+            if self.credentials:
+      'securely connecting', endpoint=_endpoint)
+       = grpc.secure_channel(_endpoint, self.credentials)
+            else:
+      'insecurely connecting', endpoint=_endpoint)
+       = grpc.insecure_channel(_endpoint)
+            if self.restart_on_disconnect:
+                connectivity_callback = functools.partial(self.connectivity_callback, self)
+            # Delay between initiating connection and executing first gRPC. See CORD-3012.
+            time.sleep(0.5)
+            swagger_from = self._retrieve_schema()
+            self._compile_proto_files(swagger_from)
+            self._clear_backoff()
+            self.connected = True
+            if self.reconnect_callback is not None:
+                reactor.callLater(0, self.reconnect_callback)
+            return
+        except _Rendezvous, e:
+            if e.code() == grpc.StatusCode.UNAVAILABLE:
+      'grpc-endpoint-not-available')
+            else:
+                log.exception('rendezvous error', e=e)
+            yield self._backoff('not-available')
+        except Exception, e:
+            if not self.shutting_down:
+                log.exception('cannot-connect', endpoint=_endpoint)
+            yield self._backoff('unknown-error')
+        reactor.callLater(0, self.connect)
+    def _backoff(self, msg):
+        wait_time = self.RETRY_BACKOFF[min(self.retries,
+                                           len(self.RETRY_BACKOFF) - 1)]
+        self.retries += 1
+        log.error(msg, retry_in=wait_time)
+        return asleep(wait_time)
+    def _clear_backoff(self):
+        if self.retries:
+  'reconnected', after_retries=self.retries)
+            self.retries = 0
+    @inlineCallbacks
+    def _get_endpoint_from_consul(self, service_name):
+        """
+        Look up an appropriate grpc endpoint (host, port) from
+        consul, under the service name specified by service-name
+        """
+        host = self.consul_endpoint.split(':')[0].strip()
+        port = int(self.consul_endpoint.split(':')[1].strip())
+        while True:
+            log.debug('consul-lookup', host=host, port=port)
+            consul = Consul(host=host, port=port)
+            _, services = consul.catalog.service(service_name)
+            log.debug('consul-response', services=services)
+            if services:
+                break
+            log.warning('no-service', consul_host=host, consul_port=port,
+                        service_name=service_name)
+            yield asleep(1.0)
+        # pick local addresses when resolving a service via consul
+        # see CORD-815 (
+        service = services[randint(0, len(services) - 1)]
+        endpoint = '{}:{}'.format(service['ServiceAddress'],
+                                  service['ServicePort'])
+        returnValue(endpoint)
+    def _retrieve_schema(self):
+        """
+        Retrieve schema from gRPC end-point, and save all *.proto files in
+        the work directory.
+        """
+        assert isinstance(, grpc.Channel)
+        stub = SchemaServiceStub(
+        # try:
+        schemas = stub.GetSchema(Empty(), timeout=120)
+        # except _Rendezvous, e:
+        #     if e.code == grpc.StatusCode.UNAVAILABLE:
+        #
+        #     else:
+        #         raise e
+        os.system('mkdir -p %s' % self.work_dir)
+        os.system('rm -fr /tmp/%s/*' %
+                  self.work_dir.replace('/tmp/', ''))  # safer
+        for proto_file in schemas.protos:
+            proto_fname = proto_file.file_name
+            proto_content = proto_file.proto
+            log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
+                      length=len(proto_content))
+            with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
+                f.write(proto_content)
+            desc_content = decompress(proto_file.descriptor)
+            desc_fname = proto_fname.replace('.proto', '.desc')
+            log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
+                      length=len(desc_content))
+            with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
+                f.write(desc_content)
+        return schemas.swagger_from
+    def _compile_proto_files(self, swagger_from):
+        """
+        For each *.proto file in the work directory, compile the proto
+        file into the respective * file as well as generate the
+        web server gateway python file *
+        :return: None
+        """
+        chameleon_base_dir = os.path.abspath(os.path.join(
+            os.path.dirname(__file__), '.'
+        ))
+        for fname in [f for f in os.listdir(self.work_dir)
+                      if f.endswith('.proto')]:
+            need_swagger = fname == swagger_from
+            log.debug('compiling', file=fname, need_swagger=need_swagger)
+            cmd = (
+                'cd %s && '
+                'env PATH=%s PYTHONPATH=%s '
+                'python -m '
+                '-I. '
+                '--python_out=. '
+                '--grpc_python_out=. '
+                '--plugin=protoc-gen-gw=%s/ '
+                '--gw_out=. '
+                '%s' % (
+                    self.work_dir,
+                    ':'.join([os.environ['PATH'], self.plugin_dir]),
+                    chameleon_base_dir,
+                    self.plugin_dir,
+                    fname)
+            )
+            log.debug('executing', cmd=cmd, file=fname)
+            os.system(cmd)
+  'compiled', file=fname)
+        # test-load each _pb2 file to see all is right
+        if self.work_dir not in sys.path:
+            sys.path.insert(0, self.work_dir)
+        for fname in [f for f in os.listdir(self.work_dir)
+                      if f.endswith('')]:
+            modname = fname[:-len('.py')]
+            log.debug('test-import', modname=modname)
+            _ = __import__(modname)
+    @inlineCallbacks
+    def invoke(self, stub, method_name, request, metadata, retry=1):
+        """
+        Invoke a gRPC call to the remote server and return the response.
+        :param stub: Reference to the *_pb2 service stub
+        :param method_name: The method name inside the service stub
+        :param request: The request protobuf message
+        :param metadata: [(str, str), (str, str), ...]
+        :return: The response protobuf message and returned trailing metadata
+        """
+        if not self.connected:
+            raise ServiceUnavailable()
+        try:
+            method = getattr(stub(, method_name)
+            response, rendezvous = method.with_call(request, metadata=metadata)
+            returnValue((response, rendezvous.trailing_metadata()))
+        except grpc._channel._Rendezvous, e:
+            code = e.code()
+            if code == grpc.StatusCode.UNAVAILABLE:
+                e = ServiceUnavailable()
+                if self.connected:
+                    self.connected = False
+                    yield self.connect()
+                    if retry > 0:
+                        response = yield self.invoke(stub, method_name,
+                                                     request, metadata,
+                                                     retry=retry - 1)
+                        returnValue(response)
+            elif code in (
+                    grpc.StatusCode.NOT_FOUND,
+                    grpc.StatusCode.INVALID_ARGUMENT,
+                    grpc.StatusCode.ALREADY_EXISTS,
+                    grpc.StatusCode.UNAUTHENTICATED,
+                    grpc.StatusCode.PERMISSION_DENIED):
+                pass  # don't log error, these occur naturally
+            else:
+                log.exception(e)
+            raise e