[VOL-1743] Create Kubernetes Probes for Open OLT Adapter
Change-Id: I93da6287f51cecab8b7145bd1786e9447617216b
diff --git a/main.go b/main.go
index 760fbe4..5adb596 100644
--- a/main.go
+++ b/main.go
@@ -34,6 +34,7 @@
"github.com/opencord/voltha-lib-go/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/pkg/kafka"
"github.com/opencord/voltha-lib-go/pkg/log"
+ "github.com/opencord/voltha-lib-go/pkg/probe"
ac "github.com/opencord/voltha-openolt-adapter/adaptercore"
"github.com/opencord/voltha-openolt-adapter/config"
"github.com/opencord/voltha-openolt-adapter/config/version"
@@ -74,19 +75,41 @@
log.Info("Starting Core Adapter components")
var err error
+ var p *probe.Probe
+ if value := ctx.Value(probe.ProbeContextKey); value != nil {
+ if _, ok := value.(*probe.Probe); ok {
+ p = value.(*probe.Probe)
+ p.RegisterService(
+ "message-bus",
+ "kv-store",
+ "container-proxy",
+ "core-request-handler",
+ "register-with-core",
+ )
+ }
+ }
+
// Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
if err = a.setKVClient(); err != nil {
log.Fatal("error-setting-kv-client")
}
+ if p != nil {
+ p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+ }
+
// Setup Kafka Client
if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
log.Fatal("Unsupported-common-client")
}
+ if p != nil {
+ p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+ }
+
// Start the common InterContainer Proxy - retries indefinitely
- if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+ if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
log.Fatal("error-starting-inter-container-proxy")
}
@@ -107,12 +130,12 @@
}
// Register the core request handler
- if err = a.setupRequestHandler(a.instanceID, a.iAdapter); err != nil {
+ if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
log.Fatal("error-setting-core-request-handler")
}
// Register this adapter to the Core - retries indefinitely
- if err = a.registerWithCore(-1); err != nil {
+ if err = a.registerWithCore(ctx, -1); err != nil {
log.Fatal("error-registering-with-core")
}
}
@@ -179,7 +202,7 @@
return nil
}
-func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
var err error
@@ -206,7 +229,7 @@
break
}
}
-
+ probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
log.Info("common-messaging-proxy-created")
return kip, nil
}
@@ -227,7 +250,7 @@
return sOLT, nil
}
-func (a *adapter) setupRequestHandler(coreInstanceID string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
log.Info("setting-request-handler")
requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
@@ -235,11 +258,12 @@
return err
}
+ probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
log.Info("request-handler-setup-done")
return nil
}
-func (a *adapter) registerWithCore(retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
log.Info("registering-with-core")
adapterDescription := &voltha.Adapter{Id: "openolt", // Unique name for the device type
Vendor: "VOLTHA OpenOLT",
@@ -263,6 +287,7 @@
break
}
}
+ probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
log.Info("registered-with-core")
return nil
}
@@ -352,7 +377,13 @@
defer cancel()
ad := newAdapter(cf)
- go ad.start(ctx)
+
+ p := &probe.Probe{}
+ go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
+
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
+
+ go ad.start(probeCtx)
code := waitForExit()
log.Infow("received-a-closing-signal", log.Fields{"code": code})