[VOL-2471] Update library to use package logger

This commit consists of the following:
1) Add a GetLogLevel() API to make it easier to use specific
logger.   There is also the V() API that kind of do something
similar.
2) Add a common.go file to some heavily used packages in order
to dynamically set their log level and also to a set a specific
logger per package.
3) Use a per package logger for some of the heavily used packages
for improved performance.

Change-Id: If22a2c82d87d808f305677a2e793f8064f33291e
diff --git a/pkg/kafka/common.go b/pkg/kafka/common.go
new file mode 100644
index 0000000..84a4e07
--- /dev/null
+++ b/pkg/kafka/common.go
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+const (
+	logLevel = log.ErrorLevel
+)
+
+var logger log.Logger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.AddPackage(log.JSON, logLevel, log.Fields{"pkg": "kafka"})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 4e04b30..652bdfa 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -31,11 +31,6 @@
 	"time"
 )
 
-// Initialize the logger - gets the default until the main function setup the logger
-func init() {
-	log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 const (
 	DefaultMaxRetries     = 3
 	DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
@@ -148,11 +143,11 @@
 }
 
 func (kp *InterContainerProxy) Start() error {
-	log.Info("Starting-Proxy")
+	logger.Info("Starting-Proxy")
 
 	// Kafka MsgClient should already have been created.  If not, output fatal error
 	if kp.kafkaClient == nil {
-		log.Fatal("kafka-client-not-set")
+		logger.Fatal("kafka-client-not-set")
 	}
 
 	// Create the Done channel
@@ -160,7 +155,7 @@
 
 	// Start the kafka client
 	if err := kp.kafkaClient.Start(); err != nil {
-		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
+		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
 
@@ -177,7 +172,7 @@
 }
 
 func (kp *InterContainerProxy) Stop() {
-	log.Info("stopping-intercontainer-proxy")
+	logger.Info("stopping-intercontainer-proxy")
 	kp.doneCh <- 1
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
@@ -188,10 +183,10 @@
 
 // DeviceDiscovered publish the discovered device onto the kafka messaging bus
 func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
-	log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+	logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
 	//	Simple validation
 	if deviceId == "" || deviceType == "" {
-		log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+		logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
 		return errors.New("invalid-parameters")
 	}
 	//	Create the device discovery message
@@ -212,7 +207,7 @@
 	var marshalledData *any.Any
 	var err error
 	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
-		log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+		logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
 		return err
 	}
 	msg := &ic.InterContainerMessage{
@@ -222,7 +217,7 @@
 
 	// Send the message
 	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
-		log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+		logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
 		return err
 	}
 	return nil
@@ -242,7 +237,7 @@
 	// Encode the request
 	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
 	if err != nil {
-		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+		logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
 	}
 
@@ -251,7 +246,7 @@
 	if waitForResponse {
 		var err error
 		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
-			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+			logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 		}
 	}
 
@@ -259,7 +254,7 @@
 	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	//key := GetDeviceIdFromTopic(*toTopic)
-	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+	logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
 	go kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
@@ -279,7 +274,7 @@
 		select {
 		case msg, ok := <-ch:
 			if !ok {
-				log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
 				protoError := &ic.Error{Reason: "channel-closed"}
 				var marshalledArg *any.Any
 				if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
@@ -287,15 +282,15 @@
 				}
 				return false, marshalledArg
 			}
-			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ic.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
-				log.Errorw("decode-response-error", log.Fields{"error": err})
+				logger.Errorw("decode-response-error", log.Fields{"error": err})
 			}
 			return responseBody.Success, responseBody.Result
 		case <-ctx.Done():
-			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
 			protoError := &ic.Error{Reason: ctx.Err().Error()}
 			var marshalledArg *any.Any
@@ -304,7 +299,7 @@
 			}
 			return false, marshalledArg
 		case <-childCtx.Done():
-			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
 			protoError := &ic.Error{Reason: childCtx.Err().Error()}
 			var marshalledArg *any.Any
@@ -313,7 +308,7 @@
 			}
 			return false, marshalledArg
 		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+			logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 			return true, nil
 		}
 	}
@@ -329,7 +324,7 @@
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
-		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 
@@ -348,7 +343,7 @@
 	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
