VOL-1723 - add readiness probe capability to rw-core
Change-Id: I1cf42e88712586f140a2dfa9d0b638b48261caac
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 133b1a4..f9f1d3e 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -54,6 +54,7 @@
default_CorePairTopic = "rwcore_1"
default_MaxConnectionRetries = -1 // retries forever
default_ConnectionRetryInterval = 2 // in seconds
+ default_ProbePort = 8080
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -88,6 +89,7 @@
CorePairTopic string
MaxConnectionRetries int
ConnectionRetryInterval int
+ ProbePort int
}
func init() {
@@ -126,6 +128,7 @@
CorePairTopic: default_CorePairTopic,
MaxConnectionRetries: default_MaxConnectionRetries,
ConnectionRetryInterval: default_ConnectionRetryInterval,
+ ProbePort: default_ProbePort,
}
return &rwCoreFlag
}
@@ -213,5 +216,8 @@
help = fmt.Sprintf("The number of seconds between each connection retry attempt ")
flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+ help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
+ flag.IntVar(&(cf.ProbePort), "probe_port", default_ProbePort, help)
+
flag.Parse()
}
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 0ce1828..188ae3d 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -21,6 +21,7 @@
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-protos/go/voltha"
"reflect"
@@ -127,13 +128,14 @@
// Register the callbacks
aMgr.adapterProxy.RegisterCallback(model.POST_UPDATE, aMgr.adapterUpdated)
aMgr.deviceTypeProxy.RegisterCallback(model.POST_UPDATE, aMgr.deviceTypesUpdated)
-
+ probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
log.Info("adapter-manager-started")
}
func (aMgr *AdapterManager) stop(ctx context.Context) {
log.Info("stopping-device-manager")
aMgr.exitChannel <- 1
+ probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusStopped)
log.Info("device-manager-stopped")
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4ddea05..e74e869 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -19,6 +19,7 @@
"context"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
@@ -77,16 +78,39 @@
}
func (core *Core) Start(ctx context.Context) {
+
+ // If the context has a probe then fetch it and register our services
+ var p *probe.Probe
+ if value := ctx.Value(probe.ProbeContextKey); value != nil {
+ if _, ok := value.(*probe.Probe); ok {
+ p = value.(*probe.Probe)
+ p.RegisterService(
+ "message-bus",
+ "kv-store",
+ "device-manager",
+ "logical-device-manager",
+ "adapter-manager",
+ "grpc-service",
+ )
+ }
+ }
+
log.Info("starting-core-services", log.Fields{"coreId": core.instanceId})
// Wait until connection to KV Store is up
if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
log.Fatal("Unable-to-connect-to-KV-store")
}
+ if p != nil {
+ p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+ }
if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
log.Fatal("Failure-starting-kafkaMessagingProxy")
}
+ if p != nil {
+ p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+ }
log.Debugw("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core)
@@ -141,9 +165,21 @@
core.grpcServer.AddService(f)
log.Info("grpc-service-added")
- // Start the server
- core.grpcServer.Start(context.Background())
+ /*
+ * Start the GRPC server
+ *
+ * This is a bit sub-optimal here as the grpcServer.Start call does not return (blocks)
+ * until something fails, but we want to send a "start" status update. As written this
+ * means that we are actually sending the "start" status update before the server is
+ * started, which means it is possible that the status is "running" before it actually is.
+ *
+ * This means that there is a small window in which the core could return its status as
+ * ready, when it really isn't.
+ */
+ probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
log.Info("grpc-server-started")
+ core.grpcServer.Start(context.Background())
+ probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
}
func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f5b017c..91c359a 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -19,6 +19,7 @@
"context"
"errors"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
"github.com/opencord/voltha-go/rw_core/utils"
@@ -71,12 +72,14 @@
log.Info("starting-device-manager")
dMgr.logicalDeviceMgr = logicalDeviceMgr
dMgr.stateTransitions = NewTransitionMap(dMgr)
+ probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusRunning)
log.Info("device-manager-started")
}
func (dMgr *DeviceManager) stop(ctx context.Context) {
log.Info("stopping-device-manager")
dMgr.exitChannel <- 1
+ probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusStopped)
log.Info("device-manager-stopped")
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 235aca5..fa9713f 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -19,6 +19,7 @@
"context"
"errors"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
"github.com/opencord/voltha-protos/go/openflow_13"
@@ -62,12 +63,14 @@
func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
log.Info("starting-logical-device-manager")
+ probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
log.Info("logical-device-manager-started")
}
func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
log.Info("stopping-logical-device-manager")
ldMgr.exitChannel <- 1
+ probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusStopped)
log.Info("logical-device-manager-stopped")
}
diff --git a/rw_core/main.go b/rw_core/main.go
index 9ace2fb..6f06576 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -21,6 +21,7 @@
"fmt"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/common/version"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/kafka"
@@ -149,7 +150,7 @@
rw.core.Start(ctx)
}
-func (rw *rwCore) stop() {
+func (rw *rwCore) stop(ctx context.Context) {
// Stop leadership tracking
rw.halted = true
@@ -166,7 +167,7 @@
rw.kvClient.Close()
}
- rw.core.Stop(nil)
+ rw.core.Stop(ctx)
//if rw.kafkaClient != nil {
// rw.kafkaClient.Stop()
@@ -262,17 +263,32 @@
log.Infow("rw-core-config", log.Fields{"config": *cf})
+ // Create the core
+ rw := newRWCore(cf)
+
+ // Create a context adding the status update channel
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- rw := newRWCore(cf)
- go rw.start(ctx, instanceId)
+ /*
+ * Create and start the liveness and readiness container management probes. This
+ * is done in the main function so just in case the main starts multiple other
+ * objects there can be a single probe end point for the process.
+ */
+ p := &probe.Probe{}
+ go p.ListenAndServe(rw.config.ProbePort)
+
+ // Add the probe to the context to pass to all the services started
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
+
+ // Start the core
+ go rw.start(probeCtx, instanceId)
code := waitForExit()
log.Infow("received-a-closing-signal", log.Fields{"code": code})
// Cleanup before leaving
- rw.stop()
+ rw.stop(probeCtx)
elapsed := time.Since(start)
log.Infow("rw-core-run-time", log.Fields{"core": instanceId, "time": elapsed / time.Second})