[VOL-2099] Modified RO Core PoD Readiness state to reflect KVStore connection status

Change-Id: I38998fe75a99769ea24b72e2e0bb19d8c08e0529
diff --git a/ro_core/config/config.go b/ro_core/config/config.go
index d320b13..eea6080 100644
--- a/ro_core/config/config.go
+++ b/ro_core/config/config.go
@@ -20,6 +20,7 @@
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
 	"os"
+	"time"
 )
 
 // RO Core service default constants
@@ -45,30 +46,34 @@
 	default_Affinity_Router_Topic = "affinityRouter"
 	default_ProbeHost             = ""
 	default_ProbePort             = 8080
+	default_LiveProbeInterval     = 60 * time.Second
+	default_NotLiveProbeInterval  = 5 * time.Second // Probe more frequently to detect Recovery early
 )
 
 // ROCoreFlags represents the set of configurations used by the read-only core service
 type ROCoreFlags struct {
 	// Command line parameters
-	InstanceID          string
-	ROCoreEndpoint      string
-	GrpcHost            string
-	GrpcPort            int
-	KVStoreType         string
-	KVStoreTimeout      int // in seconds
-	KVStoreHost         string
-	KVStorePort         int
-	KVTxnKeyDelTime     int
-	CoreTopic           string
-	LogLevel            int
-	Banner              bool
-	DisplayVersionOnly  bool
-	ROCoreKey           string
-	ROCoreCert          string
-	ROCoreCA            string
-	AffinityRouterTopic string
-	ProbeHost           string
-	ProbePort           int
+	InstanceID           string
+	ROCoreEndpoint       string
+	GrpcHost             string
+	GrpcPort             int
+	KVStoreType          string
+	KVStoreTimeout       int // in seconds
+	KVStoreHost          string
+	KVStorePort          int
+	KVTxnKeyDelTime      int
+	CoreTopic            string
+	LogLevel             int
+	Banner               bool
+	DisplayVersionOnly   bool
+	ROCoreKey            string
+	ROCoreCert           string
+	ROCoreCA             string
+	AffinityRouterTopic  string
+	ProbeHost            string
+	ProbePort            int
+	LiveProbeInterval    time.Duration
+	NotLiveProbeInterval time.Duration
 }
 
 func init() {
@@ -78,25 +83,27 @@
 // NewROCoreFlags returns a new ROCore config
 func NewROCoreFlags() *ROCoreFlags {
 	var roCoreFlag = ROCoreFlags{ // Default values
-		InstanceID:          default_InstanceID,
-		ROCoreEndpoint:      default_ROCoreEndpoint,
-		GrpcHost:            default_GrpcHost,
-		GrpcPort:            default_GrpcPort,
-		KVStoreType:         default_KVStoreType,
-		KVStoreTimeout:      default_KVStoreTimeout,
-		KVStoreHost:         default_KVStoreHost,
-		KVStorePort:         default_KVStorePort,
-		KVTxnKeyDelTime:     default_KVTxnKeyDelTime,
-		CoreTopic:           default_CoreTopic,
-		LogLevel:            default_LogLevel,
-		Banner:              default_Banner,
-		DisplayVersionOnly:  default_DisplayVersionOnly,
-		ROCoreKey:           default_ROCoreKey,
-		ROCoreCert:          default_ROCoreCert,
-		ROCoreCA:            default_ROCoreCA,
-		AffinityRouterTopic: default_Affinity_Router_Topic,
-		ProbeHost:           default_ProbeHost,
-		ProbePort:           default_ProbePort,
+		InstanceID:           default_InstanceID,
+		ROCoreEndpoint:       default_ROCoreEndpoint,
+		GrpcHost:             default_GrpcHost,
+		GrpcPort:             default_GrpcPort,
+		KVStoreType:          default_KVStoreType,
+		KVStoreTimeout:       default_KVStoreTimeout,
+		KVStoreHost:          default_KVStoreHost,
+		KVStorePort:          default_KVStorePort,
+		KVTxnKeyDelTime:      default_KVTxnKeyDelTime,
+		CoreTopic:            default_CoreTopic,
+		LogLevel:             default_LogLevel,
+		Banner:               default_Banner,
+		DisplayVersionOnly:   default_DisplayVersionOnly,
+		ROCoreKey:            default_ROCoreKey,
+		ROCoreCert:           default_ROCoreCert,
+		ROCoreCA:             default_ROCoreCA,
+		AffinityRouterTopic:  default_Affinity_Router_Topic,
+		ProbeHost:            default_ProbeHost,
+		ProbePort:            default_ProbePort,
+		LiveProbeInterval:    default_LiveProbeInterval,
+		NotLiveProbeInterval: default_NotLiveProbeInterval,
 	}
 	return &roCoreFlag
 }
@@ -151,6 +158,12 @@
 	help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
 	flag.IntVar(&(cf.ProbePort), "probe_port", default_ProbePort, help)
 
+	help = fmt.Sprintf("Time interval between liveness probes while in a live state")
+	flag.DurationVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
+
+	help = fmt.Sprintf("Time interval between liveness probes while in a not live state")
+	flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+
 	flag.Parse()
 
 	containerName := getContainerInfo()
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index 4baa198..d022266 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -26,6 +26,7 @@
 	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 	"github.com/opencord/voltha-protos/v2/go/voltha"
 	"google.golang.org/grpc"
+	"time"
 )
 
 type Core struct {
@@ -42,6 +43,7 @@
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
 	kvClient          kvstore.Client
+	backend           db.Backend
 }
 
 func init() {
@@ -55,19 +57,24 @@
 	core.config = cf
 	core.kvClient = kvClient
 
+	// Configure backend to push Liveness Status at least every cf.LiveProbeInterval / 2 seconds
+	// so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
+	livenessChannelInterval := cf.LiveProbeInterval / 2
+
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
 	// is resolved.
-	backend := db.Backend{
-		Client:     kvClient,
-		StoreType:  cf.KVStoreType,
-		Host:       cf.KVStoreHost,
-		Port:       cf.KVStorePort,
-		Timeout:    cf.KVStoreTimeout,
-		PathPrefix: "service/voltha"}
-	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
-	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+	core.backend = db.Backend{
+		Client:                  kvClient,
+		StoreType:               cf.KVStoreType,
+		Host:                    cf.KVStoreHost,
+		Port:                    cf.KVStorePort,
+		Timeout:                 cf.KVStoreTimeout,
+		LivenessChannelInterval: livenessChannelInterval,
+		PathPrefix:              "service/voltha"}
+	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
+	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
 	return &core
@@ -81,6 +88,7 @@
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
+	go core.monitorKvstoreLiveness(ctx)
 
 	log.Info("adaptercore-started")
 }
