[VOL-2102] The OpenONU adapter should update its K8s Ready state to false when it loses connectivity to its required services

Change-Id: I2fc78ad6b9dbc25257826eff1d454fa1719533d7
diff --git a/VERSION b/VERSION
index 3d55c7a..91a5166 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.25
+2.3.26
diff --git a/pyvoltha/adapters/kafka/kafka_proxy.py b/pyvoltha/adapters/kafka/kafka_proxy.py
index a6591cf..e606bbb 100644
--- a/pyvoltha/adapters/kafka/kafka_proxy.py
+++ b/pyvoltha/adapters/kafka/kafka_proxy.py
@@ -64,9 +64,12 @@
         self.config = config
         self.kclient = None
         self.kproducer = None
+        self.kproducer_heartbeat = None
+        self.alive_state_handler = None
         self.event_bus_publisher = None
         self.stopping = False
         self.faulty = False
+        self.alive = False
         self.consumer_poll_timeout = consumer_poll_timeout
         self.topic_consumer_map = {}
         self.topic_callbacks_map = {}
@@ -83,6 +86,7 @@
         log.info('started')
         KafkaProxy.faulty = False
         self.stopping = False
+        self.alive = True
         returnValue(self)
 
     @inlineCallbacks
@@ -90,6 +94,7 @@
         try:
             log.debug('stopping-kafka-proxy')
             self.stopping = True
+            self.alive = False
             try:
                 if self.kclient:
                     yield self.kclient.close()
@@ -309,6 +314,7 @@
 
         except Exception as e:
             self.faulty = True
+            self.alive_state_handler.callback(self.alive)
             log.error('failed-to-send-kafka-msg', topic=topic,
                       e=e)
 
@@ -317,20 +323,74 @@
             # port number.
             if self.stopping is False:
                 log.debug('stopping-kafka-proxy')
+                self.alive = False
                 try:
                     self.stopping = True
                     self.stop()
                     self.stopping = False
-                    self.faulty = False
                     log.debug('stopped-kafka-proxy')
                 except Exception as e:
                     log.exception('failed-stopping-kafka-proxy', e=e)
                     pass
             else:
                 log.info('already-stopping-kafka-proxy')
-
             return
 
+    # sending heartbeat message to check the readiness
+    def send_heartbeat_message(self, topic, msg):
+        assert topic is not None
+        assert msg is not None
+
+        try:
+            if self.kproducer_heartbeat is None:
+                if self.kafka_endpoint.startswith('@'):
+                    _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                           self.kafka_endpoint[
+                                                           1:])
+                else:
+                    _k_endpoint = self.kafka_endpoint
+
+                # Using 2 seconds timeout for heartbeat producer; default of 5 minutes is too long
+                self.kproducer_heartbeat = _kafkaProducer(
+                    {'bootstrap.servers': _k_endpoint,
+                     'default.topic.config' : {'message.timeout.ms': 2000},
+                    }
+                )
+
+            log.debug('sending-kafka-heartbeat-message', topic=topic)
+            msgs = [msg]
+
+            self.kproducer_heartbeat.produce(topic, msg, callback=self.handle_kafka_delivery_report)
+
+        except Exception as e:
+            self.faulty = True
+            self.alive_state_handler.callback(self.alive)
+            log.error('failed-to-send-kafka-heartbeat-msg', e=e)
+
+    def check_heartbeat_delivery(self):
+        try:
+            if self.kproducer_heartbeat is not None:
+                msg = self.kproducer_heartbeat.poll(0)
+        except Exception as e:
+            log.error('failed-to-check-heartbeat-msg-delivery', e=e)
+            self.faulty = True
+
+    def handle_kafka_delivery_report(self, err, msg):
+        if err is not None :
+            # Log and notify only in event of alive status change
+            if self.alive is True:
+                log.info('failed-to-deliver-message', msg=msg.value(), err=err.str())
+                self.alive_state_handler.callback(False)
+            self.alive = False
+        else :
+            if self.alive is not True:
+                log.info('message-delivered-successfully', msg=msg.value())
+                self.alive_state_handler.callback(True)
+            self.alive = True
+
+    def register_alive_state_update(self, defer_handler):
+        self.alive_state_handler = defer_handler
+
     def is_faulty(self):
         return self.faulty