[VOL-3069]Pass Context in methods which are performing logging and need the context

Change-Id: Ie84f9e240aa4f47d0046acaac0d82d21b17252e5
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index cbde834..368391e 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -64,17 +64,17 @@
 }
 
 type InterContainerProxy interface {
-	Start() error
-	Stop()
+	Start(ctx context.Context) error
+	Stop(ctx context.Context)
 	GetDefaultTopic() *Topic
 	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
 	InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
-	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
-	SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
-	UnSubscribeFromRequestHandler(topic Topic) error
-	DeleteTopic(topic Topic) error
-	EnableLivenessChannel(enable bool) chan bool
-	SendLiveness() error
+	SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
+	SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
+	UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
+	DeleteTopic(ctx context.Context, topic Topic) error
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	SendLiveness(ctx context.Context) error
 }
 
 // interContainerProxy represents the messaging proxy
@@ -146,17 +146,17 @@
 	return newInterContainerProxy(opts...)
 }
 
-func (kp *interContainerProxy) Start() error {
-	logger.Info("Starting-Proxy")
+func (kp *interContainerProxy) Start(ctx context.Context) error {
+	logger.Info(ctx, "Starting-Proxy")
 
 	// Kafka MsgClient should already have been created.  If not, output fatal error
 	if kp.kafkaClient == nil {
-		logger.Fatal("kafka-client-not-set")
+		logger.Fatal(ctx, "kafka-client-not-set")
 	}
 
 	// Start the kafka client
-	if err := kp.kafkaClient.Start(); err != nil {
-		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
+	if err := kp.kafkaClient.Start(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
 
@@ -172,20 +172,20 @@
 	return nil
 }
 
-func (kp *interContainerProxy) Stop() {
-	logger.Info("stopping-intercontainer-proxy")
+func (kp *interContainerProxy) Stop(ctx context.Context) {
+	logger.Info(ctx, "stopping-intercontainer-proxy")
 	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
-	kp.kafkaClient.Stop()
-	err := kp.deleteAllTopicRequestHandlerChannelMap()
+	kp.kafkaClient.Stop(ctx)
+	err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
 	if err != nil {
-		logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+		logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
 	}
-	err = kp.deleteAllTopicResponseChannelMap()
+	err = kp.deleteAllTopicResponseChannelMap(ctx)
 	if err != nil {
-		logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+		logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
 	}
-	kp.deleteAllTransactionIdToChannelMap()
+	kp.deleteAllTransactionIdToChannelMap(ctx)
 }
 
 func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -196,7 +196,7 @@
 func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
 
-	logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+	logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
 	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
 	// typically the device ID.
 	responseTopic := replyToTopic
@@ -216,17 +216,17 @@
 		var protoRequest *ic.InterContainerMessage
 
 		// Encode the request
-		protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+		protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
 		if err != nil {
-			logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+			logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 			chnl <- NewResponse(RpcFormattingError, err, nil)
 			return
 		}
 
 		// Subscribe for response, if needed, before sending request
 		var ch <-chan *ic.InterContainerMessage
-		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
-			logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 			chnl <- NewResponse(RpcTransportError, err, nil)
 			return
 		}
@@ -234,10 +234,10 @@
 		// Send request - if the topic is formatted with a device Id then we will send the request using a
 		// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 		// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
-		logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+		logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
 
 		// if the message is not sent on kafka publish an event an close the channel
-		if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+		if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
 			chnl <- NewResponse(RpcTransportError, err, nil)
 			return
 		}
@@ -250,8 +250,8 @@
 
 		defer func() {
 			// Remove the subscription for a response on return
-			if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
-				logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+				logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
 			}
 		}()
 
@@ -259,11 +259,11 @@
 		select {
 		case msg, ok := <-ch:
 			if !ok {
-				logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
 				chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
 			}
-			logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			if responseBody, err := decodeResponse(msg); err != nil {
+			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			if responseBody, err := decodeResponse(ctx, msg); err != nil {
 				chnl <- NewResponse(RpcReply, err, nil)
 			} else {
 				if responseBody.Success {
@@ -279,12 +279,12 @@
 				}
 			}
 		case <-ctx.Done():
-			logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
 			chnl <- NewResponse(RpcTimeout, err, nil)
 		case <-kp.doneCh:
 			chnl <- NewResponse(RpcSystemClosing, nil, nil)
-			logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+			logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 		}
 	}()
 	return chnl
@@ -302,9 +302,9 @@
 	}
 
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+	protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
 	if err != nil {
-		logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+		logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
 	}
 
@@ -312,8 +312,8 @@
 	var ch <-chan *ic.InterContainerMessage
 	if waitForResponse {
 		var err error
-		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
-			logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 		}
 	}
 
@@ -321,10 +321,10 @@
 	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	//key := GetDeviceIdFromTopic(*toTopic)