-		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -387,7 +382,7 @@
 		// Unsubscribe to this topic first - this will close the subscribed channel
 		var err error
 		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
-			log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
 		}
 		delete(kp.topicToResponseChannelMap, topic)
 		return err
@@ -403,7 +398,7 @@
 	for topic, _ := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
 		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
-			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
 		}
 		delete(kp.topicToResponseChannelMap, topic)
 	}
@@ -438,7 +433,7 @@
 	for topic, _ := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
 		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
-			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
 		}
 		delete(kp.topicToRequestHandlerChannelMap, topic)
 	}
@@ -486,10 +481,10 @@
 func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
 	// If we have any consumers on that topic we need to close them
 	if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
-		log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+		logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
 	}
 	if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
-		log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+		logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
 	}
 	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
 
@@ -503,7 +498,7 @@
 	}
 	protoValue, ok := returnedVal.(proto.Message)
 	if !ok {
-		log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+		logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
 		err := errors.New("response-value-not-proto-message")
 		return nil, err
 	}
@@ -512,7 +507,7 @@
 	var marshalledReturnedVal *any.Any
 	var err error
 	if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
-		log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
+		logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
 		return nil, err
 	}
 	return marshalledReturnedVal, nil
@@ -534,7 +529,7 @@
 	var err error
 	// Error should never happen here
 	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
+		logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
 	}
 
 	return &ic.InterContainerMessage{
@@ -547,7 +542,7 @@
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
 func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
-	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+	//logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
 	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
 		Type:      ic.MessageType_RESPONSE,
@@ -562,7 +557,7 @@
 	var err error
 	for _, returnVal := range returnedValues {
 		if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
-			log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+			logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
 		}
 		break // for now we support only 1 returned value - (excluding the error)
 	}
@@ -575,7 +570,7 @@
 	// Marshal the response body
 	var marshalledResponseBody *any.Any
 	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+		logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
 		return nil, err
 	}
 
@@ -611,7 +606,7 @@
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
-		log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
 		return currentArgs
 	}
 	protoArg := &ic.Argument{
@@ -625,7 +620,7 @@
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
-		log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
 		return currentArgs
 	}
 	protoArg := &ic.Argument{
@@ -645,9 +640,9 @@
 		// Get the request body
 		requestBody := &ic.InterContainerRequestBody{}
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
-			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
+			logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
-			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+			logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
 			// Augment the requestBody with the message Id as it will be used in scenarios where cores
 			// are set in pairs and competing
@@ -659,7 +654,7 @@
 
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
-				log.Warn(err)
+				logger.Warn(err)
 			}
 		}
 		// Response required?
@@ -679,7 +674,7 @@
 				if out[lastIndex].Interface() != nil { // Error
 					if retError, ok := out[lastIndex].Interface().(error); ok {
 						if retError.Error() == ErrorTransactionNotAcquired.Error() {
-							log.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+							logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
 							return // Ignore - process is in competing mode and ignored transaction
 						}
 						returnError = &ic.Error{Reason: retError.Error()}
@@ -689,12 +684,12 @@
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
-					log.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+					logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
 					return // Ignore - should not happen
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
-						//log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						//logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
 						if idx != lastIndex {
 							returnedValues = append(returnedValues, val.Interface())
 						}
@@ -704,7 +699,7 @@
 
 			var icm *ic.InterContainerMessage
 			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
-				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
+				logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
 			}
 			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
@@ -713,22 +708,22 @@
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
 			key := msg.Header.KeyTopic
-			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+			logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			go kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
-		log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
+		logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
 		go kp.dispatchResponse(msg)
 	} else {
-		log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
+		logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
 	}
 }
 
 func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
-		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
 		go kp.handleMessage(msg, targetInterface)
 	}
 }
@@ -737,7 +732,7 @@
 	kp.lockTransactionIdToChannelMap.RLock()
 	defer kp.lockTransactionIdToChannelMap.RUnlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
-		log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+		logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return
 	}
 	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
@@ -748,7 +743,7 @@
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
 func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
-	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+	logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
@@ -759,7 +754,7 @@
 }
 
 func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
