Adding better kafka error handling
Change-Id: I37b90dbf19adb975cd207ed8b359ee02a499e6c2
diff --git a/voltha/main.py b/voltha/main.py
index f28cdbe..a19c1ba 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -234,26 +234,31 @@
@inlineCallbacks
def startup_components(self):
- self.log.info('starting-internal-components')
- self.coordinator = yield Coordinator(
- internal_host_address=self.args.internal_host_address,
- external_host_address=self.args.external_host_address,
- rest_port=self.args.rest_port,
- instance_id=self.args.instance_id,
- config=self.config,
- consul=self.args.consul).start()
- init_rest_service(self.args.rest_port)
+ try:
+ self.log.info('starting-internal-components')
+ self.coordinator = yield Coordinator(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.args.instance_id,
+ config=self.config,
+ consul=self.args.consul).start()
+ init_rest_service(self.args.rest_port)
- self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
+ self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
- # initialize kafka proxy singleton
- self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
+ # initialize kafka proxy singleton
+ self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
- # adapter loader
- self.adapter_loader = yield AdapterLoader(
- config=self.config.get('adapter_loader', {})).start()
+ # adapter loader
+ self.adapter_loader = yield AdapterLoader(
+ config=self.config.get('adapter_loader', {})).start()
- self.log.info('started-internal-services')
+ self.log.info('started-internal-services')
+
+ except Exception as e:
+ self.log.exception('Failure to start all components {}'.format(e))
+
@inlineCallbacks
def shutdown_components(self):
@@ -296,9 +301,12 @@
topic = "voltha-heartbeat"
from twisted.internet.task import LoopingCall
- lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
- lc.start(10)
-
+ kafka_proxy = get_kafka_proxy()
+ if kafka_proxy:
+ lc = LoopingCall(kafka_proxy.send_message, topic, message)
+ lc.start(10)
+ else:
+ self.log.error('Kafka proxy has not been created!')
if __name__ == '__main__':
Main().start()