Adding better kafka error handling
Change-Id: I37b90dbf19adb975cd207ed8b359ee02a499e6c2
diff --git a/voltha/main.py b/voltha/main.py
index f28cdbe..a19c1ba 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -234,26 +234,31 @@
@inlineCallbacks
def startup_components(self):
- self.log.info('starting-internal-components')
- self.coordinator = yield Coordinator(
- internal_host_address=self.args.internal_host_address,
- external_host_address=self.args.external_host_address,
- rest_port=self.args.rest_port,
- instance_id=self.args.instance_id,
- config=self.config,
- consul=self.args.consul).start()
- init_rest_service(self.args.rest_port)
+ try:
+ self.log.info('starting-internal-components')
+ self.coordinator = yield Coordinator(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.args.instance_id,
+ config=self.config,
+ consul=self.args.consul).start()
+ init_rest_service(self.args.rest_port)
- self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
+ self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
- # initialize kafka proxy singleton
- self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
+ # initialize kafka proxy singleton
+ self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
- # adapter loader
- self.adapter_loader = yield AdapterLoader(
- config=self.config.get('adapter_loader', {})).start()
+ # adapter loader
+ self.adapter_loader = yield AdapterLoader(
+ config=self.config.get('adapter_loader', {})).start()
- self.log.info('started-internal-services')
+ self.log.info('started-internal-services')
+
+ except Exception as e:
+ self.log.exception('Failure to start all components {}'.format(e))
+
@inlineCallbacks
def shutdown_components(self):
@@ -296,9 +301,12 @@
topic = "voltha-heartbeat"
from twisted.internet.task import LoopingCall
- lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
- lc.start(10)
-
+ kafka_proxy = get_kafka_proxy()
+ if kafka_proxy:
+ lc = LoopingCall(kafka_proxy.send_message, topic, message)
+ lc.start(10)
+ else:
+ self.log.error('Kafka proxy has not been created!')
if __name__ == '__main__':
Main().start()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 1e95ff5..0ce66e5 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -24,6 +24,7 @@
from common.utils.consulhelpers import get_endpoint_from_consul
+log = get_logger()
class KafkaProxy(object):
"""
@@ -39,9 +40,7 @@
if KafkaProxy._kafka_instance:
raise Exception('Singleton exist for :{}'.format(KafkaProxy))
- self.log = get_logger()
-
- self.log.info('KafkaProxy init with kafka endpoint:{}'.format(
+ log.info('KafkaProxy init with kafka endpoint:{}'.format(
kafka_endpoint))
self.ack_timeout = ack_timeout
@@ -63,11 +62,11 @@
try:
_k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
self.kafka_endpoint[1:])
- self.log.info(
+ log.info(
'Found kafka service at {}'.format(_k_endpoint))
except Exception as e:
- self.log.error('Failure to locate a kafka service from '
+ log.error('Failure to locate a kafka service from '
'consul {}:'.format(repr(e)))
self.kproducer = None
self.kclient = None
@@ -93,19 +92,19 @@
self._get_kafka_producer()
# Lets the next message request do the retry if still a failure
if self.kproducer is None:
- self.log.error('No kafka producer available at {}'.format(
+ log.error('No kafka producer available at {}'.format(
self.kafka_endpoint))
return
- self.log.info('Sending message {} to kafka topic {}'.format(msg,
+ log.info('Sending message {} to kafka topic {}'.format(msg,
topic))
try:
msg_list = [msg]
yield self.kproducer.send_messages(topic, msgs=msg_list)
- self.log.info('Successfully sent message {} to kafka topic '
+ log.info('Successfully sent message {} to kafka topic '
'{}'.format(msg, topic))
except Exception as e:
- self.log.error('Failure to send message {} to kafka topic {}: '
+ log.error('Failure to send message {} to kafka topic {}: '
'{}'.format(msg, topic, repr(e)))
# set the kafka producer to None. This is needed if the
# kafka docker went down and comes back up with a different
@@ -117,3 +116,4 @@
# Common method to get the singleton instance of the kafka proxy class
def get_kafka_proxy():
return KafkaProxy._kafka_instance
+