[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index 0f28d66..79f323b 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -30,7 +30,7 @@
// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
- logger.Infow("initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
+ logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
@@ -43,17 +43,16 @@
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
// wait for connectivity
- logger.Infow("starting-kafka-manager", log.Fields{"address": address,
- "topic": coreTopic})
+ logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
for {
// If we haven't started yet, then try to start
- logger.Infow("starting-kafka-proxy", log.Fields{})
- if err := kmp.Start(); err != nil {
+ logger.Infow(ctx, "starting-kafka-proxy", log.Fields{})
+ if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
case <-ctx.Done():
@@ -63,7 +62,7 @@
}
// We started. We only need to do this once.
// Next we'll fall through and start checking liveness.
- logger.Infow("started-kafka-proxy", log.Fields{})
+ logger.Infow(ctx, "started-kafka-proxy", log.Fields{})
break
}
return kmp, nil
@@ -99,28 +98,28 @@
* though the current default is that both are set to 60 seconds.
*/
func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
- logger.Info("started-kafka-message-proxy")
+ logger.Info(ctx, "started-kafka-message-proxy")
- livenessChannel := kmp.EnableLivenessChannel(true)
+ livenessChannel := kmp.EnableLivenessChannel(ctx, true)
- logger.Info("enabled-kafka-liveness-channel")
+ logger.Info(ctx, "enabled-kafka-liveness-channel")
timeout := liveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveness := <-livenessChannel:
- logger.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+ logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
// there was a state change in Kafka liveness
if !liveness {
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Info("kafka-manager-thread-set-server-notready")
+ logger.Info(ctx, "kafka-manager-thread-set-server-notready")
// retry frequently while life is bad
timeout = notLiveProbeInterval
} else {
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- logger.Info("kafka-manager-thread-set-server-ready")
+ logger.Info(ctx, "kafka-manager-thread-set-server-ready")
// retry infrequently while life is good
timeout = liveProbeInterval
@@ -129,14 +128,14 @@
<-timeoutTimer.C
}
case <-timeoutTimer.C:
- logger.Info("kafka-proxy-liveness-recheck")
+ logger.Info(ctx, "kafka-proxy-liveness-recheck")
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
- err := kmp.SendLiveness()
+ err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
- logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+ logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
}
}()
case <-ctx.Done():
@@ -145,12 +144,13 @@
}
}
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
// Register the broadcast topic to handle any core-bound broadcast requests
- if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
- logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+ if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+ logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
}
- logger.Info("request-handler-registered")
+
+ logger.Info(ctx, "request-handler-registered")
}