VOL-1723 - add readiness probe capability to rw-core

Change-Id: I1cf42e88712586f140a2dfa9d0b638b48261caac
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 0ce1828..188ae3d 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -21,6 +21,7 @@
 	"fmt"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/common/probe"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-protos/go/voltha"
 	"reflect"
@@ -127,13 +128,14 @@
 	// Register the callbacks
 	aMgr.adapterProxy.RegisterCallback(model.POST_UPDATE, aMgr.adapterUpdated)
 	aMgr.deviceTypeProxy.RegisterCallback(model.POST_UPDATE, aMgr.deviceTypesUpdated)
-
+	probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
 	log.Info("adapter-manager-started")
 }
 
 func (aMgr *AdapterManager) stop(ctx context.Context) {
 	log.Info("stopping-device-manager")
 	aMgr.exitChannel <- 1
+	probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusStopped)
 	log.Info("device-manager-stopped")
 }
 
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4ddea05..e74e869 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -19,6 +19,7 @@
 	"context"
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/common/probe"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
@@ -77,16 +78,39 @@
 }
 
 func (core *Core) Start(ctx context.Context) {
+
+	// If the context has a probe then fetch it and register our services
+	var p *probe.Probe
+	if value := ctx.Value(probe.ProbeContextKey); value != nil {
+		if _, ok := value.(*probe.Probe); ok {
+			p = value.(*probe.Probe)
+			p.RegisterService(
+				"message-bus",
+				"kv-store",
+				"device-manager",
+				"logical-device-manager",
+				"adapter-manager",
+				"grpc-service",
+			)
+		}
+	}
+
 	log.Info("starting-core-services", log.Fields{"coreId": core.instanceId})
 
 	// Wait until connection to KV Store is up
 	if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
 		log.Fatal("Unable-to-connect-to-KV-store")
 	}
+	if p != nil {
+		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+	}
 
 	if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
 		log.Fatal("Failure-starting-kafkaMessagingProxy")
 	}
+	if p != nil {
+		p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+	}
 
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core)
@@ -141,9 +165,21 @@
 	core.grpcServer.AddService(f)
 	log.Info("grpc-service-added")
 
-	//	Start the server
-	core.grpcServer.Start(context.Background())
+	/*
+	 * Start the GRPC server
+	 *
+	 * This is a bit sub-optimal here as the grpcServer.Start call does not return (blocks)
+	 * until something fails, but we want to send a "start" status update. As written this
+	 * means that we are actually sending the "start" status update before the server is
+	 * started, which means it is possible that the status is "running" before it actually is.
+	 *
+	 * This means that there is a small window in which the core could return its status as
+	 * ready, when it really isn't.
+	 */
+	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
 	log.Info("grpc-server-started")
+	core.grpcServer.Start(context.Background())
+	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
 }
 
 func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f5b017c..91c359a 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -19,6 +19,7 @@
 	"context"
 	"errors"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/common/probe"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
 	"github.com/opencord/voltha-go/rw_core/utils"
@@ -71,12 +72,14 @@
 	log.Info("starting-device-manager")
 	dMgr.logicalDeviceMgr = logicalDeviceMgr
 	dMgr.stateTransitions = NewTransitionMap(dMgr)
+	probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusRunning)
 	log.Info("device-manager-started")
 }
 
 func (dMgr *DeviceManager) stop(ctx context.Context) {
 	log.Info("stopping-device-manager")
 	dMgr.exitChannel <- 1
+	probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusStopped)
 	log.Info("device-manager-stopped")
 }
 
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 235aca5..fa9713f 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -19,6 +19,7 @@
 	"context"
 	"errors"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/common/probe"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
 	"github.com/opencord/voltha-protos/go/openflow_13"
@@ -62,12 +63,14 @@
 
 func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
 	log.Info("starting-logical-device-manager")
+	probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
 	log.Info("logical-device-manager-started")
 }
 
 func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
 	log.Info("stopping-logical-device-manager")
 	ldMgr.exitChannel <- 1
+	probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusStopped)
 	log.Info("logical-device-manager-stopped")
 }