[VOL-2099] Modified RO Core PoD Readiness state to reflect KVStore connection status
Change-Id: I38998fe75a99769ea24b72e2e0bb19d8c08e0529
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index 4baa198..d022266 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -26,6 +26,7 @@
"github.com/opencord/voltha-lib-go/v2/pkg/probe"
"github.com/opencord/voltha-protos/v2/go/voltha"
"google.golang.org/grpc"
+ "time"
)
type Core struct {
@@ -42,6 +43,7 @@
localDataProxy *model.Proxy
exitChannel chan int
kvClient kvstore.Client
+ backend db.Backend
}
func init() {
@@ -55,19 +57,24 @@
core.config = cf
core.kvClient = kvClient
+ // Configure backend to push Liveness Status at least every cf.LiveProbeInterval / 2 seconds
+ // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
+ livenessChannelInterval := cf.LiveProbeInterval / 2
+
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
// is resolved.
- backend := db.Backend{
- Client: kvClient,
- StoreType: cf.KVStoreType,
- Host: cf.KVStoreHost,
- Port: cf.KVStorePort,
- Timeout: cf.KVStoreTimeout,
- PathPrefix: "service/voltha"}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ core.backend = db.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ LivenessChannelInterval: livenessChannelInterval,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
return &core
@@ -81,6 +88,7 @@
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
+ go core.monitorKvstoreLiveness(ctx)
log.Info("adaptercore-started")
}
@@ -107,7 +115,7 @@
// and starts the server
func (core *Core) startGRPCService(ctx context.Context) {
// create an insecure gserver server
- core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, nil)
+ core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
log.Info("grpc-server-created")
core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
@@ -157,3 +165,68 @@
core.logicalDeviceMgr.start(ctx)
log.Info("started-Logical-DeviceManager")
}
+
+/*
+* Thread to monitor kvstore Liveness (connection status)
+*
+* This function constantly monitors Liveness State of kvstore as reported
+* periodically by backend and updates the Status of kv-store service registered
+* with ro_core probe.
+*
+* If no liveness event has been seen within a timeout, then the thread will make
+* an trigger a "liveness" check, which will in turn trigger a liveness event on
+* the liveness channel, true or false depending on whether the attempt succeeded.
+*
+* The gRPC server in turn monitors the state of the readiness probe and will
+* start issuing UNAVAILABLE response while the probe is not ready.
+ */
+func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
+ log.Info("start-monitoring-kvstore-liveness")
+
+ // Instruct backend to create Liveness channel for transporting state updates
+ livenessChannel := core.backend.EnableLivenessChannel()
+
+ log.Debug("enabled-kvstore-liveness-channel")
+
+ // Default state for kvstore is not alive
+ timeout := core.config.NotLiveProbeInterval
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+
+ case liveness := <-livenessChannel:
+ log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+ if !liveness {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+
+ if core.grpcServer != nil {
+ log.Info("kvstore-set-server-notready")
+ }
+
+ timeout = core.config.NotLiveProbeInterval
+ } else {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
+ if core.grpcServer != nil {
+ log.Info("kvstore-set-server-ready")
+ }
+
+ timeout = core.config.LiveProbeInterval
+ }
+
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+
+ case <-timeoutTimer.C:
+ log.Info("kvstore-perform-liveness-check-on-timeout")
+
+ // Trigger Liveness check if no liveness update received within the timeout period.
+ // The Liveness check will push Live state to same channel which this routine is
+ // reading and processing. This, do it asynchronously to avoid blocking for
+ // backend response and avoid any possibility of deadlock
+ go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+ }
+ }
+}