[VOL-2230] Update RW Core k8s Ready state based on backend kvstore connection status
Change-Id: I51e8fb4ed97facad5f74780fe70c51e6783958ab
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index bb1ace1..68dca2c 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -19,6 +19,7 @@
"flag"
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "time"
)
// RW Core service default constants
@@ -53,9 +54,9 @@
default_CoreBindingKey = "voltha_backend_name"
default_CorePairTopic = "rwcore_1"
default_MaxConnectionRetries = -1 // retries forever
- default_ConnectionRetryInterval = 2 // in seconds
- default_LiveProbeInterval = 60 // in seconds
- default_NotLiveProbeInterval = 60 // in seconds
+ default_ConnectionRetryInterval = 2 * time.Second
+ default_LiveProbeInterval = 60 * time.Second
+ default_NotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
default_ProbeHost = ""
default_ProbePort = 8080
)
@@ -91,9 +92,9 @@
CoreBindingKey string
CorePairTopic string
MaxConnectionRetries int
- ConnectionRetryInterval int
- LiveProbeInterval int
- NotLiveProbeInterval int
+ ConnectionRetryInterval time.Duration
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
ProbeHost string
ProbePort int
}
@@ -223,13 +224,13 @@
flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
help = fmt.Sprintf("The number of seconds between each connection retry attempt")
- flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+ flag.DurationVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
help = fmt.Sprintf("The number of seconds between liveness probes while in a live state")
- flag.IntVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
+ flag.DurationVar(&(cf.LiveProbeInterval), "live_probe_interval", default_LiveProbeInterval, help)
help = fmt.Sprintf("The number of seconds between liveness probes while in a not live state")
- flag.IntVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+ flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
help = fmt.Sprintf("The host on which to listen to answer liveness and readiness probe queries over HTTP.")
flag.StringVar(&(cf.ProbeHost), "probe_host", default_ProbeHost, help)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index b74de4a..047ef4a 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -47,6 +47,7 @@
localDataProxy *model.Proxy
exitChannel chan int
kvClient kvstore.Client
+ backend db.Backend
kafkaClient kafka.Client
deviceOwnership *DeviceOwnership
}
@@ -63,16 +64,21 @@
core.kvClient = kvClient
core.kafkaClient = kafkaClient
+ // Configure backend to push Liveness Status at least every (cf.LiveProbeInterval / 2) seconds
+ // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
+ livenessChannelInterval := cf.LiveProbeInterval / 2
+
// Setup the KV store
- backend := db.Backend{
- Client: kvClient,
- StoreType: cf.KVStoreType,
- Host: cf.KVStoreHost,
- Port: cf.KVStorePort,
- Timeout: cf.KVStoreTimeout,
- PathPrefix: cf.KVStoreDataPrefix}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ core.backend = db.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ LivenessChannelInterval: livenessChannelInterval,
+ PathPrefix: cf.KVStoreDataPrefix}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
return &core
@@ -131,6 +137,7 @@
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
go core.startAdapterManager(ctx)
+ go core.monitorKvstoreLiveness(ctx)
// Setup device ownership context
core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
@@ -254,7 +261,7 @@
* though the current default is that both are set to 60 seconds.
*/
-func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval int, liveProbeInterval int, notLiveProbeInterval int) {
+func (core *Core) startKafkaManager(ctx context.Context, startupRetryInterval time.Duration, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
log.Infow("starting-kafka-manager-thread", log.Fields{"host": core.config.KafkaAdapterHost,
"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
@@ -267,7 +274,7 @@
// Don't worry about liveness, as we can't be live until we've started.
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
- time.Sleep(time.Duration(startupRetryInterval) * time.Second)
+ time.Sleep(startupRetryInterval)
} else {
// We started. We only need to do this once.
// Next we'll fall through and start checking liveness.
@@ -288,7 +295,7 @@
log.Info("enabled-kafka-liveness-channel")
- timeout := time.Duration(liveProbeInterval) * time.Second
+ timeout := liveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
@@ -303,7 +310,7 @@
}
// retry frequently while life is bad
- timeout = time.Duration(notLiveProbeInterval) * time.Second
+ timeout = notLiveProbeInterval
} else {
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
@@ -312,7 +319,7 @@
}
// retry infrequently while life is good
- timeout = time.Duration(liveProbeInterval) * time.Second
+ timeout = liveProbeInterval
}
if !timeoutTimer.Stop() {
<-timeoutTimer.C
@@ -333,7 +340,7 @@
}
// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
-func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
"port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
// Get timeout in seconds with 1 second set as minimum
@@ -352,7 +359,7 @@
}
count += 1
// Take a nap before retrying
- time.Sleep(time.Duration(retryInterval) * time.Second)
+ time.Sleep(retryInterval)
log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
} else {
@@ -402,3 +409,69 @@
core.adapterMgr.start(ctx)
log.Info("Adapter-Manager-Started")
}
+
+/*
+* Thread to monitor kvstore Liveness (connection status)
+*
+* This function constantly monitors Liveness State of kvstore as reported
+* periodically by backend and updates the Status of kv-store service registered
+* with rw_core probe.
+*
+* If no liveness event has been seen within a timeout, then the thread will
+* perform a "liveness" check attempt, which will in turn trigger a liveness event on
+* the liveness channel, true or false depending on whether the attempt succeeded.
+*
+* The gRPC server in turn monitors the state of the readiness probe and will
+* start issuing UNAVAILABLE response while the probe is not ready.
+ */
+func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
+ log.Info("start-monitoring-kvstore-liveness")
+
+ // Instruct backend to create Liveness channel for transporting state updates
+ livenessChannel := core.backend.EnableLivenessChannel()
+
+ log.Debug("enabled-kvstore-liveness-channel")
+
+ // Default state for kvstore is alive for rw_core
+ timeout := core.config.LiveProbeInterval
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+
+ case liveness := <-livenessChannel:
+ log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+ if !liveness {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+
+ if core.grpcServer != nil {
+ log.Info("kvstore-set-server-notready")
+ }
+
+ timeout = core.config.NotLiveProbeInterval
+
+ } else {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
+ if core.grpcServer != nil {
+ log.Info("kvstore-set-server-ready")
+ }
+
+ timeout = core.config.LiveProbeInterval
+ }
+
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+
+ case <-timeoutTimer.C:
+ log.Info("kvstore-perform-liveness-check-on-timeout")
+
+ // Trigger Liveness check if no liveness update received within the timeout period.
+ // The Liveness check will push Live state to same channel which this routine is
+ // reading and processing. This, do it asynchronously to avoid blocking for
+ // backend response and avoid any possibility of deadlock
+ go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+ }
+ }
+}
diff --git a/rw_core/main.go b/rw_core/main.go
index faddd57..8ad3833 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -143,7 +143,7 @@
rw.config.KafkaAdapterHost,
rw.config.KafkaAdapterPort,
instanceId,
- time.Duration(rw.config.LiveProbeInterval)*time.Second/2); err != nil {
+ rw.config.LiveProbeInterval/2); err != nil {
log.Fatal("Unsupported-kafka-client")
}