VOL-2098 Monitor Kafka service readiness

Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 87f26c3..bb1ace1 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -54,6 +54,8 @@
 	default_CorePairTopic             = "rwcore_1"
 	default_MaxConnectionRetries      = -1 // retries forever
 	default_ConnectionRetryInterval   = 2  // in seconds
+	default_LiveProbeInterval         = 60 // in seconds
+	default_NotLiveProbeInterval      = 60 // in seconds
 	default_ProbeHost                 = ""
 	default_ProbePort                 = 8080
 )
@@ -90,6 +92,8 @@
 	CorePairTopic             string
 	MaxConnectionRetries      int
 	ConnectionRetryInterval   int
+	LiveProbeInterval         int
+	NotLiveProbeInterval      int
 	ProbeHost                 string
 	ProbePort                 int
 }
@@ -130,6 +134,8 @@
 		CorePairTopic:             default_CorePairTopic,
 		MaxConnectionRetries:      default_MaxConnectionRetries,
 		ConnectionRetryInterval:   default_ConnectionRetryInterval,
+		LiveProbeInterval:         default_LiveProbeInterval,
+		NotLiveProbeInterval:      default_NotLiveProbeInterval,
 		ProbeHost:                 default_ProbeHost,
 		ProbePort:                 default_ProbePort,
 	}
@@ -216,9 +222,15 @@
 	help = fmt.Sprintf("The number of retries to connect to a dependent component")
 	flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
 
-	help = fmt.Sprintf("The number of seconds between each connection retry attempt ")
+	help = fmt.Sprintf("The number of seconds between each connection retry attempt")
 	flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
 
+	help = fmt.Sprintf("The number of seconds between liveness probes while in a live state")
+	flag.IntVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
+
+	help = fmt.Sprintf("The number of seconds between liveness probes while in a not live state")
+	flag.IntVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+
 	help = fmt.Sprintf("The host on which to listen to answer liveness and readiness probe queries over HTTP.")
 	flag.StringVar(&(cf.ProbeHost), "probe_host", default_ProbeHost, help)
 
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index e340a29..b74de4a 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -106,11 +106,10 @@
 		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
 	}
 
-	if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
-		log.Fatal("Failure-starting-kafkaMessagingProxy")
-	}
-	if p != nil {
-		p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+	// core.kmp must be created before deviceMgr and adapterMgr, as they will make
+	// private copies of the poiner to core.kmp.
+	if err := core.initKafkaManager(ctx); err != nil {
+		log.Fatal("Failed-to-init-kafka-manager")
 	}
 
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
@@ -119,9 +118,14 @@
 	core.deviceMgr.adapterMgr = core.adapterMgr
 	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
 
-	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
-		log.Fatal("Failure-registering-adapterRequestHandler")
-	}
+	// Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
+	// logicalDeviceMgr have been created, as once the kmp is started, it will register
+	// the above with the kmp.
+
+	go core.startKafkaManager(ctx,
+		core.config.ConnectionRetryInterval,
+		core.config.LiveProbeInterval,
+		core.config.NotLiveProbeInterval)
 
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
@@ -159,7 +163,7 @@
 //startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
 func (core *Core) startGRPCService(ctx context.Context) {
 	//	create an insecure gserver server
-	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
+	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
 	log.Info("grpc-server-created")
 
 	core.grpcNBIAPIHandler = NewAPIHandler(core)
@@ -193,9 +197,14 @@
 	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
 }
 
