VOL-2098 Monitor Kafka service readiness
Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/rw_core/main.go b/rw_core/main.go
index 847f386..faddd57 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -65,7 +65,7 @@
return nil, errors.New("unsupported-kv-store")
}
-func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
log.Infow("kafka-client-type", log.Fields{"client": clientType})
switch clientType {
@@ -82,7 +82,9 @@
kafka.ConsumerGroupPrefix(instanceID),
kafka.AutoCreateTopic(true),
kafka.ProducerFlushFrequency(5),
- kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+ kafka.ProducerRetryBackoff(time.Millisecond*30),
+ kafka.LivenessChannelInterval(livenessChannelInterval),
+ ), nil
}
return nil, errors.New("unsupported-client-type")
}
@@ -137,7 +139,11 @@
}
// Setup Kafka Client
- if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, instanceId); err != nil {
+ if rw.kafkaClient, err = newKafkaClient("sarama",
+ rw.config.KafkaAdapterHost,
+ rw.config.KafkaAdapterPort,
+ instanceId,
+ time.Duration(rw.config.LiveProbeInterval)*time.Second/2); err != nil {
log.Fatal("Unsupported-kafka-client")
}