VOL-2101 kubernetes liveliness changes
Help text modified
Change-Id: Ia824436d6c9c06710930dfbca1528c6366e1ef64
diff --git a/main.go b/main.go
index 2ffa534..b9bff5f 100644
--- a/main.go
+++ b/main.go
@@ -1,17 +1,17 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+* Copyright 2018-present Open Networking Foundation
 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
 
- * http://www.apache.org/licenses/LICENSE-2.0
+* http://www.apache.org/licenses/LICENSE-2.0
 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
  */
 
 //Package main invokes the application
@@ -61,13 +61,6 @@
 	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
-const (
-	// DefaultLivelinessCheck to check the liveliness
-	DefaultLivelinessCheck = 30 * time.Second
-	// DefaultTimeout to close the connection
-	DefaultTimeout = 10
-)
-
 func newAdapter(cf *config.AdapterFlags) *adapter {
 	var a adapter
 	a.instanceID = cf.InstanceID
@@ -146,35 +139,97 @@
 		log.Fatal("error-registering-with-core")
 	}
 
-	// go routine to check the readiness and liveliness and update the probe status
-	go a.checKServicesReadiness(ctx)
+	// check the readiness and liveliness and update the probe status
+	a.checkServicesReadiness(ctx)
 }
 
 /**
 This function checks the liveliness and readiness of the kakfa and kv-client services
 and update the status in the probe.
 */
-func (a *adapter) checKServicesReadiness(ctx context.Context) {
+func (a *adapter) checkServicesReadiness(ctx context.Context) {
+	// checks the kafka readiness
+	go a.checkKafkaReadiness(ctx)
+
+	// checks the kv-store readiness
+	go a.checkKvStoreReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kv-store service
+and update the status in the probe.
+*/
+func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
+	// dividing the live probe interval by 2 to get updated status every 30s
+	timeout := a.config.LiveProbeInterval / 2
+	kvStoreChannel := make(chan bool, 1)
+
+	// Default false to check the liveliness.
+	kvStoreChannel <- false
 	for {
-		// checks the connectivity of the kv-client
-		if a.kvClient.IsConnectionUp(DefaultTimeout) {
-			probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
-			log.Debugf("kv- store is reachable")
-		} else {
-			probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
-			log.Debugf("kv-store is  not reachable")
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+		case liveliness := <-kvStoreChannel:
+			if !liveliness {
+				// kv-store not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+				timeout = a.config.NotLiveProbeInterval
+			} else {
+				// kv-store is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+				timeout = a.config.LiveProbeInterval / 2
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			// Check the status of the kv-store
+			log.Info("kv-store liveliness-recheck")
+			if a.kvClient.IsConnectionUp(a.config.KVStoreTimeout) {
+				kvStoreChannel <- true
+			} else {
+				kvStoreChannel <- false
+			}
 		}
-		//checks the connectivity of the kafka
-		a.kafkaClient.SendLiveness()
-		isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
-		if isKafkaReady {
-			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-			log.Debugf("kafka is reachable")
-		} else {
-			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-			log.Debugf("kafka is  not reachable")
+	}
+}
+
+/**
+This function checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func (a *adapter) checkKafkaReadiness(ctx context.Context) {
+	livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+	timeout := a.config.LiveProbeInterval
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+
+		select {
+		case liveliness := <-livelinessChannel:
+			if !liveliness {
+				// kafka not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+				timeout = a.config.NotLiveProbeInterval
+			} else {
+				// kafka is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+				timeout = a.config.LiveProbeInterval
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			log.Info("kafka-proxy-liveness-recheck")
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			err := a.kafkaClient.SendLiveness()
+			if err != nil {
+				// Catch possible error case if sending liveness after Sarama has been stopped.
+				log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+			}
 		}
-		time.Sleep(DefaultLivelinessCheck)
 	}
 }