blob: 24d1ee01308dd7f2d61a068a035892526d1a39ed [file] [log] [blame]
Matteo Scandoloaab36db2018-10-09 19:54:11 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
kesavandc71914f2022-03-25 11:19:03 +053018 "context"
19 "fmt"
20 "log"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070021 "os"
22 "os/signal"
23 "sync"
24
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053025 "gerrit.opencord.org/kafka-topic-exporter/common/logger"
kesavand2cde6582020-06-22 04:56:23 -040026 "github.com/Shopify/sarama"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070027)
28
kesavandc71914f2022-03-25 11:19:03 +053029// Consumer represents a Sarama consumer group consumer
30type Consumer struct {
31 HandleFunc func(topic *string, data []byte)
32}
33
34
35func topicListener(ctx context.Context, topics []string, consGrp sarama.ConsumerGroup, wg sync.WaitGroup) {
36 logger.Info("Starting topicListener for [%s]", topics)
Matteo Scandoloaab36db2018-10-09 19:54:11 -070037 defer wg.Done()
kesavandc71914f2022-03-25 11:19:03 +053038
39 /**
40 * Setup a new Sarama consumer group
41 */
42 consumer := Consumer{
43 HandleFunc: export,
Matteo Scandoloaab36db2018-10-09 19:54:11 -070044 }
kesavandc71914f2022-03-25 11:19:03 +053045
Matteo Scandoloaab36db2018-10-09 19:54:11 -070046 go func() {
47 for {
kesavandc71914f2022-03-25 11:19:03 +053048 // `Consume` should be called inside an infinite loop, when a
49 // server-side rebalance happens, the consumer session will need to be
50 // recreated to get the new claims
51 if err := consGrp.Consume(ctx, topics, &consumer); err != nil {
52 log.Panicf("Error from consumer: %v", err)
Matteo Scandoloaab36db2018-10-09 19:54:11 -070053 }
kesavandc71914f2022-03-25 11:19:03 +053054
Matteo Scandoloaab36db2018-10-09 19:54:11 -070055 }
56 }()
kesavandc71914f2022-03-25 11:19:03 +053057
58 signals := make(chan os.Signal, 1)
59 signal.Notify(signals, os.Interrupt)
60
61 for {
62 select {
63 case <-ctx.Done():
64 logger.Warn("terminating: context cancelled")
65 return
66 case <-signals:
67 logger.Warn("Interrupt is detected")
68 return
69 }
70 }
71
72}
73
74// Setup is run at the beginning of a new session, before ConsumeClaim
75func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
76 return nil
77}
78
79// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
80func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
81 return nil
82}
83
84// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
85func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
86 // NOTE:
87 // Do not move the code below to a goroutine.
88 // The `ConsumeClaim` itself is called within a goroutine, see:
89 // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
90
91 if consumer.HandleFunc == nil {
92 logger.Error("No handler for consumer ")
93 return fmt.Errorf("no handler for consumer")
94 }
95
96 for message := range claim.Messages() {
97 topic := string(message.Topic)
98 consumer.HandleFunc(&topic, message.Value)
99 session.MarkMessage(message, "")
100 }
101
102 return nil
Matteo Scandoloaab36db2018-10-09 19:54:11 -0700103}