[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase

Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
index 0d9e3a5..d977e38 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
@@ -16,9 +16,9 @@
 package kafka
 
 import (
-	"time"
-
+	"context"
 	ca "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"time"
 )
 
 const (
@@ -61,15 +61,15 @@
 
 // MsgClient represents the set of APIs  a Kafka MsgClient must implement
 type Client interface {
-	Start() error
-	Stop()
-	CreateTopic(topic *Topic, numPartition int, repFactor int) error
-	DeleteTopic(topic *Topic) error
-	Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
-	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
-	SubscribeForMetadata(func(fromTopic string, timestamp time.Time))
-	Send(msg interface{}, topic *Topic, keys ...string) error
-	SendLiveness() error
-	EnableLivenessChannel(enable bool) chan bool
-	EnableHealthinessChannel(enable bool) chan bool
+	Start(ctx context.Context) error
+	Stop(ctx context.Context)
+	CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
+	DeleteTopic(ctx context.Context, topic *Topic) error
+	Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
+	UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ca.InterContainerMessage) error
+	SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time))
+	Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
+	SendLiveness(ctx context.Context) error
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
index 149c150..99b4cdf 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
@@ -19,12 +19,12 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-var logger log.Logger
+var logger log.CLogger
 
 func init() {
 	// Setup this package so that it's log level can be modified at run time
 	var err error
-	logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kafka"})
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kafka"})
 	if err != nil {
 		panic(err)
 	}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
index 1258382..a876c09 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -50,15 +50,15 @@
 
 	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
 	// now this will return the topic name
-	GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+	GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error)
 
 	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
 	// devices owned by that service need to be reconciled
-	IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+	IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error)
 
 	// GetReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
 	// test only
-	GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+	GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error)
 }
 
 type service struct {
@@ -119,9 +119,9 @@
 	return newEndpointManager(backend, opts...)
 }
 
-func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
-	logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
-	owner, err := ep.getOwner(deviceID, serviceType)
+func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(ctx, deviceID, serviceType)
 	if err != nil {
 		return "", err
 	}
@@ -133,13 +133,13 @@
 	if endpoint == "" {
 		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
 	}
-	logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
 	return endpoint, nil
 }
 
-func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
-	logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
-	owner, err := ep.getOwner(deviceID, serviceType)
+func (ep *endpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(ctx, deviceID, serviceType)
 	if err != nil {
 		return false, nil
 	}
@@ -150,8 +150,8 @@
 	return m.getReplica() == ReplicaID(replicaNumber), nil
 }
 
-func (ep *endpointManager) GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
-	owner, err := ep.getOwner(deviceID, serviceType)
+func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(ctx, deviceID, serviceType)
 	if err != nil {
 		return 0, nil
 	}
@@ -162,8 +162,8 @@
 	return m.getReplica(), nil
 }
 
-func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
-	serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+func (ep *endpointManager) getOwner(ctx context.Context, deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(ctx, serviceType)
 	if err != nil {
 		return nil, err
 	}
@@ -171,7 +171,7 @@
 	return serv.consistentRing.LocateKey(key), nil
 }
 
