VOL-2204 Report health status of Kafka service
Change-Id: I993b89f500bcbaa26da77f193575f5f20c3fb7c9
diff --git a/VERSION b/VERSION
index 255642e..2f09892 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.2.22
+2.2.23
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 488bf9f..bda7ed9 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -69,4 +69,5 @@
Send(msg interface{}, topic *Topic, keys ...string) error
SendLiveness() error
EnableLivenessChannel(enable bool) chan bool
+ EnableHealthinessChannel(enable bool) chan bool
}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 3326191..4e04b30 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -768,6 +768,10 @@
return kp.kafkaClient.EnableLivenessChannel(enable)
}
+func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+ return kp.kafkaClient.EnableHealthinessChannel(enable)
+}
+
func (kp *InterContainerProxy) SendLiveness() error {
return kp.kafkaClient.SendLiveness()
}
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index ff521a7..ca88dfd 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -81,6 +81,8 @@
livenessChannelInterval time.Duration
lastLivenessTime time.Time
started bool
+ healthy bool
+ healthiness chan bool
}
type SaramaClientOption func(*SaramaClient)
@@ -231,8 +233,9 @@
client.lockOfTopicLockMap = sync.RWMutex{}
client.lockOfGroupConsumers = sync.RWMutex{}
- // alive until proven otherwise
+ // healthy and alive until proven otherwise
client.alive = true
+ client.healthy = true
return client
}
@@ -484,6 +487,15 @@
}
}
+// Once unhealthy, we never go back
+func (sc *SaramaClient) setUnhealthy() {
+ sc.healthy = false
+ if sc.healthiness != nil {
+ log.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+ sc.healthiness <- sc.healthy
+ }
+}
+
func (sc *SaramaClient) isLivenessError(err error) bool {
// Sarama producers and consumers encapsulate the error inside
// a ProducerError or ConsumerError struct.
@@ -600,6 +612,30 @@
return sc.liveness
}
+// Enable the Healthiness monitor channel. This channel will report "false"
+// if the kafka consumers die, or some other problem occurs which is
+// catastrophic that would require re-creating the client.
+func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
+ log.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+ if enable {
+ if sc.healthiness == nil {
+ log.Info("kafka-create-healthiness-channel")
+ // At least 1, so we can immediately post to it without blocking
+ // Setting a bigger number (10) allows the monitor to fall behind
+ // without blocking others. The monitor shouldn't really fall
+ // behind...
+ sc.healthiness = make(chan bool, 10)
+ // post intial state to the channel
+ sc.healthiness <- sc.healthy
+ }
+ } else {
+ // TODO: Think about whether we need the ability to turn off
+ // liveness monitoring
+ panic("Turning off healthiness reporting is not supported")
+ }
+ return sc.healthiness
+}
+
// send an empty message on the liveness channel to check whether connectivity has
// been restored.
func (sc *SaramaClient) SendLiveness() error {
@@ -936,6 +972,7 @@
}
}
log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy()
}
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
@@ -979,6 +1016,7 @@
}
}
log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy()
}
func (sc *SaramaClient) startConsumers(topic *Topic) error {
diff --git a/pkg/mocks/kafka_client.go b/pkg/mocks/kafka_client.go
index 79cdfbc..87dd9e8 100644
--- a/pkg/mocks/kafka_client.go
+++ b/pkg/mocks/kafka_client.go
@@ -131,3 +131,7 @@
func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
return nil
}
+
+func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+ return nil
+}