[3393] Bug fix for kafka service restart
Change-Id: I137cb3285ce59ef823e131f8b2d49dfe2e7aaf3e
diff --git a/voltha/main.py b/voltha/main.py
index 5f282cc..e889864 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -397,15 +397,23 @@
topic = "voltha.heartbeat"
def send_msg():
- message['ts'] = arrow.utcnow().timestamp
- kafka_proxy.send_message(topic, dumps(message))
+ try:
+ kafka_proxy = get_kafka_proxy()
+ if kafka_proxy and not kafka_proxy.is_faulty():
+ self.log.debug('kafka-proxy-available')
+ message['ts'] = arrow.utcnow().timestamp
+ self.log.debug('start-kafka-heartbeat')
+ kafka_proxy.send_message(topic, dumps(message))
+ else:
+ self.log.error('kafka-proxy-unavailable')
+ except Exception, e:
+ self.log.exception('failed-sending-message-heartbeat', e=e)
- kafka_proxy = get_kafka_proxy()
- if kafka_proxy:
+ try:
lc = LoopingCall(send_msg)
lc.start(10)
- else:
- self.log.error('Kafka proxy has not been created!')
+ except Exception, e:
+ self.log.exception('failed-kafka-heartbeat', e=e)
if __name__ == '__main__':
diff --git a/voltha/northbound/kafka/event_bus_publisher.py b/voltha/northbound/kafka/event_bus_publisher.py
index 921a1de..8020bf5 100644
--- a/voltha/northbound/kafka/event_bus_publisher.py
+++ b/voltha/northbound/kafka/event_bus_publisher.py
@@ -37,16 +37,25 @@
self.config = config
self.topic_mappings = config.get('topic_mappings', {})
self.event_bus = EventBusClient()
+ self.subscriptions = None
def start(self):
log.debug('starting')
+ self.subscriptions = list()
self._setup_subscriptions(self.topic_mappings)
log.info('started')
return self
def stop(self):
- log.debug('stopping')
- log.info('stopped')
+ try:
+ log.debug('stopping-event-bus')
+ if self.subscriptions:
+ for subscription in self.subscriptions:
+ self.event_bus.unsubscribe(subscription)
+ log.info('stopped-event-bus')
+ except Exception, e:
+ log.exception('failed-stopping-event-bus', e=e)
+ return
def _setup_subscriptions(self, mappings):
@@ -60,20 +69,22 @@
mapping=mapping)
continue
- self.event_bus.subscribe(
+ self.subscriptions.append(self.event_bus.subscribe(
event_bus_topic,
# to avoid Python late-binding to the last registered
# kafka_topic, we force instant binding with the default arg
- lambda _, m, k=kafka_topic: self.forward(k, m))
+ lambda _, m, k=kafka_topic: self.forward(k, m)))
log.info('event-to-kafka', kafka_topic=kafka_topic,
event_bus_topic=event_bus_topic)
def forward(self, kafka_topic, msg):
-
- # convert to JSON string if msg is a protobuf msg
- if isinstance(msg, Message):
- msg = dumps(MessageToDict(msg, True, True))
-
- self.kafka_proxy.send_message(kafka_topic, msg)
+ try:
+ # convert to JSON string if msg is a protobuf msg
+ if isinstance(msg, Message):
+ msg = dumps(MessageToDict(msg, True, True))
+ log.debug('forward-event-bus-publisher')
+ self.kafka_proxy.send_message(kafka_topic, msg)
+ except Exception, e:
+ log.exception('failed-forward-event-bus-publisher', e=e)
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 4ee3e66..ad1aad5 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -57,6 +57,8 @@
self.kclient = None
self.kproducer = None
self.event_bus_publisher = None
+ self.stopping = False
+ self.faulty = False
log.debug('initialized', endpoint=kafka_endpoint)
@inlineCallbacks
@@ -67,37 +69,78 @@
self.event_bus_publisher = yield EventBusPublisher(
self, self.config.get('event_bus_publisher', {})).start()
log.info('started')
+ KafkaProxy.faulty = False
+ self.stopping = False
returnValue(self)
+ @inlineCallbacks
def stop(self):
- log.debug('stopping')
- pass
- log.info('stopped')
+ try:
+ log.debug('stopping-kafka-proxy')
+ try:
+ if self.kclient:
+ yield self.kclient.close()
+ self.kclient = None
+ log.debug('stopped-kclient-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-kclient-kafka-proxy', e=e)
+ pass
+
+ try:
+ if self.kproducer:
+ yield self.kproducer.stop()
+ self.kproducer = None
+ log.debug('stopped-kproducer-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
+ pass
+
+ #try:
+ # if self.event_bus_publisher:
+ # yield self.event_bus_publisher.stop()
+ # self.event_bus_publisher = None
+ # log.debug('stopped-event-bus-publisher-kafka-proxy')
+ #except Exception, e:
+ # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+ # pass
+
+ log.debug('stopped-kafka-proxy')
+
+ except Exception, e:
+ self.kclient = None
+ self.kproducer = None
+ #self.event_bus_publisher = None
+ log.exception('failed-stopped-kafka-proxy', e=e)
+ pass
def _get_kafka_producer(self):
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
# to a local log before sending response
+ try:
- if self.kafka_endpoint.startswith('@'):
- try:
- _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
- self.kafka_endpoint[1:])
- log.debug('found-kafka-service', endpoint=_k_endpoint)
+ if self.kafka_endpoint.startswith('@'):
+ try:
+ _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+ self.kafka_endpoint[1:])
+ log.debug('found-kafka-service', endpoint=_k_endpoint)
- except Exception as e:
- log.exception('no-kafka-service-in-consul', e=e)
+ except Exception as e:
+ log.exception('no-kafka-service-in-consul', e=e)
- self.kproducer = None
- self.kclient = None
- return
- else:
- _k_endpoint = self.kafka_endpoint
+ self.kproducer = None
+ self.kclient = None
+ return
+ else:
+ _k_endpoint = self.kafka_endpoint
- self.kclient = _KafkaClient(_k_endpoint)
- self.kproducer = _kafkaProducer(self.kclient,
- req_acks=PRODUCER_ACK_LOCAL_WRITE,
- ack_timeout=self.ack_timeout,
- max_req_attempts=self.max_req_attempts)
+ self.kclient = _KafkaClient(_k_endpoint)
+ self.kproducer = _kafkaProducer(self.kclient,
+ req_acks=PRODUCER_ACK_LOCAL_WRITE,
+ ack_timeout=self.ack_timeout,
+ max_req_attempts=self.max_req_attempts)
+ except Exception, e:
+ log.exception('failed-get-kafka-producer', e=e)
+ return
@inlineCallbacks
def send_message(self, topic, msg):
@@ -108,26 +151,50 @@
# then try to get one - this happens only when we try to lookup the
# kafka service from consul
try:
- if self.kproducer is None:
- self._get_kafka_producer()
- # Lets the next message request do the retry if still a failure
+ if self.faulty is False:
+
if self.kproducer is None:
- log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ self._get_kafka_producer()
+ # Lets the next message request do the retry if still a failure
+ if self.kproducer is None:
+ log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ return
+
+ log.debug('sending-kafka-msg', topic=topic, msg=msg)
+ msgs = [msg]
+
+ if self.kproducer and self.kclient and \
+ self.event_bus_publisher and self.faulty is False:
+ yield self.kproducer.send_messages(topic, msgs=msgs)
+ log.debug('sent-kafka-msg', topic=topic, msg=msg)
+ else:
return
- log.debug('sending-kafka-msg', topic=topic, msg=msg)
- msgs = [msg]
- yield self.kproducer.send_messages(topic, msgs=msgs)
- log.debug('sent-kafka-msg', topic=topic, msg=msg)
-
except Exception, e:
+ self.faulty = True
log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
# set the kafka producer to None. This is needed if the
# kafka docker went down and comes back up with a different
# port number.
- self.kproducer = None
- self.kclient = None
+ if self.stopping is False:
+ log.debug('stopping-kafka-proxy')
+ try:
+ self.stopping = True
+ self.stop()
+ self.stopping = False
+ self.faulty = False
+ log.debug('stopped-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopping-kafka-proxy', e=e)
+ pass
+ else:
+ log.info('already-stopping-kafka-proxy')
+
+ return
+
+ def is_faulty(self):
+ return self.faulty
# Common method to get the singleton instance of the kafka proxy class