This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index f2de01a..8468e42 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -109,13 +109,25 @@
}
}
-func ReturnOnErrors(opt bool) SaramaClientOption {
+func ProducerMaxRetries(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerRetryMax = num
+ }
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerRetryBackOff = duration
+ }
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
return func(args *SaramaClient) {
args.producerReturnErrors = opt
}
}
-func ReturnOnSuccess(opt bool) SaramaClientOption {
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
return func(args *SaramaClient) {
args.producerReturnSuccess = opt
}
@@ -376,6 +388,16 @@
// Send message to kafka
sc.producer.Input() <- kafkaMsg
+
+ // Wait for result
+ // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+ select {
+ case ok := <-sc.producer.Successes():
+ log.Debugw("message-sent", log.Fields{"status":ok})
+ case notOk := <-sc.producer.Errors():
+ log.Debugw("error-sending", log.Fields{"status":notOk})
+ return notOk
+ }
return nil
}