Revert "[VOL-3069]Pass Context in methods which are performing logging and need the context"
This reverts commit 3c425fbeabed17ec8dad437678b4d105deaf2fbe.
Reason for revert: Merging higher-priority patches first.
Change-Id: Iaa03a5977357dcd86de358d76e90cc54cd6b1fa5
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 87c7ce4..581cf49 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -231,8 +231,8 @@
return client
}
-func (sc *SaramaClient) Start(ctx context.Context) error {
- logger.Info(ctx, "Starting-kafka-sarama-client")
+func (sc *SaramaClient) Start() error {
+ logger.Info("Starting-kafka-sarama-client")
// Create the Done channel
sc.doneCh = make(chan int, 1)
@@ -242,26 +242,26 @@
// Add a cleanup in case of failure to startup
defer func() {
if err != nil {
- sc.Stop(ctx)
+ sc.Stop()
}
}()
// Create the Cluster Admin
- if err = sc.createClusterAdmin(ctx); err != nil {
- logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
+ if err = sc.createClusterAdmin(); err != nil {
+ logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
return err
}
// Create the Publisher
- if err := sc.createPublisher(ctx); err != nil {
- logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
+ if err := sc.createPublisher(); err != nil {
+ logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
return err
}
if sc.consumerType == DefaultConsumerType {
// Create the master consumers
- if err := sc.createConsumer(ctx); err != nil {
- logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
+ if err := sc.createConsumer(); err != nil {
+ logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
return err
}
}
@@ -269,15 +269,15 @@
// Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
- logger.Info(ctx, "kafka-sarama-client-started")
+ logger.Info("kafka-sarama-client-started")
sc.started = true
return nil
}
-func (sc *SaramaClient) Stop(ctx context.Context) {
- logger.Info(ctx, "stopping-sarama-client")
+func (sc *SaramaClient) Stop() {
+ logger.Info("stopping-sarama-client")
sc.started = false
@@ -286,38 +286,38 @@
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
- logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
+ logger.Errorw("closing-producer-failed", log.Fields{"error": err})
}
}
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
- logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
+ logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
}
}
for key, val := range sc.groupConsumers {
- logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
+ logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
if err := val.Close(); err != nil {
- logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+ logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
}
}
if sc.cAdmin != nil {
if err := sc.cAdmin.Close(); err != nil {
- logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
+ logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
}
}
//TODO: Clear the consumers map
//sc.clearConsumerChannelMap()
- logger.Info(ctx, "sarama-client-stopped")
+ logger.Info("sarama-client-stopped")
}
//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
// the invoking function must hold the lock
-func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
// Set the topic details
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(numPartition)
@@ -329,29 +329,29 @@
if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
if err == sarama.ErrTopicAlreadyExists {
// Not an error
- logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
return nil
}
- logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+ logger.Errorw("create-topic-failure", log.Fields{"error": err})
return err
}
// TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
// do so.
- logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+ logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
return nil
}
//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
// ensure no two go routines are performing operations on the same topic
-func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- return sc.createTopic(ctx, topic, numPartition, repFactor)
+ return sc.createTopic(topic, numPartition, repFactor)
}
//DeleteTopic removes a topic from the kafka Broker
-func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
+func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -359,16 +359,16 @@
if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
if err == sarama.ErrUnknownTopicOrPartition {
// Not an error as does not exist
- logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
return nil
}
- logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
return err
}
// Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
- if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
- logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+ if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
+ logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
return err
}
return nil
@@ -376,18 +376,18 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
+ logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
- logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
ch := make(chan *ic.InterContainerMessage)
- sc.addChannelToConsumerChannelMap(ctx, topic, ch)
+ sc.addChannelToConsumerChannelMap(topic, ch)
return ch, nil
}
@@ -398,13 +398,13 @@
// Use the consumerType option to figure out the type of consumer to launch
if sc.consumerType == PartitionConsumer {
if sc.autoCreateTopic {
- if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
- logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
}
- if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
- logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
+ logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
} else if sc.consumerType == GroupCustomer {
@@ -412,7 +412,7 @@
// does not consume from a precreated topic in some scenarios
//if sc.autoCreateTopic {
// if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
- // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
+ // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
// return nil, err
// }
//}
@@ -425,13 +425,13 @@
// Need to use a unique group Id per topic
groupId = sc.consumerGroupPrefix + topic.Name
}
- if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
- logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
+ logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
} else {
- logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+ logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
return nil, errors.New("unknown-consumer-type")
}
@@ -439,37 +439,37 @@
}
//UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+ logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
var err error
- if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
- logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
+ if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
+ logger.Errorw("failed-removing-channel", log.Fields{"error": err})
}
- if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
- logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
+ if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
+ logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
}
return err
}
-func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
sc.metadataCallback = callback
}
-func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
+func (sc *SaramaClient) updateLiveness(alive bool) {
// Post a consistent stream of liveness data to the channel,
// so that in a live state, the core does not timeout and
// send a forced liveness message. Production of liveness
// events to the channel is rate-limited by livenessChannelInterval.
if sc.liveness != nil {
if sc.alive != alive {
- logger.Info(ctx, "update-liveness-channel-because-change")
+ logger.Info("update-liveness-channel-because-change")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
} else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
- logger.Info(ctx, "update-liveness-channel-because-interval")
+ logger.Info("update-liveness-channel-because-interval")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
}
@@ -477,21 +477,21 @@
// Only emit a log message when the state changes
if sc.alive != alive {
- logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
+ logger.Info("set-client-alive", log.Fields{"alive": alive})
sc.alive = alive
}
}
// Once unhealthy, we never go back
-func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
+func (sc *SaramaClient) setUnhealthy() {
sc.healthy = false
if sc.healthiness != nil {
- logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+ logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
sc.healthiness <- sc.healthy
}
}
-func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
+func (sc *SaramaClient) isLivenessError(err error) bool {
// Sarama producers and consumers encapsulate the error inside
// a ProducerError or ConsumerError struct.
if prodError, ok := err.(*sarama.ProducerError); ok {
@@ -506,48 +506,48 @@
switch err.Error() {
case context.DeadlineExceeded.Error():
- logger.Info(ctx, "is-liveness-error-timeout")
+ logger.Info("is-liveness-error-timeout")
return true
case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
- logger.Info(ctx, "is-liveness-error-no-brokers")
+ logger.Info("is-liveness-error-no-brokers")
return true
case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
- logger.Info(ctx, "is-liveness-error-shutting-down")
+ logger.Info("is-liveness-error-shutting-down")
return true
case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
- logger.Info(ctx, "is-liveness-error-not-available")
+ logger.Info("is-liveness-error-not-available")
return true
case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
- logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
+ logger.Info("is-liveness-error-circuit-breaker-open")
return true
}
if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
- logger.Info(ctx, "is-liveness-error-connection-refused")
+ logger.Info("is-liveness-error-connection-refused")
return true
}
if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
- logger.Info(ctx, "is-liveness-error-io-timeout")
+ logger.Info("is-liveness-error-io-timeout")
return true
}
// Other errors shouldn't trigger a loss of liveness
- logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
+ logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
return false
}
// send formats and sends the request onto the kafka messaging bus.
-func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
+func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
// Assert message is a proto message
var protoMsg proto.Message
var ok bool
// ascertain the value interface type is a proto.Message
if protoMsg, ok = msg.(proto.Message); !ok {
- logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
+ logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
return fmt.Errorf("not-a-proto-msg-%s", msg)
}
@@ -555,7 +555,7 @@
var err error
// Create the Sarama producer message
if marshalled, err = proto.Marshal(protoMsg); err != nil {
- logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+ logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
return err
}
key := ""
@@ -574,12 +574,12 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
- sc.updateLiveness(ctx, true)
+ logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
- logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
- if sc.isLivenessError(ctx, notOk) {
- sc.updateLiveness(ctx, false)
+ logger.Debugw("error-sending", log.Fields{"status": notOk})
+ if sc.isLivenessError(notOk) {
+ sc.updateLiveness(false)
}
return notOk
}
@@ -591,11 +591,11 @@
// or not the channel is still live. This channel is then picked up
// by the service (i.e. rw_core / ro_core) to update readiness status
// and/or take other actions.
-func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
- logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
+ logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
if enable {
if sc.liveness == nil {
- logger.Info(ctx, "kafka-create-liveness-channel")
+ logger.Info("kafka-create-liveness-channel")
// At least 1, so we can immediately post to it without blocking
// Setting a bigger number (10) allows the monitor to fall behind
// without blocking others. The monitor shouldn't really fall
@@ -615,11 +615,11 @@
// Enable the Healthiness monitor channel. This channel will report "false"
// if the kafka consumers die, or some other problem occurs which is
// catastrophic that would require re-creating the client.
-func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
- logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
+ logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
if enable {
if sc.healthiness == nil {
- logger.Info(ctx, "kafka-create-healthiness-channel")
+ logger.Info("kafka-create-healthiness-channel")
// At least 1, so we can immediately post to it without blocking
// Setting a bigger number (10) allows the monitor to fall behind
// without blocking others. The monitor shouldn't really fall
@@ -638,7 +638,7 @@
// send an empty message on the liveness channel to check whether connectivity has
// been restored.
-func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
+func (sc *SaramaClient) SendLiveness() error {
if !sc.started {
return fmt.Errorf("SendLiveness() called while not started")
}
@@ -654,12 +654,12 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
- sc.updateLiveness(ctx, true)
+ logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
- logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
- if sc.isLivenessError(ctx, notOk) {
- sc.updateLiveness(ctx, false)
+ logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+ if sc.isLivenessError(notOk) {
+ sc.updateLiveness(false)
}
return notOk
}
@@ -686,7 +686,7 @@
return sarama.OffsetNewest
}
-func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
+func (sc *SaramaClient) createClusterAdmin() error {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
@@ -694,7 +694,7 @@
var cAdmin sarama.ClusterAdmin
var err error
if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
- logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+ logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
return err
}
sc.cAdmin = cAdmin
@@ -739,24 +739,24 @@
return nil
}
-func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
consumerCh.channels = append(consumerCh.channels, ch)
return
}
- logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+ logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
}
//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
-func closeConsumers(ctx context.Context, consumers []interface{}) error {
+func closeConsumers(consumers []interface{}) error {
var err error
for _, consumer := range consumers {
// Is it a partition consumers?
if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
if errTemp := partionConsumer.Close(); errTemp != nil {
- logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
+ logger.Debugw("partition!!!", log.Fields{"err": errTemp})
if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
// This can occur on race condition
err = nil
@@ -778,35 +778,35 @@
return err
}
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
// Channel will be closed in the removeChannel method
- consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
+ consumerCh.channels = removeChannel(consumerCh.channels, ch)
// If there are no more channels then we can close the consumers itself
if len(consumerCh.channels) == 0 {
- logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
- err := closeConsumers(ctx, consumerCh.consumers)
+ logger.Debugw("closing-consumers", log.Fields{"topic": topic})
+ err := closeConsumers(consumerCh.consumers)
//err := consumerCh.consumers.Close()
delete(sc.topicToConsumerChannelMap, topic.Name)
return err
}
return nil
}
- logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
+ logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
return errors.New("topic-does-not-exist")
}
-func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
+func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
for _, ch := range consumerCh.channels {
// Channel will be closed in the removeChannel method
- removeChannel(ctx, consumerCh.channels, ch)
+ removeChannel(consumerCh.channels, ch)
}
- err := closeConsumers(ctx, consumerCh.consumers)
+ err := closeConsumers(consumerCh.consumers)
//if err == sarama.ErrUnknownTopicOrPartition {
// // Not an error
// err = nil
@@ -815,12 +815,12 @@
delete(sc.topicToConsumerChannelMap, topic.Name)
return err
}
- logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
return nil
}
//createPublisher creates the publisher which is used to send a message onto kafka
-func (sc *SaramaClient) createPublisher(ctx context.Context) error {
+func (sc *SaramaClient) createPublisher() error {
// This Creates the publisher
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
@@ -835,16 +835,16 @@
brokers := []string{sc.KafkaAddress}
if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
- logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
+ logger.Errorw("error-starting-publisher", log.Fields{"error": err})
return err
} else {
sc.producer = producer
}
- logger.Info(ctx, "Kafka-publisher-created")
+ logger.Info("Kafka-publisher-created")
return nil
}
-func (sc *SaramaClient) createConsumer(ctx context.Context) error {
+func (sc *SaramaClient) createConsumer() error {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Fetch.Min = 1
@@ -855,17 +855,17 @@
brokers := []string{sc.KafkaAddress}
if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
- logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
+ logger.Errorw("error-starting-consumers", log.Fields{"error": err})
return err
} else {
sc.consumer = consumer
}
- logger.Info(ctx, "Kafka-consumers-created")
+ logger.Info("Kafka-consumers-created")
return nil
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
@@ -883,10 +883,10 @@
var err error
if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
- logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//sc.groupConsumers[topic.Name] = consumer
sc.addToGroupConsumers(topic.Name, consumer)
@@ -911,104 +911,104 @@
}
}
-func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
- logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
+ logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
select {
case err, ok := <-consumer.Errors():
if ok {
- if sc.isLivenessError(ctx, err) {
- sc.updateLiveness(ctx, false)
- logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
+ logger.Warnw("partition-consumers-error", log.Fields{"error": err})
}
} else {
// Channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
- //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
+ //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
if !ok {
// channel is closed
break startloop
}
msgBody := msg.Value
- sc.updateLiveness(ctx, true)
- logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ sc.updateLiveness(true)
+ logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
- logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
+ logger.Warnw("partition-invalid-message", log.Fields{"error": err})
continue
}
go sc.dispatchToConsumers(consumerChnls, icm)
case <-sc.doneCh:
- logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
+ logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
- sc.setUnhealthy(ctx)
+ logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy()
}
-func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
- logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+ logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
select {
case err, ok := <-consumer.Errors():
if ok {
- if sc.isLivenessError(ctx, err) {
- sc.updateLiveness(ctx, false)
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
}
- logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+ logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
- logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
+ logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
// channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
if !ok {
- logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+ logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
// Channel closed
break startloop
}
- sc.updateLiveness(ctx, true)
- logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ sc.updateLiveness(true)
+ logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
- logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
+ logger.Warnw("invalid-message", log.Fields{"error": err})
continue
}
go sc.dispatchToConsumers(consumerChnls, icm)
consumer.MarkOffset(msg, "")
case ntf := <-consumer.Notifications():
- logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
+ logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
case <-sc.doneCh:
- logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
+ logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
- sc.setUnhealthy(ctx)
+ logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy()
}
-func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
- logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) startConsumers(topic *Topic) error {
+ logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
var consumerCh *consumerChannels
if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
- logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
+ logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
return errors.New("consumers-not-exist")
}
// For each consumer listening for that topic, start a consumption loop
for _, consumer := range consumerCh.consumers {
if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
- go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
+ go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
- go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
+ go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
} else {
- logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
+ logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
return errors.New("invalid-consumer")
}
}
@@ -1017,12 +1017,12 @@
//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
//// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
var pConsumers []sarama.PartitionConsumer
var err error
- if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
- logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
+ logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -1044,8 +1044,8 @@
//Start a consumers to listen on that specific topic
go func() {
- if err := sc.startConsumers(ctx, topic); err != nil {
- logger.Errorw(ctx, "start-consumers-failed", log.Fields{
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
"topic": topic,
"error": err})
}
@@ -1056,12 +1056,12 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
- if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
- logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
+ logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
@@ -1077,8 +1077,8 @@
//Start a consumers to listen on that specific topic
go func() {
- if err := sc.startConsumers(ctx, topic); err != nil {
- logger.Errorw(ctx, "start-consumers-failed", log.Fields{
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
"topic": topic,
"error": err})
}
@@ -1087,11 +1087,11 @@
return consumerListeningChannel, nil
}
-func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
- logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
+func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+ logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
partitionList, err := sc.consumer.Partitions(topic.Name)
if err != nil {
- logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -1099,7 +1099,7 @@
for _, partition := range partitionList {
var pConsumer sarama.PartitionConsumer
if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
- logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
pConsumers = append(pConsumers, pConsumer)
@@ -1107,14 +1107,14 @@
return pConsumers, nil
}
-func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
var i int
var channel chan *ic.InterContainerMessage
for i, channel = range channels {
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
close(channel)
- logger.Debug(ctx, "channel-closed")
+ logger.Debug("channel-closed")
return channels[:len(channels)-1]
}
}
@@ -1129,14 +1129,14 @@
}
}
-func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
+func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
if _, exist := sc.groupConsumers[topic]; exist {
consumer := sc.groupConsumers[topic]
delete(sc.groupConsumers, topic)
if err := consumer.Close(); err != nil {
- logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
+ logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
return err
}
}