Chameleon fault tolerance

Change-Id: Id7060f121f85a444005dfeff6279daef51a59295
diff --git a/chameleon/asleep.py b/chameleon/asleep.py
new file mode 100644
index 0000000..e1868ab
--- /dev/null
+++ b/chameleon/asleep.py
@@ -0,0 +1,27 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Async sleep (asleep) method and other twisted goodies """
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+
+
+def asleep(dt):
+    assert isinstance(dt, (int, float))
+    d = Deferred()
+    reactor.callLater(dt, lambda: d.callback(None))
+    return d
diff --git a/chameleon/grpc_client/grpc_client.py b/chameleon/grpc_client/grpc_client.py
index c2d66db..72c8e47 100644
--- a/chameleon/grpc_client/grpc_client.py
+++ b/chameleon/grpc_client/grpc_client.py
@@ -15,11 +15,11 @@
 #
 
 """
-gRPC client meant to connect to a gRPC server endpoint,
-and query the end-point's schema by calling
-SchemaService.Schema(NullMessage) and all of its
+gRPC client meant to connect to a gRPC server endpoint, and query the
+end-point's schema by calling SchemaService.Schema(NullMessage) and all of its
 semantics are derived from the recovered schema.
 """
+
 import os
 import sys
 from random import randint
@@ -27,14 +27,25 @@
 
 import grpc
 from consul import Consul
+from grpc._channel import _Rendezvous
 from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from werkzeug.exceptions import ServiceUnavailable
 
+from chameleon.asleep import asleep
 from chameleon.protos.schema_pb2 import NullMessage, SchemaServiceStub
 
 log = get_logger()
 
 
 class GrpcClient(object):
+    """
+    Connect to a gRPC server, fetch its schema, and process the downloaded
+    schema files to drive the customization of the north-bound interface(s)
+    of Chameleon.
+    """
+    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
 
     def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
         self.consul_endpoint = consul_endpoint
@@ -45,11 +56,13 @@
 
         self.channel = None
         self.schema = None
-
+        self.retries = 0
+        self.on_reconnect = None
         self.shutting_down = False
 
-    def run(self):
-        self.connect()
+    def run(self, on_reconnect=None):
+        self.on_reconnect = on_reconnect
+        reactor.callLater(0, self.connect)
         return self
 
     def shutdown(self):
@@ -58,14 +71,18 @@
         self.shutting_down = True
         pass
 
+    @inlineCallbacks
     def connect(self):
-        """(re-)connect to end-point"""
+        """
+        (Re-)Connect to end-point
+        """
+
         if self.shutting_down:
             return
 
         try:
             if self.endpoint.startswith('@'):
-                _endpoint = self.get_endpoint_from_consul(self.endpoint[1:])
+                _endpoint = self._get_endpoint_from_consul(self.endpoint[1:])
             else:
                 _endpoint = self.endpoint
 
@@ -74,13 +91,43 @@
 
             self._retrieve_schema()
             self._compile_proto_files()
+            self._clear_backoff()
+
+            if self.on_reconnect is not None:
+                reactor.callLater(0, self.on_reconnect)
+
+            return
+
+        except _Rendezvous, e:
+            if e.code() == grpc.StatusCode.UNAVAILABLE:
+                log.info('grpc-endpoint-not-available')
+            else:
+                log.exception(e)
+            yield self._backoff('not-available')
 
         except Exception, e:
-            log.exception('cannot-connect', endpoint=_endpoint)
+            if not self.shutting_down:
+                log.exception('cannot-connect', endpoint=_endpoint)
+            yield self._backoff('unknown-error')
 
-    def get_endpoint_from_consul(self, service_name):
-        """Look up an appropriate grpc endpoint (host, port) from
-           consul, under the service name specified by service-name
+        reactor.callLater(0, self.connect)
+
+    def _backoff(self, msg):
+        wait_time = self.RETRY_BACKOFF[min(self.retries,
+                                           len(self.RETRY_BACKOFF) - 1)]
+        self.retries += 1
+        log.error(msg, retry_in=wait_time)
+        return asleep(wait_time)
+
+    def _clear_backoff(self):
+        if self.retries:
+            log.info('reconnected', after_retries=self.retries)
+            self.retries = 0
+
+    def _get_endpoint_from_consul(self, service_name):
+        """
+        Look up an appropriate grpc endpoint (host, port) from
+        consul, under the service name specified by service-name
         """
         host = self.consul_endpoint.split(':')[0].strip()
         port = int(self.consul_endpoint.split(':')[1].strip())
@@ -100,10 +147,19 @@
         return endpoint
 
     def _retrieve_schema(self):
-        """Retrieve schema from gRPC end-point"""
+        """
+        Retrieve schema from gRPC end-point, and save all *.proto files in
+        the work directory.
+        """
         assert isinstance(self.channel, grpc.Channel)
         stub = SchemaServiceStub(self.channel)
+        # try:
         schema = stub.GetSchema(NullMessage())
