blob: 3880e989a5696c1fce607c097f7d6924f5b1554d [file] [log] [blame]
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Matteo Scandolo189526a2018-07-13 09:10:23 -070015package main
16
17import (
kartikey dubey72ef3b82019-05-27 06:50:04 +000018 "gerrit.opencord.org/kafka-topic-exporter/common/logger"
19 "github.com/Shopify/sarama"
20 "github.com/prometheus/client_golang/prometheus"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053021 "gopkg.in/yaml.v2"
22 "io/ioutil"
23 "log"
Matteo Scandolo189526a2018-07-13 09:10:23 -070024 "net/http"
kartikey dubey72ef3b82019-05-27 06:50:04 +000025 "strconv"
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053026 "strings"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070027 "sync"
Matteo Scandolo189526a2018-07-13 09:10:23 -070028)
29
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053030func kafkaInit(broker BrokerInfo) {
Matteo Scandolo189526a2018-07-13 09:10:23 -070031 config := sarama.NewConfig()
32 config.Consumer.Return.Errors = true
Matteo Scandoloaab36db2018-10-09 19:54:11 -070033 var wg sync.WaitGroup
34
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053035 master, err := sarama.NewConsumer([]string{broker.Host}, config)
kartikey dubey72ef3b82019-05-27 06:50:04 +000036
Matteo Scandolo189526a2018-07-13 09:10:23 -070037 if err != nil {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053038 logger.Panic("kafkaInit panic")
Matteo Scandolo189526a2018-07-13 09:10:23 -070039 panic(err)
40 }
41 defer func() {
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053042 logger.Debug("kafkaInit close connection")
Matteo Scandolo189526a2018-07-13 09:10:23 -070043 if err := master.Close(); err != nil {
44 panic(err)
45 }
46 }()
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053047
48 // read topics from config
49 topics := broker.Topics
50
51 // we are spinning threads for each topic, we need to wait for
52 // them to exit before stopping the kafka connection
53 wg.Add(len(topics))
54
55 for _, topic := range topics {
56 t := topic
57 go topicListener(&t, master, wg)
58 }
Matteo Scandolo189526a2018-07-13 09:10:23 -070059
Matteo Scandoloaab36db2018-10-09 19:54:11 -070060 wg.Wait()
Matteo Scandolo189526a2018-07-13 09:10:23 -070061}
62
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053063func runServer(target TargetInfo) {
64 if target.Port == 0 {
65 logger.Warn("Prometheus target port not configured, using default 8080")
66 target.Port = 8080
67 }
68 logger.Debug("Starting HTTP Server on %d port", target.Port)
Matteo Scandolo189526a2018-07-13 09:10:23 -070069 http.Handle("/metrics", prometheus.Handler())
Ganesh Bhure8d0c9942019-05-24 11:42:09 +053070 http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
Matteo Scandolo189526a2018-07-13 09:10:23 -070071}
72
73func init() {
Matteo Scandolo189526a2018-07-13 09:10:23 -070074 // register metrics within Prometheus
Matteo Scandoloaab36db2018-10-09 19:54:11 -070075 prometheus.MustRegister(volthaTxBytesTotal)
76 prometheus.MustRegister(volthaRxBytesTotal)
77 prometheus.MustRegister(volthaTxPacketsTotal)
78 prometheus.MustRegister(volthaRxPacketsTotal)
79 prometheus.MustRegister(volthaTxErrorPacketsTotal)
80 prometheus.MustRegister(volthaRxErrorPacketsTotal)
81
Ganesh Bhure967018e2019-07-29 14:48:32 +053082 prometheus.MustRegister(VolthaOnuLaserBiasCurrent)
83 prometheus.MustRegister(volthaOnuTemperature)
84 prometheus.MustRegister(VolthaOnuPowerFeedVoltage)
85 prometheus.MustRegister(VolthaOnuMeanOpticalLaunchPower)
86 prometheus.MustRegister(VolthaOnuReceivedOpticalPower)
87
Matteo Scandoloaab36db2018-10-09 19:54:11 -070088 prometheus.MustRegister(onosTxBytesTotal)
89 prometheus.MustRegister(onosRxBytesTotal)
90 prometheus.MustRegister(onosTxPacketsTotal)
91 prometheus.MustRegister(onosRxPacketsTotal)
92 prometheus.MustRegister(onosTxDropPacketsTotal)
93 prometheus.MustRegister(onosRxDropPacketsTotal)
kartikey dubey72ef3b82019-05-27 06:50:04 +000094
95 prometheus.MustRegister(onosaaaRxAcceptResponses)
96 prometheus.MustRegister(onosaaaRxRejectResponses)
97 prometheus.MustRegister(onosaaaRxChallengeResponses)
98 prometheus.MustRegister(onosaaaTxAccessRequests)
99 prometheus.MustRegister(onosaaaRxInvalidValidators)
100 prometheus.MustRegister(onosaaaRxUnknownType)
101 prometheus.MustRegister(onosaaaPendingRequests)
102 prometheus.MustRegister(onosaaaRxDroppedResponses)
103 prometheus.MustRegister(onosaaaRxMalformedResponses)
104 prometheus.MustRegister(onosaaaRxUnknownserver)
105 prometheus.MustRegister(onosaaaRequestRttMillis)
106 prometheus.MustRegister(onosaaaRequestReTx)
Matteo Scandolo189526a2018-07-13 09:10:23 -0700107}
108
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530109func loadConfigFile() Config {
110 m := Config{}
111 // this file path is configmap mounted in pod yaml
112 yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
113 if err != nil {
114 log.Printf("yamlFile.Get err: %v ", err)
115 }
116 err = yaml.Unmarshal(yamlFile, &m)
117 if err != nil {
118 log.Fatalf("Unmarshal: %v", err)
119 }
120 return m
Matteo Scandolo189526a2018-07-13 09:10:23 -0700121}
Ganesh Bhure8d0c9942019-05-24 11:42:09 +0530122
123func main() {
124 // load configuration
125 conf := loadConfigFile()
126
127 // logger setup
128 logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
129 logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
130
131 go kafkaInit(conf.Broker)
132 runServer(conf.Target)
133}