VOL-2101 readiness and liveliness check added to message-bus and etcd
updated vendor folder
Change-Id: I0c1b983bf47e2740e0a24ea0c3a05a803caff15c
updated testcases
diff --git a/main.go b/main.go
index 921c3df..2ffa534 100644
--- a/main.go
+++ b/main.go
@@ -61,6 +61,13 @@
_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
}
+const (
+ // DefaultLivelinessCheck to check the liveliness
+ DefaultLivelinessCheck = 30 * time.Second
+ // DefaultTimeout to close the connection
+ DefaultTimeout = 10
+)
+
func newAdapter(cf *config.AdapterFlags) *adapter {
var a adapter
a.instanceID = cf.InstanceID
@@ -138,6 +145,37 @@
if err = a.registerWithCore(ctx, -1); err != nil {
log.Fatal("error-registering-with-core")
}
+
+ // go routine to check the readiness and liveliness and update the probe status
+ go a.checKServicesReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kakfa and kv-client services
+and update the status in the probe.
+*/
+func (a *adapter) checKServicesReadiness(ctx context.Context) {
+ for {
+ // checks the connectivity of the kv-client
+ if a.kvClient.IsConnectionUp(DefaultTimeout) {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ log.Debugf("kv- store is reachable")
+ } else {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ log.Debugf("kv-store is not reachable")
+ }
+ //checks the connectivity of the kafka
+ a.kafkaClient.SendLiveness()
+ isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
+ if isKafkaReady {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+ log.Debugf("kafka is reachable")
+ } else {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+ log.Debugf("kafka is not reachable")
+ }
+ time.Sleep(DefaultLivelinessCheck)
+ }
}
func (a *adapter) stop() {