-	logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+	logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
 	go func() {
-		if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
-			logger.Errorw("send-failed", log.Fields{
+		if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+			logger.Errorw(ctx, "send-failed", log.Fields{
 				"topic": toTopic,
 				"key":   key,
 				"error": err})
@@ -345,8 +345,8 @@
 		// Wait for response as well as timeout or cancellation
 		// Remove the subscription for a response on return
 		defer func() {
-			if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
-				logger.Errorw("response-unsubscribe-failed", log.Fields{
+			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+				logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
 					"id":    protoRequest.Header.Id,
 					"error": err})
 			}
@@ -354,7 +354,7 @@
 		select {
 		case msg, ok := <-ch:
 			if !ok {
-				logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
 				protoError := &ic.Error{Reason: "channel-closed"}
 				var marshalledArg *any.Any
 				if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
@@ -362,16 +362,16 @@
 				}
 				return false, marshalledArg
 			}
-			logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ic.InterContainerResponseBody
 			var err error
-			if responseBody, err = decodeResponse(msg); err != nil {
-				logger.Errorw("decode-response-error", log.Fields{"error": err})
+			if responseBody, err = decodeResponse(ctx, msg); err != nil {
+				logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
 				// FIXME we should return something
 			}
 			return responseBody.Success, responseBody.Result
 		case <-ctx.Done():
-			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
 			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
 
@@ -381,7 +381,7 @@
 			}
 			return false, marshalledArg
 		case <-childCtx.Done():
-			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
 			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
 
@@ -391,7 +391,7 @@
 			}
 			return false, marshalledArg
 		case <-kp.doneCh:
-			logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+			logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 			return true, nil
 		}
 	}
@@ -400,55 +400,55 @@
 
 // SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
-		logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 
 	kp.defaultRequestHandlerInterface = handler
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForMessages(ch, topic, handler)
+	go kp.waitForMessages(ctx, ch, topic, handler)
 
 	return nil
 }
 
 // SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
-func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
-		logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
+		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
+	go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
 
 	return nil
 }
 
-func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
-	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
+	return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
 }
 
-func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
 		// Unsubscribe to this topic first - this will close the subscribed channel
 		var err error
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
-			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+		if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
 		}
 		delete(kp.topicToResponseChannelMap, topic)
 		return err
@@ -458,16 +458,16 @@
 }
 
 // nolint: unused
-func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
-	logger.Debug("delete-all-topic-response-channel")
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
+	logger.Debug(ctx, "delete-all-topic-response-channel")
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	var unsubscribeFailTopics []string
 	for topic := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
 			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
-			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
 			// Do not return. Continue to try to unsubscribe to other topics.
 		} else {
 			// Only delete from channel map if successfully unsubscribed.
@@ -488,12 +488,12 @@
 	}
 }
 
-func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
 			return err
 		}
 		delete(kp.topicToRequestHandlerChannelMap, topic)
@@ -504,16 +504,16 @@
 }
 
 // nolint: unused
-func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
-	logger.Debug("delete-all-topic-request-channel")
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
+	logger.Debug(ctx, "delete-all-topic-request-channel")
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	var unsubscribeFailTopics []string
 	for topic := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
 			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
-			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
 			// Do not return. Continue to try to unsubscribe to other topics.
 		} else {
 			// Only delete from channel map if successfully unsubscribed.
@@ -556,8 +556,8 @@
 }
 
 // nolint: unused
-func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
-	logger.Debug("delete-all-transaction-id-channel-map")
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
+	logger.Debug(ctx, "delete-all-transaction-id-channel-map")
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
@@ -566,27 +566,27 @@
 	}
 }
 
-func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
+func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
 	// If we have any consumers on that topic we need to close them
-	if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
-		logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+	if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
 	}
-	if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
-		logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+	if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
 	}
 	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
 
-	return kp.kafkaClient.DeleteTopic(&topic)
+	return kp.kafkaClient.DeleteTopic(ctx, &topic)
 }
 
-func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
+func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
 	// Encode the response argument - needs to be a proto message
 	if returnedVal == nil {
 		return nil, nil
 	}
 	protoValue, ok := returnedVal.(proto.Message)
 	if !ok {
-		logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+		logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
 		err := errors.New("response-value-not-proto-message")
 		return nil, err
 	}
@@ -595,13 +595,13 @@
 	var marshalledReturnedVal *any.Any
 	var err error
 	if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
-		logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
 		return nil, err
 	}
 	return marshalledReturnedVal, nil
 }
 
-func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
 	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
 		Type:      ic.MessageType_RESPONSE,
@@ -617,7 +617,7 @@
 	var err error
 	// Error should never happen here
 	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
 	}
 
 	return &ic.InterContainerMessage{
@@ -629,8 +629,8 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
-	//logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
+	//logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
 	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
 		Type:      ic.MessageType_RESPONSE,
@@ -646,8 +646,8 @@
 
 	// for now we support only 1 returned value - (excluding the error)
 	if len(returnedValues) > 0 {
-		if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
-			logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+		if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
+			logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
 		}
 	}
 
@@ -659,7 +659,7 @@
 	// Marshal the response body
 	var marshalledResponseBody *any.Any
 	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
 		return nil, err
 	}
 
@@ -669,7 +669,7 @@
 	}, nil
 }
 
