blob: 9b0f2dcc63641869812bf831ca087d6b49c29813 [file] [log] [blame]
Matteo Scandoloaab36db2018-10-09 19:54:11 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
Matteo Scandoloaab36db2018-10-09 19:54:11 -070018 "os"
19 "os/signal"
20 "sync"
21
22 "github.com/Shopify/sarama"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053023 "gerrit.opencord.org/kafka-topic-exporter/common/logger"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070024)
25
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053026func topicListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
27 logger.Info("Starting topicListener for [%s]", *topic)
Matteo Scandoloaab36db2018-10-09 19:54:11 -070028 defer wg.Done()
29 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
30 if err != nil {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053031 logger.Error("topicListener panic")
Matteo Scandoloaab36db2018-10-09 19:54:11 -070032 panic(err)
33 }
34 signals := make(chan os.Signal, 1)
35 signal.Notify(signals, os.Interrupt)
36 doneCh := make(chan struct{})
37 go func() {
38 for {
39 select {
40 case err := <-consumer.Errors():
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053041 logger.Error("%s", err)
Matteo Scandoloaab36db2018-10-09 19:54:11 -070042 case msg := <-consumer.Messages():
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053043 logger.Debug("Message on %s: %s", *topic, string(msg.Value))
44 export(topic, msg.Value)
Matteo Scandoloaab36db2018-10-09 19:54:11 -070045 case <-signals:
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053046 logger.Warn("Interrupt is detected")
Matteo Scandoloaab36db2018-10-09 19:54:11 -070047 doneCh <- struct{}{}
48 }
49 }
50 }()
51 <-doneCh
52}