-	log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+	logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
 	kp.deleteFromTransactionIdToChannelMap(trnsId)
 	return nil
 }
@@ -803,12 +798,12 @@
 		// ascertain the value interface type is a proto.Message
 		protoValue, ok := arg.Value.(proto.Message)
 		if !ok {
-			log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+			logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
 			err := errors.New("argument-value-not-proto-message")
 			return nil, err
 		}
 		if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
-			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+			logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
 			return nil, err
 		}
 		protoArg := &ic.Argument{
@@ -821,7 +816,7 @@
 	var marshalledData *any.Any
 	var err error
 	if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
-		log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+		logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
 		return nil, err
 	}
 	request := &ic.InterContainerMessage{
@@ -835,10 +830,10 @@
 	//	Extract the message body
 	responseBody := ic.InterContainerResponseBody{}
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
 	}
-	//log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+	//logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
 
 	return &responseBody, nil
 
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index c05df69..6bc2a49 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -31,10 +31,6 @@
 	"time"
 )
 
-func init() {
-	log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 type returnErrorFunction func() error
 
 // consumerChannels represents one or more consumers listening on a kafka topic.  Once a message is received on that
@@ -241,7 +237,7 @@
 }
 
 func (sc *SaramaClient) Start() error {
-	log.Info("Starting-kafka-sarama-client")
+	logger.Info("Starting-kafka-sarama-client")
 
 	// Create the Done channel
 	sc.doneCh = make(chan int, 1)
@@ -257,20 +253,20 @@
 
 	// Create the Cluster Admin
 	if err = sc.createClusterAdmin(); err != nil {
-		log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
+		logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
 		return err
 	}
 
 	// Create the Publisher
 	if err := sc.createPublisher(); err != nil {
-		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+		logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
 		return err
 	}
 
 	if sc.consumerType == DefaultConsumerType {
 		// Create the master consumers
 		if err := sc.createConsumer(); err != nil {
-			log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+			logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
 			return err
 		}
 	}
@@ -278,7 +274,7 @@
 	// Create the topic to consumers/channel map
 	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
 
-	log.Info("kafka-sarama-client-started")
+	logger.Info("kafka-sarama-client-started")
 
 	sc.started = true
 
@@ -286,7 +282,7 @@
 }
 
 func (sc *SaramaClient) Stop() {
-	log.Info("stopping-sarama-client")
+	logger.Info("stopping-sarama-client")
 
 	sc.started = false
 
@@ -295,33 +291,33 @@
 
 	if sc.producer != nil {
 		if err := sc.producer.Close(); err != nil {
-			log.Errorw("closing-producer-failed", log.Fields{"error": err})
+			logger.Errorw("closing-producer-failed", log.Fields{"error": err})
 		}
 	}
 
 	if sc.consumer != nil {
 		if err := sc.consumer.Close(); err != nil {
-			log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
+			logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
 		}
 	}
 
 	for key, val := range sc.groupConsumers {
-		log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+		logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
 		if err := val.Close(); err != nil {
-			log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+			logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
 		}
 	}
 
 	if sc.cAdmin != nil {
 		if err := sc.cAdmin.Close(); err != nil {
-			log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
+			logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
 		}
 	}
 
 	//TODO: Clear the consumers map
 	//sc.clearConsumerChannelMap()
 
-	log.Info("sarama-client-stopped")
+	logger.Info("sarama-client-stopped")
 }
 
 //createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
@@ -338,15 +334,15 @@
 	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
 		if err == sarama.ErrTopicAlreadyExists {
 			//	Not an error
-			log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
+			logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
 			return nil
 		}
-		log.Errorw("create-topic-failure", log.Fields{"error": err})
+		logger.Errorw("create-topic-failure", log.Fields{"error": err})
 		return err
 	}
 	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
 	// do so.
-	log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+	logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
 	return nil
 }
 
@@ -368,16 +364,16 @@
 	if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
 		if err == sarama.ErrUnknownTopicOrPartition {
 			//	Not an error as does not exist
-			log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
+			logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
 			return nil
 		}
-		log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
+		logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
 		return err
 	}
 
 	// Clear the topic from the consumer channel.  This will also close any consumers listening on that topic.
 	if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
