This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index f2de01a..8468e42 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -109,13 +109,25 @@
 	}
 }
 
-func ReturnOnErrors(opt bool) SaramaClientOption {
+func ProducerMaxRetries(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryMax = num
+	}
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryBackOff = duration
+	}
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.producerReturnErrors = opt
 	}
 }
 
-func ReturnOnSuccess(opt bool) SaramaClientOption {
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.producerReturnSuccess = opt
 	}
@@ -376,6 +388,16 @@
 
 	// Send message to kafka
 	sc.producer.Input() <- kafkaMsg
+
+	// Wait for result
+	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+	select {
+	case ok := <-sc.producer.Successes():
+		log.Debugw("message-sent", log.Fields{"status":ok})
+	case notOk := <-sc.producer.Errors():
+		log.Debugw("error-sending", log.Fields{"status":notOk})
+		return notOk
+	}
 	return nil
 }