[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: Ibaf8681ccdbffcc8a3c68612c49d7822a20e1b14
diff --git a/topic-listener.go b/topic-listener.go
index 687e229..24d1ee0 100644
--- a/topic-listener.go
+++ b/topic-listener.go
@@ -15,6 +15,9 @@
package main
import (
+ "context"
+ "fmt"
+ "log"
"os"
"os/signal"
"sync"
@@ -23,30 +26,78 @@
"github.com/Shopify/sarama"
)
-func topicListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
- logger.Info("Starting topicListener for [%s]", *topic)
+// Consumer represents a Sarama consumer group consumer
+type Consumer struct {
+ HandleFunc func(topic *string, data []byte)
+}
+
+
+func topicListener(ctx context.Context, topics []string, consGrp sarama.ConsumerGroup, wg sync.WaitGroup) {
+ logger.Info("Starting topicListener for [%s]", topics)
defer wg.Done()
- consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
- if err != nil {
- logger.Error("topicListener panic, topic=[%s]: %s", *topic, err.Error())
- return
+
+ /**
+ * Setup a new Sarama consumer group
+ */
+ consumer := Consumer{
+ HandleFunc: export,
}
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- doneCh := make(chan struct{})
+
go func() {
for {
- select {
- case err := <-consumer.Errors():
- logger.Error("Consumer error: %s", err.Err)
- case msg := <-consumer.Messages():
- logger.Debug("Got message on topic=[%s]: %s", *topic, string(msg.Value))
- export(topic, msg.Value)
- case <-signals:
- logger.Warn("Interrupt is detected")
- doneCh <- struct{}{}
+ // `Consume` should be called inside an infinite loop, when a
+ // server-side rebalance happens, the consumer session will need to be
+ // recreated to get the new claims
+ if err := consGrp.Consume(ctx, topics, &consumer); err != nil {
+ log.Panicf("Error from consumer: %v", err)
}
+
}
}()
- <-doneCh
+
+ signals := make(chan os.Signal, 1)
+ signal.Notify(signals, os.Interrupt)
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Warn("terminating: context cancelled")
+ return
+ case <-signals:
+ logger.Warn("Interrupt is detected")
+ return
+ }
+ }
+
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
+func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ // NOTE:
+ // Do not move the code below to a goroutine.
+ // The `ConsumeClaim` itself is called within a goroutine, see:
+ // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
+
+ if consumer.HandleFunc == nil {
+ logger.Error("No handler for consumer ")
+ return fmt.Errorf("no handler for consumer")
+ }
+
+ for message := range claim.Messages() {
+ topic := string(message.Topic)
+ consumer.HandleFunc(&topic, message.Value)
+ session.MarkMessage(message, "")
+ }
+
+ return nil
}