Fix an exception in the kafka client. Update gitignore to ignore
only proto files generated by Voltha.

Change-Id: I54b1b5e428a918ab20e564b55176942d0bc0facd
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index e669940..35ede44 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -52,6 +52,7 @@
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
 	groupConsumers                map[string]*scc.Consumer
+	lockOfGroupConsumers            sync.RWMutex
 	consumerGroupPrefix           string
 	consumerType                  int
 	consumerGroupName             string
@@ -206,6 +207,7 @@
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
 	client.topicLockMap = make(map[string]*sync.RWMutex)
 	client.lockOfTopicLockMap = sync.RWMutex{}
+	client.lockOfGroupConsumers = sync.RWMutex{}
 	return client
 }
 
@@ -411,7 +413,13 @@
 	defer sc.unLockTopic(topic)
 
 	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
-	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
+	var err error
+	if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
+		log.Errorw("failed-removing-channel", log.Fields{"error": err})
+	}
+	if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
+		log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
+	}
 	return err
 }
 
@@ -698,9 +706,9 @@
 		return nil, err
 	}
 	log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
-	//time.Sleep(10*time.Second)
-	//sc.groupConsumer = consumer
-	sc.groupConsumers[topic.Name] = consumer
+
+	//sc.groupConsumers[topic.Name] = consumer
+	sc.addToGroupConsumers(topic.Name, consumer)
 	return consumer, nil
 }
 
@@ -814,7 +822,7 @@
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
-	if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
+	if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
 		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
@@ -868,7 +876,7 @@
 	return consumerListeningChannel, nil
 }
 
-func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
 	log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
 	partitionList, err := sc.consumer.Partitions(topic.Name)
 	if err != nil {
@@ -901,3 +909,26 @@
 	}
 	return channels
 }
+
+
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+	sc.lockOfGroupConsumers.Lock()
+	defer sc.lockOfGroupConsumers.Unlock()
+	if _, exist := sc.groupConsumers[topic]; !exist {
+		sc.groupConsumers[topic] = consumer
+	}
+}
+
+func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
+	sc.lockOfGroupConsumers.Lock()
+	defer sc.lockOfGroupConsumers.Unlock()
+	if _, exist := sc.groupConsumers[topic]; exist {
+		consumer := sc.groupConsumers[topic]
+		delete(sc.groupConsumers, topic)
+		if err := consumer.Close(); err!= nil {
+			log.Errorw("failure-closing-consumer", log.Fields{"error": err})
+			return err
+		}
+	}
+	return nil
+}