-func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
-	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
+// Initialize the kafka manager, but we will start it later
+func (core *Core) initKafkaManager(ctx context.Context) error {
+	log.Infow("initialize-kafka-manager", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
+
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+
+	// create the proxy
 	var err error
 	if core.kmp, err = kafka.NewInterContainerProxy(
 		kafka.InterContainerHost(core.config.KafkaAdapterHost),
@@ -206,25 +215,121 @@
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
-	count := 0
-	for {
-		if err = core.kmp.Start(); err != nil {
+
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+
+	return nil
+}
+
+/*
+ * KafkaMonitorThread
+ *
+ * Repsonsible for starting the Kafka Interadapter Proxy and monitoring its liveness
+ * state.
+ *
+ * Any producer that fails to send will cause KafkaInterContainerProxy to
+ * post a false event on its liveness channel. Any producer that succeeds in sending
+ * will cause KafkaInterContainerProxy to post a true event on its liveness
+ * channel. Group recievers also update liveness state, and a receiver will typically
+ * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
+ * only indicate restoration of liveness if a message is received. During normal
+ * operation, messages will be routinely produced and received, automatically
+ * indicating liveness state. These routine liveness indications are rate-limited
+ * inside sarama_client.
+ *
+ * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
+ * that state to the core's readiness probes. If no liveness event has been seen
+ * within a timeout, then the thread will make an attempt to produce a "liveness"
+ * message, which will in turn trigger a liveness event on the liveness channel, true
+ * or false depending on whether the attempt succeeded.
+ *
+ * The gRPC server in turn monitors the state of the readiness probe and will
+ * start issuing UNAVAILABLE response while the probe is not ready.
+ *
+ * startupRetryInterval -- interval between attempts to start
+ * liveProbeInterval -- interval between liveness checks when in a live state
+ * notLiveProbeInterval -- interval between liveness checks when in a notLive state
+ *
+ * liveProbeInterval and notLiveProbeInterval can be configured separately,
+ * though the current default is that both are set to 60 seconds.
+ */
+
+func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval int, liveProbeInterval int, notLiveProbeInterval int) {
+	log.Infow("starting-kafka-manager-thread", log.Fields{"host": core.config.KafkaAdapterHost,
+		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
+
+	started := false
+	for !started {
+		// If we haven't started yet, then try to start
+		log.Infow("starting-kafka-proxy", log.Fields{})
+		if err := core.kmp.Start(); err != nil {
+			// We failed to start. Delay and then try again later.
+			// Don't worry about liveness, as we can't be live until we've started.
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
 			log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
-			if maxRetries != -1 {
-				if count >= maxRetries {
-					return err
-				}
-			}
-			count += 1
-			log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
-			//	Take a nap before retrying
-			time.Sleep(time.Duration(retryInterval) * time.Second)
+			time.Sleep(time.Duration(startupRetryInterval) * time.Second)
 		} else {
-			break
+			// We started. We only need to do this once.
+			// Next we'll fall through and start checking liveness.
+			log.Infow("started-kafka-proxy", log.Fields{})
+
+			// cannot do this until after the kmp is started
+			if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+				log.Fatal("Failure-registering-adapterRequestHandler")
+			}
+
+			started = true
 		}
 	}
-	log.Info("kafka-messaging-proxy-created")
-	return nil
+
+	log.Info("started-kafka-message-proxy")
+
+	livenessChannel := core.kmp.EnableLivenessChannel(true)
+
+	log.Info("enabled-kafka-liveness-channel")
+
+	timeout := time.Duration(liveProbeInterval) * time.Second
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+		case liveness := <-livenessChannel:
+			log.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+			// there was a state change in Kafka liveness
+			if !liveness {
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+
+				if core.grpcServer != nil {
+					log.Info("kafka-manager-thread-set-server-notready")
+				}
+
+				// retry frequently while life is bad
+				timeout = time.Duration(notLiveProbeInterval) * time.Second
+			} else {
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+
+				if core.grpcServer != nil {
+					log.Info("kafka-manager-thread-set-server-ready")
+				}
+
+				// retry infrequently while life is good
+				timeout = time.Duration(liveProbeInterval) * time.Second
+			}
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			log.Info("kafka-proxy-liveness-recheck")
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			go func() {
+				err := core.kmp.SendLiveness()
+				if err != nil {
+					// Catch possible error case if sending liveness after Sarama has been stopped.
+					log.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+				}
+			}()
+		}
+	}
 }
 
 // waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
diff --git a/rw_core/main.go b/rw_core/main.go
index 847f386..faddd57 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -65,7 +65,7 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
-func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
 
 	log.Infow("kafka-client-type", log.Fields{"client": clientType})
 	switch clientType {
@@ -82,7 +82,9 @@
 			kafka.ConsumerGroupPrefix(instanceID),
 			kafka.AutoCreateTopic(true),
 			kafka.ProducerFlushFrequency(5),
-			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+			kafka.ProducerRetryBackoff(time.Millisecond*30),
+			kafka.LivenessChannelInterval(livenessChannelInterval),
+		), nil
 	}
 	return nil, errors.New("unsupported-client-type")
 }
@@ -137,7 +139,11 @@
 	}
 
 	// Setup Kafka Client
-	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, instanceId); err != nil {
+	if rw.kafkaClient, err = newKafkaClient("sarama",
+		rw.config.KafkaAdapterHost,
+		rw.config.KafkaAdapterPort,
+		instanceId,
+		time.Duration(rw.config.LiveProbeInterval)*time.Second/2); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}