VOL-1727: K8S Liveness and Readiness Probes Added

Change-Id: I18748ebd35286d54d0bb9f0b083f3c69b97abdb5
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
index 7e17613..a141946 100755
--- a/python/ofagent/connection_mgr.py
+++ b/python/ofagent/connection_mgr.py
@@ -37,6 +37,11 @@
 # _ = third_party
 
 class ConnectionManager(object):
+    running = False
+    channel = None
+    subscription = None
+    grpc_client = None
+
     def __init__(self, consul_endpoint,
                  vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
                  vcore_transaction_key, controller_endpoints, instance_id,
@@ -78,6 +83,7 @@
         log.debug('starting')
 
         self.running = True
+        ConnectionManager.running = True
 
         # Get a subscription to vcore
         reactor.callInThread(self.get_vcore_subscription)
@@ -89,6 +95,16 @@
 
         return self
 
+    @classmethod
+    def liveness_probe(cls):
+        # Pod restarts when liveness condition fails
+        return ConnectionManager.running
+
+    @classmethod
+    def readiness_probe(cls):
+        # Pod is isolated when readiness condition fails
+        return bool(ConnectionManager.channel and ConnectionManager.subscription and ConnectionManager.grpc_client)
+
     def stop(self):
         log.debug('stopping')
         # clean up all controller connections
@@ -126,11 +142,14 @@
         if self.channel is not None:
             del self.channel
 
-        self.is_alive = False
         self.channel = None
         self.subscription = None
         self.grpc_client = None
 
+        ConnectionManager.channel = None
+        ConnectionManager.subscription = None
+        ConnectionManager.grpc_client = None
+
         log.debug('stop-reset-grpc-attributes')
 
     def _assign_grpc_attributes(self):
@@ -144,7 +163,9 @@
 
         # Establish a connection to the vcore GRPC server
         self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
-        self.is_alive = True
+
+        # For Readiness probe
+        ConnectionManager.channel = self.channel
 
         log.debug('stop-assign-grpc-attributes')
 
@@ -167,6 +188,9 @@
                 subscription = yield self.grpc_client.subscribe(
                     OfAgentSubscriber(ofagent_id=container_name))
 
+                #For Readiness probes
+                ConnectionManager.subscription =  subscription
+                ConnectionManager.grpc_client = self.grpc_client
                 # If the subscriber id matches the current instance
                 # ... then the subscription has succeeded
                 if subscription is not None and subscription.ofagent_id == container_name:
diff --git a/python/ofagent/main.py b/python/ofagent/main.py
index fa2b5c0..58afe15 100755
--- a/python/ofagent/main.py
+++ b/python/ofagent/main.py
@@ -18,6 +18,9 @@
 import os
 
 import yaml
+import SocketServer
+
+from probe import Probe
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks
 
@@ -30,6 +33,7 @@
     config=os.environ.get('CONFIG', './ofagent.yml'),
     consul=os.environ.get('CONSUL', 'localhost:8500'),
     controller=os.environ.get('CONTROLLER', 'localhost:6653'),
+    probe=os.environ.get('PROBE', ':8080'),
     external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
     grpc_endpoint=os.environ.get('GRPC_ENDPOINT', 'localhost:50055'),
@@ -71,6 +75,12 @@
         default=defs['controller'],
         help=_help)
 
+    _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
+    parser.add_argument(
+        '-P', '--probe', dest='probe', action='store',
+        default=defs['probe'],
+        help=_help)
+
     _help = ('<hostname> or <ip> at which ofagent is reachable from outside '
              'the cluster (default: %s)' % defs['external_host_address'])
     parser.add_argument('-E', '--external-host-address',
@@ -282,8 +292,15 @@
 
         reactor.addSystemEventTrigger('before', 'shutdown',
                                       self.shutdown_components)
+        reactor.callInThread(self.start_probe)
         reactor.run()
 
+    def start_probe(self):
+        args = self.args
+        host = args.probe.split(':')[0]
+        port = args.probe.split(':')[1]
+        server = SocketServer.TCPServer((host, int(port)), Probe)
+        server.serve_forever()
 
 if __name__ == '__main__':
     Main().start()
diff --git a/python/ofagent/probe.py b/python/ofagent/probe.py
new file mode 100644
index 0000000..612d661
--- /dev/null
+++ b/python/ofagent/probe.py
@@ -0,0 +1,47 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+from structlog import get_logger
+from connection_mgr import ConnectionManager
+log = get_logger()
+
+class Probe(SimpleHTTPRequestHandler):
+
+    def do_GET(self):
+
+        if self.path == '/healthz':
+            self.health_probe()
+
+        elif self.path == '/readz':
+            self.ready_probe()
+
+    def health_probe(self):
+
+        if ConnectionManager.liveness_probe():
+            self.send_response(200)
+            self.end_headers()
+        else :
+            self.send_response(418)
+            self.end_headers()
+
+    def ready_probe(self):
+
+        if ConnectionManager.readiness_probe():
+            self.send_response(200)
+            self.end_headers()
+        else :
+            self.send_response(418)
+            self.end_headers()