Create Kubernetes Probes for API Server[VOL-1731]
Change-Id: Ie289ffce77af284f2b8f62603f8570d80a598e50
diff --git a/internal/pkg/afrouterd/discoveryMonitor.go b/internal/pkg/afrouterd/discoveryMonitor.go
index 679d3f7..2e3e831 100644
--- a/internal/pkg/afrouterd/discoveryMonitor.go
+++ b/internal/pkg/afrouterd/discoveryMonitor.go
@@ -21,6 +21,7 @@
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
pb "github.com/opencord/voltha-protos/v2/go/afrouter"
ic "github.com/opencord/voltha-protos/v2/go/inter_container"
"golang.org/x/net/context"
@@ -52,6 +53,7 @@
func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
defer close(doneCh)
defer kc.Stop()
+ defer probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
monitorLoop:
for {
@@ -85,15 +87,18 @@
panic(err)
}
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
for {
if err := kc.Start(); err != nil {
log.Error("Could not connect to kafka")
} else {
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
break
}
select {
case <-ctx.Done():
close(doneCh)
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
return doneCh, errors.New("GRPC context done")
case <-time.After(5 * time.Second):
@@ -104,6 +109,7 @@
log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
close(doneCh)
kc.Stop()
+ probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
return doneCh, err
}
diff --git a/internal/pkg/afrouterd/misc.go b/internal/pkg/afrouterd/misc.go
index 7946205..1f0b379 100644
--- a/internal/pkg/afrouterd/misc.go
+++ b/internal/pkg/afrouterd/misc.go
@@ -19,6 +19,7 @@
import (
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
pb "github.com/opencord/voltha-protos/v2/go/afrouter"
"golang.org/x/net/context"
"google.golang.org/grpc"
@@ -110,7 +111,7 @@
}
// endOnClose cancels the context when the connection closes
-func ConnectionActiveContext(conn *grpc.ClientConn) context.Context {
+func ConnectionActiveContext(conn *grpc.ClientConn, p *probe.Probe) context.Context {
ctx, disconnected := context.WithCancel(context.Background())
go func() {
for state := conn.GetState(); state != connectivity.TransientFailure && state != connectivity.Shutdown; state = conn.GetState() {
@@ -119,6 +120,7 @@
}
}
log.Infof("Connection to afrouter lost")
+ p.UpdateStatus("affinity-router", probe.ServiceStatusStopped)
disconnected()
}()
return ctx