-func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+func (ep *endpointManager) getServiceAndDeviceType(ctx context.Context, serviceType string) (*service, string, error) {
 	// Check whether service exist
 	ep.servicesLock.RLock()
 	serv, serviceExist := ep.services[serviceType]
@@ -179,7 +179,7 @@
 
 	// Load the service and device types if needed
 	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
-		if err := ep.loadServices(); err != nil {
+		if err := ep.loadServices(ctx); err != nil {
 			return nil, "", err
 		}
 
@@ -214,7 +214,7 @@
 // loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
 // the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
 // instead of watching for updates in the dB and acting on it.
-func (ep *endpointManager) loadServices() error {
+func (ep *endpointManager) loadServices(ctx context.Context) error {
 	ep.servicesLock.Lock()
 	defer ep.servicesLock.Unlock()
 	ep.deviceTypeServiceMapLock.Lock()
@@ -276,13 +276,13 @@
 	if logger.V(log.DebugLevel) {
 		for key, val := range ep.services {
 			members := val.consistentRing.GetMembers()
-			logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			logger.Debugw(ctx, "service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
 			for _, m := range members {
 				n := m.(Member)
-				logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+				logger.Debugw(ctx, "service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
 			}
 		}
-		logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+		logger.Debugw(ctx, "device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
 	}
 	return nil
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index cbde834..368391e 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -64,17 +64,17 @@
 }
 
 type InterContainerProxy interface {
-	Start() error
-	Stop()
+	Start(ctx context.Context) error
+	Stop(ctx context.Context)
 	GetDefaultTopic() *Topic
 	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
 	InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
-	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
-	SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
-	UnSubscribeFromRequestHandler(topic Topic) error
-	DeleteTopic(topic Topic) error
-	EnableLivenessChannel(enable bool) chan bool
-	SendLiveness() error
+	SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
+	SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
+	UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
+	DeleteTopic(ctx context.Context, topic Topic) error
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	SendLiveness(ctx context.Context) error
 }
 
 // interContainerProxy represents the messaging proxy
@@ -146,17 +146,17 @@
 	return newInterContainerProxy(opts...)
 }
 
-func (kp *interContainerProxy) Start() error {
-	logger.Info("Starting-Proxy")
+func (kp *interContainerProxy) Start(ctx context.Context) error {
+	logger.Info(ctx, "Starting-Proxy")
 
 	// Kafka MsgClient should already have been created.  If not, output fatal error
 	if kp.kafkaClient == nil {
-		logger.Fatal("kafka-client-not-set")
+		logger.Fatal(ctx, "kafka-client-not-set")
 	}
 
 	// Start the kafka client
-	if err := kp.kafkaClient.Start(); err != nil {
-		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
+	if err := kp.kafkaClient.Start(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
 
@@ -172,20 +172,20 @@
 	return nil
 }
 
-func (kp *interContainerProxy) Stop() {
-	logger.Info("stopping-intercontainer-proxy")
+func (kp *interContainerProxy) Stop(ctx context.Context) {
+	logger.Info(ctx, "stopping-intercontainer-proxy")
 	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
-	kp.kafkaClient.Stop()
-	err := kp.deleteAllTopicRequestHandlerChannelMap()
+	kp.kafkaClient.Stop(ctx)
+	err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
 	if err != nil {
-		logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+		logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
 	}
-	err = kp.deleteAllTopicResponseChannelMap()
+	err = kp.deleteAllTopicResponseChannelMap(ctx)
 	if err != nil {
-		logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+		logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
 	}
-	kp.deleteAllTransactionIdToChannelMap()
+	kp.deleteAllTransactionIdToChannelMap(ctx)
 }
 
 func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -196,7 +196,7 @@
 func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
 
-	logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+	logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
 	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
 	// typically the device ID.
 	responseTopic := replyToTopic
@@ -216,17 +216,17 @@
 		var protoRequest *ic.InterContainerMessage
 
 		// Encode the request
-		protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+		protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
 		if err != nil {
-			logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+			logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 			chnl <- NewResponse(RpcFormattingError, err, nil)
 			return
 		}
 
 		// Subscribe for response, if needed, before sending request
 		var ch <-chan *ic.InterContainerMessage
-		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
-			logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 			chnl <- NewResponse(RpcTransportError, err, nil)
 			return
 		}
@@ -234,10 +234,10 @@
 		// Send request - if the topic is formatted with a device Id then we will send the request using a
 		// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 		// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
-		logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+		logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
 
 		// if the message is not sent on kafka publish an event an close the channel
-		if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+		if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
 			chnl <- NewResponse(RpcTransportError, err, nil)
 			return
 		}
@@ -250,8 +250,8 @@
 
 		defer func() {
 			// Remove the subscription for a response on return
-			if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
-				logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+				logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
 			}
 		}()
 
@@ -259,11 +259,11 @@
 		select {
 		case msg, ok := <-ch:
 			if !ok {
-				logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
 				chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
 			}
-			logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			if responseBody, err := decodeResponse(msg); err != nil {
+			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			if responseBody, err := decodeResponse(ctx, msg); err != nil {
 				chnl <- NewResponse(RpcReply, err, nil)
 			} else {
 				if responseBody.Success {
@@ -279,12 +279,12 @@
 				}
 			}
 		case <-ctx.Done():
-			logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
 			chnl <- NewResponse(RpcTimeout, err, nil)
 		case <-kp.doneCh:
 			chnl <- NewResponse(RpcSystemClosing, nil, nil)
-			logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+			logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 		}
 	}()
 	return chnl
@@ -302,9 +302,9 @@
 	}
 
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+	protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
 	if err != nil {
-		logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+		logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
 	}
 
@@ -312,8 +312,8 @@
 	var ch <-chan *ic.InterContainerMessage
 	if waitForResponse {
 		var err error
-		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
-			logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 		}
 	}
 
@@ -321,10 +321,10 @@
 	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	//key := GetDeviceIdFromTopic(*toTopic)
-	logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+	logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
 	go func() {
-		if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
-			logger.Errorw("send-failed", log.Fields{
+		if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+			logger.Errorw(ctx, "send-failed", log.Fields{
 				"topic": toTopic,
 				"key":   key,
 				"error": err})
@@ -345,8 +345,8 @@
 		// Wait for response as well as timeout or cancellation
 		// Remove the subscription for a response on return
 		defer func() {
-			if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
-				logger.Errorw("response-unsubscribe-failed", log.Fields{
+			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+				logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
 					"id":    protoRequest.Header.Id,
 					"error": err})
 			}
@@ -354,7 +354,7 @@
 		select {
 		case msg, ok := <-ch:
 			if !ok {
-				logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
 				protoError := &ic.Error{Reason: "channel-closed"}
 				var marshalledArg *any.Any
 				if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
@@ -362,16 +362,16 @@
 				}
 				return false, marshalledArg
 			}
-			logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ic.InterContainerResponseBody
 			var err error
-			if responseBody, err = decodeResponse(msg); err != nil {
-				logger.Errorw("decode-response-error", log.Fields{"error": err})
+			if responseBody, err = decodeResponse(ctx, msg); err != nil {
+				logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
 				// FIXME we should return something
 			}
 			return responseBody.Success, responseBody.Result
 		case <-ctx.Done():
-			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
 			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
 
@@ -381,7 +381,7 @@
 			}
 			return false, marshalledArg
 		case <-childCtx.Done():
-			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
 			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
 
@@ -391,7 +391,7 @@
 			}
 			return false, marshalledArg
 		case <-kp.doneCh:
-			logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+			logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 			return true, nil
 		}
 	}
@@ -400,55 +400,55 @@
 
 // SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
-		logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 
 	kp.defaultRequestHandlerInterface = handler
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForMessages(ch, topic, handler)
+	go kp.waitForMessages(ctx, ch, topic, handler)
 
 	return nil
 }
 
 // SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
-func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
-		logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
+		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
+	go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
 
 	return nil
 }
 
-func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
-	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
+	return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
 }
 
-func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
 		// Unsubscribe to this topic first - this will close the subscribed channel
 		var err error
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
-			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+		if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
 		}
 		delete(kp.topicToResponseChannelMap, topic)
 		return err
