VOL-2098 Monitor Kafka service readiness

Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/rw_core/main.go b/rw_core/main.go
index 847f386..faddd57 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -65,7 +65,7 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
-func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
 
 	log.Infow("kafka-client-type", log.Fields{"client": clientType})
 	switch clientType {
@@ -82,7 +82,9 @@
 			kafka.ConsumerGroupPrefix(instanceID),
 			kafka.AutoCreateTopic(true),
 			kafka.ProducerFlushFrequency(5),
-			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+			kafka.ProducerRetryBackoff(time.Millisecond*30),
+			kafka.LivenessChannelInterval(livenessChannelInterval),
+		), nil
 	}
 	return nil, errors.New("unsupported-client-type")
 }
@@ -137,7 +139,11 @@
 	}
 
 	// Setup Kafka Client
-	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, instanceId); err != nil {
+	if rw.kafkaClient, err = newKafkaClient("sarama",
+		rw.config.KafkaAdapterHost,
+		rw.config.KafkaAdapterPort,
+		instanceId,
+		time.Duration(rw.config.LiveProbeInterval)*time.Second/2); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}