SEBA-495 eliminate chameleon dependency
Change-Id: Ia359d751c3ac84bf8f7038f611d1c5f1a126d1df
diff --git a/lib/xos-api/xosapi/chameleon_client/grpc_client.py b/lib/xos-api/xosapi/chameleon_client/grpc_client.py
new file mode 100644
index 0000000..a8ad69d
--- /dev/null
+++ b/lib/xos-api/xosapi/chameleon_client/grpc_client.py
@@ -0,0 +1,327 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+gRPC client meant to connect to a gRPC server endpoint, and query the
+end-point's schema by calling SchemaService.Schema(Empty) and all of its
+semantics are derived from the recovered schema.
+"""
+
+import os
+import sys
+import time
+from random import randint
+from zlib import decompress
+
+import functools
+import grpc
+from consul import Consul
+from grpc._channel import _Rendezvous
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from werkzeug.exceptions import ServiceUnavailable
+
+from protos.schema_pb2_grpc import SchemaServiceStub
+from google.protobuf.empty_pb2 import Empty
+
+from asleep import asleep
+
+log = get_logger()
+
+
+class GrpcClient(object):
+ """
+ Connect to a gRPC server, fetch its schema, and process the downloaded
+ schema files to drive the customization of the north-bound interface(s)
+ of Chameleon.
+ """
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
+ reconnect_callback=None, credentials=None, restart_on_disconnect=False):
+ self.consul_endpoint = consul_endpoint
+ self.endpoint = endpoint
+ self.work_dir = work_dir
+ self.reconnect_callback = reconnect_callback
+ self.credentials = credentials
+ self.restart_on_disconnect = restart_on_disconnect
+
+ self.plugin_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), 'protoc_plugins'))
+
+ self.channel = None
+ self.schema = None
+ self.retries = 0
+ self.shutting_down = False
+ self.connected = False
+ self.was_connected = False
+
+ def start(self):
+ log.debug('starting')
+ if not self.connected:
+ reactor.callLater(0, self.connect)
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ if self.shutting_down:
+ return
+ self.shutting_down = True
+ log.info('stopped')
+
+ def set_reconnect_callback(self, reconnect_callback):
+ self.reconnect_callback = reconnect_callback
+ return self
+
+ def connectivity_callback(self, client, connectivity):
+ if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]):
+ log.info("connectivity lost -- restarting")
+ os.execv(sys.executable, ['python'] + sys.argv)
+
+ if (connectivity == connectivity.READY):
+ self.was_connected = True
+
+ # Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is
+ # disconnected. So on idle, force a connectivity check.
+ if (connectivity == connectivity.IDLE) and (self.was_connected):
+ connectivity = client.channel._channel.check_connectivity_state(True)
+ # The result will probably show IDLE, but passing in True has the side effect of reconnecting if the
+ # connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for.
+
+ @inlineCallbacks
+ def connect(self):
+ """
+ (Re-)Connect to end-point
+ """
+
+ if self.shutting_down or self.connected:
+ return
+
+ try:
+ if self.endpoint.startswith('@'):
+ _endpoint = yield self._get_endpoint_from_consul(
+ self.endpoint[1:])
+ else:
+ _endpoint = self.endpoint
+
+ if self.credentials:
+ log.info('securely connecting', endpoint=_endpoint)
+ self.channel = grpc.secure_channel(_endpoint, self.credentials)
+ else:
+ log.info('insecurely connecting', endpoint=_endpoint)
+ self.channel = grpc.insecure_channel(_endpoint)
+
+ if self.restart_on_disconnect:
+ connectivity_callback = functools.partial(self.connectivity_callback, self)
+ self.channel.subscribe(connectivity_callback)
+
+ # Delay between initiating connection and executing first gRPC. See CORD-3012.
+ time.sleep(0.5)
+
+ swagger_from = self._retrieve_schema()
+ self._compile_proto_files(swagger_from)
+ self._clear_backoff()
+
+ self.connected = True
+ if self.reconnect_callback is not None:
+ reactor.callLater(0, self.reconnect_callback)
+
+ return
+
+ except _Rendezvous, e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ log.info('grpc-endpoint-not-available')
+ else:
+ log.exception('rendezvous error', e=e)
+ yield self._backoff('not-available')
+
+ except Exception, e:
+ if not self.shutting_down:
+ log.exception('cannot-connect', endpoint=_endpoint)
+ yield self._backoff('unknown-error')
+
+ reactor.callLater(0, self.connect)
+
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected', after_retries=self.retries)
+ self.retries = 0
+
+ @inlineCallbacks
+ def _get_endpoint_from_consul(self, service_name):
+ """
+ Look up an appropriate grpc endpoint (host, port) from
+ consul, under the service name specified by service-name
+ """
+ host = self.consul_endpoint.split(':')[0].strip()
+ port = int(self.consul_endpoint.split(':')[1].strip())
+
+ while True:
+ log.debug('consul-lookup', host=host, port=port)
+ consul = Consul(host=host, port=port)
+ _, services = consul.catalog.service(service_name)
+ log.debug('consul-response', services=services)
+ if services:
+ break
+ log.warning('no-service', consul_host=host, consul_port=port,
+ service_name=service_name)
+ yield asleep(1.0)
+
+ # pick local addresses when resolving a service via consul
+ # see CORD-815 (https://jira.opencord.org/browse/CORD-815)
+
+ service = services[randint(0, len(services) - 1)]
+ endpoint = '{}:{}'.format(service['ServiceAddress'],
+ service['ServicePort'])
+ returnValue(endpoint)
+
+ def _retrieve_schema(self):
+ """
+ Retrieve schema from gRPC end-point, and save all *.proto files in
+ the work directory.
+ """
+ assert isinstance(self.channel, grpc.Channel)
+ stub = SchemaServiceStub(self.channel)
+ # try:
+ schemas = stub.GetSchema(Empty(), timeout=120)
+ # except _Rendezvous, e:
+ # if e.code == grpc.StatusCode.UNAVAILABLE:
+ #
+ # else:
+ # raise e
+
+ os.system('mkdir -p %s' % self.work_dir)
+ os.system('rm -fr /tmp/%s/*' %
+ self.work_dir.replace('/tmp/', '')) # safer
+
+ for proto_file in schemas.protos:
+ proto_fname = proto_file.file_name
+ proto_content = proto_file.proto
+ log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
+ length=len(proto_content))
+ with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
+ f.write(proto_content)
+
+ desc_content = decompress(proto_file.descriptor)
+ desc_fname = proto_fname.replace('.proto', '.desc')
+ log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
+ length=len(desc_content))
+ with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
+ f.write(desc_content)
+ return schemas.swagger_from
+
+ def _compile_proto_files(self, swagger_from):
+ """
+ For each *.proto file in the work directory, compile the proto
+ file into the respective *_pb2.py file as well as generate the
+ web server gateway python file *_gw.py.
+ :return: None
+ """
+
+ chameleon_base_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '.'
+ ))
+
+ for fname in [f for f in os.listdir(self.work_dir)
+ if f.endswith('.proto')]:
+
+ need_swagger = fname == swagger_from
+ log.debug('compiling', file=fname, need_swagger=need_swagger)
+ cmd = (
+ 'cd %s && '
+ 'env PATH=%s PYTHONPATH=%s '
+ 'python -m grpc.tools.protoc '
+ '-I. '
+ '--python_out=. '
+ '--grpc_python_out=. '
+ '--plugin=protoc-gen-gw=%s/gw_gen.py '
+ '--gw_out=. '
+ '%s' % (
+ self.work_dir,
+ ':'.join([os.environ['PATH'], self.plugin_dir]),
+ chameleon_base_dir,
+ self.plugin_dir,
+ fname)
+ )
+ log.debug('executing', cmd=cmd, file=fname)
+ os.system(cmd)
+ log.info('compiled', file=fname)
+
+ # test-load each _pb2 file to see all is right
+ if self.work_dir not in sys.path:
+ sys.path.insert(0, self.work_dir)
+
+ for fname in [f for f in os.listdir(self.work_dir)
+ if f.endswith('_pb2.py')]:
+ modname = fname[:-len('.py')]
+ log.debug('test-import', modname=modname)
+ _ = __import__(modname)
+
+ @inlineCallbacks
+ def invoke(self, stub, method_name, request, metadata, retry=1):
+ """
+ Invoke a gRPC call to the remote server and return the response.
+ :param stub: Reference to the *_pb2 service stub
+ :param method_name: The method name inside the service stub
+ :param request: The request protobuf message
+ :param metadata: [(str, str), (str, str), ...]
+ :return: The response protobuf message and returned trailing metadata
+ """
+
+ if not self.connected:
+ raise ServiceUnavailable()
+
+ try:
+ method = getattr(stub(self.channel), method_name)
+ response, rendezvous = method.with_call(request, metadata=metadata)
+ returnValue((response, rendezvous.trailing_metadata()))
+
+ except grpc._channel._Rendezvous, e:
+ code = e.code()
+ if code == grpc.StatusCode.UNAVAILABLE:
+ e = ServiceUnavailable()
+
+ if self.connected:
+ self.connected = False
+ yield self.connect()
+ if retry > 0:
+ response = yield self.invoke(stub, method_name,
+ request, metadata,
+ retry=retry - 1)
+ returnValue(response)
+
+ elif code in (
+ grpc.StatusCode.NOT_FOUND,
+ grpc.StatusCode.INVALID_ARGUMENT,
+ grpc.StatusCode.ALREADY_EXISTS,
+ grpc.StatusCode.UNAUTHENTICATED,
+ grpc.StatusCode.PERMISSION_DENIED):
+
+ pass # don't log error, these occur naturally
+
+ else:
+ log.exception(e)
+
+ raise e