@@ -458,16 +458,16 @@
 }
 
 // nolint: unused
-func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
-	logger.Debug("delete-all-topic-response-channel")
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
+	logger.Debug(ctx, "delete-all-topic-response-channel")
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	var unsubscribeFailTopics []string
 	for topic := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
 			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
-			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
 			// Do not return. Continue to try to unsubscribe to other topics.
 		} else {
 			// Only delete from channel map if successfully unsubscribed.
@@ -488,12 +488,12 @@
 	}
 }
 
-func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
 			return err
 		}
 		delete(kp.topicToRequestHandlerChannelMap, topic)
@@ -504,16 +504,16 @@
 }
 
 // nolint: unused
-func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
-	logger.Debug("delete-all-topic-request-channel")
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
+	logger.Debug(ctx, "delete-all-topic-request-channel")
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	var unsubscribeFailTopics []string
 	for topic := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
 			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
-			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
 			// Do not return. Continue to try to unsubscribe to other topics.
 		} else {
 			// Only delete from channel map if successfully unsubscribed.
@@ -556,8 +556,8 @@
 }
 
 // nolint: unused
-func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
-	logger.Debug("delete-all-transaction-id-channel-map")
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
+	logger.Debug(ctx, "delete-all-transaction-id-channel-map")
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
@@ -566,27 +566,27 @@
 	}
 }
 
-func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
+func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
 	// If we have any consumers on that topic we need to close them
-	if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
-		logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+	if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
 	}
-	if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
-		logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+	if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
 	}
 	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
 
-	return kp.kafkaClient.DeleteTopic(&topic)
+	return kp.kafkaClient.DeleteTopic(ctx, &topic)
 }
 
-func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
+func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
 	// Encode the response argument - needs to be a proto message
 	if returnedVal == nil {
 		return nil, nil
 	}
 	protoValue, ok := returnedVal.(proto.Message)
 	if !ok {
-		logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+		logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
 		err := errors.New("response-value-not-proto-message")
 		return nil, err
 	}
@@ -595,13 +595,13 @@
 	var marshalledReturnedVal *any.Any
 	var err error
 	if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
-		logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
 		return nil, err
 	}
 	return marshalledReturnedVal, nil
 }
 
-func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
 	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
 		Type:      ic.MessageType_RESPONSE,
@@ -617,7 +617,7 @@
 	var err error
 	// Error should never happen here
 	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
 	}
 
 	return &ic.InterContainerMessage{
@@ -629,8 +629,8 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
-	//logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
+	//logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
 	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
 		Type:      ic.MessageType_RESPONSE,
@@ -646,8 +646,8 @@
 
 	// for now we support only 1 returned value - (excluding the error)
 	if len(returnedValues) > 0 {
-		if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
-			logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+		if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
+			logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
 		}
 	}
 
@@ -659,7 +659,7 @@
 	// Marshal the response body
 	var marshalledResponseBody *any.Any
 	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
 		return nil, err
 	}
 
@@ -669,7 +669,7 @@
 	}, nil
 }
 
-func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
+func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
 	myClassValue := reflect.ValueOf(myClass)
 	// Capitalize the first letter in the funcName to workaround the first capital letters required to
 	// invoke a function from a different package
@@ -678,15 +678,16 @@
 	if !m.IsValid() {
 		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
 	}
-	in := make([]reflect.Value, len(params))
+	in := make([]reflect.Value, len(params)+1)
+	in[0] = reflect.ValueOf(ctx)
 	for i, param := range params {
-		in[i] = reflect.ValueOf(param)
+		in[i+1] = reflect.ValueOf(param)
 	}
 	out = m.Call(in)
 	return
 }
 
