This commit adds a complete partition consumer as well as a
group consumer to the sarama client library. It also upgrades
the kafka running version.
Change-Id: Idca3eb1aa31d668afa86d12b39d6a1b0ab1965bc
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 25fc1b7..e2210c4 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -70,12 +70,12 @@
lockTopicRequestHandlerChannelMap sync.RWMutex
// This map is used to map a channel to a response topic. This channel handles all responses on that
- // channel for that topic and forward them to the appropriate consumer channel, using the
+ // channel for that topic and forward them to the appropriate consumers channel, using the
// transactionIdToChannelMap.
topicToResponseChannelMap map[string]<-chan *ca.InterContainerMessage
lockTopicResponseChannelMap sync.RWMutex
- // This map is used to map a transaction to a consumer channel. This is used whenever a request has been
+ // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
// sent out and we are waiting for a response.
transactionIdToChannelMap map[string]*transactionChannel
lockTransactionIdToChannelMap sync.RWMutex
@@ -143,7 +143,7 @@
kp.doneCh = make(chan int, 1)
// Start the kafka client
- if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+ if err := kp.kafkaClient.Start(); err != nil {
log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
return err
}
@@ -260,7 +260,7 @@
// Subscribe to receive messages for that topic
var ch <-chan *ca.InterContainerMessage
var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+ if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
//if ch, err = kp.Subscribe(topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
}
@@ -279,7 +279,7 @@
// Subscribe to receive messages for that topic
var ch <-chan *ca.InterContainerMessage
var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+ if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
}
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -294,7 +294,7 @@
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
-// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
kp.lockTopicResponseChannelMap.Lock()
@@ -616,7 +616,7 @@
}
// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumer
+// and then dispatches to the consumers
func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
startloop:
@@ -649,7 +649,7 @@
if !kp.isTopicSubscribedForResponse(topic.Name) {
var subscribedCh <-chan *ca.InterContainerMessage
var err error
- if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+ if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
return nil, err
}
@@ -657,7 +657,7 @@
go kp.waitForResponseLoop(subscribedCh, &topic)
}
- // Create a specific channel for this consumer. We cannot use the channel from the kafkaclient as it will
+ // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
ch := make(chan *ca.InterContainerMessage)
kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)