blob: f57547f336e4fba7e0ecd8252907b4cda095c4d3 [file] [log] [blame]
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Matteo Scandolo189526a2018-07-13 09:10:23 -070015package main
16
17import (
kartikey dubey72ef3b82019-05-27 06:50:04 +000018 "gerrit.opencord.org/kafka-topic-exporter/common/logger"
19 "github.com/Shopify/sarama"
20 "github.com/prometheus/client_golang/prometheus"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053021 "gopkg.in/yaml.v2"
22 "io/ioutil"
23 "log"
Matteo Scandolo189526a2018-07-13 09:10:23 -070024 "net/http"
kartikey dubey72ef3b82019-05-27 06:50:04 +000025 "strconv"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053026 "strings"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070027 "sync"
Matteo Scandolo189526a2018-07-13 09:10:23 -070028)
29
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053030func kafkaInit(broker BrokerInfo) {
Matteo Scandolo189526a2018-07-13 09:10:23 -070031 config := sarama.NewConfig()
32 config.Consumer.Return.Errors = true
Matteo Scandoloaab36db2018-10-09 19:54:11 -070033 var wg sync.WaitGroup
34
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053035 master, err := sarama.NewConsumer([]string{broker.Host}, config)
kartikey dubey72ef3b82019-05-27 06:50:04 +000036
Matteo Scandolo189526a2018-07-13 09:10:23 -070037 if err != nil {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053038 logger.Panic("kafkaInit panic")
Matteo Scandolo189526a2018-07-13 09:10:23 -070039 panic(err)
40 }
41 defer func() {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053042 logger.Debug("kafkaInit close connection")
Matteo Scandolo189526a2018-07-13 09:10:23 -070043 if err := master.Close(); err != nil {
44 panic(err)
45 }
46 }()
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053047
48 // read topics from config
49 topics := broker.Topics
50
51 // we are spinning threads for each topic, we need to wait for
52 // them to exit before stopping the kafka connection
53 wg.Add(len(topics))
54
55 for _, topic := range topics {
56 t := topic
57 go topicListener(&t, master, wg)
58 }
Matteo Scandolo189526a2018-07-13 09:10:23 -070059
Matteo Scandoloaab36db2018-10-09 19:54:11 -070060 wg.Wait()
Matteo Scandolo189526a2018-07-13 09:10:23 -070061}
62
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053063func runServer(target TargetInfo) {
64 if target.Port == 0 {
65 logger.Warn("Prometheus target port not configured, using default 8080")
66 target.Port = 8080
67 }
68 logger.Debug("Starting HTTP Server on %d port", target.Port)
Matteo Scandolo189526a2018-07-13 09:10:23 -070069 http.Handle("/metrics", prometheus.Handler())
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053070 http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
Matteo Scandolo189526a2018-07-13 09:10:23 -070071}
72
73func init() {
Matteo Scandolo189526a2018-07-13 09:10:23 -070074 // register metrics within Prometheus
Matteo Scandoloaab36db2018-10-09 19:54:11 -070075 prometheus.MustRegister(volthaTxBytesTotal)
76 prometheus.MustRegister(volthaRxBytesTotal)
77 prometheus.MustRegister(volthaTxPacketsTotal)
78 prometheus.MustRegister(volthaRxPacketsTotal)
79 prometheus.MustRegister(volthaTxErrorPacketsTotal)
80 prometheus.MustRegister(volthaRxErrorPacketsTotal)
81
82 prometheus.MustRegister(onosTxBytesTotal)
83 prometheus.MustRegister(onosRxBytesTotal)
84 prometheus.MustRegister(onosTxPacketsTotal)
85 prometheus.MustRegister(onosRxPacketsTotal)
86 prometheus.MustRegister(onosTxDropPacketsTotal)
87 prometheus.MustRegister(onosRxDropPacketsTotal)
kartikey dubey72ef3b82019-05-27 06:50:04 +000088
89 prometheus.MustRegister(onosaaaRxAcceptResponses)
90 prometheus.MustRegister(onosaaaRxRejectResponses)
91 prometheus.MustRegister(onosaaaRxChallengeResponses)
92 prometheus.MustRegister(onosaaaTxAccessRequests)
93 prometheus.MustRegister(onosaaaRxInvalidValidators)
94 prometheus.MustRegister(onosaaaRxUnknownType)
95 prometheus.MustRegister(onosaaaPendingRequests)
96 prometheus.MustRegister(onosaaaRxDroppedResponses)
97 prometheus.MustRegister(onosaaaRxMalformedResponses)
98 prometheus.MustRegister(onosaaaRxUnknownserver)
99 prometheus.MustRegister(onosaaaRequestRttMillis)
100 prometheus.MustRegister(onosaaaRequestReTx)
Matteo Scandolo189526a2018-07-13 09:10:23 -0700101}
102
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530103func loadConfigFile() Config {
104 m := Config{}
105 // this file path is configmap mounted in pod yaml
106 yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
107 if err != nil {
108 log.Printf("yamlFile.Get err: %v ", err)
109 }
110 err = yaml.Unmarshal(yamlFile, &m)
111 if err != nil {
112 log.Fatalf("Unmarshal: %v", err)
113 }
114 return m
Matteo Scandolo189526a2018-07-13 09:10:23 -0700115}
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530116
117func main() {
118 // load configuration
119 conf := loadConfigFile()
120
121 // logger setup
122 logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
123 logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
124
125 go kafkaInit(conf.Broker)
126 runServer(conf.Target)
127}