[VOL-1719] Kubernetes Probes for R/O Core
Change-Id: I5a9e8963f312aa75cd7ca7c38440850f4cfae53f
diff --git a/ro_core/config/config.go b/ro_core/config/config.go
index 030134b..be78228 100644
--- a/ro_core/config/config.go
+++ b/ro_core/config/config.go
@@ -43,6 +43,7 @@
default_ROCoreCert = "pki/voltha.crt"
default_ROCoreCA = "pki/voltha-CA.pem"
default_Affinity_Router_Topic = "affinityRouter"
+ default_ProbePort = 8080
)
// ROCoreFlags represents the set of configurations used by the read-only core service
@@ -65,6 +66,7 @@
ROCoreCert string
ROCoreCA string
AffinityRouterTopic string
+ ProbePort int
}
func init() {
@@ -91,6 +93,7 @@
ROCoreCert: default_ROCoreCert,
ROCoreCA: default_ROCoreCA,
AffinityRouterTopic: default_Affinity_Router_Topic,
+ ProbePort: default_ProbePort,
}
return &roCoreFlag
}
@@ -139,6 +142,9 @@
help = fmt.Sprintf("Show version information and exit")
flag.BoolVar(&cf.DisplayVersionOnly, "version", default_DisplayVersionOnly, help)
+ help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
+ flag.IntVar(&(cf.ProbePort), "probe_port", default_ProbePort, help)
+
flag.Parse()
containerName := getContainerInfo()
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index 54cd455..cd27a42 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -19,6 +19,7 @@
"context"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/ro_core/config"
@@ -113,9 +114,24 @@
core.grpcServer.AddService(f)
log.Info("grpc-service-added")
+ /*
+ * Start the GRPC server
+ *
+ * This is a bit sub-optimal here as the grpcServer.Start call does not return (blocks)
+ * until something fails, but we want to send a "start" status update. As written this
+ * means that we are actually sending the "start" status update before the server is
+ * started, which means it is possible that the status is "running" before it actually is.
+ *
+ * This means that there is a small window in which the core could return its status as
+ * ready, when it really isn't.
+ */
+ probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
+
// Start the server
- core.grpcServer.Start(context.Background())
log.Info("grpc-server-started")
+ core.grpcServer.Start(context.Background())
+
+ probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
}
func (core *Core) startDeviceManager(ctx context.Context) {
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
index 90c7822..c42eee3 100644
--- a/ro_core/core/device_manager.go
+++ b/ro_core/core/device_manager.go
@@ -18,6 +18,7 @@
import (
"context"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
@@ -44,12 +45,14 @@
func (dMgr *DeviceManager) start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
log.Info("starting-device-manager")
dMgr.logicalDeviceMgr = logicalDeviceMgr
+ probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusRunning)
log.Info("device-manager-started")
}
func (dMgr *DeviceManager) stop(ctx context.Context) {
log.Info("stopping-device-manager")
dMgr.exitChannel <- 1
+ probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusStopped)
log.Info("device-manager-stopped")
}
diff --git a/ro_core/core/logical_device_manager.go b/ro_core/core/logical_device_manager.go
index db220d5..215a406 100644
--- a/ro_core/core/logical_device_manager.go
+++ b/ro_core/core/logical_device_manager.go
@@ -18,6 +18,7 @@
import (
"context"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
@@ -49,12 +50,14 @@
func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
log.Info("starting-logical-device-manager")
+ probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
log.Info("logical-device-manager-started")
}
func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
log.Info("stopping-logical-device-manager")
ldMgr.exitChannel <- 1
+ probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusStopped)
log.Info("logical-device-manager-stopped")
}
diff --git a/ro_core/main.go b/ro_core/main.go
index 2313fd3..27ada02 100644
--- a/ro_core/main.go
+++ b/ro_core/main.go
@@ -21,6 +21,7 @@
"fmt"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/common/version"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/ro_core/config"
@@ -95,6 +96,20 @@
func (ro *roCore) start(ctx context.Context) {
log.Info("Starting RW Core components")
+ // If the context has a probe then fetch it and register our services
+ var p *probe.Probe
+ if value := ctx.Value(probe.ProbeContextKey); value != nil {
+ if _, ok := value.(*probe.Probe); ok {
+ p = value.(*probe.Probe)
+ p.RegisterService(
+ "kv-store",
+ "device-manager",
+ "logical-device-manager",
+ "grpc-service",
+ )
+ }
+ }
+
// Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": ro.config.KVStoreType})
@@ -106,11 +121,15 @@
// Create the core service
ro.core = c.NewCore(ro.config.InstanceID, ro.config, ro.kvClient)
+ if p != nil {
+ p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+ }
+
// start the core
ro.core.Start(ctx)
}
-func (ro *roCore) stop() {
+func (ro *roCore) stop(ctx context.Context) {
// Stop leadership tracking
ro.halted = true
@@ -127,7 +146,7 @@
ro.kvClient.Close()
}
- ro.core.Stop(nil)
+ ro.core.Stop(ctx)
}
func waitForExit() int {
@@ -210,17 +229,31 @@
log.Infow("ro-core-config", log.Fields{"config": *cf})
+ // Create the RO Core
+ ro := newROCore(cf)
+
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- ro := newROCore(cf)
- go ro.start(ctx)
+ /*
+ * Create and start the liveness and readiness container management probes. This
+ * is done in the main function so just in case the main starts multiple other
+ * objects there can be a single probe end point for the process.
+ */
+ p := &probe.Probe{}
+ go p.ListenAndServe(ro.config.ProbePort)
+
+ // Add the probe to the context to pass to all the services started
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
+
+ // Start the RO core
+ go ro.start(probeCtx)
code := waitForExit()
log.Infow("received-a-closing-signal", log.Fields{"code": code})
// Cleanup before leaving
- ro.stop()
+ ro.stop(probeCtx)
elapsed := time.Since(start)
log.Infow("ro-core-run-time", log.Fields{"core": ro.config.InstanceID, "time": elapsed / time.Second})