VOL-2101 readiness and liveliness check added to message-bus and etcd
updated vendor folder
Change-Id: I0c1b983bf47e2740e0a24ea0c3a05a803caff15c
updated testcases
diff --git a/main.go b/main.go
index 921c3df..2ffa534 100644
--- a/main.go
+++ b/main.go
@@ -61,6 +61,13 @@
 	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
+const (
+	// DefaultLivelinessCheck to check the liveliness
+	DefaultLivelinessCheck = 30 * time.Second
+	// DefaultTimeout to close the connection
+	DefaultTimeout = 10
+)
+
 func newAdapter(cf *config.AdapterFlags) *adapter {
 	var a adapter
 	a.instanceID = cf.InstanceID
@@ -138,6 +145,37 @@
 	if err = a.registerWithCore(ctx, -1); err != nil {
 		log.Fatal("error-registering-with-core")
 	}
+
+	// go routine to check the readiness and liveliness and update the probe status
+	go a.checKServicesReadiness(ctx)
+}
+
+/**
+This function checks the liveliness and readiness of the kakfa and kv-client services
+and update the status in the probe.
+*/
+func (a *adapter) checKServicesReadiness(ctx context.Context) {
+	for {
+		// checks the connectivity of the kv-client
+		if a.kvClient.IsConnectionUp(DefaultTimeout) {
+			probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+			log.Debugf("kv- store is reachable")
+		} else {
+			probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+			log.Debugf("kv-store is  not reachable")
+		}
+		//checks the connectivity of the kafka
+		a.kafkaClient.SendLiveness()
+		isKafkaReady := <-a.kafkaClient.EnableLivenessChannel(true)
+		if isKafkaReady {
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+			log.Debugf("kafka is reachable")
+		} else {
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+			log.Debugf("kafka is  not reachable")
+		}
+		time.Sleep(DefaultLivelinessCheck)
+	}
 }
 
 func (a *adapter) stop() {