[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase
Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index fcdf340..d50274b 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -36,11 +36,12 @@
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
- kafka.InterContainerHost(host),
- kafka.InterContainerPort(port),
- kafka.MsgClient(kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+ ctx,
+ kafka.InterContainerHost(ctx, host),
+ kafka.InterContainerPort(ctx, port),
+ kafka.MsgClient(ctx, kafkaClient),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreTopic}),
+ kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: affinityRouterTopic}))
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
@@ -51,7 +52,7 @@
for {
// If we haven't started yet, then try to start
logger.Infow("starting-kafka-proxy", log.Fields{})
- if err := kmp.Start(); err != nil {
+ if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
@@ -103,7 +104,7 @@
func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
logger.Info("started-kafka-message-proxy")
- livenessChannel := kmp.EnableLivenessChannel(true)
+ livenessChannel := kmp.EnableLivenessChannel(ctx, true)
logger.Info("enabled-kafka-liveness-channel")
@@ -135,7 +136,7 @@
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
- err := kmp.SendLiveness()
+ err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
@@ -147,16 +148,16 @@
}
}
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
- requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
+ requestProxy := api.NewAdapterRequestHandlerProxy(ctx, dMgr, aMgr)
// Register the broadcast topic to handle any core-bound broadcast requests
- if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+ if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
}
// Register the core-pair topic to handle core-bound requests destined to the core pair
- if err := kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
+ if err := kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
}