[VOL-2099] Modified RO Core PoD Readiness state to reflect KVStore connection status

Change-Id: I38998fe75a99769ea24b72e2e0bb19d8c08e0529
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index 4baa198..d022266 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -26,6 +26,7 @@
 	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 	"github.com/opencord/voltha-protos/v2/go/voltha"
 	"google.golang.org/grpc"
+	"time"
 )
 
 type Core struct {
@@ -42,6 +43,7 @@
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
 	kvClient          kvstore.Client
+	backend           db.Backend
 }
 
 func init() {
@@ -55,19 +57,24 @@
 	core.config = cf
 	core.kvClient = kvClient
 
+	// Configure backend to push Liveness Status at least every cf.LiveProbeInterval / 2 seconds
+	// so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
+	livenessChannelInterval := cf.LiveProbeInterval / 2
+
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
 	// is resolved.
-	backend := db.Backend{
-		Client:     kvClient,
-		StoreType:  cf.KVStoreType,
-		Host:       cf.KVStoreHost,
-		Port:       cf.KVStorePort,
-		Timeout:    cf.KVStoreTimeout,
-		PathPrefix: "service/voltha"}
-	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
-	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+	core.backend = db.Backend{
+		Client:                  kvClient,
+		StoreType:               cf.KVStoreType,
+		Host:                    cf.KVStoreHost,
+		Port:                    cf.KVStorePort,
+		Timeout:                 cf.KVStoreTimeout,
+		LivenessChannelInterval: livenessChannelInterval,
+		PathPrefix:              "service/voltha"}
+	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
+	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
 	return &core
@@ -81,6 +88,7 @@
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
+	go core.monitorKvstoreLiveness(ctx)
 
 	log.Info("adaptercore-started")
 }
@@ -107,7 +115,7 @@
 // and starts the server
 func (core *Core) startGRPCService(ctx context.Context) {
 	//	create an insecure gserver server
-	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, nil)
+	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
 	log.Info("grpc-server-created")
 
 	core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
@@ -157,3 +165,68 @@
 	core.logicalDeviceMgr.start(ctx)
 	log.Info("started-Logical-DeviceManager")
 }
+
+/*
+* Thread to monitor kvstore Liveness (connection status)
+*
+* This function constantly monitors Liveness State of kvstore as reported
+* periodically by backend and updates the Status of kv-store service registered
+* with ro_core probe.
+*
+* If no liveness event has been seen within a timeout, then the thread will make
+* an trigger a "liveness" check, which will in turn trigger a liveness event on
+* the liveness channel, true or false depending on whether the attempt succeeded.
+*
+* The gRPC server in turn monitors the state of the readiness probe and will
+* start issuing UNAVAILABLE response while the probe is not ready.
+ */
+func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
+	log.Info("start-monitoring-kvstore-liveness")
+
+	// Instruct backend to create Liveness channel for transporting state updates
+	livenessChannel := core.backend.EnableLivenessChannel()
+
+	log.Debug("enabled-kvstore-liveness-channel")
+
+	// Default state for kvstore is not alive
+	timeout := core.config.NotLiveProbeInterval
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+
+		case liveness := <-livenessChannel:
+			log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+			if !liveness {
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+
+				if core.grpcServer != nil {
+					log.Info("kvstore-set-server-notready")
+				}
+
+				timeout = core.config.NotLiveProbeInterval
+			} else {
+				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
+				if core.grpcServer != nil {
+					log.Info("kvstore-set-server-ready")
+				}
+
+				timeout = core.config.LiveProbeInterval
+			}
+
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+
+		case <-timeoutTimer.C:
+			log.Info("kvstore-perform-liveness-check-on-timeout")
+
+			// Trigger Liveness check if no liveness update received within the timeout period.
+			// The Liveness check will push Live state to same channel which this routine is
+			// reading and processing. This, do it asynchronously to avoid blocking for
+			// backend response and avoid any possibility of deadlock
+			go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+		}
+	}
+}