-func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
 	arg := &KVArg{
 		Key:   TransactionKey,
 		Value: &ic.StrType{Val: transactionId},
@@ -695,7 +696,7 @@
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
-		logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
 		return currentArgs
 	}
 	protoArg := &ic.Argument{
@@ -705,11 +706,11 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
-		logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
 		return currentArgs
 	}
 	protoArg := &ic.Argument{
@@ -719,7 +720,7 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
@@ -729,21 +730,21 @@
 		// Get the request body
 		requestBody := &ic.InterContainerRequestBody{}
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
-			logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
+			logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
-			logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+			logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
 			// Augment the requestBody with the message Id as it will be used in scenarios where cores
 			// are set in pairs and competing
-			requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
+			requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
 
 			// Augment the requestBody with the From topic name as it will be used in scenarios where a container
 			// needs to send an unsollicited message to the currently requested container
-			requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
+			requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
 
-			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
+			out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
-				logger.Warn(err)
+				logger.Warn(ctx, err)
 			}
 		}
 		// Response required?
@@ -763,7 +764,7 @@
 				if out[lastIndex].Interface() != nil { // Error
 					if retError, ok := out[lastIndex].Interface().(error); ok {
 						if retError.Error() == ErrorTransactionNotAcquired.Error() {
-							logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+							logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
 							return // Ignore - process is in competing mode and ignored transaction
 						}
 						returnError = &ic.Error{Reason: retError.Error()}
@@ -773,12 +774,12 @@
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
-					logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+					logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
 					return // Ignore - should not happen
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
-						//logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						//logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
 						if idx != lastIndex {
 							returnedValues = append(returnedValues, val.Interface())
 						}
@@ -787,9 +788,9 @@
 			}
 
 			var icm *ic.InterContainerMessage
-			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
-				logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
-				icm = encodeDefaultFailedResponse(msg)
+			if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
+				logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
+				icm = encodeDefaultFailedResponse(ctx, msg)
 			}
 			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
 			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
@@ -797,11 +798,11 @@
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
 			key := msg.Header.KeyTopic
-			logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+			logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			go func() {
-				if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
-					logger.Errorw("send-reply-failed", log.Fields{
+				if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
+					logger.Errorw(ctx, "send-reply-failed", log.Fields{
 						"topic": replyTopic,
 						"key":   key,
 						"error": err})
@@ -809,26 +810,26 @@
 			}()
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
-		logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
-		go kp.dispatchResponse(msg)
+		logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
+		go kp.dispatchResponse(ctx, msg)
 	} else {
-		logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
+		logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
 	}
 }
 
-func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
-		//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
-		go kp.handleMessage(msg, targetInterface)
+		//logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		go kp.handleMessage(context.Background(), msg, targetInterface)
 	}
 }
 
-func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.RLock()
 	defer kp.lockTransactionIdToChannelMap.RUnlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
-		logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+		logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return
 	}
 	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
@@ -838,8 +839,8 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
-	logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+	logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
@@ -850,27 +851,27 @@
 	return ch, nil
 }
 
-func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
-	logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
+	logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
 	kp.deleteFromTransactionIdToChannelMap(trnsId)
 	return nil
 }
 
-func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
-	return kp.kafkaClient.EnableLivenessChannel(enable)
+func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
 }
 
-func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
-	return kp.kafkaClient.EnableHealthinessChannel(enable)
+func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+	return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
 }
 
-func (kp *interContainerProxy) SendLiveness() error {
-	return kp.kafkaClient.SendLiveness()
+func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
+	return kp.kafkaClient.SendLiveness(ctx)
 }
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
 	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_REQUEST,
@@ -895,12 +896,12 @@
 		// ascertain the value interface type is a proto.Message
 		protoValue, ok := arg.Value.(proto.Message)
 		if !ok {
-			logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+			logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
 			err := errors.New("argument-value-not-proto-message")
 			return nil, err
 		}
 		if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
-			logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
+			logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
 			return nil, err
 		}
 		protoArg := &ic.Argument{
@@ -913,7 +914,7 @@
 	var marshalledData *any.Any
 	var err error
 	if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
-		logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
 		return nil, err
 	}
 	request := &ic.InterContainerMessage{
@@ -923,14 +924,14 @@
 	return request, nil
 }
 
-func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
+func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
 	//	Extract the message body
 	responseBody := ic.InterContainerResponseBody{}
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
-		logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
 	}
-	//logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+	//logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
 
 	return &responseBody, nil
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index 581cf49..87c7ce4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -231,8 +231,8 @@
 	return client
 }
 
