[VOL-2062]Kubernetes probes for openonu adapter
Change-Id: I5a58599e3f6452ab2b340d1436f5dd9c99f5c41c
diff --git a/python/adapters/brcm_openomci_onu/main.py b/python/adapters/brcm_openomci_onu/main.py
index 54f171c..902ed0e 100755
--- a/python/adapters/brcm_openomci_onu/main.py
+++ b/python/adapters/brcm_openomci_onu/main.py
@@ -23,6 +23,8 @@
import arrow
import yaml
+import SocketServer
+
from packaging.version import Version
from simplejson import dumps
from twisted.internet.defer import inlineCallbacks, returnValue
@@ -45,6 +47,8 @@
from voltha_protos.adapter_pb2 import AdapterConfig
from brcm_openomci_onu import BrcmOpenomciOnuAdapter
+from probe import Probe
+
defs = dict(
version_file='./VERSION',
@@ -67,6 +71,7 @@
backend=os.environ.get('BACKEND', 'none'),
retry_interval=os.environ.get('RETRY_INTERVAL', 2),
heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
+ probe=os.environ.get('PROBE', ':8080')
)
@@ -231,6 +236,13 @@
default=defs['event_topic'],
help=_help)
+
+ _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
+ parser.add_argument(
+ '-P', '--probe', dest='probe', action='store',
+ default=defs['probe'],
+ help=_help)
+
args = parser.parse_args()
# post-processing
@@ -352,6 +364,7 @@
config=self.config.get('kafka-cluster-proxy', {})
)
).start()
+ Probe.kafka_cluster_proxy_running = True
config = self._get_adapter_config()
@@ -386,12 +399,14 @@
target_cls=openonu_request_handler
)
).start()
+ Probe.kafka_adapter_proxy_running = True
self.core_proxy.kafka_proxy = get_messaging_proxy()
self.adapter_proxy.kafka_proxy = get_messaging_proxy()
# retry for ever
res = yield self._register_with_core(-1)
+ Probe.register_adapter_with_core = True
self.log.info('started-internal-services')
@@ -423,8 +438,16 @@
lambda: self.log.info('twisted-reactor-started'))
reactor.addSystemEventTrigger('before', 'shutdown',
self.shutdown_components)
+ reactor.callInThread(self.start_probe)
reactor.run()
+ def start_probe(self):
+ args = self.args
+ host = args.probe.split(':')[0]
+ port = args.probe.split(':')[1]
+ server = SocketServer.TCPServer((host, int(port)), Probe)
+ server.serve_forever()
+
@inlineCallbacks
def _register_with_core(self, retries):
while 1:
@@ -484,6 +507,7 @@
# self.log.debug('start-kafka-heartbeat')
kafka_cluster_proxy.send_message(topic, dumps(message))
else:
+ Probe.kafka_cluster_proxy_running = False
self.log.error('kafka-proxy-unavailable')
except Exception, e:
self.log.exception('failed-sending-message-heartbeat', e=e)
diff --git a/python/adapters/brcm_openomci_onu/probe.py b/python/adapters/brcm_openomci_onu/probe.py
new file mode 100644
index 0000000..6b4d39d
--- /dev/null
+++ b/python/adapters/brcm_openomci_onu/probe.py
@@ -0,0 +1,52 @@
+#
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+from structlog import get_logger
+
+log = get_logger()
+
+class Probe(SimpleHTTPRequestHandler):
+ kafka_adapter_proxy_running = False
+ kafka_cluster_proxy_running = False
+ register_adapter_with_core = False
+
+ def readiness_probe(self):
+ return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+
+ def liveness_probe(self):
+ return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+
+ def do_GET(self):
+ if self.path == '/readz':
+ self.ready_probe()
+ elif self.path == '/healthz':
+ self.health_probe()
+
+ def ready_probe(self):
+ if self.readiness_probe():
+ self.send_response(200)
+ self.end_headers()
+ else :
+ self.send_response(418)
+ self.end_headers()
+
+ def health_probe(self):
+ if self.liveness_probe():
+ self.send_response(200)
+ self.end_headers()
+ else :
+ self.send_response(418)
+ self.end_headers()