VOL-1739 Create Kubernetes Probes for Simulated ONU Adapter
Change-Id: I481d29df47a1ed7ede60cd3f8840221aef861e6b
diff --git a/cmd/simulated_onu/main.go b/cmd/simulated_onu/main.go
index 3d48781..cdd12df 100644
--- a/cmd/simulated_onu/main.go
+++ b/cmd/simulated_onu/main.go
@@ -22,6 +22,7 @@
"github.com/opencord/voltha-go/adapters"
com "github.com/opencord/voltha-go/adapters/common"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/probe"
"github.com/opencord/voltha-go/common/version"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/kafka"
@@ -67,19 +68,42 @@
log.Info("Starting Core Adapter components")
var err error
+ // If the context has a probe then fetch it and register our services
+ var p *probe.Probe
+ if value := ctx.Value(probe.ProbeContextKey); value != nil {
+ if _, ok := value.(*probe.Probe); ok {
+ p = value.(*probe.Probe)
+ p.RegisterService(
+ "message-bus",
+ "kv-store",
+ "container-proxy",
+ "core-request-handler",
+ "register-with-core",
+ )
+ }
+ }
+
// Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
if err := a.setKVClient(); err != nil {
log.Fatal("error-setting-kv-client")
}
+ if p != nil {
+ p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
+ }
+
// Setup Kafka Client
if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
log.Fatal("Unsupported-common-client")
}
+ if p != nil {
+ p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
+ }
+
// Start the common InterContainer Proxy - retry indefinitely
- if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+ if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
log.Fatal("error-starting-inter-container-proxy")
}
@@ -92,12 +116,12 @@
}
// Register the core request handler
- if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
+ if err = a.setupRequestHandler(ctx, a.instanceId, a.iAdapter, a.coreProxy); err != nil {
log.Fatal("error-setting-core-request-handler")
}
// Register this adapter to the Core - retry indefinitely
- if err = a.registerWithCore(-1); err != nil {
+ if err = a.registerWithCore(ctx, -1); err != nil {
log.Fatal("error-registering-with-core")
}
}
@@ -173,7 +197,7 @@
}
}
-func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
var err error
@@ -201,6 +225,7 @@
}
}
+ probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
log.Info("common-messaging-proxy-created")
return kip, nil
}
@@ -219,7 +244,7 @@
return sOLT, nil
}
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
+func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
log.Info("setting-request-handler")
requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
@@ -227,10 +252,11 @@
return err
}
+ probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
log.Info("request-handler-setup-done")
return nil
}
-func (a *adapter) registerWithCore(retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
log.Info("registering-with-core")
adapterDescription := &voltha.Adapter{
Id: "simulated_onu",
@@ -253,6 +279,7 @@
break
}
}
+ probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
log.Info("registered-with-core")
return nil
}
@@ -329,7 +356,11 @@
defer cancel()
ad := newAdapter(cf)
- go ad.start(ctx)
+
+ p := &probe.Probe{}
+ go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
+ go ad.start(probeCtx)
code := waitForExit()
log.Infow("received-a-closing-signal", log.Fields{"code": code})