-func (sc *SaramaClient) Start() error {
-	logger.Info("Starting-kafka-sarama-client")
+func (sc *SaramaClient) Start(ctx context.Context) error {
+	logger.Info(ctx, "Starting-kafka-sarama-client")
 
 	// Create the Done channel
 	sc.doneCh = make(chan int, 1)
@@ -242,26 +242,26 @@
 	// Add a cleanup in case of failure to startup
 	defer func() {
 		if err != nil {
-			sc.Stop()
+			sc.Stop(ctx)
 		}
 	}()
 
 	// Create the Cluster Admin
-	if err = sc.createClusterAdmin(); err != nil {
-		logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
+	if err = sc.createClusterAdmin(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
 		return err
 	}
 
 	// Create the Publisher
-	if err := sc.createPublisher(); err != nil {
-		logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+	if err := sc.createPublisher(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
 		return err
 	}
 
 	if sc.consumerType == DefaultConsumerType {
 		// Create the master consumers
-		if err := sc.createConsumer(); err != nil {
-			logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+		if err := sc.createConsumer(ctx); err != nil {
+			logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
 			return err
 		}
 	}
@@ -269,15 +269,15 @@
 	// Create the topic to consumers/channel map
 	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
 
-	logger.Info("kafka-sarama-client-started")
+	logger.Info(ctx, "kafka-sarama-client-started")
 
 	sc.started = true
 
 	return nil
 }
 
-func (sc *SaramaClient) Stop() {
-	logger.Info("stopping-sarama-client")
+func (sc *SaramaClient) Stop(ctx context.Context) {
+	logger.Info(ctx, "stopping-sarama-client")
 
 	sc.started = false
 
@@ -286,38 +286,38 @@
 
 	if sc.producer != nil {
 		if err := sc.producer.Close(); err != nil {
-			logger.Errorw("closing-producer-failed", log.Fields{"error": err})
+			logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
 		}
 	}
 
 	if sc.consumer != nil {
 		if err := sc.consumer.Close(); err != nil {
-			logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
+			logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
 		}
 	}
 
 	for key, val := range sc.groupConsumers {
-		logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
+		logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
 		if err := val.Close(); err != nil {
-			logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+			logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
 		}
 	}
 
 	if sc.cAdmin != nil {
 		if err := sc.cAdmin.Close(); err != nil {
-			logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
+			logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
 		}
 	}
 
 	//TODO: Clear the consumers map
 	//sc.clearConsumerChannelMap()
 
-	logger.Info("sarama-client-stopped")
+	logger.Info(ctx, "sarama-client-stopped")
 }
 
 //createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
 // the invoking function must hold the lock
-func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
+func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
 	// Set the topic details
 	topicDetail := &sarama.TopicDetail{}
 	topicDetail.NumPartitions = int32(numPartition)
@@ -329,29 +329,29 @@
 	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
 		if err == sarama.ErrTopicAlreadyExists {
 			//	Not an error
-			logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
+			logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
 			return nil
 		}
-		logger.Errorw("create-topic-failure", log.Fields{"error": err})
+		logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
 		return err
 	}
 	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
 	// do so.
-	logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+	logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
 	return nil
 }
 
 //CreateTopic is a public API to create a topic on the Kafka Broker.  It uses a lock on a specific topic to
 // ensure no two go routines are performing operations on the same topic
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
-	return sc.createTopic(topic, numPartition, repFactor)
+	return sc.createTopic(ctx, topic, numPartition, repFactor)
 }
 
 //DeleteTopic removes a topic from the kafka Broker
-func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -359,16 +359,16 @@
 	if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
 		if err == sarama.ErrUnknownTopicOrPartition {
 			//	Not an error as does not exist
-			logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
+			logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
 			return nil
 		}
-		logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
+		logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
 		return err
 	}
 
 	// Clear the topic from the consumer channel.  This will also close any consumers listening on that topic.
-	if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
-		logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+	if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
+		logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
 		return err
 	}
 	return nil
@@ -376,18 +376,18 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
-	logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
+	logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
 
 	// If a consumers already exist for that topic then resuse it
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
-		logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+		logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
 		ch := make(chan *ic.InterContainerMessage)
-		sc.addChannelToConsumerChannelMap(topic, ch)
+		sc.addChannelToConsumerChannelMap(ctx, topic, ch)
 		return ch, nil
 	}
 
@@ -398,13 +398,13 @@
 	// Use the consumerType option to figure out the type of consumer to launch
 	if sc.consumerType == PartitionConsumer {
 		if sc.autoCreateTopic {
-			if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
-				logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+			if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
+				logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
 				return nil, err
 			}
 		}
-		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
-			logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
+			logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
 	} else if sc.consumerType == GroupCustomer {
@@ -412,7 +412,7 @@
 		// does not consume from a precreated topic in some scenarios
 		//if sc.autoCreateTopic {
 		//	if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
-		//		logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
+		//		logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
 		//		return nil, err
 		//	}
 		//}
@@ -425,13 +425,13 @@
 			// Need to use a unique group Id per topic
 			groupId = sc.consumerGroupPrefix + topic.Name
 		}
-		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
-			logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
+			logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
 
 	} else {
-		logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+		logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
 		return nil, errors.New("unknown-consumer-type")
 	}
 
@@ -439,37 +439,37 @@
 }
 
 //UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
-	logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+	logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	var err error
-	if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
-		logger.Errorw("failed-removing-channel", log.Fields{"error": err})
+	if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
+		logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
 	}
-	if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
-		logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
+	if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
 	}
 	return err
 }
 
-func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
+func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
 	sc.metadataCallback = callback
 }
 
