VOL-2204 Throw fatal error after a consumer stop

Change-Id: I6e87a88ece2f7dc496d263ff0fc501bd8685cc72
diff --git a/main.go b/main.go
index b9bff5f..0e2e186 100644
--- a/main.go
+++ b/main.go
@@ -201,11 +201,17 @@
 */
 func (a *adapter) checkKafkaReadiness(ctx context.Context) {
 	livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
+	healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
 	timeout := a.config.LiveProbeInterval
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 
 		select {
+		case healthiness := <-healthinessChannel:
+			if !healthiness {
+				// log.Fatal will call os.Exit(1) to terminate
+				log.Fatal("Kafka service has become unhealthy")
+			}
 		case liveliness := <-livelinessChannel:
 			if !liveliness {
 				// kafka not reachable or down, updating the status to not ready state