Fix an exception in the kafka client. Update gitignore to ignore
only proto files generated by Voltha.
Change-Id: I54b1b5e428a918ab20e564b55176942d0bc0facd
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index e669940..35ede44 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -52,6 +52,7 @@
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumers map[string]*scc.Consumer
+ lockOfGroupConsumers sync.RWMutex
consumerGroupPrefix string
consumerType int
consumerGroupName string
@@ -206,6 +207,7 @@
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
client.topicLockMap = make(map[string]*sync.RWMutex)
client.lockOfTopicLockMap = sync.RWMutex{}
+ client.lockOfGroupConsumers = sync.RWMutex{}
return client
}
@@ -411,7 +413,13 @@
defer sc.unLockTopic(topic)
log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
- err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
+ var err error
+ if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
+ log.Errorw("failed-removing-channel", log.Fields{"error": err})
+ }
+ if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
+ log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
+ }
return err
}
@@ -698,9 +706,9 @@
return nil, err
}
log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
- //time.Sleep(10*time.Second)
- //sc.groupConsumer = consumer
- sc.groupConsumers[topic.Name] = consumer
+
+ //sc.groupConsumers[topic.Name] = consumer
+ sc.addToGroupConsumers(topic.Name, consumer)
return consumer, nil
}
@@ -814,7 +822,7 @@
var pConsumers []sarama.PartitionConsumer
var err error
- if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
+ if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -868,7 +876,7 @@
return consumerListeningChannel, nil
}
-func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
partitionList, err := sc.consumer.Partitions(topic.Name)
if err != nil {
@@ -901,3 +909,26 @@
}
return channels
}
+
+
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+ sc.lockOfGroupConsumers.Lock()
+ defer sc.lockOfGroupConsumers.Unlock()
+ if _, exist := sc.groupConsumers[topic]; !exist {
+ sc.groupConsumers[topic] = consumer
+ }
+}
+
+func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
+ sc.lockOfGroupConsumers.Lock()
+ defer sc.lockOfGroupConsumers.Unlock()
+ if _, exist := sc.groupConsumers[topic]; exist {
+ consumer := sc.groupConsumers[topic]
+ delete(sc.groupConsumers, topic)
+ if err := consumer.Close(); err!= nil {
+ log.Errorw("failure-closing-consumer", log.Fields{"error": err})
+ return err
+ }
+ }
+ return nil
+}