-func (sc *SaramaClient) updateLiveness(alive bool) {
+func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
 	// Post a consistent stream of liveness data to the channel,
 	// so that in a live state, the core does not timeout and
 	// send a forced liveness message. Production of liveness
 	// events to the channel is rate-limited by livenessChannelInterval.
 	if sc.liveness != nil {
 		if sc.alive != alive {
-			logger.Info("update-liveness-channel-because-change")
+			logger.Info(ctx, "update-liveness-channel-because-change")
 			sc.liveness <- alive
 			sc.lastLivenessTime = time.Now()
 		} else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
-			logger.Info("update-liveness-channel-because-interval")
+			logger.Info(ctx, "update-liveness-channel-because-interval")
 			sc.liveness <- alive
 			sc.lastLivenessTime = time.Now()
 		}
@@ -477,21 +477,21 @@
 
 	// Only emit a log message when the state changes
 	if sc.alive != alive {
-		logger.Info("set-client-alive", log.Fields{"alive": alive})
+		logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
 		sc.alive = alive
 	}
 }
 
 // Once unhealthy, we never go back
-func (sc *SaramaClient) setUnhealthy() {
+func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
 	sc.healthy = false
 	if sc.healthiness != nil {
-		logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+		logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
 		sc.healthiness <- sc.healthy
 	}
 }
 
-func (sc *SaramaClient) isLivenessError(err error) bool {
+func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
 	// Sarama producers and consumers encapsulate the error inside
 	// a ProducerError or ConsumerError struct.
 	if prodError, ok := err.(*sarama.ProducerError); ok {
@@ -506,48 +506,48 @@
 
 	switch err.Error() {
 	case context.DeadlineExceeded.Error():
-		logger.Info("is-liveness-error-timeout")
+		logger.Info(ctx, "is-liveness-error-timeout")
 		return true
 	case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
-		logger.Info("is-liveness-error-no-brokers")
+		logger.Info(ctx, "is-liveness-error-no-brokers")
 		return true
 	case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
-		logger.Info("is-liveness-error-shutting-down")
+		logger.Info(ctx, "is-liveness-error-shutting-down")
 		return true
 	case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
-		logger.Info("is-liveness-error-not-available")
+		logger.Info(ctx, "is-liveness-error-not-available")
 		return true
 	case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
-		logger.Info("is-liveness-error-circuit-breaker-open")
+		logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
 		return true
 	}
 
 	if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
-		logger.Info("is-liveness-error-connection-refused")
+		logger.Info(ctx, "is-liveness-error-connection-refused")
 		return true
 	}
 
 	if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
-		logger.Info("is-liveness-error-io-timeout")
+		logger.Info(ctx, "is-liveness-error-io-timeout")
 		return true
 	}
 
 	// Other errors shouldn't trigger a loss of liveness
 
-	logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+	logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
 
 	return false
 }
 
 // send formats and sends the request onto the kafka messaging bus.
-func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
+func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
 
 	// Assert message is a proto message
 	var protoMsg proto.Message
 	var ok bool
 	// ascertain the value interface type is a proto.Message
 	if protoMsg, ok = msg.(proto.Message); !ok {
-		logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+		logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
 		return fmt.Errorf("not-a-proto-msg-%s", msg)
 	}
 
@@ -555,7 +555,7 @@
 	var err error
 	//	Create the Sarama producer message
 	if marshalled, err = proto.Marshal(protoMsg); err != nil {
-		logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+		logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
 		return err
 	}
 	key := ""
@@ -574,12 +574,12 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
-		sc.updateLiveness(true)
+		logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
+		sc.updateLiveness(ctx, true)
 	case notOk := <-sc.producer.Errors():
-		logger.Debugw("error-sending", log.Fields{"status": notOk})
-		if sc.isLivenessError(notOk) {
-			sc.updateLiveness(false)
+		logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
+		if sc.isLivenessError(ctx, notOk) {
+			sc.updateLiveness(ctx, false)
 		}
 		return notOk
 	}
@@ -591,11 +591,11 @@
 // or not the channel is still live. This channel is then picked up
 // by the service (i.e. rw_core / ro_core) to update readiness status
 // and/or take other actions.
