[VOL-2099] Modified RO Core PoD Readiness state to reflect KVStore connection status
Change-Id: I38998fe75a99769ea24b72e2e0bb19d8c08e0529
diff --git a/ro_core/config/config.go b/ro_core/config/config.go
index d320b13..eea6080 100644
--- a/ro_core/config/config.go
+++ b/ro_core/config/config.go
@@ -20,6 +20,7 @@
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"os"
+ "time"
)
// RO Core service default constants
@@ -45,30 +46,34 @@
default_Affinity_Router_Topic = "affinityRouter"
default_ProbeHost = ""
default_ProbePort = 8080
+ default_LiveProbeInterval = 60 * time.Second
+ default_NotLiveProbeInterval = 5 * time.Second // Probe more frequently to detect Recovery early
)
// ROCoreFlags represents the set of configurations used by the read-only core service
type ROCoreFlags struct {
// Command line parameters
- InstanceID string
- ROCoreEndpoint string
- GrpcHost string
- GrpcPort int
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- KVTxnKeyDelTime int
- CoreTopic string
- LogLevel int
- Banner bool
- DisplayVersionOnly bool
- ROCoreKey string
- ROCoreCert string
- ROCoreCA string
- AffinityRouterTopic string
- ProbeHost string
- ProbePort int
+ InstanceID string
+ ROCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ KVTxnKeyDelTime int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ DisplayVersionOnly bool
+ ROCoreKey string
+ ROCoreCert string
+ ROCoreCA string
+ AffinityRouterTopic string
+ ProbeHost string
+ ProbePort int
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
}
func init() {
@@ -78,25 +83,27 @@
// NewROCoreFlags returns a new ROCore config
func NewROCoreFlags() *ROCoreFlags {
var roCoreFlag = ROCoreFlags{ // Default values
- InstanceID: default_InstanceID,
- ROCoreEndpoint: default_ROCoreEndpoint,
- GrpcHost: default_GrpcHost,
- GrpcPort: default_GrpcPort,
- KVStoreType: default_KVStoreType,
- KVStoreTimeout: default_KVStoreTimeout,
- KVStoreHost: default_KVStoreHost,
- KVStorePort: default_KVStorePort,
- KVTxnKeyDelTime: default_KVTxnKeyDelTime,
- CoreTopic: default_CoreTopic,
- LogLevel: default_LogLevel,
- Banner: default_Banner,
- DisplayVersionOnly: default_DisplayVersionOnly,
- ROCoreKey: default_ROCoreKey,
- ROCoreCert: default_ROCoreCert,
- ROCoreCA: default_ROCoreCA,
- AffinityRouterTopic: default_Affinity_Router_Topic,
- ProbeHost: default_ProbeHost,
- ProbePort: default_ProbePort,
+ InstanceID: default_InstanceID,
+ ROCoreEndpoint: default_ROCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ DisplayVersionOnly: default_DisplayVersionOnly,
+ ROCoreKey: default_ROCoreKey,
+ ROCoreCert: default_ROCoreCert,
+ ROCoreCA: default_ROCoreCA,
+ AffinityRouterTopic: default_Affinity_Router_Topic,
+ ProbeHost: default_ProbeHost,
+ ProbePort: default_ProbePort,
+ LiveProbeInterval: default_LiveProbeInterval,
+ NotLiveProbeInterval: default_NotLiveProbeInterval,
}
return &roCoreFlag
}
@@ -151,6 +158,12 @@
help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
flag.IntVar(&(cf.ProbePort), "probe_port", default_ProbePort, help)
+ help = fmt.Sprintf("Time interval between liveness probes while in a live state")
+ flag.DurationVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
+
+ help = fmt.Sprintf("Time interval between liveness probes while in a not live state")
+ flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+
flag.Parse()
containerName := getContainerInfo()
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index 4baa198..d022266 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -26,6 +26,7 @@
"github.com/opencord/voltha-lib-go/v2/pkg/probe"
"github.com/opencord/voltha-protos/v2/go/voltha"
"google.golang.org/grpc"
+ "time"
)
type Core struct {
@@ -42,6 +43,7 @@
localDataProxy *model.Proxy
exitChannel chan int
kvClient kvstore.Client
+ backend db.Backend
}
func init() {
@@ -55,19 +57,24 @@
core.config = cf
core.kvClient = kvClient
+ // Configure backend to push Liveness Status at least every cf.LiveProbeInterval / 2 seconds
+ // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
+ livenessChannelInterval := cf.LiveProbeInterval / 2
+
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
// is resolved.
- backend := db.Backend{
- Client: kvClient,
- StoreType: cf.KVStoreType,
- Host: cf.KVStoreHost,
- Port: cf.KVStorePort,
- Timeout: cf.KVStoreTimeout,
- PathPrefix: "service/voltha"}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ core.backend = db.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ LivenessChannelInterval: livenessChannelInterval,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
return &core
@@ -81,6 +88,7 @@
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
+ go core.monitorKvstoreLiveness(ctx)
log.Info("adaptercore-started")
}
@@ -107,7 +115,7 @@
// and starts the server
func (core *Core) startGRPCService(ctx context.Context) {
// create an insecure gserver server
- core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, nil)
+ core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
log.Info("grpc-server-created")
core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
@@ -157,3 +165,68 @@
core.logicalDeviceMgr.start(ctx)
log.Info("started-Logical-DeviceManager")
}
+
+/*
+* Thread to monitor kvstore Liveness (connection status)
+*
+* This function constantly monitors Liveness State of kvstore as reported
+* periodically by backend and updates the Status of kv-store service registered
+* with ro_core probe.
+*
+* If no liveness event has been seen within a timeout, then the thread will make
+* an trigger a "liveness" check, which will in turn trigger a liveness event on
+* the liveness channel, true or false depending on whether the attempt succeeded.
+*
+* The gRPC server in turn monitors the state of the readiness probe and will
+* start issuing UNAVAILABLE response while the probe is not ready.
+ */
+func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
+ log.Info("start-monitoring-kvstore-liveness")
+
+ // Instruct backend to create Liveness channel for transporting state updates
+ livenessChannel := core.backend.EnableLivenessChannel()
+
+ log.Debug("enabled-kvstore-liveness-channel")
+
+ // Default state for kvstore is not alive
+ timeout := core.config.NotLiveProbeInterval
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+
+ case liveness := <-livenessChannel:
+ log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+ if !liveness {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+
+ if core.grpcServer != nil {
+ log.Info("kvstore-set-server-notready")
+ }
+
+ timeout = core.config.NotLiveProbeInterval
+ } else {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
+ if core.grpcServer != nil {
+ log.Info("kvstore-set-server-ready")
+ }
+
+ timeout = core.config.LiveProbeInterval
+ }
+
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+
+ case <-timeoutTimer.C:
+ log.Info("kvstore-perform-liveness-check-on-timeout")
+
+ // Trigger Liveness check if no liveness update received within the timeout period.
+ // The Liveness check will push Live state to same channel which this routine is
+ // reading and processing. This, do it asynchronously to avoid blocking for
+ // backend response and avoid any possibility of deadlock
+ go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+ }
+ }
+}
diff --git a/ro_core/main.go b/ro_core/main.go
index ddcf936..f1e00d2 100644
--- a/ro_core/main.go
+++ b/ro_core/main.go
@@ -122,7 +122,7 @@
ro.core = c.NewCore(ro.config.InstanceID, ro.config, ro.kvClient)
if p != nil {
- p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+ p.UpdateStatus("kv-store", probe.ServiceStatusPrepared)
}
// start the core