[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: Ibaf8681ccdbffcc8a3c68612c49d7822a20e1b14
diff --git a/topic-listener.go b/topic-listener.go
index 687e229..24d1ee0 100644
--- a/topic-listener.go
+++ b/topic-listener.go
@@ -15,6 +15,9 @@
 package main
 
 import (
+	"context"
+	"fmt"
+	"log"
 	"os"
 	"os/signal"
 	"sync"
@@ -23,30 +26,78 @@
 	"github.com/Shopify/sarama"
 )
 
-func topicListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
-	logger.Info("Starting topicListener for [%s]", *topic)
+// Consumer represents a Sarama consumer group consumer
+type Consumer struct {
+	HandleFunc func(topic *string, data []byte)
+}
+
+
+func topicListener(ctx context.Context, topics []string, consGrp sarama.ConsumerGroup, wg sync.WaitGroup) {
+	logger.Info("Starting topicListener for [%s]", topics)
 	defer wg.Done()
-	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
-	if err != nil {
-		logger.Error("topicListener panic, topic=[%s]: %s", *topic, err.Error())
-		return
+
+	/**
+	 * Setup a new Sarama consumer group
+	 */
+	consumer := Consumer{
+		HandleFunc: export,
 	}
-	signals := make(chan os.Signal, 1)
-	signal.Notify(signals, os.Interrupt)
-	doneCh := make(chan struct{})
+
 	go func() {
 		for {
-			select {
-			case err := <-consumer.Errors():
-				logger.Error("Consumer error: %s", err.Err)
-			case msg := <-consumer.Messages():
-				logger.Debug("Got message on topic=[%s]: %s", *topic, string(msg.Value))
-				export(topic, msg.Value)
-			case <-signals:
-				logger.Warn("Interrupt is detected")
-				doneCh <- struct{}{}
+			// `Consume` should be called inside an infinite loop, when a
+			// server-side rebalance happens, the consumer session will need to be
+			// recreated to get the new claims
+			if err := consGrp.Consume(ctx, topics, &consumer); err != nil {
+				log.Panicf("Error from consumer: %v", err)
 			}
+
 		}
 	}()
-	<-doneCh
+
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+
+	for {
+		select {
+		case <-ctx.Done():
+			logger.Warn("terminating: context cancelled")
+			return
+		case <-signals:
+			logger.Warn("Interrupt is detected")
+			return
+		}
+	}
+
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
+func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	// NOTE:
+	// Do not move the code below to a goroutine.
+	// The `ConsumeClaim` itself is called within a goroutine, see:
+	// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
+
+	if consumer.HandleFunc == nil {
+		logger.Error("No handler for consumer ")
+		return fmt.Errorf("no handler for consumer")
+	}
+
+	for message := range claim.Messages() {
+		topic := string(message.Topic)
+		consumer.HandleFunc(&topic, message.Value)
+		session.MarkMessage(message, "")
+	}
+
+	return nil
 }