[VOL-2471] Update library to use package logger
This commit consists of the following:
1) Add a GetLogLevel() API to make it easier to use specific
logger. There is also the V() API that kind of do something
similar.
2) Add a common.go file to some heavily used packages in order
to dynamically set their log level and also to a set a specific
logger per package.
3) Use a per package logger for some of the heavily used packages
for improved performance.
Change-Id: If22a2c82d87d808f305677a2e793f8064f33291e
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index c05df69..6bc2a49 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -31,10 +31,6 @@
"time"
)
-func init() {
- log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
type returnErrorFunction func() error
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
@@ -241,7 +237,7 @@
}
func (sc *SaramaClient) Start() error {
- log.Info("Starting-kafka-sarama-client")
+ logger.Info("Starting-kafka-sarama-client")
// Create the Done channel
sc.doneCh = make(chan int, 1)
@@ -257,20 +253,20 @@
// Create the Cluster Admin
if err = sc.createClusterAdmin(); err != nil {
- log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
return err
}
// Create the Publisher
if err := sc.createPublisher(); err != nil {
- log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
return err
}
if sc.consumerType == DefaultConsumerType {
// Create the master consumers
if err := sc.createConsumer(); err != nil {
- log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
return err
}
}
@@ -278,7 +274,7 @@
// Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
- log.Info("kafka-sarama-client-started")
+ logger.Info("kafka-sarama-client-started")
sc.started = true
@@ -286,7 +282,7 @@
}
func (sc *SaramaClient) Stop() {
- log.Info("stopping-sarama-client")
+ logger.Info("stopping-sarama-client")
sc.started = false
@@ -295,33 +291,33 @@
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
- log.Errorw("closing-producer-failed", log.Fields{"error": err})
+ logger.Errorw("closing-producer-failed", log.Fields{"error": err})
}
}
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
- log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
+ logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
}
}
for key, val := range sc.groupConsumers {
- log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+ logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
if err := val.Close(); err != nil {
- log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+ logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
}
}
if sc.cAdmin != nil {
if err := sc.cAdmin.Close(); err != nil {
- log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
+ logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
}
}
//TODO: Clear the consumers map
//sc.clearConsumerChannelMap()
- log.Info("sarama-client-stopped")
+ logger.Info("sarama-client-stopped")
}
//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
@@ -338,15 +334,15 @@
if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
if err == sarama.ErrTopicAlreadyExists {
// Not an error
- log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
return nil
}
- log.Errorw("create-topic-failure", log.Fields{"error": err})
+ logger.Errorw("create-topic-failure", log.Fields{"error": err})
return err
}
// TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
// do so.
- log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+ logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
return nil
}
@@ -368,16 +364,16 @@
if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
if err == sarama.ErrUnknownTopicOrPartition {
// Not an error as does not exist
- log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
return nil
}
- log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
return err
}
// Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
- log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
return err
}
return nil
@@ -389,11 +385,11 @@
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+ logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
- log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
ch := make(chan *ic.InterContainerMessage)
sc.addChannelToConsumerChannelMap(topic, ch)
@@ -408,12 +404,12 @@
if sc.consumerType == PartitionConsumer {
if sc.autoCreateTopic {
if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
- log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
}
if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
} else if sc.consumerType == GroupCustomer {
@@ -421,7 +417,7 @@
// does not consume from a precreated topic in some scenarios
//if sc.autoCreateTopic {
// if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
- // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
// return nil, err
// }
//}
@@ -435,12 +431,12 @@
groupId = sc.consumerGroupPrefix + topic.Name
}
if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
} else {
- log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+ logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
return nil, errors.New("unknown-consumer-type")
}
@@ -452,13 +448,13 @@
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+ logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
var err error
if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
- log.Errorw("failed-removing-channel", log.Fields{"error": err})
+ logger.Errorw("failed-removing-channel", log.Fields{"error": err})
}
if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
- log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
+ logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
}
return err
}
@@ -470,11 +466,11 @@
// events to the channel is rate-limited by livenessChannelInterval.
if sc.liveness != nil {
if sc.alive != alive {
- log.Info("update-liveness-channel-because-change")
+ logger.Info("update-liveness-channel-because-change")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
} else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
- log.Info("update-liveness-channel-because-interval")
+ logger.Info("update-liveness-channel-because-interval")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
}
@@ -482,7 +478,7 @@
// Only emit a log message when the state changes
if sc.alive != alive {
- log.Info("set-client-alive", log.Fields{"alive": alive})
+ logger.Info("set-client-alive", log.Fields{"alive": alive})
sc.alive = alive
}
}
@@ -491,7 +487,7 @@
func (sc *SaramaClient) setUnhealthy() {
sc.healthy = false
if sc.healthiness != nil {
- log.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+ logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
sc.healthiness <- sc.healthy
}
}
@@ -511,35 +507,35 @@
switch err.Error() {
case context.DeadlineExceeded.Error():
- log.Info("is-liveness-error-timeout")
+ logger.Info("is-liveness-error-timeout")
return true
case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
- log.Info("is-liveness-error-no-brokers")
+ logger.Info("is-liveness-error-no-brokers")
return true
case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
- log.Info("is-liveness-error-shutting-down")
+ logger.Info("is-liveness-error-shutting-down")
return true
case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
- log.Info("is-liveness-error-not-available")
+ logger.Info("is-liveness-error-not-available")
return true
case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
- log.Info("is-liveness-error-circuit-breaker-open")
+ logger.Info("is-liveness-error-circuit-breaker-open")
return true
}
if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
- log.Info("is-liveness-error-connection-refused")
+ logger.Info("is-liveness-error-connection-refused")
return true
}
if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
- log.Info("is-liveness-error-io-timeout")
+ logger.Info("is-liveness-error-io-timeout")
return true
}
// Other errors shouldn't trigger a loss of liveness
- log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+ logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
return false
}
@@ -552,7 +548,7 @@
var ok bool
// ascertain the value interface type is a proto.Message
if protoMsg, ok = msg.(proto.Message); !ok {
- log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+ logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
}
@@ -560,7 +556,7 @@
var err error
// Create the Sarama producer message
if marshalled, err = proto.Marshal(protoMsg); err != nil {
- log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+ logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
return err
}
key := ""
@@ -579,10 +575,10 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("message-sent", log.Fields{"status": ok.Topic})
+ logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
- log.Debugw("error-sending", log.Fields{"status": notOk})
+ logger.Debugw("error-sending", log.Fields{"status": notOk})
if sc.isLivenessError(notOk) {
sc.updateLiveness(false)
}
@@ -597,10 +593,10 @@
// by the service (i.e. rw_core / ro_core) to update readiness status
// and/or take other actions.
func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
- log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+ logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
if enable {
if sc.liveness == nil {
- log.Info("kafka-create-liveness-channel")
+ logger.Info("kafka-create-liveness-channel")
// At least 1, so we can immediately post to it without blocking
// Setting a bigger number (10) allows the monitor to fall behind
// without blocking others. The monitor shouldn't really fall
@@ -621,10 +617,10 @@
// if the kafka consumers die, or some other problem occurs which is
// catastrophic that would require re-creating the client.
func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
- log.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+ logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
if enable {
if sc.healthiness == nil {
- log.Info("kafka-create-healthiness-channel")
+ logger.Info("kafka-create-healthiness-channel")
// At least 1, so we can immediately post to it without blocking
// Setting a bigger number (10) allows the monitor to fall behind
// without blocking others. The monitor shouldn't really fall
@@ -659,10 +655,10 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+ logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
- log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+ logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
if sc.isLivenessError(notOk) {
sc.updateLiveness(false)
}
@@ -700,7 +696,7 @@
var cAdmin sarama.ClusterAdmin
var err error
if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
- log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+ logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
return err
}
sc.cAdmin = cAdmin
@@ -760,7 +756,7 @@
consumerCh.channels = append(consumerCh.channels, ch)
return
}
- log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+ logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
}
//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
@@ -770,7 +766,7 @@
// Is it a partition consumers?
if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
if errTemp := partionConsumer.Close(); errTemp != nil {
- log.Debugw("partition!!!", log.Fields{"err": errTemp})
+ logger.Debugw("partition!!!", log.Fields{"err": errTemp})
if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
// This can occur on race condition
err = nil
@@ -800,7 +796,7 @@
consumerCh.channels = removeChannel(consumerCh.channels, ch)
// If there are no more channels then we can close the consumers itself
if len(consumerCh.channels) == 0 {
- log.Debugw("closing-consumers", log.Fields{"topic": topic})
+ logger.Debugw("closing-consumers", log.Fields{"topic": topic})
err := closeConsumers(consumerCh.consumers)
//err := consumerCh.consumers.Close()
delete(sc.topicToConsumerChannelMap, topic.Name)
@@ -808,7 +804,7 @@
}
return nil
}
- log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+ logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
return errors.New("topic-does-not-exist")
}
@@ -829,7 +825,7 @@
delete(sc.topicToConsumerChannelMap, topic.Name)
return err
}
- log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
return nil
}
@@ -868,12 +864,12 @@
brokers := []string{kafkaFullAddr}
if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
- log.Errorw("error-starting-publisher", log.Fields{"error": err})
+ logger.Errorw("error-starting-publisher", log.Fields{"error": err})
return err
} else {
sc.producer = producer
}
- log.Info("Kafka-publisher-created")
+ logger.Info("Kafka-publisher-created")
return nil
}
@@ -889,12 +885,12 @@
brokers := []string{kafkaFullAddr}
if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
- log.Errorw("error-starting-consumers", log.Fields{"error": err})
+ logger.Errorw("error-starting-consumers", log.Fields{"error": err})
return err
} else {
sc.consumer = consumer
}
- log.Info("Kafka-consumers-created")
+ logger.Info("Kafka-consumers-created")
return nil
}
@@ -918,10 +914,10 @@
var err error
if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
- log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//sc.groupConsumers[topic.Name] = consumer
sc.addToGroupConsumers(topic.Name, consumer)
@@ -942,7 +938,7 @@
}
func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
- log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+ logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
select {
@@ -950,38 +946,38 @@
if ok {
if sc.isLivenessError(err) {
sc.updateLiveness(false)
- log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ logger.Warnw("partition-consumers-error", log.Fields{"error": err})
}
} else {
// Channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
- //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+ //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
if !ok {
// channel is closed
break startloop
}
msgBody := msg.Value
sc.updateLiveness(true)
- log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
- log.Warnw("partition-invalid-message", log.Fields{"error": err})
+ logger.Warnw("partition-invalid-message", log.Fields{"error": err})
continue
}
go sc.dispatchToConsumers(consumerChnls, icm)
case <-sc.doneCh:
- log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
+ logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+ logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
sc.setUnhealthy()
}
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
- log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+ logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
@@ -991,44 +987,44 @@
if sc.isLivenessError(err) {
sc.updateLiveness(false)
}
- log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+ logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
- log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
+ logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
// channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
if !ok {
- log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+ logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
// Channel closed
break startloop
}
sc.updateLiveness(true)
- log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
- log.Warnw("invalid-message", log.Fields{"error": err})
+ logger.Warnw("invalid-message", log.Fields{"error": err})
continue
}
go sc.dispatchToConsumers(consumerChnls, icm)
consumer.MarkOffset(msg, "")
case ntf := <-consumer.Notifications():
- log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+ logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
case <-sc.doneCh:
- log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
+ logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+ logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
sc.setUnhealthy()
}
func (sc *SaramaClient) startConsumers(topic *Topic) error {
- log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
+ logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
var consumerCh *consumerChannels
if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
- log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
+ logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
return errors.New("consumers-not-exist")
}
// For each consumer listening for that topic, start a consumption loop
@@ -1038,7 +1034,7 @@
} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
} else {
- log.Errorw("invalid-consumer", log.Fields{"topic": topic})
+ logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
return errors.New("invalid-consumer")
}
}
@@ -1052,7 +1048,7 @@
var err error
if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
- log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -1085,7 +1081,7 @@
var pConsumer *scc.Consumer
var err error
if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
- log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
@@ -1106,10 +1102,10 @@
}
func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
- log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
+ logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
partitionList, err := sc.consumer.Partitions(topic.Name)
if err != nil {
- log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -1117,7 +1113,7 @@
for _, partition := range partitionList {
var pConsumer sarama.PartitionConsumer
if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
- log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
pConsumers = append(pConsumers, pConsumer)
@@ -1132,7 +1128,7 @@
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
close(channel)
- log.Debug("channel-closed")
+ logger.Debug("channel-closed")
return channels[:len(channels)-1]
}
}
@@ -1154,7 +1150,7 @@
consumer := sc.groupConsumers[topic]
delete(sc.groupConsumers, topic)
if err := consumer.Close(); err != nil {
- log.Errorw("failure-closing-consumer", log.Fields{"error": err})
+ logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
return err
}
}