[VOL-2102] The OpenONU adapter should update its K8s Ready state to false when it loses connectivity to its required services
Change-Id: Ibc849d07de2c0e524f2cc9df1b81ca4660e44c8f
diff --git a/VERSION b/VERSION
index a2a044c..e75da3e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.6-dev
+2.3.6
diff --git a/python/adapters/brcm_openomci_onu/main.py b/python/adapters/brcm_openomci_onu/main.py
index bcdc1aa..5a23832 100755
--- a/python/adapters/brcm_openomci_onu/main.py
+++ b/python/adapters/brcm_openomci_onu/main.py
@@ -30,7 +30,7 @@
import configparser
from simplejson import dumps
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from twisted.internet.task import LoopingCall
from zope.interface import implementer
@@ -336,7 +336,6 @@
self.startup_components()
if not args.no_heartbeat:
- self.start_heartbeat()
self.start_kafka_cluster_heartbeat(self.instance_id)
def start(self):
@@ -388,6 +387,7 @@
)
).start()
Probe.kafka_cluster_proxy_running = True
+ Probe.kafka_proxy_faulty = False
config = self._get_adapter_config()
@@ -430,7 +430,7 @@
# retry for ever
res = yield self._register_with_core(-1)
- Probe.register_adapter_with_core = True
+ Probe.adapter_registered_with_core = True
self.log.info('started-internal-services')
@@ -444,6 +444,8 @@
for component in reversed(registry.iterate()):
yield component.stop()
+ self.server.shutdown()
+
import threading
self.log.info('THREADS:')
main_thread = threading.current_thread()
@@ -469,8 +471,9 @@
args = self.args
host = args.probe.split(':')[0]
port = args.probe.split(':')[1]
- server = socketserver.TCPServer((host, int(port)), Probe)
- server.serve_forever()
+ socketserver.TCPServer.allow_reuse_address = True
+ self.server = socketserver.TCPServer((host, int(port)), Probe)
+ self.server.serve_forever()
@inlineCallbacks
def _register_with_core(self, retries):
@@ -496,17 +499,6 @@
self.log.exception("failed-registration", e=e)
raise
- def start_heartbeat(self):
-
- t0 = time.time()
- t0s = time.ctime(t0)
-
- def heartbeat():
- self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
-
- lc = LoopingCall(heartbeat)
- lc.start(10)
-
# Temporary function to send a heartbeat message to the external kafka
# broker
def start_kafka_cluster_heartbeat(self, instance_id):
@@ -521,28 +513,55 @@
)
topic = defs['heartbeat_topic']
- def send_msg(start_time):
+ def send_heartbeat_msg():
try:
kafka_cluster_proxy = get_kafka_proxy()
- if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
- # self.log.debug('kafka-proxy-available')
+ if kafka_cluster_proxy:
message['ts'] = arrow.utcnow().timestamp
- message['uptime'] = time.time() - start_time
- # self.log.debug('start-kafka-heartbeat')
- kafka_cluster_proxy.send_message(topic, dumps(message))
+ self.log.debug('sending-kafka-heartbeat-message')
+
+ # Creating a handler to receive the message callbacks
+ df = Deferred()
+ df.addCallback(self.process_kafka_alive_state_update)
+ kafka_cluster_proxy.register_alive_state_update(df)
+ kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
else:
Probe.kafka_cluster_proxy_running = False
self.log.error('kafka-proxy-unavailable')
except Exception as e:
self.log.exception('failed-sending-message-heartbeat', e=e)
- try:
- t0 = time.time()
- lc = LoopingCall(send_msg, t0)
- lc.start(10)
- except Exception as e:
- self.log.exception('failed-kafka-heartbeat', e=e)
+ def check_heartbeat_delivery():
+ try:
+ kafka_cluster_proxy = get_kafka_proxy()
+ if kafka_cluster_proxy:
+ kafka_cluster_proxy.check_heartbeat_delivery()
+ except Exception as e:
+ self.log.exception('failed-checking-heartbeat-delivery', e=e)
+ def schedule_periodic_heartbeat():
+ try:
+ # Sending the heartbeat message in a loop
+ lc_heartbeat = LoopingCall(send_heartbeat_msg)
+ lc_heartbeat.start(10)
+ # Polling the delivery status more frequently to get early notification
+ lc_poll = LoopingCall(check_heartbeat_delivery)
+ lc_poll.start(2)
+ except Exception as e:
+ self.log.exception('failed-kafka-heartbeat-startup', e=e)
+
+ from twisted.internet import reactor
+ # Delaying heartbeat initially to let kafka connection be established
+ reactor.callLater(5, schedule_periodic_heartbeat)
+
+ # Receiving the callback and updating the probe accordingly
+ def process_kafka_alive_state_update(self, alive_state):
+ self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
+ Probe.kafka_cluster_proxy_running = alive_state
+
+ kafka_cluster_proxy = get_kafka_proxy()
+ if kafka_cluster_proxy:
+ Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
if __name__ == '__main__':
Main().start()
diff --git a/python/adapters/brcm_openomci_onu/probe.py b/python/adapters/brcm_openomci_onu/probe.py
index 32646fd..ca58652 100644
--- a/python/adapters/brcm_openomci_onu/probe.py
+++ b/python/adapters/brcm_openomci_onu/probe.py
@@ -19,15 +19,19 @@
log = get_logger()
class Probe(SimpleHTTPRequestHandler):
- kafka_adapter_proxy_running = False
+ # Checks for Onu Adapter Readiness; all should be true
kafka_cluster_proxy_running = False
- register_adapter_with_core = False
+ kafka_adapter_proxy_running = False
+ adapter_registered_with_core = False
+
+ # Only Kafka connectivity check defines Liveness
+ kafka_proxy_faulty = True
def readiness_probe(self):
- return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+ return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.adapter_registered_with_core
def liveness_probe(self):
- return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+ return not Probe.kafka_proxy_faulty
def do_GET(self):
if self.path == '/readz':
diff --git a/python/requirements.txt b/python/requirements.txt
index c81d4d7..e35a619 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
voltha-protos==3.2.8
-pyvoltha==2.3.25
+pyvoltha==2.3.26