VOL-2098 Monitor Kafka service readiness

Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index e340a29..b74de4a 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -106,11 +106,10 @@
 		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
 	}
 
-	if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
-		log.Fatal("Failure-starting-kafkaMessagingProxy")
-	}
-	if p != nil {
-		p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+	// core.kmp must be created before deviceMgr and adapterMgr, as they will make
+	// private copies of the poiner to core.kmp.
+	if err := core.initKafkaManager(ctx); err != nil {
+		log.Fatal("Failed-to-init-kafka-manager")
 	}
 
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
@@ -119,9 +118,14 @@
 	core.deviceMgr.adapterMgr = core.adapterMgr
 	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
 
-	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
-		log.Fatal("Failure-registering-adapterRequestHandler")
-	}
+	// Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
+	// logicalDeviceMgr have been created, as once the kmp is started, it will register
+	// the above with the kmp.
+
+	go core.startKafkaManager(ctx,
+		core.config.ConnectionRetryInterval,
+		core.config.LiveProbeInterval,
+		core.config.NotLiveProbeInterval)
 
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
@@ -159,7 +163,7 @@
 //startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
 func (core *Core) startGRPCService(ctx context.Context) {
 	//	create an insecure gserver server
-	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
+	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
 	log.Info("grpc-server-created")
 
 	core.grpcNBIAPIHandler = NewAPIHandler(core)
@@ -193,9 +197,14 @@
 	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
 }
 
-func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
-	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
+// Initialize the kafka manager, but we will start it later
+func (core *Core) initKafkaManager(ctx context.Context) error {
+	log.Infow("initialize-kafka-manager", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
+
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+
+	// create the proxy
 	var err error
 	if core.kmp, err = kafka.NewInterContainerProxy(
 		kafka.InterContainerHost(core.config.KafkaAdapterHost),
@@ -206,25 +215,121 @@
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
-	count := 0
-	for {
-		if err = core.kmp.Start(); err != nil {
+
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+
+	return nil
+}
+
+/*
+ * KafkaMonitorThread
+ *
+ * Repsonsible for starting the Kafka Interadapter Proxy and monitoring its liveness
+ * state.
+ *
+ * Any producer that fails to send will cause KafkaInterContainerProxy to
+ * post a false event on its liveness channel. Any producer that succeeds in sending
+ * will cause KafkaInterContainerProxy to post a true event on its liveness
+ * channel. Group recievers also update liveness state, and a receiver will typically
+ * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
+ * only indicate restoration of liveness if a message is received. During normal
+ * operation, messages will be routinely produced and received, automatically
+ * indicating liveness state. These routine liveness indications are rate-limited
+ * inside sarama_client.
+ *
+ * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
+ * that state to the core's readiness probes. If no liveness event has been seen
+ * within a timeout, then the thread will make an attempt to produce a "liveness"
+ * message, which will in turn trigger a liveness event on the liveness channel, true
+ * or false depending on whether the attempt succeeded.
+ *
+ * The gRPC server in turn monitors the state of the readiness probe and will
+ * start issuing UNAVAILABLE response while the probe is not ready.
+ *
+ * startupRetryInterval -- interval between attempts to start
+ * liveProbeInterval -- interval between liveness checks when in a live state
+ * notLiveProbeInterval -- interval between liveness checks when in a notLive state
+ *
+ * liveProbeInterval and notLiveProbeInterval can be configured separately,
+ * though the current default is that both are set to 60 seconds.
+ */
+
+func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval int, liveProbeInterval int, notLiveProbeInterval int) {
+	log.Infow("starting-kafka-manager-thread", log.Fields{"host": core.config.KafkaAdapterHost,
+		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
+
+	started := false
+	for !started {
+		// If we haven't started yet, then try to start
+		log.Infow("starting-kafka-proxy", log.Fields{})
+		if err := core.kmp.Start(); err != nil {
+			// We failed to start. Delay and then try again later.
+			// Don't worry about liveness, as we can't be live until we've started.
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
 			log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
-			if maxRetries != -1 {
-				if count >= maxRetries {
-					return err
-				}
-			}
-			count += 1
-			log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
-			//	Take a nap before retrying
-			time.Sleep(time.Duration(retryInterval) * time.Second)
+			time.Sleep(time.Duration(startupRetryInterval) * time.Second)
 		} else {
-			break
+			// We started. We only need to do this once.
+			// Next we'll fall through and start checking liveness.
+			log.Infow("started-kafka-proxy", log.Fields{})
+
+			// cannot do this until after the kmp is started
+			if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+				log.Fatal("Failure-registering-adapterRequestHandler")
+			}
+
+			started = true
 		}
 	}
-	log.Info("kafka-messaging-proxy-created")
-	return nil
+
+	log.Info("started-kafka-message-proxy")
+
+	livenessChannel := core.kmp.EnableLivenessChannel(true)
+
+	log.Info("enabled-kafka-liveness-channel")
+
+	timeout := time.Duration(liveProbeInterval) * time.Second
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+		case liveness := <-livenessChannel:
+			log.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+			// there was a state change in Kafka liveness
+			if !liveness {
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+
+				if core.grpcServer != nil {
+					log.Info("kafka-manager-thread-set-server-notready")
+				}
+
+				// retry frequently while life is bad
+				timeout = time.Duration(notLiveProbeInterval) * time.Second
+			} else {
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+
+				if core.grpcServer != nil {
+					log.Info("kafka-manager-thread-set-server-ready")
+				}
+
+				// retry infrequently while life is good
+				timeout = time.Duration(liveProbeInterval) * time.Second
+			}
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			log.Info("kafka-proxy-liveness-recheck")
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			go func() {
+				err := core.kmp.SendLiveness()
+				if err != nil {
+					// Catch possible error case if sending liveness after Sarama has been stopped.
+					log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+				}
+			}()
+		}
+	}
 }
 
 // waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached