[VOL-2230] Update RW Core k8s Ready state based on backend kvstore connection status

Change-Id: I51e8fb4ed97facad5f74780fe70c51e6783958ab
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index bb1ace1..68dca2c 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -19,6 +19,7 @@
 	"flag"
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"time"
 )
 
 // RW Core service default constants
@@ -53,9 +54,9 @@
 	default_CoreBindingKey            = "voltha_backend_name"
 	default_CorePairTopic             = "rwcore_1"
 	default_MaxConnectionRetries      = -1 // retries forever
-	default_ConnectionRetryInterval   = 2  // in seconds
-	default_LiveProbeInterval         = 60 // in seconds
-	default_NotLiveProbeInterval      = 60 // in seconds
+	default_ConnectionRetryInterval   = 2 * time.Second
+	default_LiveProbeInterval         = 60 * time.Second
+	default_NotLiveProbeInterval      = 5 * time.Second // Probe more frequently when not alive
 	default_ProbeHost                 = ""
 	default_ProbePort                 = 8080
 )
@@ -91,9 +92,9 @@
 	CoreBindingKey            string
 	CorePairTopic             string
 	MaxConnectionRetries      int
-	ConnectionRetryInterval   int
-	LiveProbeInterval         int
-	NotLiveProbeInterval      int
+	ConnectionRetryInterval   time.Duration
+	LiveProbeInterval         time.Duration
+	NotLiveProbeInterval      time.Duration
 	ProbeHost                 string
 	ProbePort                 int
 }
@@ -223,13 +224,13 @@
 	flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
 
 	help = fmt.Sprintf("The number of seconds between each connection retry attempt")
-	flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+	flag.DurationVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
 
 	help = fmt.Sprintf("The number of seconds between liveness probes while in a live state")
-	flag.IntVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
+	flag.DurationVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
 
 	help = fmt.Sprintf("The number of seconds between liveness probes while in a not live state")
-	flag.IntVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+	flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
 
 	help = fmt.Sprintf("The host on which to listen to answer liveness and readiness probe queries over HTTP.")
 	flag.StringVar(&(cf.ProbeHost), "probe_host", default_ProbeHost, help)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index b74de4a..047ef4a 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -47,6 +47,7 @@
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
 	kvClient          kvstore.Client
+	backend           db.Backend
 	kafkaClient       kafka.Client
 	deviceOwnership   *DeviceOwnership
 }
@@ -63,16 +64,21 @@
 	core.kvClient = kvClient
 	core.kafkaClient = kafkaClient
 
+	// Configure backend to push Liveness Status at least every (cf.LiveProbeInterval / 2) seconds
+	// so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
+	livenessChannelInterval := cf.LiveProbeInterval / 2
+
 	// Setup the KV store
-	backend := db.Backend{
-		Client:     kvClient,
-		StoreType:  cf.KVStoreType,
-		Host:       cf.KVStoreHost,
-		Port:       cf.KVStorePort,
-		Timeout:    cf.KVStoreTimeout,
-		PathPrefix: cf.KVStoreDataPrefix}
-	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
-	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+	core.backend = db.Backend{
+		Client:                  kvClient,
+		StoreType:               cf.KVStoreType,
+		Host:                    cf.KVStoreHost,
+		Port:                    cf.KVStorePort,
+		Timeout:                 cf.KVStoreTimeout,
+		LivenessChannelInterval: livenessChannelInterval,
+		PathPrefix:              cf.KVStoreDataPrefix}
+	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
+	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
 	return &core
@@ -131,6 +137,7 @@
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
 	go core.startAdapterManager(ctx)
+	go core.monitorKvstoreLiveness(ctx)
 
 	// Setup device ownership context
 	core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
@@ -254,7 +261,7 @@
  * though the current default is that both are set to 60 seconds.
  */
 