-func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
-	logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
 	if enable {
 		if sc.liveness == nil {
-			logger.Info("kafka-create-liveness-channel")
+			logger.Info(ctx, "kafka-create-liveness-channel")
 			// At least 1, so we can immediately post to it without blocking
 			// Setting a bigger number (10) allows the monitor to fall behind
 			// without blocking others. The monitor shouldn't really fall
@@ -615,11 +615,11 @@
 // Enable the Healthiness monitor channel. This channel will report "false"
 // if the kafka consumers die, or some other problem occurs which is
 // catastrophic that would require re-creating the client.
-func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
-	logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+	logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
 	if enable {
 		if sc.healthiness == nil {
-			logger.Info("kafka-create-healthiness-channel")
+			logger.Info(ctx, "kafka-create-healthiness-channel")
 			// At least 1, so we can immediately post to it without blocking
 			// Setting a bigger number (10) allows the monitor to fall behind
 			// without blocking others. The monitor shouldn't really fall
@@ -638,7 +638,7 @@
 
 // send an empty message on the liveness channel to check whether connectivity has
 // been restored.
-func (sc *SaramaClient) SendLiveness() error {
+func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
 	if !sc.started {
 		return fmt.Errorf("SendLiveness() called while not started")
 	}
@@ -654,12 +654,12 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
-		sc.updateLiveness(true)
+		logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
+		sc.updateLiveness(ctx, true)
 	case notOk := <-sc.producer.Errors():
-		logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
-		if sc.isLivenessError(notOk) {
-			sc.updateLiveness(false)
+		logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
+		if sc.isLivenessError(ctx, notOk) {
+			sc.updateLiveness(ctx, false)
 		}
 		return notOk
 	}
@@ -686,7 +686,7 @@
 	return sarama.OffsetNewest
 }
 
-func (sc *SaramaClient) createClusterAdmin() error {
+func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
 	config := sarama.NewConfig()
 	config.Version = sarama.V1_0_0_0
 
@@ -694,7 +694,7 @@
 	var cAdmin sarama.ClusterAdmin
 	var err error
 	if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
-		logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
 		return err
 	}
 	sc.cAdmin = cAdmin
@@ -739,24 +739,24 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
 		consumerCh.channels = append(consumerCh.channels, ch)
 		return
 	}
-	logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+	logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
 }
 
 //closeConsumers closes a list of sarama consumers.  The consumers can either be a partition consumers or a group consumers
-func closeConsumers(consumers []interface{}) error {
+func closeConsumers(ctx context.Context, consumers []interface{}) error {
 	var err error
 	for _, consumer := range consumers {
 		//	Is it a partition consumers?
 		if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
 			if errTemp := partionConsumer.Close(); errTemp != nil {
-				logger.Debugw("partition!!!", log.Fields{"err": errTemp})
+				logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
 				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
 					// This can occur on race condition
 					err = nil
@@ -778,35 +778,35 @@
 	return err
 }
 
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
 		// Channel will be closed in the removeChannel method
-		consumerCh.channels = removeChannel(consumerCh.channels, ch)
+		consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
 		// If there are no more channels then we can close the consumers itself
 		if len(consumerCh.channels) == 0 {
-			logger.Debugw("closing-consumers", log.Fields{"topic": topic})
-			err := closeConsumers(consumerCh.consumers)
+			logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
+			err := closeConsumers(ctx, consumerCh.consumers)
 			//err := consumerCh.consumers.Close()
 			delete(sc.topicToConsumerChannelMap, topic.Name)
 			return err
 		}
 		return nil
 	}
-	logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
 	return errors.New("topic-does-not-exist")
 }
 
-func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
+func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
 		for _, ch := range consumerCh.channels {
 			// Channel will be closed in the removeChannel method
-			removeChannel(consumerCh.channels, ch)
+			removeChannel(ctx, consumerCh.channels, ch)
 		}
-		err := closeConsumers(consumerCh.consumers)
+		err := closeConsumers(ctx, consumerCh.consumers)
 		//if err == sarama.ErrUnknownTopicOrPartition {
 		//	// Not an error
 		//	err = nil
@@ -815,12 +815,12 @@
 		delete(sc.topicToConsumerChannelMap, topic.Name)
 		return err
 	}
-	logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
 	return nil
 }
 
 //createPublisher creates the publisher which is used to send a message onto kafka
-func (sc *SaramaClient) createPublisher() error {
+func (sc *SaramaClient) createPublisher(ctx context.Context) error {
 	// This Creates the publisher
 	config := sarama.NewConfig()
 	config.Producer.Partitioner = sarama.NewRandomPartitioner
@@ -835,16 +835,16 @@
 	brokers := []string{sc.KafkaAddress}
 
 	if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
-		logger.Errorw("error-starting-publisher", log.Fields{"error": err})
+		logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
 		return err
 	} else {
 		sc.producer = producer
 	}
-	logger.Info("Kafka-publisher-created")
+	logger.Info(ctx, "Kafka-publisher-created")
 	return nil
 }
 
-func (sc *SaramaClient) createConsumer() error {
+func (sc *SaramaClient) createConsumer(ctx context.Context) error {
 	config := sarama.NewConfig()
 	config.Consumer.Return.Errors = true
 	config.Consumer.Fetch.Min = 1
@@ -855,17 +855,17 @@
 	brokers := []string{sc.KafkaAddress}
 
 	if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
-		logger.Errorw("error-starting-consumers", log.Fields{"error": err})
+		logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
 		return err
 	} else {
 		sc.consumer = consumer
 	}
-	logger.Info("Kafka-consumers-created")
+	logger.Info(ctx, "Kafka-consumers-created")
 	return nil
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
@@ -883,10 +883,10 @@
 	var err error
 
 	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
-		logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 
 	//sc.groupConsumers[topic.Name] = consumer
 	sc.addToGroupConsumers(topic.Name, consumer)
@@ -911,104 +911,104 @@
 	}
 }
 
