VOL-2101 kubernetes liveliness changes
Help text modified
Change-Id: Ia824436d6c9c06710930dfbca1528c6366e1ef64
diff --git a/main.go b/main.go
index 2ffa534..b9bff5f 100644
--- a/main.go
+++ b/main.go
@@ -1,17 +1,17 @@
/*
- * Copyright 2018-present Open Networking Foundation
+* Copyright 2018-present Open Networking Foundation
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+* http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
*/
//Package main invokes the application
@@ -61,13 +61,6 @@
_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
}
-const (
- // DefaultLivelinessCheck to check the liveliness
- DefaultLivelinessCheck = 30 * time.Second
- // DefaultTimeout to close the connection
- DefaultTimeout = 10
-)
-
func newAdapter(cf *config.AdapterFlags) *adapter {
var a adapter
a.instanceID = cf.InstanceID
@@ -146,35 +139,97 @@
log.Fatal("error-registering-with-core")
}
- // go routine to check the readiness and liveliness and update the probe status
- go a.checKServicesReadiness(ctx)
+ // check the readiness and liveliness and update the probe status
+ a.checkServicesReadiness(ctx)
}
/**
This function checks the liveliness and readiness of the kakfa and kv-client services
and update the status in the probe.
*/
-func (a *adapter) checKServicesReadiness(ctx context.Context) {
+func (a *adapter) checkServicesReadiness(ctx context.Context) {
+ // checks the kafka readiness
+ go a.checkKafkaReadiness(ctx)
+
+ // checks the kv-store readiness
+ go a.checkKvStoreReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kv-store service
+and update the status in the probe.
+*/
+func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
+ // dividing the live probe interval by 2 to get updated status every 30s
+ timeout := a.config.LiveProbeInterval / 2
+ kvStoreChannel := make(chan bool, 1)
+
+ // Default false to check the liveliness.
+ kvStoreChannel <- false
for {
- // checks the connectivity of the kv-client
- if a.kvClient.IsConnectionUp(DefaultTimeout) {
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
- log.Debugf("kv- store is reachable")
- } else {
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
- log.Debugf("kv-store is not reachable")
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+ case liveliness := <-kvStoreChannel:
+ if !liveliness {
+ // kv-store not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ timeout = a.config.NotLiveProbeInterval
+ } else {
+ // kv-store is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ timeout = a.config.LiveProbeInterval / 2
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ // Check the status of the kv-store
+ log.Info("kv-store liveliness-recheck")
+ if a.kvClient.IsConnectionUp(a.config.KVStoreTimeout) {
+ kvStoreChannel <- true
+ } else {
+ kvStoreChannel <- false
+ }
}
- //checks the connectivity of the kafka
- a.kafkaClient.SendLiveness()
- isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
- if isKafkaReady {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- log.Debugf("kafka is reachable")
- } else {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- log.Debugf("kafka is not reachable")
+ }
+}
+
+/**
+This function checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func (a *adapter) checkKafkaReadiness(ctx context.Context) {
+ livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+ timeout := a.config.LiveProbeInterval
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+
+ select {
+ case liveliness := <-livelinessChannel:
+ if !liveliness {
+ // kafka not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+ timeout = a.config.NotLiveProbeInterval
+ } else {
+ // kafka is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+ timeout = a.config.LiveProbeInterval
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ log.Info("kafka-proxy-liveness-recheck")
+ // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+ // the liveness probe may wait (and block) writing to our channel.
+ err := a.kafkaClient.SendLiveness()
+ if err != nil {
+ // Catch possible error case if sending liveness after Sarama has been stopped.
+ log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+ }
}
- time.Sleep(DefaultLivelinessCheck)
}
}