blob: 9c860083cffb00c283ef638139cd8756f75c4a65 [file] [log] [blame]
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Matteo Scandolo189526a2018-07-13 09:10:23 -070015package main
16
17import (
kartikey dubey72ef3b82019-05-27 06:50:04 +000018 "gerrit.opencord.org/kafka-topic-exporter/common/logger"
19 "github.com/Shopify/sarama"
20 "github.com/prometheus/client_golang/prometheus"
Ganesh Bhure0f2449c2019-07-15 10:31:38 +053021 "github.com/prometheus/client_golang/prometheus/promhttp"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053022 "gopkg.in/yaml.v2"
23 "io/ioutil"
24 "log"
Matteo Scandolo189526a2018-07-13 09:10:23 -070025 "net/http"
kartikey dubey72ef3b82019-05-27 06:50:04 +000026 "strconv"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053027 "strings"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070028 "sync"
Matteo Scandolo189526a2018-07-13 09:10:23 -070029)
30
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053031func kafkaInit(broker BrokerInfo) {
Matteo Scandolo189526a2018-07-13 09:10:23 -070032 config := sarama.NewConfig()
33 config.Consumer.Return.Errors = true
Matteo Scandoloaab36db2018-10-09 19:54:11 -070034 var wg sync.WaitGroup
35
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053036 master, err := sarama.NewConsumer([]string{broker.Host}, config)
kartikey dubey72ef3b82019-05-27 06:50:04 +000037
Matteo Scandolo189526a2018-07-13 09:10:23 -070038 if err != nil {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053039 logger.Panic("kafkaInit panic")
Matteo Scandolo189526a2018-07-13 09:10:23 -070040 panic(err)
41 }
42 defer func() {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053043 logger.Debug("kafkaInit close connection")
Matteo Scandolo189526a2018-07-13 09:10:23 -070044 if err := master.Close(); err != nil {
45 panic(err)
46 }
47 }()
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053048
49 // read topics from config
50 topics := broker.Topics
51
52 // we are spinning threads for each topic, we need to wait for
53 // them to exit before stopping the kafka connection
54 wg.Add(len(topics))
55
56 for _, topic := range topics {
57 t := topic
58 go topicListener(&t, master, wg)
59 }
Matteo Scandolo189526a2018-07-13 09:10:23 -070060
Matteo Scandoloaab36db2018-10-09 19:54:11 -070061 wg.Wait()
Matteo Scandolo189526a2018-07-13 09:10:23 -070062}
63
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053064func runServer(target TargetInfo) {
65 if target.Port == 0 {
66 logger.Warn("Prometheus target port not configured, using default 8080")
67 target.Port = 8080
68 }
69 logger.Debug("Starting HTTP Server on %d port", target.Port)
Ganesh Bhure0f2449c2019-07-15 10:31:38 +053070 http.Handle("/metrics", promhttp.Handler())
71 err := http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
72 if err != nil {
73 logger.Error("HTTP Server Error: %s", err.Error())
74 }
Matteo Scandolo189526a2018-07-13 09:10:23 -070075}
76
77func init() {
Matteo Scandolo189526a2018-07-13 09:10:23 -070078 // register metrics within Prometheus
Matteo Scandoloaab36db2018-10-09 19:54:11 -070079 prometheus.MustRegister(volthaTxBytesTotal)
80 prometheus.MustRegister(volthaRxBytesTotal)
81 prometheus.MustRegister(volthaTxPacketsTotal)
82 prometheus.MustRegister(volthaRxPacketsTotal)
83 prometheus.MustRegister(volthaTxErrorPacketsTotal)
84 prometheus.MustRegister(volthaRxErrorPacketsTotal)
85
Ganesh Bhure967018e2019-07-29 14:48:32 +053086 prometheus.MustRegister(VolthaOnuLaserBiasCurrent)
87 prometheus.MustRegister(volthaOnuTemperature)
88 prometheus.MustRegister(VolthaOnuPowerFeedVoltage)
89 prometheus.MustRegister(VolthaOnuMeanOpticalLaunchPower)
90 prometheus.MustRegister(VolthaOnuReceivedOpticalPower)
91
Matteo Scandoloaab36db2018-10-09 19:54:11 -070092 prometheus.MustRegister(onosTxBytesTotal)
93 prometheus.MustRegister(onosRxBytesTotal)
94 prometheus.MustRegister(onosTxPacketsTotal)
95 prometheus.MustRegister(onosRxPacketsTotal)
96 prometheus.MustRegister(onosTxDropPacketsTotal)
97 prometheus.MustRegister(onosRxDropPacketsTotal)
kartikey dubey72ef3b82019-05-27 06:50:04 +000098
99 prometheus.MustRegister(onosaaaRxAcceptResponses)
100 prometheus.MustRegister(onosaaaRxRejectResponses)
101 prometheus.MustRegister(onosaaaRxChallengeResponses)
102 prometheus.MustRegister(onosaaaTxAccessRequests)
103 prometheus.MustRegister(onosaaaRxInvalidValidators)
104 prometheus.MustRegister(onosaaaRxUnknownType)
105 prometheus.MustRegister(onosaaaPendingRequests)
106 prometheus.MustRegister(onosaaaRxDroppedResponses)
107 prometheus.MustRegister(onosaaaRxMalformedResponses)
108 prometheus.MustRegister(onosaaaRxUnknownserver)
109 prometheus.MustRegister(onosaaaRequestRttMillis)
110 prometheus.MustRegister(onosaaaRequestReTx)
Daniele Morobe242582019-10-01 14:02:46 -0700111
112 prometheus.MustRegister(onosPppoeUpTermBytes)
113 prometheus.MustRegister(onosPppoeUpTermPackets)
114 prometheus.MustRegister(onosPppoeUpDropBytes)
115 prometheus.MustRegister(onosPppoeUpDropPackets)
116 prometheus.MustRegister(onosPppoeUpControlPackets)
117 prometheus.MustRegister(onosPppoeDownRxBytes)
118 prometheus.MustRegister(onosPppoeDownRxPackets)
119 prometheus.MustRegister(onosPppoeDownTxBytes)
120 prometheus.MustRegister(onosPppoeDownTxPackets)
Ganesh Bhuref15383e2019-11-11 15:15:14 +0530121
122 prometheus.MustRegister(deviceLaserBiasCurrent)
123 prometheus.MustRegister(deviceTemperature)
124 prometheus.MustRegister(deviceTxPower)
125 prometheus.MustRegister(deviceVoltage)
Matteo Scandolo189526a2018-07-13 09:10:23 -0700126}
127
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530128func loadConfigFile() Config {
129 m := Config{}
130 // this file path is configmap mounted in pod yaml
131 yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
132 if err != nil {
133 log.Printf("yamlFile.Get err: %v ", err)
134 }
135 err = yaml.Unmarshal(yamlFile, &m)
136 if err != nil {
137 log.Fatalf("Unmarshal: %v", err)
138 }
139 return m
Matteo Scandolo189526a2018-07-13 09:10:23 -0700140}
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530141
142func main() {
143 // load configuration
144 conf := loadConfigFile()
145
146 // logger setup
147 logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
148 logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
149
150 go kafkaInit(conf.Broker)
151 runServer(conf.Target)
Ganesh Bhure0f2449c2019-07-15 10:31:38 +0530152}