[VOL-1743] Create Kubernetes Probes for Open OLT Adapter

Change-Id: I93da6287f51cecab8b7145bd1786e9447617216b
diff --git a/main.go b/main.go
index 760fbe4..5adb596 100644
--- a/main.go
+++ b/main.go
@@ -34,6 +34,7 @@
 	"github.com/opencord/voltha-lib-go/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/pkg/log"
+	"github.com/opencord/voltha-lib-go/pkg/probe"
 	ac "github.com/opencord/voltha-openolt-adapter/adaptercore"
 	"github.com/opencord/voltha-openolt-adapter/config"
 	"github.com/opencord/voltha-openolt-adapter/config/version"
@@ -74,19 +75,41 @@
 	log.Info("Starting Core Adapter components")
 	var err error
 
+	var p *probe.Probe
+	if value := ctx.Value(probe.ProbeContextKey); value != nil {
+		if _, ok := value.(*probe.Probe); ok {
+			p = value.(*probe.Probe)
+			p.RegisterService(
+				"message-bus",
+				"kv-store",
+				"container-proxy",
+				"core-request-handler",
+				"register-with-core",
+			)
+		}
+	}
+
 	// Setup KV Client
 	log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
 	if err = a.setKVClient(); err != nil {
 		log.Fatal("error-setting-kv-client")
 	}
 
+	if p != nil {
+		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+	}
+
 	// Setup Kafka Client
 	if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
 		log.Fatal("Unsupported-common-client")
 	}
 
+	if p != nil {
+		p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+	}
+
 	// Start the common InterContainer Proxy - retries indefinitely
-	if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+	if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
 		log.Fatal("error-starting-inter-container-proxy")
 	}
 
@@ -107,12 +130,12 @@
 	}
 
 	// Register the core request handler
-	if err = a.setupRequestHandler(a.instanceID, a.iAdapter); err != nil {
+	if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
 		log.Fatal("error-setting-core-request-handler")
 	}
 
 	// Register this adapter to the Core - retries indefinitely
-	if err = a.registerWithCore(-1); err != nil {
+	if err = a.registerWithCore(ctx, -1); err != nil {
 		log.Fatal("error-registering-with-core")
 	}
 }
@@ -179,7 +202,7 @@
 	return nil
 }
 
-func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
 	log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
 		"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
 	var err error
@@ -206,7 +229,7 @@
 			break
 		}
 	}
-
+	probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
 	log.Info("common-messaging-proxy-created")
 	return kip, nil
 }
@@ -227,7 +250,7 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(coreInstanceID string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
 	log.Info("setting-request-handler")
 	requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
 	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
@@ -235,11 +258,12 @@
 		return err
 
 	}
+	probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
 	log.Info("request-handler-setup-done")
 	return nil
 }
 
-func (a *adapter) registerWithCore(retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
 	log.Info("registering-with-core")
 	adapterDescription := &voltha.Adapter{Id: "openolt", // Unique name for the device type
 		Vendor:  "VOLTHA OpenOLT",
@@ -263,6 +287,7 @@
 			break
 		}
 	}
+	probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
 	log.Info("registered-with-core")
 	return nil
 }
@@ -352,7 +377,13 @@
 	defer cancel()
 
 	ad := newAdapter(cf)
-	go ad.start(ctx)
+
+	p := &probe.Probe{}
+	go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
+
+	probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
+
+	go ad.start(probeCtx)
 
 	code := waitForExit()
 	log.Infow("received-a-closing-signal", log.Fields{"code": code})