VOL-2631 Cleanup Kafka Interadapter Proxy on shutdown;
Exit gracefully rather than Fatal() on kafka shutdown
Change-Id: Ic4506a8f4e5127d10a0db130a81584c0bf8c8428
diff --git a/main.go b/main.go
index 536e917..05479c9 100644
--- a/main.go
+++ b/main.go
@@ -202,17 +202,28 @@
livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
timeout := a.config.LiveProbeInterval
+ failed := false
for {
timeoutTimer := time.NewTimer(timeout)
select {
case healthiness := <-healthinessChannel:
if !healthiness {
- // log.Fatal will call os.Exit(1) to terminate
- log.Fatal("Kafka service has become unhealthy")
+ // This will eventually cause K8s to restart the container, and will do
+ // so in a way that allows cleanup to continue, rather than an immediate
+ // panic and exit here.
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
+ failed = true
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
}
case liveliness := <-livelinessChannel:
- if !liveliness {
+ if failed {
+ // Failures of the message bus are permanent and can't ever be recovered from,
+ // so make sure we never inadvertently reset a failed state back to unready.
+ } else if !liveliness {
// kafka not reachable or down, updating the status to not ready state
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
timeout = a.config.NotLiveProbeInterval
@@ -255,6 +266,10 @@
a.kvClient.Close()
}
+ if a.kip != nil {
+ a.kip.Stop()
+ }
+
// TODO: More cleanup
}