-func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval int, liveProbeInterval int, notLiveProbeInterval int) {
+func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval time.Duration, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
 	log.Infow("starting-kafka-manager-thread", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
 
@@ -267,7 +274,7 @@
 			// Don't worry about liveness, as we can't be live until we've started.
 			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
 			log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
-			time.Sleep(time.Duration(startupRetryInterval) * time.Second)
+			time.Sleep(startupRetryInterval)
 		} else {
 			// We started. We only need to do this once.
 			// Next we'll fall through and start checking liveness.
@@ -288,7 +295,7 @@
 
 	log.Info("enabled-kafka-liveness-channel")
 
-	timeout := time.Duration(liveProbeInterval) * time.Second
+	timeout := liveProbeInterval
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 		select {
@@ -303,7 +310,7 @@
 				}
 
 				// retry frequently while life is bad
-				timeout = time.Duration(notLiveProbeInterval) * time.Second
+				timeout = notLiveProbeInterval
 			} else {
 				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
 
@@ -312,7 +319,7 @@
 				}
 
 				// retry infrequently while life is good
-				timeout = time.Duration(liveProbeInterval) * time.Second
+				timeout = liveProbeInterval
 			}
 			if !timeoutTimer.Stop() {
 				<-timeoutTimer.C
@@ -333,7 +340,7 @@
 }
 
 // waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
-func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
 	log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
 		"port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
 	// Get timeout in seconds with 1 second set as minimum
@@ -352,7 +359,7 @@
 			}
 			count += 1
 			//	Take a nap before retrying
-			time.Sleep(time.Duration(retryInterval) * time.Second)
+			time.Sleep(retryInterval)
 			log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
 
 		} else {
@@ -402,3 +409,69 @@
 	core.adapterMgr.start(ctx)
 	log.Info("Adapter-Manager-Started")
 }
+
+/*
+* Thread to monitor kvstore Liveness (connection status)
+*
+* This function constantly monitors Liveness State of kvstore as reported
+* periodically by backend and updates the Status of kv-store service registered
+* with rw_core probe.
+*
+* If no liveness event has been seen within a timeout, then the thread will
+* perform a "liveness" check attempt, which will in turn trigger a liveness event on
+* the liveness channel, true or false depending on whether the attempt succeeded.
+*
+* The gRPC server in turn monitors the state of the readiness probe and will
+* start issuing UNAVAILABLE response while the probe is not ready.
+ */
+func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
+	log.Info("start-monitoring-kvstore-liveness")
+
+	// Instruct backend to create Liveness channel for transporting state updates
+	livenessChannel := core.backend.EnableLivenessChannel()
+
+	log.Debug("enabled-kvstore-liveness-channel")
+
+	// Default state for kvstore is alive for rw_core
+	timeout := core.config.LiveProbeInterval
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+
+		case liveness := <-livenessChannel:
+			log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+			if !liveness {
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+
+				if core.grpcServer != nil {
+					log.Info("kvstore-set-server-notready")
+				}
+
+				timeout = core.config.NotLiveProbeInterval
+
+			} else {
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
+				if core.grpcServer != nil {
+					log.Info("kvstore-set-server-ready")
+				}
+
+				timeout = core.config.LiveProbeInterval
+			}
+
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+
+		case <-timeoutTimer.C:
+			log.Info("kvstore-perform-liveness-check-on-timeout")
+
+			// Trigger Liveness check if no liveness update received within the timeout period.
+			// The Liveness check will push Live state to same channel which this routine is
+			// reading and processing. This, do it asynchronously to avoid blocking for
+			// backend response and avoid any possibility of deadlock
+			go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+		}
+	}
+}
diff --git a/rw_core/main.go b/rw_core/main.go
index faddd57..8ad3833 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -143,7 +143,7 @@
 		rw.config.KafkaAdapterHost,
 		rw.config.KafkaAdapterPort,
 		instanceId,
-		time.Duration(rw.config.LiveProbeInterval)*time.Second/2); err != nil {
+		rw.config.LiveProbeInterval/2); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}