[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase

Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index fcdf340..d50274b 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -36,11 +36,12 @@
 
 	// create the kafka RPC proxy
 	kmp := kafka.NewInterContainerProxy(
-		kafka.InterContainerHost(host),
-		kafka.InterContainerPort(port),
-		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+		ctx,
+		kafka.InterContainerHost(ctx, host),
+		kafka.InterContainerPort(ctx, port),
+		kafka.MsgClient(ctx, kafkaClient),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreTopic}),
+		kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: affinityRouterTopic}))
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
 
@@ -51,7 +52,7 @@
 	for {
 		// If we haven't started yet, then try to start
 		logger.Infow("starting-kafka-proxy", log.Fields{})
-		if err := kmp.Start(); err != nil {
+		if err := kmp.Start(ctx); err != nil {
 			// We failed to start. Delay and then try again later.
 			// Don't worry about liveness, as we can't be live until we've started.
 			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
@@ -103,7 +104,7 @@
 func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
 	logger.Info("started-kafka-message-proxy")
 
-	livenessChannel := kmp.EnableLivenessChannel(true)
+	livenessChannel := kmp.EnableLivenessChannel(ctx, true)
 
 	logger.Info("enabled-kafka-liveness-channel")
 
@@ -135,7 +136,7 @@
 			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
 			// the liveness probe may wait (and block) writing to our channel.
 			go func() {
-				err := kmp.SendLiveness()
+				err := kmp.SendLiveness(ctx)
 				if err != nil {
 					// Catch possible error case if sending liveness after Sarama has been stopped.
 					logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
@@ -147,16 +148,16 @@
 	}
 }
 
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
-	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
+	requestProxy := api.NewAdapterRequestHandlerProxy(ctx, dMgr, aMgr)
 
 	// Register the broadcast topic to handle any core-bound broadcast requests
-	if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+	if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
 		logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
 	}
 
 	// Register the core-pair topic to handle core-bound requests destined to the core pair
-	if err := kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
+	if err := kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
 		logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
 	}