[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase

Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index 0f28d66..79f323b 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -30,7 +30,7 @@
 
 // startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
 func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
-	logger.Infow("initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
+	logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
 
@@ -43,17 +43,16 @@
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
 
 	// wait for connectivity
-	logger.Infow("starting-kafka-manager", log.Fields{"address": address,
-		"topic": coreTopic})
+	logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
 
 	for {
 		// If we haven't started yet, then try to start
-		logger.Infow("starting-kafka-proxy", log.Fields{})
-		if err := kmp.Start(); err != nil {
+		logger.Infow(ctx, "starting-kafka-proxy", log.Fields{})
+		if err := kmp.Start(ctx); err != nil {
 			// We failed to start. Delay and then try again later.
 			// Don't worry about liveness, as we can't be live until we've started.
 			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-			logger.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+			logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
 			select {
 			case <-time.After(connectionRetryInterval):
 			case <-ctx.Done():
@@ -63,7 +62,7 @@
 		}
 		// We started. We only need to do this once.
 		// Next we'll fall through and start checking liveness.
-		logger.Infow("started-kafka-proxy", log.Fields{})
+		logger.Infow(ctx, "started-kafka-proxy", log.Fields{})
 		break
 	}
 	return kmp, nil
@@ -99,28 +98,28 @@
  * though the current default is that both are set to 60 seconds.
  */
 func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
-	logger.Info("started-kafka-message-proxy")
+	logger.Info(ctx, "started-kafka-message-proxy")
 
-	livenessChannel := kmp.EnableLivenessChannel(true)
+	livenessChannel := kmp.EnableLivenessChannel(ctx, true)
 
-	logger.Info("enabled-kafka-liveness-channel")
+	logger.Info(ctx, "enabled-kafka-liveness-channel")
 
 	timeout := liveProbeInterval
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 		select {
 		case liveness := <-livenessChannel:
-			logger.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+			logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
 			// there was a state change in Kafka liveness
 			if !liveness {
 				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-				logger.Info("kafka-manager-thread-set-server-notready")
+				logger.Info(ctx, "kafka-manager-thread-set-server-notready")
 
 				// retry frequently while life is bad
 				timeout = notLiveProbeInterval
 			} else {
 				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-				logger.Info("kafka-manager-thread-set-server-ready")
+				logger.Info(ctx, "kafka-manager-thread-set-server-ready")
 
 				// retry infrequently while life is good
 				timeout = liveProbeInterval
@@ -129,14 +128,14 @@
 				<-timeoutTimer.C
 			}
 		case <-timeoutTimer.C:
-			logger.Info("kafka-proxy-liveness-recheck")
+			logger.Info(ctx, "kafka-proxy-liveness-recheck")
 			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
 			// the liveness probe may wait (and block) writing to our channel.
 			go func() {
-				err := kmp.SendLiveness()
+				err := kmp.SendLiveness(ctx)
 				if err != nil {
 					// Catch possible error case if sending liveness after Sarama has been stopped.
-					logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
 				}
 			}()
 		case <-ctx.Done():
@@ -145,12 +144,13 @@
 	}
 }
 
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
 	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
 
 	// Register the broadcast topic to handle any core-bound broadcast requests
-	if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
-		logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+	if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+		logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
 	}
-	logger.Info("request-handler-registered")
+
+	logger.Info(ctx, "request-handler-registered")
 }