-		log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+		logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
 		return err
 	}
 	return nil
@@ -389,11 +385,11 @@
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
-	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+	logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
 
 	// If a consumers already exist for that topic then resuse it
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
-		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+		logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
 		ch := make(chan *ic.InterContainerMessage)
 		sc.addChannelToConsumerChannelMap(topic, ch)
@@ -408,12 +404,12 @@
 	if sc.consumerType == PartitionConsumer {
 		if sc.autoCreateTopic {
 			if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
-				log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+				logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
 				return nil, err
 			}
 		}
 		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
-			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+			logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
 	} else if sc.consumerType == GroupCustomer {
@@ -421,7 +417,7 @@
 		// does not consume from a precreated topic in some scenarios
 		//if sc.autoCreateTopic {
 		//	if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
-		//		log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+		//		logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
 		//		return nil, err
 		//	}
 		//}
@@ -435,12 +431,12 @@
 			groupId = sc.consumerGroupPrefix + topic.Name
 		}
 		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
-			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+			logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
 
 	} else {
-		log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+		logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
 		return nil, errors.New("unknown-consumer-type")
 	}
 
@@ -452,13 +448,13 @@
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
-	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+	logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	var err error
 	if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
-		log.Errorw("failed-removing-channel", log.Fields{"error": err})
+		logger.Errorw("failed-removing-channel", log.Fields{"error": err})
 	}
 	if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
-		log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
+		logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
 	}
 	return err
 }
@@ -470,11 +466,11 @@
 	// events to the channel is rate-limited by livenessChannelInterval.
 	if sc.liveness != nil {
 		if sc.alive != alive {
-			log.Info("update-liveness-channel-because-change")
+			logger.Info("update-liveness-channel-because-change")
 			sc.liveness <- alive
 			sc.lastLivenessTime = time.Now()
 		} else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
-			log.Info("update-liveness-channel-because-interval")
+			logger.Info("update-liveness-channel-because-interval")
 			sc.liveness <- alive
 			sc.lastLivenessTime = time.Now()
 		}
@@ -482,7 +478,7 @@
 
 	// Only emit a log message when the state changes
 	if sc.alive != alive {
-		log.Info("set-client-alive", log.Fields{"alive": alive})
+		logger.Info("set-client-alive", log.Fields{"alive": alive})
 		sc.alive = alive
 	}
 }
@@ -491,7 +487,7 @@
 func (sc *SaramaClient) setUnhealthy() {
 	sc.healthy = false
 	if sc.healthiness != nil {
-		log.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+		logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
 		sc.healthiness <- sc.healthy
 	}
 }
@@ -511,35 +507,35 @@
 
 	switch err.Error() {
 	case context.DeadlineExceeded.Error():
-		log.Info("is-liveness-error-timeout")
+		logger.Info("is-liveness-error-timeout")
 		return true
 	case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
-		log.Info("is-liveness-error-no-brokers")
+		logger.Info("is-liveness-error-no-brokers")
 		return true
 	case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
-		log.Info("is-liveness-error-shutting-down")
+		logger.Info("is-liveness-error-shutting-down")
 		return true
 	case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
-		log.Info("is-liveness-error-not-available")
+		logger.Info("is-liveness-error-not-available")
 		return true
 	case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
-		log.Info("is-liveness-error-circuit-breaker-open")
+		logger.Info("is-liveness-error-circuit-breaker-open")
 		return true
 	}
 
 	if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
-		log.Info("is-liveness-error-connection-refused")
+		logger.Info("is-liveness-error-connection-refused")
 		return true
 	}
 
 	if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
-		log.Info("is-liveness-error-io-timeout")
+		logger.Info("is-liveness-error-io-timeout")
 		return true
 	}
 
 	// Other errors shouldn't trigger a loss of liveness
 
-	log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+	logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
 
 	return false
 }
@@ -552,7 +548,7 @@
 	var ok bool
 	// ascertain the value interface type is a proto.Message
 	if protoMsg, ok = msg.(proto.Message); !ok {
-		log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+		logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
 		return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
 	}
 