@@ -107,7 +115,7 @@
 // and starts the server
 func (core *Core) startGRPCService(ctx context.Context) {
 	//	create an insecure gserver server
-	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, nil)
+	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
 	log.Info("grpc-server-created")
 
 	core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
@@ -157,3 +165,68 @@
 	core.logicalDeviceMgr.start(ctx)
 	log.Info("started-Logical-DeviceManager")
 }
+
+/*
+* Thread to monitor kvstore Liveness (connection status)
+*
+* This function constantly monitors Liveness State of kvstore as reported
+* periodically by backend and updates the Status of kv-store service registered
+* with ro_core probe.
+*
+* If no liveness event has been seen within a timeout, then the thread will make
+* an trigger a "liveness" check, which will in turn trigger a liveness event on
+* the liveness channel, true or false depending on whether the attempt succeeded.
+*
+* The gRPC server in turn monitors the state of the readiness probe and will
+* start issuing UNAVAILABLE response while the probe is not ready.
+ */
+func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
+	log.Info("start-monitoring-kvstore-liveness")
+
+	// Instruct backend to create Liveness channel for transporting state updates
+	livenessChannel := core.backend.EnableLivenessChannel()
+
+	log.Debug("enabled-kvstore-liveness-channel")
+
+	// Default state for kvstore is not alive
+	timeout := core.config.NotLiveProbeInterval
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+
+		case liveness := <-livenessChannel:
+			log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+			if !liveness {
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+
+				if core.grpcServer != nil {
+					log.Info("kvstore-set-server-notready")
+				}
+
+				timeout = core.config.NotLiveProbeInterval
+			} else {
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
+				if core.grpcServer != nil {
+					log.Info("kvstore-set-server-ready")
+				}
+
+				timeout = core.config.LiveProbeInterval
+			}
+
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+
+		case <-timeoutTimer.C:
+			log.Info("kvstore-perform-liveness-check-on-timeout")
+
+			// Trigger Liveness check if no liveness update received within the timeout period.
+			// The Liveness check will push Live state to same channel which this routine is
+			// reading and processing. This, do it asynchronously to avoid blocking for
+			// backend response and avoid any possibility of deadlock
+			go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+		}
+	}
+}
diff --git a/ro_core/main.go b/ro_core/main.go
index ddcf936..f1e00d2 100644
--- a/ro_core/main.go
+++ b/ro_core/main.go
@@ -122,7 +122,7 @@
 	ro.core = c.NewCore(ro.config.InstanceID, ro.config, ro.kvClient)
 
 	if p != nil {
-		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+		p.UpdateStatus("kv-store", probe.ServiceStatusPrepared)
 	}
 
 	// start the core