Adding better kafka error handling

Change-Id: I37b90dbf19adb975cd207ed8b359ee02a499e6c2
diff --git a/voltha/main.py b/voltha/main.py
index f28cdbe..a19c1ba 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -234,26 +234,31 @@
 
     @inlineCallbacks
     def startup_components(self):
-        self.log.info('starting-internal-components')
-        self.coordinator = yield Coordinator(
-            internal_host_address=self.args.internal_host_address,
-            external_host_address=self.args.external_host_address,
-            rest_port=self.args.rest_port,
-            instance_id=self.args.instance_id,
-            config=self.config,
-            consul=self.args.consul).start()
-        init_rest_service(self.args.rest_port)
+        try:
+            self.log.info('starting-internal-components')
+            self.coordinator = yield Coordinator(
+                internal_host_address=self.args.internal_host_address,
+                external_host_address=self.args.external_host_address,
+                rest_port=self.args.rest_port,
+                instance_id=self.args.instance_id,
+                config=self.config,
+                consul=self.args.consul).start()
+            init_rest_service(self.args.rest_port)
 
-        self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
+            self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
 
-        # initialize kafka proxy singleton
-        self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
+            # initialize kafka proxy singleton
+            self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
 
-        # adapter loader
-        self.adapter_loader = yield AdapterLoader(
-            config=self.config.get('adapter_loader', {})).start()
+            # adapter loader
+            self.adapter_loader = yield AdapterLoader(
+                config=self.config.get('adapter_loader', {})).start()
 
-        self.log.info('started-internal-services')
+            self.log.info('started-internal-services')
+
+        except Exception as e:
+            self.log.exception('Failure to start all components {}'.format(e))
+
 
     @inlineCallbacks
     def shutdown_components(self):
@@ -296,9 +301,12 @@
         topic = "voltha-heartbeat"
 
         from twisted.internet.task import LoopingCall
-        lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
-        lc.start(10)
-
+        kafka_proxy = get_kafka_proxy()
+        if kafka_proxy:
+            lc = LoopingCall(kafka_proxy.send_message, topic, message)
+            lc.start(10)
+        else:
+            self.log.error('Kafka proxy has not been created!')
 
 if __name__ == '__main__':
     Main().start()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 1e95ff5..0ce66e5 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -24,6 +24,7 @@
 
 from common.utils.consulhelpers import get_endpoint_from_consul
 
+log = get_logger()
 
 class KafkaProxy(object):
     """
@@ -39,9 +40,7 @@
         if KafkaProxy._kafka_instance:
             raise Exception('Singleton exist for :{}'.format(KafkaProxy))
 
-        self.log = get_logger()
-
-        self.log.info('KafkaProxy init with kafka endpoint:{}'.format(
+        log.info('KafkaProxy init with kafka endpoint:{}'.format(
             kafka_endpoint))
 
         self.ack_timeout = ack_timeout
@@ -63,11 +62,11 @@
             try:
                 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
                                                        self.kafka_endpoint[1:])
-                self.log.info(
+                log.info(
                     'Found kafka service at {}'.format(_k_endpoint))
 
             except Exception as e:
-                self.log.error('Failure to locate a kafka service from '
+                log.error('Failure to locate a kafka service from '
                                'consul {}:'.format(repr(e)))
                 self.kproducer = None
                 self.kclient = None
@@ -93,19 +92,19 @@
             self._get_kafka_producer()
             # Lets the next message request do the retry if still a failure
             if self.kproducer is None:
-                self.log.error('No kafka producer available at {}'.format(
+                log.error('No kafka producer available at {}'.format(
                     self.kafka_endpoint))
                 return
 
-        self.log.info('Sending message {} to kafka topic {}'.format(msg,
+        log.info('Sending message {} to kafka topic {}'.format(msg,
                                                                     topic))
         try:
             msg_list = [msg]
             yield self.kproducer.send_messages(topic, msgs=msg_list)
-            self.log.info('Successfully sent message {} to kafka topic '
+            log.info('Successfully sent message {} to kafka topic '
                           '{}'.format(msg, topic))
         except Exception as e:
-            self.log.error('Failure to send message {} to kafka topic {}: '
+            log.error('Failure to send message {} to kafka topic {}: '
                            '{}'.format(msg, topic, repr(e)))
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different
@@ -117,3 +116,4 @@
 # Common method to get the singleton instance of the kafka proxy class
 def get_kafka_proxy():
     return KafkaProxy._kafka_instance
+