[VOL-2102] The OpenONU adapter should update its K8s Ready state to false when it loses connectivity to its required services
Change-Id: I2fc78ad6b9dbc25257826eff1d454fa1719533d7
diff --git a/VERSION b/VERSION
index 3d55c7a..91a5166 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.25
+2.3.26
diff --git a/pyvoltha/adapters/kafka/kafka_proxy.py b/pyvoltha/adapters/kafka/kafka_proxy.py
index a6591cf..e606bbb 100644
--- a/pyvoltha/adapters/kafka/kafka_proxy.py
+++ b/pyvoltha/adapters/kafka/kafka_proxy.py
@@ -64,9 +64,12 @@
self.config = config
self.kclient = None
self.kproducer = None
+ self.kproducer_heartbeat = None
+ self.alive_state_handler = None
self.event_bus_publisher = None
self.stopping = False
self.faulty = False
+ self.alive = False
self.consumer_poll_timeout = consumer_poll_timeout
self.topic_consumer_map = {}
self.topic_callbacks_map = {}
@@ -83,6 +86,7 @@
log.info('started')
KafkaProxy.faulty = False
self.stopping = False
+ self.alive = True
returnValue(self)
@inlineCallbacks
@@ -90,6 +94,7 @@
try:
log.debug('stopping-kafka-proxy')
self.stopping = True
+ self.alive = False
try:
if self.kclient:
yield self.kclient.close()
@@ -309,6 +314,7 @@
except Exception as e:
self.faulty = True
+ self.alive_state_handler.callback(self.alive)
log.error('failed-to-send-kafka-msg', topic=topic,
e=e)
@@ -317,20 +323,74 @@
# port number.
if self.stopping is False:
log.debug('stopping-kafka-proxy')
+ self.alive = False
try:
self.stopping = True
self.stop()
self.stopping = False
- self.faulty = False
log.debug('stopped-kafka-proxy')
except Exception as e:
log.exception('failed-stopping-kafka-proxy', e=e)
pass
else:
log.info('already-stopping-kafka-proxy')
-
return
+ # sending heartbeat message to check the readiness
+ def send_heartbeat_message(self, topic, msg):
+ assert topic is not None
+ assert msg is not None
+
+ try:
+ if self.kproducer_heartbeat is None:
+ if self.kafka_endpoint.startswith('@'):
+ _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+ self.kafka_endpoint[
+ 1:])
+ else:
+ _k_endpoint = self.kafka_endpoint
+
+ # Using 2 seconds timeout for heartbeat producer; default of 5 minutes is too long
+ self.kproducer_heartbeat = _kafkaProducer(
+ {'bootstrap.servers': _k_endpoint,
+ 'default.topic.config' : {'message.timeout.ms': 2000},
+ }
+ )
+
+ log.debug('sending-kafka-heartbeat-message', topic=topic)
+ msgs = [msg]
+
+ self.kproducer_heartbeat.produce(topic, msg, callback=self.handle_kafka_delivery_report)
+
+ except Exception as e:
+ self.faulty = True
+ self.alive_state_handler.callback(self.alive)
+ log.error('failed-to-send-kafka-heartbeat-msg', e=e)
+
+ def check_heartbeat_delivery(self):
+ try:
+ if self.kproducer_heartbeat is not None:
+ msg = self.kproducer_heartbeat.poll(0)
+ except Exception as e:
+ log.error('failed-to-check-heartbeat-msg-delivery', e=e)
+ self.faulty = True
+
+ def handle_kafka_delivery_report(self, err, msg):
+ if err is not None :
+ # Log and notify only in event of alive status change
+ if self.alive is True:
+ log.info('failed-to-deliver-message', msg=msg.value(), err=err.str())
+ self.alive_state_handler.callback(False)
+ self.alive = False
+ else :
+ if self.alive is not True:
+ log.info('message-delivered-successfully', msg=msg.value())
+ self.alive_state_handler.callback(True)
+ self.alive = True
+
+ def register_alive_state_update(self, defer_handler):
+ self.alive_state_handler = defer_handler
+
def is_faulty(self):
return self.faulty