-func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
+func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
 	myClassValue := reflect.ValueOf(myClass)
 	// Capitalize the first letter in the funcName to workaround the first capital letters required to
 	// invoke a function from a different package
@@ -678,15 +678,16 @@
 	if !m.IsValid() {
 		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
 	}
-	in := make([]reflect.Value, len(params))
+	in := make([]reflect.Value, len(params)+1)
+	in[0] = reflect.ValueOf(ctx)
 	for i, param := range params {
-		in[i] = reflect.ValueOf(param)
+		in[i+1] = reflect.ValueOf(param)
 	}
 	out = m.Call(in)
 	return
 }
 
-func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
 	arg := &KVArg{
 		Key:   TransactionKey,
 		Value: &ic.StrType{Val: transactionId},
@@ -695,7 +696,7 @@
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
-		logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
 		return currentArgs
 	}
 	protoArg := &ic.Argument{
@@ -705,11 +706,11 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
-		logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
 		return currentArgs
 	}
 	protoArg := &ic.Argument{
@@ -719,7 +720,7 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
@@ -729,21 +730,21 @@
 		// Get the request body
 		requestBody := &ic.InterContainerRequestBody{}
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
-			logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
+			logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
-			logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+			logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
 			// Augment the requestBody with the message Id as it will be used in scenarios where cores
 			// are set in pairs and competing
-			requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
+			requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
 
 			// Augment the requestBody with the From topic name as it will be used in scenarios where a container
 			// needs to send an unsollicited message to the currently requested container
-			requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
+			requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
 
-			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
+			out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
-				logger.Warn(err)
+				logger.Warn(ctx, err)
 			}
 		}
 		// Response required?
@@ -763,7 +764,7 @@
 				if out[lastIndex].Interface() != nil { // Error
 					if retError, ok := out[lastIndex].Interface().(error); ok {
 						if retError.Error() == ErrorTransactionNotAcquired.Error() {
-							logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+							logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
 							return // Ignore - process is in competing mode and ignored transaction
 						}
 						returnError = &ic.Error{Reason: retError.Error()}
@@ -773,12 +774,12 @@
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
-					logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+					logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
 					return // Ignore - should not happen
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
-						//logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						//logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
 						if idx != lastIndex {
 							returnedValues = append(returnedValues, val.Interface())
 						}
@@ -787,9 +788,9 @@
 			}
 
 			var icm *ic.InterContainerMessage
-			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
-				logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
-				icm = encodeDefaultFailedResponse(msg)
+			if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
+				logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
+				icm = encodeDefaultFailedResponse(ctx, msg)
 			}
 			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
 			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
@@ -797,11 +798,11 @@
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
 			key := msg.Header.KeyTopic
-			logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+			logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			go func() {
-				if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
-					logger.Errorw("send-reply-failed", log.Fields{
+				if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
+					logger.Errorw(ctx, "send-reply-failed", log.Fields{
 						"topic": replyTopic,
 						"key":   key,
 						"error": err})
@@ -809,26 +810,26 @@
 			}()
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
-		logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
-		go kp.dispatchResponse(msg)
+		logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
+		go kp.dispatchResponse(ctx, msg)
 	} else {
-		logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
+		logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
 	}
 }
 
-func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
-		//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
-		go kp.handleMessage(msg, targetInterface)
+		//logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		go kp.handleMessage(context.Background(), msg, targetInterface)
 	}
 }
 
-func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.RLock()
 	defer kp.lockTransactionIdToChannelMap.RUnlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
-		logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+		logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return
 	}
 	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
@@ -838,8 +839,8 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
-	logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+	logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
@@ -850,27 +851,27 @@
 	return ch, nil
 }
 
-func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
-	logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
+	logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
 	kp.deleteFromTransactionIdToChannelMap(trnsId)
 	return nil
 }
 
-func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
-	return kp.kafkaClient.EnableLivenessChannel(enable)
+func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
 }
 
-func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
-	return kp.kafkaClient.EnableHealthinessChannel(enable)
+func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+	return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
 }
 
-func (kp *interContainerProxy) SendLiveness() error {
-	return kp.kafkaClient.SendLiveness()
+func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
+	return kp.kafkaClient.SendLiveness(ctx)
 }
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
 	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_REQUEST,
@@ -895,12 +896,12 @@
 		// ascertain the value interface type is a proto.Message
 		protoValue, ok := arg.Value.(proto.Message)
 		if !ok {
-			logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+			logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
 			err := errors.New("argument-value-not-proto-message")
 			return nil, err
 		}
 		if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
-			logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
+			logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
 			return nil, err
 		}
 		protoArg := &ic.Argument{
@@ -913,7 +914,7 @@
 	var marshalledData *any.Any
 	var err error
 	if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
-		logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
 		return nil, err
 	}
 	request := &ic.InterContainerMessage{
@@ -923,14 +924,14 @@
 	return request, nil
 }
 
-func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
+func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
 	//	Extract the message body
 	responseBody := ic.InterContainerResponseBody{}
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
-		logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
 	}
-	//logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+	//logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
 
 	return &responseBody, nil