Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 1 | // Copyright 2018 Open Networking Foundation |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package main |
| 16 | |
| 17 | import ( |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 18 | "context" |
| 19 | "fmt" |
| 20 | "log" |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 21 | "os" |
| 22 | "os/signal" |
| 23 | "sync" |
| 24 | |
Ganesh Bhure | 8d0c994 | 2019-05-24 11:42:09 +0530 | [diff] [blame] | 25 | "gerrit.opencord.org/kafka-topic-exporter/common/logger" |
kesavand | 2cde658 | 2020-06-22 04:56:23 -0400 | [diff] [blame] | 26 | "github.com/Shopify/sarama" |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 27 | ) |
| 28 | |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 29 | // Consumer represents a Sarama consumer group consumer |
| 30 | type Consumer struct { |
| 31 | HandleFunc func(topic *string, data []byte) |
| 32 | } |
| 33 | |
| 34 | |
| 35 | func topicListener(ctx context.Context, topics []string, consGrp sarama.ConsumerGroup, wg sync.WaitGroup) { |
| 36 | logger.Info("Starting topicListener for [%s]", topics) |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 37 | defer wg.Done() |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 38 | |
| 39 | /** |
| 40 | * Setup a new Sarama consumer group |
| 41 | */ |
| 42 | consumer := Consumer{ |
| 43 | HandleFunc: export, |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 44 | } |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 45 | |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 46 | go func() { |
| 47 | for { |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 48 | // `Consume` should be called inside an infinite loop, when a |
| 49 | // server-side rebalance happens, the consumer session will need to be |
| 50 | // recreated to get the new claims |
| 51 | if err := consGrp.Consume(ctx, topics, &consumer); err != nil { |
| 52 | log.Panicf("Error from consumer: %v", err) |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 53 | } |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 54 | |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 55 | } |
| 56 | }() |
kesavand | c71914f | 2022-03-25 11:19:03 +0530 | [diff] [blame] | 57 | |
| 58 | signals := make(chan os.Signal, 1) |
| 59 | signal.Notify(signals, os.Interrupt) |
| 60 | |
| 61 | for { |
| 62 | select { |
| 63 | case <-ctx.Done(): |
| 64 | logger.Warn("terminating: context cancelled") |
| 65 | return |
| 66 | case <-signals: |
| 67 | logger.Warn("Interrupt is detected") |
| 68 | return |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | } |
| 73 | |
| 74 | // Setup is run at the beginning of a new session, before ConsumeClaim |
| 75 | func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { |
| 76 | return nil |
| 77 | } |
| 78 | |
| 79 | // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited |
| 80 | func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { |
| 81 | return nil |
| 82 | } |
| 83 | |
| 84 | // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). |
| 85 | func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { |
| 86 | // NOTE: |
| 87 | // Do not move the code below to a goroutine. |
| 88 | // The `ConsumeClaim` itself is called within a goroutine, see: |
| 89 | // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29 |
| 90 | |
| 91 | if consumer.HandleFunc == nil { |
| 92 | logger.Error("No handler for consumer ") |
| 93 | return fmt.Errorf("no handler for consumer") |
| 94 | } |
| 95 | |
| 96 | for message := range claim.Messages() { |
| 97 | topic := string(message.Topic) |
| 98 | consumer.HandleFunc(&topic, message.Value) |
| 99 | session.MarkMessage(message, "") |
| 100 | } |
| 101 | |
| 102 | return nil |
Matteo Scandolo | aab36db | 2018-10-09 19:54:11 -0700 | [diff] [blame] | 103 | } |