VOL-2098 Monitor Kafka service readiness
Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 87f26c3..bb1ace1 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -54,6 +54,8 @@
default_CorePairTopic = "rwcore_1"
default_MaxConnectionRetries = -1 // retries forever
default_ConnectionRetryInterval = 2 // in seconds
+ default_LiveProbeInterval = 60 // in seconds
+ default_NotLiveProbeInterval = 60 // in seconds
default_ProbeHost = ""
default_ProbePort = 8080
)
@@ -90,6 +92,8 @@
CorePairTopic string
MaxConnectionRetries int
ConnectionRetryInterval int
+ LiveProbeInterval int
+ NotLiveProbeInterval int
ProbeHost string
ProbePort int
}
@@ -130,6 +134,8 @@
CorePairTopic: default_CorePairTopic,
MaxConnectionRetries: default_MaxConnectionRetries,
ConnectionRetryInterval: default_ConnectionRetryInterval,
+ LiveProbeInterval: default_LiveProbeInterval,
+ NotLiveProbeInterval: default_NotLiveProbeInterval,
ProbeHost: default_ProbeHost,
ProbePort: default_ProbePort,
}
@@ -216,9 +222,15 @@
help = fmt.Sprintf("The number of retries to connect to a dependent component")
flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
- help = fmt.Sprintf("The number of seconds between each connection retry attempt ")
+ help = fmt.Sprintf("The number of seconds between each connection retry attempt")
flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+ help = fmt.Sprintf("The number of seconds between liveness probes while in a live state")
+ flag.IntVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
+
+ help = fmt.Sprintf("The number of seconds between liveness probes while in a not live state")
+ flag.IntVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+
help = fmt.Sprintf("The host on which to listen to answer liveness and readiness probe queries over HTTP.")
flag.StringVar(&(cf.ProbeHost), "probe_host", default_ProbeHost, help)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index e340a29..b74de4a 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -106,11 +106,10 @@
p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
}
- if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
- log.Fatal("Failure-starting-kafkaMessagingProxy")
- }
- if p != nil {
- p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+ // core.kmp must be created before deviceMgr and adapterMgr, as they will make
+ // private copies of the poiner to core.kmp.
+ if err := core.initKafkaManager(ctx); err != nil {
+ log.Fatal("Failed-to-init-kafka-manager")
}
log.Debugw("values", log.Fields{"kmp": core.kmp})
@@ -119,9 +118,14 @@
core.deviceMgr.adapterMgr = core.adapterMgr
core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
- if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
- log.Fatal("Failure-registering-adapterRequestHandler")
- }
+ // Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
+ // logicalDeviceMgr have been created, as once the kmp is started, it will register
+ // the above with the kmp.
+
+ go core.startKafkaManager(ctx,
+ core.config.ConnectionRetryInterval,
+ core.config.LiveProbeInterval,
+ core.config.NotLiveProbeInterval)
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
@@ -159,7 +163,7 @@
//startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
func (core *Core) startGRPCService(ctx context.Context) {
// create an insecure gserver server
- core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
+ core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
log.Info("grpc-server-created")
core.grpcNBIAPIHandler = NewAPIHandler(core)
@@ -193,9 +197,14 @@
probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
}
-func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
- log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
+// Initialize the kafka manager, but we will start it later
+func (core *Core) initKafkaManager(ctx context.Context) error {
+ log.Infow("initialize-kafka-manager", log.Fields{"host": core.config.KafkaAdapterHost,
"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
+
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+
+ // create the proxy
var err error
if core.kmp, err = kafka.NewInterContainerProxy(
kafka.InterContainerHost(core.config.KafkaAdapterHost),
@@ -206,25 +215,121 @@
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
- count := 0
- for {
- if err = core.kmp.Start(); err != nil {
+
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+
+ return nil
+}
+
+/*
+ * KafkaMonitorThread
+ *
+ * Repsonsible for starting the Kafka Interadapter Proxy and monitoring its liveness
+ * state.
+ *
+ * Any producer that fails to send will cause KafkaInterContainerProxy to
+ * post a false event on its liveness channel. Any producer that succeeds in sending
+ * will cause KafkaInterContainerProxy to post a true event on its liveness
+ * channel. Group recievers also update liveness state, and a receiver will typically
+ * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
+ * only indicate restoration of liveness if a message is received. During normal
+ * operation, messages will be routinely produced and received, automatically
+ * indicating liveness state. These routine liveness indications are rate-limited
+ * inside sarama_client.
+ *
+ * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
+ * that state to the core's readiness probes. If no liveness event has been seen
+ * within a timeout, then the thread will make an attempt to produce a "liveness"
+ * message, which will in turn trigger a liveness event on the liveness channel, true
+ * or false depending on whether the attempt succeeded.
+ *
+ * The gRPC server in turn monitors the state of the readiness probe and will
+ * start issuing UNAVAILABLE response while the probe is not ready.
+ *
+ * startupRetryInterval -- interval between attempts to start
+ * liveProbeInterval -- interval between liveness checks when in a live state
+ * notLiveProbeInterval -- interval between liveness checks when in a notLive state
+ *
+ * liveProbeInterval and notLiveProbeInterval can be configured separately,
+ * though the current default is that both are set to 60 seconds.
+ */
+
+func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval int, liveProbeInterval int, notLiveProbeInterval int) {
+ log.Infow("starting-kafka-manager-thread", log.Fields{"host": core.config.KafkaAdapterHost,
+ "port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
+
+ started := false
+ for !started {
+ // If we haven't started yet, then try to start
+ log.Infow("starting-kafka-proxy", log.Fields{})
+ if err := core.kmp.Start(); err != nil {
+ // We failed to start. Delay and then try again later.
+ // Don't worry about liveness, as we can't be live until we've started.
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
- if maxRetries != -1 {
- if count >= maxRetries {
- return err
- }
- }
- count += 1
- log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
- // Take a nap before retrying
- time.Sleep(time.Duration(retryInterval) * time.Second)
+ time.Sleep(time.Duration(startupRetryInterval) * time.Second)
} else {
- break
+ // We started. We only need to do this once.
+ // Next we'll fall through and start checking liveness.
+ log.Infow("started-kafka-proxy", log.Fields{})
+
+ // cannot do this until after the kmp is started
+ if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+ log.Fatal("Failure-registering-adapterRequestHandler")
+ }
+
+ started = true
}
}
- log.Info("kafka-messaging-proxy-created")
- return nil
+
+ log.Info("started-kafka-message-proxy")
+
+ livenessChannel := core.kmp.EnableLivenessChannel(true)
+
+ log.Info("enabled-kafka-liveness-channel")
+
+ timeout := time.Duration(liveProbeInterval) * time.Second
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+ case liveness := <-livenessChannel:
+ log.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+ // there was a state change in Kafka liveness
+ if !liveness {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+
+ if core.grpcServer != nil {
+ log.Info("kafka-manager-thread-set-server-notready")
+ }
+
+ // retry frequently while life is bad
+ timeout = time.Duration(notLiveProbeInterval) * time.Second
+ } else {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+
+ if core.grpcServer != nil {
+ log.Info("kafka-manager-thread-set-server-ready")
+ }
+
+ // retry infrequently while life is good
+ timeout = time.Duration(liveProbeInterval) * time.Second
+ }
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ log.Info("kafka-proxy-liveness-recheck")
+ // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+ // the liveness probe may wait (and block) writing to our channel.
+ go func() {
+ err := core.kmp.SendLiveness()
+ if err != nil {
+ // Catch possible error case if sending liveness after Sarama has been stopped.
+ log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+ }
+ }()
+ }
+ }
}
// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
diff --git a/rw_core/main.go b/rw_core/main.go
index 847f386..faddd57 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -65,7 +65,7 @@
return nil, errors.New("unsupported-kv-store")
}
-func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
log.Infow("kafka-client-type", log.Fields{"client": clientType})
switch clientType {
@@ -82,7 +82,9 @@
kafka.ConsumerGroupPrefix(instanceID),
kafka.AutoCreateTopic(true),
kafka.ProducerFlushFrequency(5),
- kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+ kafka.ProducerRetryBackoff(time.Millisecond*30),
+ kafka.LivenessChannelInterval(livenessChannelInterval),
+ ), nil
}
return nil, errors.New("unsupported-client-type")
}
@@ -137,7 +139,11 @@
}
// Setup Kafka Client
- if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, instanceId); err != nil {
+ if rw.kafkaClient, err = newKafkaClient("sarama",
+ rw.config.KafkaAdapterHost,
+ rw.config.KafkaAdapterPort,
+ instanceId,
+ time.Duration(rw.config.LiveProbeInterval)*time.Second/2); err != nil {
log.Fatal("Unsupported-kafka-client")
}