blob: 2b2d1e0d8290f30e3641dc22e73bdbe1533c31d1 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
gRPC client meant to connect to a gRPC server endpoint, and query the
end-point's schema by calling SchemaService.Schema(Empty) and all of its
semantics are derived from the recovered schema.
"""
from __future__ import absolute_import
import os
import sys
import time
from random import randint
from zlib import decompress
import functools
import grpc
from consul import Consul
from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.error import ConnectError
from .protos.schema_pb2_grpc import SchemaServiceStub
from google.protobuf.empty_pb2 import Empty
from .asleep import asleep
log = get_logger()
class GrpcClient(object):
"""
Connect to a gRPC server, fetch its schema, and process the downloaded
schema files to drive the customization of the north-bound interface(s)
of Chameleon.
"""
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
reconnect_callback=None, credentials=None, restart_on_disconnect=False):
self.consul_endpoint = consul_endpoint
self.endpoint = endpoint
self.work_dir = work_dir
self.reconnect_callback = reconnect_callback
self.credentials = credentials
self.restart_on_disconnect = restart_on_disconnect
self.google_api_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), 'protos'))
self.plugin_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), 'protoc_plugins'))
self.channel = None
self.schema = None
self.retries = 0
self.shutting_down = False
self.connected = False
self.was_connected = False
def start(self):
log.debug('starting')
if not self.connected:
reactor.callLater(0, self.connect)
log.info('started')
return self
def stop(self):
log.debug('stopping')
if self.shutting_down:
return
self.shutting_down = True
log.info('stopped')
def set_reconnect_callback(self, reconnect_callback):
self.reconnect_callback = reconnect_callback
return self
def connectivity_callback(self, client, connectivity):
if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]):
log.info("connectivity lost -- restarting")
os.execv(sys.executable, ['python'] + sys.argv)
if (connectivity == connectivity.READY):
self.was_connected = True
# Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is
# disconnected. So on idle, force a connectivity check.
if (connectivity == connectivity.IDLE) and (self.was_connected):
connectivity = client.channel._channel.check_connectivity_state(True)
# The result will probably show IDLE, but passing in True has the side effect of reconnecting if the
# connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for.
@inlineCallbacks
def connect(self):
"""
(Re-)Connect to end-point
"""
if self.shutting_down or self.connected:
return
try:
if self.endpoint.startswith('@'):
_endpoint = yield self._get_endpoint_from_consul(
self.endpoint[1:])
else:
_endpoint = self.endpoint
if self.credentials:
log.info('securely connecting', endpoint=_endpoint)
self.channel = grpc.secure_channel(_endpoint, self.credentials)
else:
log.info('insecurely connecting', endpoint=_endpoint)
self.channel = grpc.insecure_channel(_endpoint)
if self.restart_on_disconnect:
connectivity_callback = functools.partial(self.connectivity_callback, self)
self.channel.subscribe(connectivity_callback)
# Delay between initiating connection and executing first gRPC. See CORD-3012.
time.sleep(0.5)
swagger_from = self._retrieve_schema()
self._compile_proto_files(swagger_from)
self._clear_backoff()
self.connected = True
if self.reconnect_callback is not None:
reactor.callLater(0, self.reconnect_callback)
return
except _Rendezvous as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
log.info('grpc-endpoint-not-available')
else:
log.exception('rendezvous error', e=e)
yield self._backoff('not-available')
except Exception:
if not self.shutting_down:
log.exception('cannot-connect', endpoint=_endpoint)
yield self._backoff('unknown-error')
reactor.callLater(0, self.connect)
def _backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
log.error(msg, retry_in=wait_time)
return asleep(wait_time)
def _clear_backoff(self):
if self.retries:
log.info('reconnected', after_retries=self.retries)
self.retries = 0
@inlineCallbacks
def _get_endpoint_from_consul(self, service_name):
"""
Look up an appropriate grpc endpoint (host, port) from
consul, under the service name specified by service-name
"""
host = self.consul_endpoint.split(':')[0].strip()
port = int(self.consul_endpoint.split(':')[1].strip())
while True:
log.debug('consul-lookup', host=host, port=port)
consul = Consul(host=host, port=port)
_, services = consul.catalog.service(service_name)
log.debug('consul-response', services=services)
if services:
break
log.warning('no-service', consul_host=host, consul_port=port,
service_name=service_name)
yield asleep(1.0)
# pick local addresses when resolving a service via consul
# see CORD-815 (https://jira.opencord.org/browse/CORD-815)
service = services[randint(0, len(services) - 1)]
endpoint = '{}:{}'.format(service['ServiceAddress'],
service['ServicePort'])
returnValue(endpoint)
def _retrieve_schema(self):
"""
Retrieve schema from gRPC end-point, and save all *.proto files in
the work directory.
"""
assert isinstance(self.channel, grpc.Channel)
stub = SchemaServiceStub(self.channel)
# try:
schemas = stub.GetSchema(Empty(), timeout=120)
# except _Rendezvous, e:
# if e.code == grpc.StatusCode.UNAVAILABLE:
#
# else:
# raise e
os.system('mkdir -p %s' % self.work_dir)
os.system('rm -fr /tmp/%s/*' %
self.work_dir.replace('/tmp/', '')) # safer
for proto_file in schemas.protos:
proto_fname = proto_file.file_name
proto_content = proto_file.proto
log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
length=len(proto_content))
with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
f.write(proto_content)
desc_content = decompress(proto_file.descriptor)
desc_fname = proto_fname.replace('.proto', '.desc')
log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
length=len(desc_content))
with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
f.write(desc_content)
return schemas.swagger_from
def _compile_proto_files(self, swagger_from):
"""
For each *.proto file in the work directory, compile the proto
file into the respective *_pb2.py file as well as generate the
web server gateway python file *_gw.py.
:return: None
"""
chameleon_base_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '.'
))
for fname in [f for f in os.listdir(self.work_dir)
if f.endswith('.proto')]:
need_swagger = fname == swagger_from
log.debug('compiling', file=fname, need_swagger=need_swagger)
cmd = (
'cd %s && '
'env PATH=%s PYTHONPATH=%s '
'python -m grpc.tools.protoc '
'-I. -I %s '
'--python_out=. '
'--grpc_python_out=. '
'--plugin=protoc-gen-gw=%s/gw_gen.py '
'--gw_out=. '
'%s' % (
self.work_dir,
':'.join([os.environ['PATH'], self.plugin_dir]),
chameleon_base_dir,
self.google_api_dir,
self.plugin_dir,
fname)
)
log.debug('executing', cmd=cmd, file=fname)
os.system(cmd)
log.info('compiled', file=fname)
# test-load each _pb2 file to see all is right
if self.work_dir not in sys.path:
sys.path.insert(0, self.work_dir)
for fname in [f for f in os.listdir(self.work_dir)
if f.endswith('_pb2.py')]:
modname = fname[:-len('.py')]
log.debug('test-import', modname=modname)
_ = __import__(modname) # noqa: F841
@inlineCallbacks
def invoke(self, stub, method_name, request, metadata, retry=1):
"""
Invoke a gRPC call to the remote server and return the response.
:param stub: Reference to the *_pb2 service stub
:param method_name: The method name inside the service stub
:param request: The request protobuf message
:param metadata: [(str, str), (str, str), ...]
:return: The response protobuf message and returned trailing metadata
"""
if not self.connected:
raise ConnectError()
try:
method = getattr(stub(self.channel), method_name)
response, rendezvous = method.with_call(request, metadata=metadata)
returnValue((response, rendezvous.trailing_metadata()))
except grpc._channel._Rendezvous as e:
code = e.code()
if code == grpc.StatusCode.UNAVAILABLE:
e = ConnectError()
if self.connected:
self.connected = False
yield self.connect()
if retry > 0:
response = yield self.invoke(stub, method_name,
request, metadata,
retry=retry - 1)
returnValue(response)
elif code in (
grpc.StatusCode.NOT_FOUND,
grpc.StatusCode.INVALID_ARGUMENT,
grpc.StatusCode.ALREADY_EXISTS,
grpc.StatusCode.UNAUTHENTICATED,
grpc.StatusCode.PERMISSION_DENIED):
pass # don't log error, these occur naturally
else:
log.exception(e)
raise e