@@ -560,7 +556,7 @@
 	var err error
 	//	Create the Sarama producer message
 	if marshalled, err = proto.Marshal(protoMsg); err != nil {
-		log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+		logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
 		return err
 	}
 	key := ""
@@ -579,10 +575,10 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		log.Debugw("message-sent", log.Fields{"status": ok.Topic})
+		logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
 		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
-		log.Debugw("error-sending", log.Fields{"status": notOk})
+		logger.Debugw("error-sending", log.Fields{"status": notOk})
 		if sc.isLivenessError(notOk) {
 			sc.updateLiveness(false)
 		}
@@ -597,10 +593,10 @@
 // by the service (i.e. rw_core / ro_core) to update readiness status
 // and/or take other actions.
 func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
-	log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+	logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
 	if enable {
 		if sc.liveness == nil {
-			log.Info("kafka-create-liveness-channel")
+			logger.Info("kafka-create-liveness-channel")
 			// At least 1, so we can immediately post to it without blocking
 			// Setting a bigger number (10) allows the monitor to fall behind
 			// without blocking others. The monitor shouldn't really fall
@@ -621,10 +617,10 @@
 // if the kafka consumers die, or some other problem occurs which is
 // catastrophic that would require re-creating the client.
 func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
-	log.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+	logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
 	if enable {
 		if sc.healthiness == nil {
-			log.Info("kafka-create-healthiness-channel")
+			logger.Info("kafka-create-healthiness-channel")
 			// At least 1, so we can immediately post to it without blocking
 			// Setting a bigger number (10) allows the monitor to fall behind
 			// without blocking others. The monitor shouldn't really fall
@@ -659,10 +655,10 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+		logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
 		sc.updateLiveness(true)
 	case notOk := <-sc.producer.Errors():
-		log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+		logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
 		if sc.isLivenessError(notOk) {
 			sc.updateLiveness(false)
 		}
@@ -700,7 +696,7 @@
 	var cAdmin sarama.ClusterAdmin
 	var err error
 	if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
-		log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+		logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
 		return err
 	}
 	sc.cAdmin = cAdmin
@@ -760,7 +756,7 @@
 		consumerCh.channels = append(consumerCh.channels, ch)
 		return
 	}
-	log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+	logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
 }
 
 //closeConsumers closes a list of sarama consumers.  The consumers can either be a partition consumers or a group consumers
@@ -770,7 +766,7 @@
 		//	Is it a partition consumers?
 		if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
 			if errTemp := partionConsumer.Close(); errTemp != nil {
-				log.Debugw("partition!!!", log.Fields{"err": errTemp})
+				logger.Debugw("partition!!!", log.Fields{"err": errTemp})
 				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
 					// This can occur on race condition
 					err = nil
@@ -800,7 +796,7 @@
 		consumerCh.channels = removeChannel(consumerCh.channels, ch)
 		// If there are no more channels then we can close the consumers itself
 		if len(consumerCh.channels) == 0 {
-			log.Debugw("closing-consumers", log.Fields{"topic": topic})
+			logger.Debugw("closing-consumers", log.Fields{"topic": topic})
 			err := closeConsumers(consumerCh.consumers)
 			//err := consumerCh.consumers.Close()
 			delete(sc.topicToConsumerChannelMap, topic.Name)
@@ -808,7 +804,7 @@
 		}
 		return nil
 	}
-	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
 	return errors.New("topic-does-not-exist")
 }
 
@@ -829,7 +825,7 @@
 		delete(sc.topicToConsumerChannelMap, topic.Name)
 		return err
 	}
-	log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
 	return nil
 }
 
@@ -868,12 +864,12 @@
 	brokers := []string{kafkaFullAddr}
 
 	if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
-		log.Errorw("error-starting-publisher", log.Fields{"error": err})
+		logger.Errorw("error-starting-publisher", log.Fields{"error": err})
 		return err
 	} else {
 		sc.producer = producer
 	}
-	log.Info("Kafka-publisher-created")
+	logger.Info("Kafka-publisher-created")
 	return nil
 }
 
@@ -889,12 +885,12 @@
 	brokers := []string{kafkaFullAddr}
 
 	if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
