VOL-2204 Throw fatal error after a consumer stop
Change-Id: I6e87a88ece2f7dc496d263ff0fc501bd8685cc72
diff --git a/main.go b/main.go
index b9bff5f..0e2e186 100644
--- a/main.go
+++ b/main.go
@@ -201,11 +201,17 @@
*/
func (a *adapter) checkKafkaReadiness(ctx context.Context) {
livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+ healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
timeout := a.config.LiveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
+ case healthiness := <-healthinessChannel:
+ if !healthiness {
+ // log.Fatal will call os.Exit(1) to terminate
+ log.Fatal("Kafka service has become unhealthy")
+ }
case liveliness := <-livelinessChannel:
if !liveliness {
// kafka not reachable or down, updating the status to not ready state