[VOL-2102] The OpenONU adapter should update its K8s Ready state to false when it loses connectivity to its required services

Change-Id: Ibc849d07de2c0e524f2cc9df1b81ca4660e44c8f
diff --git a/VERSION b/VERSION
index a2a044c..e75da3e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.6-dev
+2.3.6
diff --git a/python/adapters/brcm_openomci_onu/main.py b/python/adapters/brcm_openomci_onu/main.py
index bcdc1aa..5a23832 100755
--- a/python/adapters/brcm_openomci_onu/main.py
+++ b/python/adapters/brcm_openomci_onu/main.py
@@ -30,7 +30,7 @@
 import configparser
 
 from simplejson import dumps
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
 from twisted.internet.task import LoopingCall
 from zope.interface import implementer
 
@@ -336,7 +336,6 @@
         self.startup_components()
 
         if not args.no_heartbeat:
-            self.start_heartbeat()
             self.start_kafka_cluster_heartbeat(self.instance_id)
 
     def start(self):
@@ -388,6 +387,7 @@
                 )
             ).start()
             Probe.kafka_cluster_proxy_running = True
+            Probe.kafka_proxy_faulty = False
 
             config = self._get_adapter_config()
 
@@ -430,7 +430,7 @@
 
             # retry for ever
             res = yield self._register_with_core(-1)
-            Probe.register_adapter_with_core = True
+            Probe.adapter_registered_with_core = True
 
             self.log.info('started-internal-services')
 
@@ -444,6 +444,8 @@
         for component in reversed(registry.iterate()):
             yield component.stop()
 
+        self.server.shutdown()
+
         import threading
         self.log.info('THREADS:')
         main_thread = threading.current_thread()
@@ -469,8 +471,9 @@
         args = self.args
         host = args.probe.split(':')[0]
         port = args.probe.split(':')[1]
-        server = socketserver.TCPServer((host, int(port)), Probe)
-        server.serve_forever()
+        socketserver.TCPServer.allow_reuse_address = True
+        self.server = socketserver.TCPServer((host, int(port)), Probe)
+        self.server.serve_forever()
 
     @inlineCallbacks
     def _register_with_core(self, retries):
@@ -496,17 +499,6 @@
                 self.log.exception("failed-registration", e=e)
                 raise
 
-    def start_heartbeat(self):
-
-        t0 = time.time()
-        t0s = time.ctime(t0)
-
-        def heartbeat():
-            self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
-
-        lc = LoopingCall(heartbeat)
-        lc.start(10)
-
     # Temporary function to send a heartbeat message to the external kafka
     # broker
     def start_kafka_cluster_heartbeat(self, instance_id):
@@ -521,28 +513,55 @@
         )
         topic = defs['heartbeat_topic']
 
-        def send_msg(start_time):
+        def send_heartbeat_msg():
             try:
                 kafka_cluster_proxy = get_kafka_proxy()
-                if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
-                    # self.log.debug('kafka-proxy-available')
+                if kafka_cluster_proxy:
                     message['ts'] = arrow.utcnow().timestamp
-                    message['uptime'] = time.time() - start_time
-                    # self.log.debug('start-kafka-heartbeat')
-                    kafka_cluster_proxy.send_message(topic, dumps(message))
+                    self.log.debug('sending-kafka-heartbeat-message')
+
+                    # Creating a handler to receive the message callbacks
+                    df = Deferred()
+                    df.addCallback(self.process_kafka_alive_state_update)
+                    kafka_cluster_proxy.register_alive_state_update(df)
+                    kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
                 else:
                     Probe.kafka_cluster_proxy_running = False
                     self.log.error('kafka-proxy-unavailable')
             except Exception as e:
                 self.log.exception('failed-sending-message-heartbeat', e=e)
 
-        try:
-            t0 = time.time()
-            lc = LoopingCall(send_msg, t0)
-            lc.start(10)
-        except Exception as e:
-            self.log.exception('failed-kafka-heartbeat', e=e)
+        def check_heartbeat_delivery():
+            try:
+                kafka_cluster_proxy = get_kafka_proxy()
+                if kafka_cluster_proxy:
+                    kafka_cluster_proxy.check_heartbeat_delivery()
+            except Exception as e:
+                self.log.exception('failed-checking-heartbeat-delivery', e=e)
 
+        def schedule_periodic_heartbeat():
+            try:
+                # Sending the heartbeat message in a loop
+                lc_heartbeat = LoopingCall(send_heartbeat_msg)
+                lc_heartbeat.start(10)
+                # Polling the delivery status more frequently to get early notification
+                lc_poll = LoopingCall(check_heartbeat_delivery)
+                lc_poll.start(2)
+            except Exception as e:
+                self.log.exception('failed-kafka-heartbeat-startup', e=e)
+
+        from twisted.internet import reactor
+        # Delaying heartbeat initially to let kafka connection be established
+        reactor.callLater(5, schedule_periodic_heartbeat)
+
+    # Receiving the callback and updating the probe accordingly
+    def process_kafka_alive_state_update(self, alive_state):
+        self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
+        Probe.kafka_cluster_proxy_running = alive_state
+
+        kafka_cluster_proxy = get_kafka_proxy()
+        if kafka_cluster_proxy:
+            Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
 
 if __name__ == '__main__':
     Main().start()
diff --git a/python/adapters/brcm_openomci_onu/probe.py b/python/adapters/brcm_openomci_onu/probe.py
index 32646fd..ca58652 100644
--- a/python/adapters/brcm_openomci_onu/probe.py
+++ b/python/adapters/brcm_openomci_onu/probe.py
@@ -19,15 +19,19 @@
 log = get_logger()
 
 class Probe(SimpleHTTPRequestHandler):
-    kafka_adapter_proxy_running = False
+    # Checks for Onu Adapter Readiness; all should be true
     kafka_cluster_proxy_running = False
-    register_adapter_with_core = False
+    kafka_adapter_proxy_running = False
+    adapter_registered_with_core = False
+
+    # Only Kafka connectivity check defines Liveness
+    kafka_proxy_faulty = True
 
     def readiness_probe(self):
-        return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+        return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.adapter_registered_with_core
 
     def liveness_probe(self):
-        return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+        return not Probe.kafka_proxy_faulty
 
     def do_GET(self):
         if self.path == '/readz':
diff --git a/python/requirements.txt b/python/requirements.txt
index c81d4d7..e35a619 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
 voltha-protos==3.2.8
-pyvoltha==2.3.25
+pyvoltha==2.3.26