+        # except _Rendezvous, e:
+        #     if e.code == grpc.StatusCode.UNAVAILABLE:
+        #
+        #     else:
+        #         raise e
 
         os.system('mkdir -p %s' % self.work_dir)
         os.system('rm -fr /tmp/%s/*' %
@@ -124,7 +180,12 @@
                 f.write(content)
 
     def _compile_proto_files(self):
-
+        """
+        For each *.proto file in the work directory, compile the proto
+        file into the respective *_pb2.py file as well as generate the
+        web server gateway python file *_gw.py.
+        :return: None
+        """
         google_api_dir = os.path.abspath(os.path.join(
             os.path.dirname(__file__),
             '../protos/third_party'
@@ -132,8 +193,8 @@
 
         for fname in [f for f in os.listdir(self.work_dir)
                       if f.endswith('.proto')]:
-            log.info('compiling', file=fname)
 
+            log.debug('compiling', file=fname)
             cmd = (
                 'cd %s && '
                 'env PATH=%s PYTHONPATH=%s '
@@ -152,8 +213,9 @@
                     self.plugin_dir,
                     fname)
             )
-            log.debug('executing', cmd=cmd)
+            log.debug('executing', cmd=cmd, file=fname)
             os.system(cmd)
+            log.info('compiled', file=fname)
 
         # test-load each _pb2 file to see all is right
         if self.work_dir not in sys.path:
@@ -166,5 +228,28 @@
             _ = __import__(modname)
 
     def invoke(self, stub, method_name, request):
-        response = getattr(stub(self.channel), method_name)(request)
-        return response
+        """
+        Invoke a gRPC call to the remote server and return the response.
+        :param stub: Reference to the *_pb2 service stub
+        :param method_name: The method name inside the service stub
+        :param request: The request protobuf message
+        :return: The response protobuf message
+        """
+
+        if self.channel is None:
+            raise ServiceUnavailable()
+
+        try:
+            response = getattr(stub(self.channel), method_name)(request)
+            return response
+
+        except grpc._channel._Rendezvous, e:
+            if e.code() == grpc.StatusCode.UNAVAILABLE:
+                e = ServiceUnavailable()
+            else:
+                log.exception(e)
+
+            self.channel = None
+            reactor.callLater(0, self.connect)
+
+            raise e
diff --git a/chameleon/main.py b/chameleon/main.py
index 470dc84..3892dac 100755
--- a/chameleon/main.py
+++ b/chameleon/main.py
@@ -222,9 +222,11 @@
         self.log.info('starting-internal-components')
         args = self.args
         self.grpc_client = yield \
-            GrpcClient(args.consul, args.work_dir, args.grpc_endpoint).run()
+            GrpcClient(args.consul, args.work_dir, args.grpc_endpoint)
         self.web_server = yield \
             WebServer(args.rest_port, args.work_dir, self.grpc_client).run()
+        self.grpc_client.run(
+            on_reconnect=self.web_server.reload_generated_routes)
         self.log.info('started-internal-services')
 
     @inlineCallbacks
diff --git a/chameleon/web_server/web_server.py b/chameleon/web_server/web_server.py
index af8c6c5..c46ad88 100644
--- a/chameleon/web_server/web_server.py
+++ b/chameleon/web_server/web_server.py
@@ -25,6 +25,7 @@
 from twisted.internet.tcp import Port
 from twisted.web.server import Site
 from twisted.web.static import File
+from werkzeug.exceptions import BadRequest
 
 log = get_logger()
 
@@ -43,23 +44,13 @@
             os.path.join(os.path.dirname(__file__), '../swagger_ui'))
 
         self.tcp_port = None
+        self.shutting_down = False
 
     @inlineCallbacks
     def run(self):
         yield self._open_endpoint()
-        yield self._load_generated_routes()
         returnValue(self)
 
-    def _load_generated_routes(self):
-        for fname in os.listdir(self.work_dir):
-            if fname.endswith('_gw.py'):
-                module_name = fname.replace('.py', '')
-                print 'module_name', module_name
-                m = __import__(module_name)
-                print dir(m)
-                assert hasattr(m, 'add_routes')
-                m.add_routes(self.app, self.grpc_client)
-
     @inlineCallbacks
     def _open_endpoint(self):
         endpoint = endpoints.TCP4ServerEndpoint(reactor, self.port)
@@ -70,10 +61,20 @@
 
     @inlineCallbacks
     def shutdown(self):
-        if self.tcp_porte is not None:
+        self.shutting_down = True
+        if self.tcp_port is not None:
             assert isinstance(self.tcp_port, Port)
             yield self.tcp_port.socket.close()
 
+    def reload_generated_routes(self):
+        for fname in os.listdir(self.work_dir):
+            if fname.endswith('_gw.py'):
+                module_name = fname.replace('.py', '')
+                m = __import__(module_name)
+                assert hasattr(m, 'add_routes')
+                m.add_routes(self.app, self.grpc_client)
+                log.info('routes-loaded', module=module_name)
+
     # static swagger_ui website as landing page (for now)
 
     @app.route('/', branch=True)
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index fe0cc52..d023d6b 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -322,6 +322,7 @@
         while 1:
             try:
                 result = yield func(*args, **kw)
+                self._clear_backoff()
                 break
             except ConsulException, e:
                 yield self._backoff('consul-not-up')