VOL-1739 [Create Kubernetes Probes for Simulated OLT Adapter]

Change-Id: Ib0ef80daf7843b27f95d87894a829a4826190881
diff --git a/cmd/simulated_olt/main.go b/cmd/simulated_olt/main.go
index f324939..e67c6ca 100644
--- a/cmd/simulated_olt/main.go
+++ b/cmd/simulated_olt/main.go
@@ -22,6 +22,7 @@
 	"github.com/opencord/voltha-go/adapters"
 	com "github.com/opencord/voltha-go/adapters/common"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/common/probe"
 	"github.com/opencord/voltha-go/common/version"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"github.com/opencord/voltha-go/kafka"
@@ -67,19 +68,41 @@
 	log.Info("Starting Core Adapter components")
 	var err error
 
+	var p *probe.Probe
+	if value := ctx.Value(probe.ProbeContextKey); value != nil {
+		if _, ok := value.(*probe.Probe); ok {
+			p = value.(*probe.Probe)
+			p.RegisterService(
+				"message-bus",
+				"kv-store",
+				"container-proxy",
+				"core-request-handler",
+				"register-with-core",
+			)
+		}
+	}
+
 	// Setup KV Client
 	log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
 	if err := a.setKVClient(); err != nil {
 		log.Fatal("error-setting-kv-client")
 	}
 
+	if p != nil {
+		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+	}
+
 	// Setup Kafka Client
 	if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
 		log.Fatal("Unsupported-common-client")
 	}
 
+	if p != nil {
+		p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+	}
+
 	// Start the common InterContainer Proxy - retries indefinitely
-	if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+	if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
 		log.Fatal("error-starting-inter-container-proxy")
 	}
 
@@ -92,12 +115,12 @@
 	}
 
 	// Register the core request handler
-	if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
+	if err = a.setupRequestHandler(ctx, a.instanceId, a.iAdapter, a.coreProxy); err != nil {
 		log.Fatal("error-setting-core-request-handler")
 	}
 
 	//	Register this adapter to the Core - retries indefinitely
-	if err = a.registerWithCore(-1); err != nil {
+	if err = a.registerWithCore(ctx, -1); err != nil {
 		log.Fatal("error-registering-with-core")
 	}
 }
@@ -173,7 +196,7 @@
 	}
 }
 
-func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
 	log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
 		"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
 	var err error
@@ -201,6 +224,7 @@
 		}
 	}
 
+	probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
 	log.Info("common-messaging-proxy-created")
 	return kip, nil
 }
@@ -219,7 +243,7 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
+func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
 	log.Info("setting-request-handler")
 	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
 	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
@@ -227,11 +251,12 @@
 		return err
 
 	}
+	probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
 	log.Info("request-handler-setup-done")
 	return nil
 }
 
-func (a *adapter) registerWithCore(retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
 	log.Info("registering-with-core")
 	adapterDescription := &voltha.Adapter{
 		Id:      "simulated_olt",
@@ -254,6 +279,7 @@
 			break
 		}
 	}
+	probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
 	log.Info("registered-with-core")
 	return nil
 }
@@ -329,7 +355,13 @@
 	defer cancel()
 
 	ad := newAdapter(cf)
-	go ad.start(ctx)
+
+	p := &probe.Probe{}
+	go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
+
+	probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
+
+	go ad.start(probeCtx)
 
 	code := waitForExit()
 	log.Infow("received-a-closing-signal", log.Fields{"code": code})