Create Kubernetes Probes for API Server[VOL-1731]

Change-Id: Ie289ffce77af284f2b8f62603f8570d80a598e50
diff --git a/internal/pkg/afrouter/api_test.go b/internal/pkg/afrouter/api_test.go
index 0bcb1d4..fc00a1b 100644
--- a/internal/pkg/afrouter/api_test.go
+++ b/internal/pkg/afrouter/api_test.go
@@ -18,6 +18,7 @@
 
 import (
 	"fmt"
+	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
 	"testing"
@@ -98,9 +99,9 @@
 }
 
 func makeProxy(numBackends int, numConnections int) (*ArouterProxy, error) {
-
+	p := &probe.Probe{}
 	conf := makeConfig(3, 2)
-	arouter, err := NewArouterProxy(&conf)
+	arouter, err := NewArouterProxy(&conf, p)
 	return arouter, err
 }
 
diff --git a/internal/pkg/afrouter/arproxy.go b/internal/pkg/afrouter/arproxy.go
index 80c8611..6d1c747 100644
--- a/internal/pkg/afrouter/arproxy.go
+++ b/internal/pkg/afrouter/arproxy.go
@@ -22,6 +22,7 @@
 
 import (
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 )
 
 // String names for display in error messages.
@@ -33,7 +34,7 @@
 }
 
 // Create the routing proxy
-func NewArouterProxy(conf *Configuration) (*ArouterProxy, error) {
+func NewArouterProxy(conf *Configuration, p *probe.Probe) (*ArouterProxy, error) {
 	arProxy = &ArouterProxy{servers: make(map[string]*server)}
 	// Create all the servers listed in the configuration
 	for _, s := range conf.Servers {
@@ -53,6 +54,7 @@
 		arProxy.api = api
 	}
 
+	p.UpdateStatus("affinity-router-proxy", probe.ServiceStatusRunning)
 	return arProxy, nil
 }
 
diff --git a/internal/pkg/afrouter/config.go b/internal/pkg/afrouter/config.go
index 8fdfd41..3a61e63 100644
--- a/internal/pkg/afrouter/config.go
+++ b/internal/pkg/afrouter/config.go
@@ -44,11 +44,8 @@
 
 	err := cmdParse.Parse(os.Args[1:])
 	if err != nil {
-		//return err
 		return nil, errors.New("Error parsing the command line")
 	}
-	//if(!cmdParse.Parsed()) {
-	//}
 
 	if val, have := os.LookupEnv("HOSTNAME"); have {
 		config.InstanceID = val
@@ -179,8 +176,10 @@
 
 // Api configuration
 type ApiConfig struct {
-	Addr string `json:"address"`
-	Port uint   `json:"port"`
+	Addr      string `json:"address"`
+	Port      uint   `json:"port"`
+	ProbeHost string `json:"probeHost"`
+	ProbePort int    `json:"probePort"`
 }
 
 func (conf *Configuration) LoadConfig() error {
diff --git a/internal/pkg/afrouterd/discoveryMonitor.go b/internal/pkg/afrouterd/discoveryMonitor.go
index 679d3f7..2e3e831 100644
--- a/internal/pkg/afrouterd/discoveryMonitor.go
+++ b/internal/pkg/afrouterd/discoveryMonitor.go
@@ -21,6 +21,7 @@
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 	pb "github.com/opencord/voltha-protos/v2/go/afrouter"
 	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
 	"golang.org/x/net/context"
@@ -52,6 +53,7 @@
 func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
 	defer close(doneCh)
 	defer kc.Stop()
+	defer probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
 
 monitorLoop:
 	for {
@@ -85,15 +87,18 @@
 		panic(err)
 	}
 
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
 	for {
 		if err := kc.Start(); err != nil {
 			log.Error("Could not connect to kafka")
 		} else {
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
 			break
 		}
 		select {
 		case <-ctx.Done():
 			close(doneCh)
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
 			return doneCh, errors.New("GRPC context done")
 
 		case <-time.After(5 * time.Second):
@@ -104,6 +109,7 @@
 		log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
 		close(doneCh)
 		kc.Stop()
+		probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
 		return doneCh, err
 	}
 
diff --git a/internal/pkg/afrouterd/misc.go b/internal/pkg/afrouterd/misc.go
index 7946205..1f0b379 100644
--- a/internal/pkg/afrouterd/misc.go
+++ b/internal/pkg/afrouterd/misc.go
@@ -19,6 +19,7 @@
 import (
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 	pb "github.com/opencord/voltha-protos/v2/go/afrouter"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -110,7 +111,7 @@
 }
 
 // endOnClose cancels the context when the connection closes
-func ConnectionActiveContext(conn *grpc.ClientConn) context.Context {
+func ConnectionActiveContext(conn *grpc.ClientConn, p *probe.Probe) context.Context {
 	ctx, disconnected := context.WithCancel(context.Background())
 	go func() {
 		for state := conn.GetState(); state != connectivity.TransientFailure && state != connectivity.Shutdown; state = conn.GetState() {
@@ -119,6 +120,7 @@
 			}
 		}
 		log.Infof("Connection to afrouter lost")
+		p.UpdateStatus("affinity-router", probe.ServiceStatusStopped)
 		disconnected()
 	}()
 	return ctx