This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/kafka/client.go b/kafka/client.go
index ad8f01a..b93ad86 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -33,7 +33,7 @@
 	DefaultProducerFlushFrequency   = 5
 	DefaultProducerFlushMessages    = 1
 	DefaultProducerFlushMaxmessages = 5
-	DefaultProducerReturnSuccess    = false
+	DefaultProducerReturnSuccess    = true
 	DefaultProducerReturnErrors     = true
 	DefaultProducerRetryMax         = 3
 	DefaultProducerRetryBackoff     = time.Millisecond * 100
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index e2210c4..ff3584f 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -589,7 +589,7 @@
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
 			key := GetDeviceIdFromTopic(*replyTopic)
-			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
+			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index f2de01a..8468e42 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -109,13 +109,25 @@
 	}
 }
 
-func ReturnOnErrors(opt bool) SaramaClientOption {
+func ProducerMaxRetries(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryMax = num
+	}
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryBackOff = duration
+	}
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.producerReturnErrors = opt
 	}
 }
 
-func ReturnOnSuccess(opt bool) SaramaClientOption {
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.producerReturnSuccess = opt
 	}
@@ -376,6 +388,16 @@
 
 	// Send message to kafka
 	sc.producer.Input() <- kafkaMsg
+
+	// Wait for result
+	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+	select {
+	case ok := <-sc.producer.Successes():
+		log.Debugw("message-sent", log.Fields{"status":ok})
+	case notOk := <-sc.producer.Errors():
+		log.Debugw("error-sending", log.Fields{"status":notOk})
+		return notOk
+	}
 	return nil
 }