-		log.Errorw("error-starting-consumers", log.Fields{"error": err})
+		logger.Errorw("error-starting-consumers", log.Fields{"error": err})
 		return err
 	} else {
 		sc.consumer = consumer
 	}
-	log.Info("Kafka-consumers-created")
+	logger.Info("Kafka-consumers-created")
 	return nil
 }
 
@@ -918,10 +914,10 @@
 	var err error
 
 	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
-		log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 
 	//sc.groupConsumers[topic.Name] = consumer
 	sc.addToGroupConsumers(topic.Name, consumer)
@@ -942,7 +938,7 @@
 }
 
 func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
-	log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+	logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
@@ -950,38 +946,38 @@
 			if ok {
 				if sc.isLivenessError(err) {
 					sc.updateLiveness(false)
-					log.Warnw("partition-consumers-error", log.Fields{"error": err})
+					logger.Warnw("partition-consumers-error", log.Fields{"error": err})
 				}
 			} else {
 				// Channel is closed
 				break startloop
 			}
 		case msg, ok := <-consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			//logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			if !ok {
 				// channel is closed
 				break startloop
 			}
 			msgBody := msg.Value
 			sc.updateLiveness(true)
-			log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+			logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				log.Warnw("partition-invalid-message", log.Fields{"error": err})
+				logger.Warnw("partition-invalid-message", log.Fields{"error": err})
 				continue
 			}
 			go sc.dispatchToConsumers(consumerChnls, icm)
 		case <-sc.doneCh:
-			log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
+			logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-	log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+	logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
 	sc.setUnhealthy()
 }
 
 func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
-	log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+	logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
 startloop:
 	for {
@@ -991,44 +987,44 @@
 				if sc.isLivenessError(err) {
 					sc.updateLiveness(false)
 				}
-				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+				logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
-				log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
+				logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
 				// channel is closed
 				break startloop
 			}
 		case msg, ok := <-consumer.Messages():
 			if !ok {
-				log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+				logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
 				// Channel closed
 				break startloop
 			}
 			sc.updateLiveness(true)
-			log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+			logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				log.Warnw("invalid-message", log.Fields{"error": err})
+				logger.Warnw("invalid-message", log.Fields{"error": err})
 				continue
 			}
 			go sc.dispatchToConsumers(consumerChnls, icm)
 			consumer.MarkOffset(msg, "")
 		case ntf := <-consumer.Notifications():
-			log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+			logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
 		case <-sc.doneCh:
-			log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
+			logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-	log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+	logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
 	sc.setUnhealthy()
 }
 
 func (sc *SaramaClient) startConsumers(topic *Topic) error {
-	log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
+	logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
 	var consumerCh *consumerChannels
 	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
-		log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
+		logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
 		return errors.New("consumers-not-exist")
 	}
 	// For each consumer listening for that topic, start a consumption loop
@@ -1038,7 +1034,7 @@
 		} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
 			go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
 		} else {
-			log.Errorw("invalid-consumer", log.Fields{"topic": topic})
+			logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
 			return errors.New("invalid-consumer")
 		}
 	}
@@ -1052,7 +1048,7 @@
 	var err error
 
 	if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
-		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+		logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 
@@ -1085,7 +1081,7 @@
 	var pConsumer *scc.Consumer
 	var err error
 	if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
-		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+		logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
@@ -1106,10 +1102,10 @@
 }
 
 func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
-	log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
+	logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
 	partitionList, err := sc.consumer.Partitions(topic.Name)
 	if err != nil {
-		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 
@@ -1117,7 +1113,7 @@
 	for _, partition := range partitionList {
 		var pConsumer sarama.PartitionConsumer
 		if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
-			log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+			logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
 		pConsumers = append(pConsumers, pConsumer)
@@ -1132,7 +1128,7 @@
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
 			close(channel)
-			log.Debug("channel-closed")
+			logger.Debug("channel-closed")
 			return channels[:len(channels)-1]
 		}
 	}
@@ -1154,7 +1150,7 @@
 		consumer := sc.groupConsumers[topic]
 		delete(sc.groupConsumers, topic)
 		if err := consumer.Close(); err != nil {
-			log.Errorw("failure-closing-consumer", log.Fields{"error": err})
+			logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
 			return err
 		}
 	}