Chameleon fault tolerance
Change-Id: Id7060f121f85a444005dfeff6279daef51a59295
diff --git a/chameleon/grpc_client/grpc_client.py b/chameleon/grpc_client/grpc_client.py
index c2d66db..72c8e47 100644
--- a/chameleon/grpc_client/grpc_client.py
+++ b/chameleon/grpc_client/grpc_client.py
@@ -15,11 +15,11 @@
#
"""
-gRPC client meant to connect to a gRPC server endpoint,
-and query the end-point's schema by calling
-SchemaService.Schema(NullMessage) and all of its
+gRPC client meant to connect to a gRPC server endpoint, and query the
+end-point's schema by calling SchemaService.Schema(NullMessage) and all of its
semantics are derived from the recovered schema.
"""
+
import os
import sys
from random import randint
@@ -27,14 +27,25 @@
import grpc
from consul import Consul
+from grpc._channel import _Rendezvous
from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from werkzeug.exceptions import ServiceUnavailable
+from chameleon.asleep import asleep
from chameleon.protos.schema_pb2 import NullMessage, SchemaServiceStub
log = get_logger()
class GrpcClient(object):
+ """
+ Connect to a gRPC server, fetch its schema, and process the downloaded
+ schema files to drive the customization of the north-bound interface(s)
+ of Chameleon.
+ """
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
self.consul_endpoint = consul_endpoint
@@ -45,11 +56,13 @@
self.channel = None
self.schema = None
-
+ self.retries = 0
+ self.on_reconnect = None
self.shutting_down = False
- def run(self):
- self.connect()
+ def run(self, on_reconnect=None):
+ self.on_reconnect = on_reconnect
+ reactor.callLater(0, self.connect)
return self
def shutdown(self):
@@ -58,14 +71,18 @@
self.shutting_down = True
pass
+ @inlineCallbacks
def connect(self):
- """(re-)connect to end-point"""
+ """
+ (Re-)Connect to end-point
+ """
+
if self.shutting_down:
return
try:
if self.endpoint.startswith('@'):
- _endpoint = self.get_endpoint_from_consul(self.endpoint[1:])
+ _endpoint = self._get_endpoint_from_consul(self.endpoint[1:])
else:
_endpoint = self.endpoint
@@ -74,13 +91,43 @@
self._retrieve_schema()
self._compile_proto_files()
+ self._clear_backoff()
+
+ if self.on_reconnect is not None:
+ reactor.callLater(0, self.on_reconnect)
+
+ return
+
+ except _Rendezvous, e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ log.info('grpc-endpoint-not-available')
+ else:
+ log.exception(e)
+ yield self._backoff('not-available')
except Exception, e:
- log.exception('cannot-connect', endpoint=_endpoint)
+ if not self.shutting_down:
+ log.exception('cannot-connect', endpoint=_endpoint)
+ yield self._backoff('unknown-error')
- def get_endpoint_from_consul(self, service_name):
- """Look up an appropriate grpc endpoint (host, port) from
- consul, under the service name specified by service-name
+ reactor.callLater(0, self.connect)
+
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_endpoint_from_consul(self, service_name):
+ """
+ Look up an appropriate grpc endpoint (host, port) from
+ consul, under the service name specified by service-name
"""
host = self.consul_endpoint.split(':')[0].strip()
port = int(self.consul_endpoint.split(':')[1].strip())
@@ -100,10 +147,19 @@
return endpoint
def _retrieve_schema(self):
- """Retrieve schema from gRPC end-point"""
+ """
+ Retrieve schema from gRPC end-point, and save all *.proto files in
+ the work directory.
+ """
assert isinstance(self.channel, grpc.Channel)
stub = SchemaServiceStub(self.channel)
+ # try:
schema = stub.GetSchema(NullMessage())
+ # except _Rendezvous, e:
+ # if e.code == grpc.StatusCode.UNAVAILABLE:
+ #
+ # else:
+ # raise e
os.system('mkdir -p %s' % self.work_dir)
os.system('rm -fr /tmp/%s/*' %
@@ -124,7 +180,12 @@
f.write(content)
def _compile_proto_files(self):
-
+ """
+ For each *.proto file in the work directory, compile the proto
+ file into the respective *_pb2.py file as well as generate the
+ web server gateway python file *_gw.py.
+ :return: None
+ """
google_api_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__),
'../protos/third_party'
@@ -132,8 +193,8 @@
for fname in [f for f in os.listdir(self.work_dir)
if f.endswith('.proto')]:
- log.info('compiling', file=fname)
+ log.debug('compiling', file=fname)
cmd = (
'cd %s && '
'env PATH=%s PYTHONPATH=%s '
@@ -152,8 +213,9 @@
self.plugin_dir,
fname)
)
- log.debug('executing', cmd=cmd)
+ log.debug('executing', cmd=cmd, file=fname)
os.system(cmd)
+ log.info('compiled', file=fname)
# test-load each _pb2 file to see all is right
if self.work_dir not in sys.path:
@@ -166,5 +228,28 @@
_ = __import__(modname)
def invoke(self, stub, method_name, request):
- response = getattr(stub(self.channel), method_name)(request)
- return response
+ """
+ Invoke a gRPC call to the remote server and return the response.
+ :param stub: Reference to the *_pb2 service stub
+ :param method_name: The method name inside the service stub
+ :param request: The request protobuf message
+ :return: The response protobuf message
+ """
+
+ if self.channel is None:
+ raise ServiceUnavailable()
+
+ try:
+ response = getattr(stub(self.channel), method_name)(request)
+ return response
+
+ except grpc._channel._Rendezvous, e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ e = ServiceUnavailable()
+ else:
+ log.exception(e)
+
+ self.channel = None
+ reactor.callLater(0, self.connect)
+
+ raise e