This commit adds a complete partition consumer as well as a
group consumer to the sarama client library.  It also upgrades
the kafka running version.

Change-Id: Idca3eb1aa31d668afa86d12b39d6a1b0ab1965bc
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 25fc1b7..e2210c4 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -70,12 +70,12 @@
 	lockTopicRequestHandlerChannelMap sync.RWMutex
 
 	// This map is used to map a channel to a response topic.   This channel handles all responses on that
-	// channel for that topic and forward them to the appropriate consumer channel, using the
+	// channel for that topic and forward them to the appropriate consumers channel, using the
 	// transactionIdToChannelMap.
 	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
 	lockTopicResponseChannelMap sync.RWMutex
 
-	// This map is used to map a transaction to a consumer channel.  This is used whenever a request has been
+	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
 	// sent out and we are waiting for a response.
 	transactionIdToChannelMap     map[string]*transactionChannel
 	lockTransactionIdToChannelMap sync.RWMutex
@@ -143,7 +143,7 @@
 	kp.doneCh = make(chan int, 1)
 
 	// Start the kafka client
-	if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+	if err := kp.kafkaClient.Start(); err != nil {
 		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
@@ -260,7 +260,7 @@
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
@@ -279,7 +279,7 @@
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -294,7 +294,7 @@
 	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
 }
 
-// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
 func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
@@ -616,7 +616,7 @@
 }
 
 // waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumer
+// and then dispatches to the consumers
 func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
 	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
@@ -649,7 +649,7 @@
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
 		var subscribedCh <-chan *ca.InterContainerMessage
 		var err error
-		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
 			return nil, err
 		}
@@ -657,7 +657,7 @@
 		go kp.waitForResponseLoop(subscribedCh, &topic)
 	}
 
-	// Create a specific channel for this consumer.  We cannot use the channel from the kafkaclient as it will
+	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ca.InterContainerMessage)
 	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)