blob: cda8c195c0395cd70820e602e4bb0302619f9439 [file] [log] [blame]
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Matteo Scandolo189526a2018-07-13 09:10:23 -070015package main
16
17import (
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053018 "strconv"
19 "gopkg.in/yaml.v2"
20 "io/ioutil"
21 "log"
Matteo Scandolo189526a2018-07-13 09:10:23 -070022 "net/http"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053023 "strings"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070024 "sync"
Matteo Scandolo189526a2018-07-13 09:10:23 -070025
26 "github.com/Shopify/sarama"
27 "github.com/prometheus/client_golang/prometheus"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053028 "gerrit.opencord.org/kafka-topic-exporter/common/logger"
Matteo Scandolo189526a2018-07-13 09:10:23 -070029)
30
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053031func kafkaInit(broker BrokerInfo) {
Matteo Scandolo189526a2018-07-13 09:10:23 -070032 config := sarama.NewConfig()
33 config.Consumer.Return.Errors = true
Matteo Scandoloaab36db2018-10-09 19:54:11 -070034 var wg sync.WaitGroup
35
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053036 master, err := sarama.NewConsumer([]string{broker.Host}, config)
Matteo Scandolo189526a2018-07-13 09:10:23 -070037 if err != nil {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053038 logger.Panic("kafkaInit panic")
Matteo Scandolo189526a2018-07-13 09:10:23 -070039 panic(err)
40 }
41 defer func() {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053042 logger.Debug("kafkaInit close connection")
Matteo Scandolo189526a2018-07-13 09:10:23 -070043 if err := master.Close(); err != nil {
44 panic(err)
45 }
46 }()
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053047
48 // read topics from config
49 topics := broker.Topics
50
51 // we are spinning threads for each topic, we need to wait for
52 // them to exit before stopping the kafka connection
53 wg.Add(len(topics))
54
55 for _, topic := range topics {
56 t := topic
57 go topicListener(&t, master, wg)
58 }
Matteo Scandolo189526a2018-07-13 09:10:23 -070059
Matteo Scandoloaab36db2018-10-09 19:54:11 -070060 wg.Wait()
Matteo Scandolo189526a2018-07-13 09:10:23 -070061}
62
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053063func runServer(target TargetInfo) {
64 if target.Port == 0 {
65 logger.Warn("Prometheus target port not configured, using default 8080")
66 target.Port = 8080
67 }
68 logger.Debug("Starting HTTP Server on %d port", target.Port)
Matteo Scandolo189526a2018-07-13 09:10:23 -070069 http.Handle("/metrics", prometheus.Handler())
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053070 http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
Matteo Scandolo189526a2018-07-13 09:10:23 -070071}
72
73func init() {
74
Matteo Scandolo189526a2018-07-13 09:10:23 -070075 // register metrics within Prometheus
Matteo Scandoloaab36db2018-10-09 19:54:11 -070076 prometheus.MustRegister(volthaTxBytesTotal)
77 prometheus.MustRegister(volthaRxBytesTotal)
78 prometheus.MustRegister(volthaTxPacketsTotal)
79 prometheus.MustRegister(volthaRxPacketsTotal)
80 prometheus.MustRegister(volthaTxErrorPacketsTotal)
81 prometheus.MustRegister(volthaRxErrorPacketsTotal)
82
83 prometheus.MustRegister(onosTxBytesTotal)
84 prometheus.MustRegister(onosRxBytesTotal)
85 prometheus.MustRegister(onosTxPacketsTotal)
86 prometheus.MustRegister(onosRxPacketsTotal)
87 prometheus.MustRegister(onosTxDropPacketsTotal)
88 prometheus.MustRegister(onosRxDropPacketsTotal)
Matteo Scandolo189526a2018-07-13 09:10:23 -070089}
90
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053091func loadConfigFile() Config {
92 m := Config{}
93 // this file path is configmap mounted in pod yaml
94 yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
95 if err != nil {
96 log.Printf("yamlFile.Get err: %v ", err)
97 }
98 err = yaml.Unmarshal(yamlFile, &m)
99 if err != nil {
100 log.Fatalf("Unmarshal: %v", err)
101 }
102 return m
Matteo Scandolo189526a2018-07-13 09:10:23 -0700103}
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530104
105func main() {
106 // load configuration
107 conf := loadConfigFile()
108
109 // logger setup
110 logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
111 logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
112
113 go kafkaInit(conf.Broker)
114 runServer(conf.Target)
115}