[VOL-2062]Kubernetes probes for openonu adapter

Change-Id: I5a58599e3f6452ab2b340d1436f5dd9c99f5c41c
diff --git a/python/adapters/brcm_openomci_onu/main.py b/python/adapters/brcm_openomci_onu/main.py
index 54f171c..902ed0e 100755
--- a/python/adapters/brcm_openomci_onu/main.py
+++ b/python/adapters/brcm_openomci_onu/main.py
@@ -23,6 +23,8 @@
 
 import arrow
 import yaml
+import SocketServer
+
 from packaging.version import Version
 from simplejson import dumps
 from twisted.internet.defer import inlineCallbacks, returnValue
@@ -45,6 +47,8 @@
 from voltha_protos.adapter_pb2 import AdapterConfig
 
 from brcm_openomci_onu import BrcmOpenomciOnuAdapter
+from probe import Probe
+
 
 defs = dict(
     version_file='./VERSION',
@@ -67,6 +71,7 @@
     backend=os.environ.get('BACKEND', 'none'),
     retry_interval=os.environ.get('RETRY_INTERVAL', 2),
     heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
+    probe=os.environ.get('PROBE', ':8080')
 )
 
 
@@ -231,6 +236,13 @@
                         default=defs['event_topic'],
                         help=_help)
 
+
+    _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
+    parser.add_argument(
+        '-P', '--probe', dest='probe', action='store',
+        default=defs['probe'],
+        help=_help)
+
     args = parser.parse_args()
 
     # post-processing
@@ -352,6 +364,7 @@
                     config=self.config.get('kafka-cluster-proxy', {})
                 )
             ).start()
+            Probe.kafka_cluster_proxy_running = True
 
             config = self._get_adapter_config()
 
@@ -386,12 +399,14 @@
                     target_cls=openonu_request_handler
                 )
             ).start()
+            Probe.kafka_adapter_proxy_running = True
 
             self.core_proxy.kafka_proxy = get_messaging_proxy()
             self.adapter_proxy.kafka_proxy = get_messaging_proxy()
 
             # retry for ever
             res = yield self._register_with_core(-1)
+            Probe.register_adapter_with_core = True
 
             self.log.info('started-internal-services')
 
@@ -423,8 +438,16 @@
             lambda: self.log.info('twisted-reactor-started'))
         reactor.addSystemEventTrigger('before', 'shutdown',
                                       self.shutdown_components)
+        reactor.callInThread(self.start_probe)
         reactor.run()
 
+    def start_probe(self):
+        args = self.args
+        host = args.probe.split(':')[0]
+        port = args.probe.split(':')[1]
+        server = SocketServer.TCPServer((host, int(port)), Probe)
+        server.serve_forever()
+
     @inlineCallbacks
     def _register_with_core(self, retries):
         while 1:
@@ -484,6 +507,7 @@
                     # self.log.debug('start-kafka-heartbeat')
                     kafka_cluster_proxy.send_message(topic, dumps(message))
                 else:
+                    Probe.kafka_cluster_proxy_running = False
                     self.log.error('kafka-proxy-unavailable')
             except Exception, e:
                 self.log.exception('failed-sending-message-heartbeat', e=e)
diff --git a/python/adapters/brcm_openomci_onu/probe.py b/python/adapters/brcm_openomci_onu/probe.py
new file mode 100644
index 0000000..6b4d39d
--- /dev/null
+++ b/python/adapters/brcm_openomci_onu/probe.py
@@ -0,0 +1,52 @@
+#
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+from structlog import get_logger
+
+log = get_logger()
+
+class Probe(SimpleHTTPRequestHandler):
+    kafka_adapter_proxy_running = False
+    kafka_cluster_proxy_running = False
+    register_adapter_with_core = False
+
+    def readiness_probe(self):
+        return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+
+    def liveness_probe(self):
+        return Probe.kafka_adapter_proxy_running and Probe.kafka_cluster_proxy_running and Probe.register_adapter_with_core
+
+    def do_GET(self):
+        if self.path == '/readz':
+            self.ready_probe()
+        elif self.path == '/healthz':
+            self.health_probe()
+
+    def ready_probe(self):
+        if self.readiness_probe():
+            self.send_response(200)
+            self.end_headers()
+        else :
+            self.send_response(418)
+            self.end_headers()
+
+    def health_probe(self):
+        if self.liveness_probe():
+            self.send_response(200)
+            self.end_headers()
+        else :
+            self.send_response(418)
+            self.end_headers()