-func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
-	logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
+	logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
 		case err, ok := <-consumer.Errors():
 			if ok {
-				if sc.isLivenessError(err) {
-					sc.updateLiveness(false)
-					logger.Warnw("partition-consumers-error", log.Fields{"error": err})
+				if sc.isLivenessError(ctx, err) {
+					sc.updateLiveness(ctx, false)
+					logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
 				}
 			} else {
 				// Channel is closed
 				break startloop
 			}
 		case msg, ok := <-consumer.Messages():
-			//logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			//logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			if !ok {
 				// channel is closed
 				break startloop
 			}
 			msgBody := msg.Value
-			sc.updateLiveness(true)
-			logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+			sc.updateLiveness(ctx, true)
+			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				logger.Warnw("partition-invalid-message", log.Fields{"error": err})
+				logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
 				continue
 			}
 			go sc.dispatchToConsumers(consumerChnls, icm)
 		case <-sc.doneCh:
-			logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
+			logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-	logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
-	sc.setUnhealthy()
+	logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
+	sc.setUnhealthy(ctx)
 }
 
-func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
-	logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+	logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
 startloop:
 	for {
 		select {
 		case err, ok := <-consumer.Errors():
 			if ok {
-				if sc.isLivenessError(err) {
-					sc.updateLiveness(false)
+				if sc.isLivenessError(ctx, err) {
+					sc.updateLiveness(ctx, false)
 				}
-				logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+				logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
-				logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
+				logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
 				// channel is closed
 				break startloop
 			}
 		case msg, ok := <-consumer.Messages():
 			if !ok {
-				logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+				logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
 				// Channel closed
 				break startloop
 			}
-			sc.updateLiveness(true)
-			logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+			sc.updateLiveness(ctx, true)
+			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				logger.Warnw("invalid-message", log.Fields{"error": err})
+				logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
 				continue
 			}
 			go sc.dispatchToConsumers(consumerChnls, icm)
 			consumer.MarkOffset(msg, "")
 		case ntf := <-consumer.Notifications():
-			logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
+			logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
 		case <-sc.doneCh:
-			logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
+			logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-	logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
-	sc.setUnhealthy()
+	logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
+	sc.setUnhealthy(ctx)
 }
 
-func (sc *SaramaClient) startConsumers(topic *Topic) error {
-	logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
+	logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
 	var consumerCh *consumerChannels
 	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
-		logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
+		logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
 		return errors.New("consumers-not-exist")
 	}
 	// For each consumer listening for that topic, start a consumption loop
 	for _, consumer := range consumerCh.consumers {
 		if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
-			go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
+			go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
 		} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
-			go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
+			go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
 		} else {
-			logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
+			logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
 			return errors.New("invalid-consumer")
 		}
 	}
@@ -1017,12 +1017,12 @@
 
 //// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 //// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
-	if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
-		logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+	if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
+		logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 
@@ -1044,8 +1044,8 @@
 
 	//Start a consumers to listen on that specific topic
 	go func() {
-		if err := sc.startConsumers(topic); err != nil {
-			logger.Errorw("start-consumers-failed", log.Fields{
+		if err := sc.startConsumers(ctx, topic); err != nil {
+			logger.Errorw(ctx, "start-consumers-failed", log.Fields{
 				"topic": topic,
 				"error": err})
 		}
@@ -1056,12 +1056,12 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
-		logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+	if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
+		logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
@@ -1077,8 +1077,8 @@
 
 	//Start a consumers to listen on that specific topic
 	go func() {
-		if err := sc.startConsumers(topic); err != nil {
-			logger.Errorw("start-consumers-failed", log.Fields{
+		if err := sc.startConsumers(ctx, topic); err != nil {
+			logger.Errorw(ctx, "start-consumers-failed", log.Fields{
 				"topic": topic,
 				"error": err})
 		}
@@ -1087,11 +1087,11 @@
 	return consumerListeningChannel, nil
 }
 
-func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
-	logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+	logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
 	partitionList, err := sc.consumer.Partitions(topic.Name)
 	if err != nil {
-		logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 
@@ -1099,7 +1099,7 @@
 	for _, partition := range partitionList {
 		var pConsumer sarama.PartitionConsumer
 		if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
-			logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+			logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
 		pConsumers = append(pConsumers, pConsumer)
@@ -1107,14 +1107,14 @@
 	return pConsumers, nil
 }
 
-func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
 	var i int
 	var channel chan *ic.InterContainerMessage
 	for i, channel = range channels {
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
 			close(channel)
-			logger.Debug("channel-closed")
+			logger.Debug(ctx, "channel-closed")
 			return channels[:len(channels)-1]
 		}
 	}
@@ -1129,14 +1129,14 @@
 	}
 }
 
-func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
+func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
 	sc.lockOfGroupConsumers.Lock()
 	defer sc.lockOfGroupConsumers.Unlock()
 	if _, exist := sc.groupConsumers[topic]; exist {
 		consumer := sc.groupConsumers[topic]
 		delete(sc.groupConsumers, topic)
 		if err := consumer.Close(); err != nil {
-			logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
+			logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
 			return err
 		}
 	}