VOL-2631 Cleanup Kafka Interadapter Proxy on shutdown;
Exit gracefully rather than Fatal() on kafka shutdown

Change-Id: Ic4506a8f4e5127d10a0db130a81584c0bf8c8428
diff --git a/main.go b/main.go
index 536e917..05479c9 100644
--- a/main.go
+++ b/main.go
@@ -202,17 +202,28 @@
 	livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
 	healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
 	timeout := a.config.LiveProbeInterval
+	failed := false
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 
 		select {
 		case healthiness := <-healthinessChannel:
 			if !healthiness {
-				// log.Fatal will call os.Exit(1) to terminate
-				log.Fatal("Kafka service has become unhealthy")
+				// This will eventually cause K8s to restart the container, and will do
+				// so in a way that allows cleanup to continue, rather than an immediate
+				// panic and exit here.
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
+				failed = true
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
 			}
 		case liveliness := <-livelinessChannel:
-			if !liveliness {
+			if failed {
+				// Failures of the message bus are permanent and can't ever be recovered from,
+				// so make sure we never inadvertently reset a failed state back to unready.
+			} else if !liveliness {
 				// kafka not reachable or down, updating the status to not ready state
 				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
 				timeout = a.config.NotLiveProbeInterval
@@ -255,6 +266,10 @@
 		a.kvClient.Close()
 	}
 
+	if a.kip != nil {
+		a.kip.Stop()
+	}
+
 	// TODO:  More cleanup
 }