seba-631-632: exporter enhancements for logger and topics
Change-Id: If10e56a7ccfce758712ea02df9656d4f413dbf84
diff --git a/Dockerfile b/Dockerfile
index 29e46da..a9ff877 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,9 +21,12 @@
WORKDIR /app
RUN go get github.com/prometheus/client_golang/prometheus
RUN go get github.com/Shopify/sarama
+RUN go get github.com/gfremex/logrus-kafka-hook
+RUN go get github.com/sirupsen/logrus
+RUN go get gerrit.opencord.org/kafka-topic-exporter/common/logger
RUN CGO_ENABLED=0 GOOS=linux go build -o main .
FROM alpine:3.8
WORKDIR /app/
COPY --from=builder /app/main .
-ENTRYPOINT ["./main"]
\ No newline at end of file
+ENTRYPOINT ["./main"]
diff --git a/README.md b/README.md
index 7b10c8f..e3cfe47 100644
--- a/README.md
+++ b/README.md
@@ -22,6 +22,7 @@
}, "device_id": ""
}
+ }
]
}
-```
\ No newline at end of file
+```
diff --git a/VERSION b/VERSION
index 45a1b3f..26aaba0 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.1.2
+1.2.0
diff --git a/common/logger/logger.go b/common/logger/logger.go
new file mode 100644
index 0000000..6f9d4c4
--- /dev/null
+++ b/common/logger/logger.go
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package logger
+
+import (
+ lkh "github.com/gfremex/logrus-kafka-hook"
+ log "github.com/sirupsen/logrus"
+ "time"
+)
+
+var (
+ myLogger *log.Entry
+)
+
+func Setup(kafkaBroker string, level string) {
+
+ logger := log.New()
+ //logger.SetReportCaller(true)
+ myLogger = logger.WithField("topics", []string{"kafka-exporter.log"})
+
+ var logLevel log.Level = log.DebugLevel
+ switch level {
+ case "TRACE":
+ logLevel = log.TraceLevel
+ case "INFO":
+ logLevel = log.InfoLevel
+ case "WARN":
+ logLevel = log.WarnLevel
+ case "ERROR":
+ logLevel = log.ErrorLevel
+ default:
+ logLevel = log.DebugLevel
+ }
+ logger.Println("Setting Log Level", logLevel)
+ logger.SetLevel(logLevel)
+
+ if len(kafkaBroker) > 0 {
+ myLogger.Debug("Setting up kafka integration")
+ hook, err := lkh.NewKafkaHook(
+ "kh",
+ []log.Level{log.DebugLevel, log.InfoLevel, log.WarnLevel, log.ErrorLevel},
+ &log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ FieldMap: log.FieldMap{
+ log.FieldKeyTime: "@timestamp",
+ log.FieldKeyLevel: "levelname",
+ log.FieldKeyMsg: "message",
+ },
+ },
+ []string{kafkaBroker},
+ )
+
+ if err != nil {
+ myLogger.Error(err)
+ }
+
+ logger.Hooks.Add(hook)
+ myLogger.WithField("kafkaBroker", kafkaBroker).Debug("Logger setup done")
+ }
+}
+
+func GetLogger() *log.Entry {
+ return myLogger
+}
+
+func WithField(key string, value interface{}) *log.Entry {
+ return myLogger.WithField(key, value)
+}
+
+func WithFields(fields log.Fields) *log.Entry {
+ return myLogger.WithFields(fields)
+}
+
+func Panic(msg string, args ...interface{}) {
+ myLogger.Panicf(msg, args...)
+}
+
+func Fatal(msg string, args ...interface{}) {
+ myLogger.Fatalf(msg, args...)
+}
+
+func Error(msg string, args ...interface{}) {
+ myLogger.Errorf(msg, args...)
+}
+
+func Warn(msg string, args ...interface{}) {
+ myLogger.Warnf(msg, args...)
+}
+
+func Info(msg string, args ...interface{}) {
+ myLogger.Infof(msg, args...)
+}
+
+func Debug(msg string, args ...interface{}) {
+ myLogger.Debugf(msg, args...)
+}
diff --git a/config/conf.yaml b/config/conf.yaml
new file mode 100644
index 0000000..c06dea8
--- /dev/null
+++ b/config/conf.yaml
@@ -0,0 +1,14 @@
+---
+broker:
+ name: broker-name
+ host: voltha-kafka.default.svc.cluster.local:9092
+ description: The kafka broker
+ topics: [voltha.kpis, onos.kpis, importer.kpis]
+logger:
+ loglevel: debug
+ host: voltha-kafka.default.svc.cluster.local:9092
+target:
+ type: prometheus-target
+ name: http-server
+ port: 8080
+ description: http target for prometheus
diff --git a/main.go b/main.go
index 0eba8ba..cda8c19 100644
--- a/main.go
+++ b/main.go
@@ -15,66 +15,63 @@
package main
import (
- "flag"
- "fmt"
+ "strconv"
+ "gopkg.in/yaml.v2"
+ "io/ioutil"
+ "log"
"net/http"
+ "strings"
"sync"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
+ "gerrit.opencord.org/kafka-topic-exporter/common/logger"
)
-var (
- broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
- volthaTopic = "voltha.kpis"
- onosTopic = "onos.kpis"
-
- volthaTopicPointer = &volthaTopic
- onosTopicPointer = &onosTopic
-)
-
-var brokers []string
-
-func kafkaInit(brokers []string) {
+func kafkaInit(broker BrokerInfo) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
var wg sync.WaitGroup
- wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
-
- master, err := sarama.NewConsumer(brokers, config)
+ master, err := sarama.NewConsumer([]string{broker.Host}, config)
if err != nil {
- fmt.Println("kafkaInit panic")
+ logger.Panic("kafkaInit panic")
panic(err)
}
defer func() {
- fmt.Println("kafkaInit close connection")
+ logger.Debug("kafkaInit close connection")
if err := master.Close(); err != nil {
panic(err)
}
}()
- go VOLTHAListener(volthaTopicPointer, master, wg)
- go ONOSListener(onosTopicPointer, master, wg)
+
+ // read topics from config
+ topics := broker.Topics
+
+ // we are spinning threads for each topic, we need to wait for
+ // them to exit before stopping the kafka connection
+ wg.Add(len(topics))
+
+ for _, topic := range topics {
+ t := topic
+ go topicListener(&t, master, wg)
+ }
wg.Wait()
}
-func runServer() {
- fmt.Println("Starting Server")
+func runServer(target TargetInfo) {
+ if target.Port == 0 {
+ logger.Warn("Prometheus target port not configured, using default 8080")
+ target.Port = 8080
+ }
+ logger.Debug("Starting HTTP Server on %d port", target.Port)
http.Handle("/metrics", prometheus.Handler())
- http.ListenAndServe(":8080", nil)
+ http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
}
func init() {
- // read config from cli flags
- flag.Parse()
- brokers = make([]string, 0)
- brokers = []string{*broker}
- fmt.Println("Connecting to broker: ", brokers)
- fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
- fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
-
// register metrics within Prometheus
prometheus.MustRegister(volthaTxBytesTotal)
prometheus.MustRegister(volthaRxBytesTotal)
@@ -91,7 +88,28 @@
prometheus.MustRegister(onosRxDropPacketsTotal)
}
-func main() {
- go kafkaInit(brokers)
- runServer()
+func loadConfigFile() Config {
+ m := Config{}
+ // this file path is configmap mounted in pod yaml
+ yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
+ if err != nil {
+ log.Printf("yamlFile.Get err: %v ", err)
+ }
+ err = yaml.Unmarshal(yamlFile, &m)
+ if err != nil {
+ log.Fatalf("Unmarshal: %v", err)
+ }
+ return m
}
+
+func main() {
+ // load configuration
+ conf := loadConfigFile()
+
+ // logger setup
+ logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
+ logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
+
+ go kafkaInit(conf.Broker)
+ runServer(conf.Target)
+}
\ No newline at end of file
diff --git a/onos-prometheus.go b/onos-prometheus.go
deleted file mode 100644
index b383b55..0000000
--- a/onos-prometheus.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2018 Open Networking Foundation
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
- "github.com/prometheus/client_golang/prometheus"
-)
-
-var (
- onosTxBytesTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "onos_tx_bytes_total",
- Help: "Number of total bytes transmitted",
- },
- []string{"device_id", "port_id"},
- )
- onosRxBytesTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "onos_rx_bytes_total",
- Help: "Number of total bytes received",
- },
- []string{"device_id", "port_id"},
- )
- onosTxPacketsTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "onos_tx_packets_total",
- Help: "Number of total packets transmitted",
- },
- []string{"device_id", "port_id"},
- )
- onosRxPacketsTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "onos_rx_packets_total",
- Help: "Number of total packets received",
- },
- []string{"device_id", "port_id"},
- )
-
- onosTxDropPacketsTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "onos_tx_drop_packets_total",
- Help: "Number of total transmitted packets dropped",
- },
- []string{"device_id", "port_id"},
- )
-
- onosRxDropPacketsTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "onos_rx_drop_packets_total",
- Help: "Number of total received packets dropped",
- },
- []string{"device_id", "port_id"},
- )
-)
-
-func exportOnosKPI(kpi OnosKPI) {
-
- for _, data := range kpi.Ports {
-
- onosTxBytesTotal.WithLabelValues(
- kpi.DeviceID,
- data.PortID,
- ).Set(data.TxBytes)
-
- onosRxBytesTotal.WithLabelValues(
- kpi.DeviceID,
- data.PortID,
- ).Set(data.RxBytes)
-
- onosTxPacketsTotal.WithLabelValues(
- kpi.DeviceID,
- data.PortID,
- ).Set(data.TxPackets)
-
- onosRxPacketsTotal.WithLabelValues(
- kpi.DeviceID,
- data.PortID,
- ).Set(data.RxPackets)
-
- onosTxDropPacketsTotal.WithLabelValues(
- kpi.DeviceID,
- data.PortID,
- ).Set(data.TxPacketsDrop)
-
- onosRxDropPacketsTotal.WithLabelValues(
- kpi.DeviceID,
- data.PortID,
- ).Set(data.RxPacketsDrop)
-
- }
-}
diff --git a/voltha-prometheus.go b/topic-exporter.go
similarity index 74%
rename from voltha-prometheus.go
rename to topic-exporter.go
index 526010b..3313f46 100644
--- a/voltha-prometheus.go
+++ b/topic-exporter.go
@@ -15,10 +15,14 @@
package main
import (
+ "encoding/json"
+ "log"
"github.com/prometheus/client_golang/prometheus"
+ "gerrit.opencord.org/kafka-topic-exporter/common/logger"
)
var (
+ // voltha kpis
volthaTxBytesTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "voltha_tx_bytes_total",
@@ -63,6 +67,52 @@
},
[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
)
+
+ // onos kpis
+ onosTxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_tx_bytes_total",
+ Help: "Number of total bytes transmitted",
+ },
+ []string{"device_id", "port_id"},
+ )
+ onosRxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_rx_bytes_total",
+ Help: "Number of total bytes received",
+ },
+ []string{"device_id", "port_id"},
+ )
+ onosTxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_tx_packets_total",
+ Help: "Number of total packets transmitted",
+ },
+ []string{"device_id", "port_id"},
+ )
+ onosRxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_rx_packets_total",
+ Help: "Number of total packets received",
+ },
+ []string{"device_id", "port_id"},
+ )
+
+ onosTxDropPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_tx_drop_packets_total",
+ Help: "Number of total transmitted packets dropped",
+ },
+ []string{"device_id", "port_id"},
+ )
+
+ onosRxDropPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_rx_drop_packets_total",
+ Help: "Number of total received packets dropped",
+ },
+ []string{"device_id", "port_id"},
+ )
)
func exportVolthaKPI(kpi VolthaKPI) {
@@ -260,3 +310,72 @@
}
}
}
+
+func exportOnosKPI(kpi OnosKPI) {
+
+ for _, data := range kpi.Ports {
+
+ onosTxBytesTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.TxBytes)
+
+ onosRxBytesTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.RxBytes)
+
+ onosTxPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.TxPackets)
+
+ onosRxPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.RxPackets)
+
+ onosTxDropPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.TxPacketsDrop)
+
+ onosRxDropPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.RxPacketsDrop)
+ }
+}
+
+func exportImporterKPI(kpi ImporterKPI) {
+ // TODO: add metrics for importer data
+ logger.Info("To be implemented")
+}
+
+func export(topic *string, data []byte) {
+ switch *topic {
+ case "voltha.kpis":
+ kpi := VolthaKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportVolthaKPI(kpi)
+ case "onos.kpis":
+ kpi := OnosKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportOnosKPI(kpi)
+ case "importer.kpis":
+ kpi := ImporterKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportImporterKPI(kpi)
+ default:
+ logger.Warn("Unexpected export. Should not come here")
+ }
+}
diff --git a/onos-kpi.go b/topic-listener.go
similarity index 73%
rename from onos-kpi.go
rename to topic-listener.go
index da7b120..9b0f2dc 100644
--- a/onos-kpi.go
+++ b/topic-listener.go
@@ -15,22 +15,20 @@
package main
import (
- "encoding/json"
- "fmt"
- "log"
"os"
"os/signal"
"sync"
"github.com/Shopify/sarama"
+ "gerrit.opencord.org/kafka-topic-exporter/common/logger"
)
-func ONOSListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
- fmt.Println("Starting ONOSListener")
+func topicListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
+ logger.Info("Starting topicListener for [%s]", *topic)
defer wg.Done()
consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
if err != nil {
- fmt.Println("ONOSListener panic")
+ logger.Error("topicListener panic")
panic(err)
}
signals := make(chan os.Signal, 1)
@@ -40,21 +38,12 @@
for {
select {
case err := <-consumer.Errors():
- fmt.Println(err)
+ logger.Error("%s", err)
case msg := <-consumer.Messages():
-
- kpi := OnosKPI{}
-
- err := json.Unmarshal(msg.Value, &kpi)
-
- if err != nil {
- log.Fatal(err)
- }
-
- exportOnosKPI(kpi)
-
+ logger.Debug("Message on %s: %s", *topic, string(msg.Value))
+ export(topic, msg.Value)
case <-signals:
- fmt.Println("Interrupt is detected")
+ logger.Warn("Interrupt is detected")
doneCh <- struct{}{}
}
}
diff --git a/types.go b/types.go
index b3ecd59..8cb41ff 100644
--- a/types.go
+++ b/types.go
@@ -14,8 +14,33 @@
package main
-// KPI Events format
+// configuration
+type BrokerInfo struct {
+ Name string `yaml: name`
+ Host string `yaml: host`
+ Description string `yaml: description`
+ Topics []string `yaml: topics`
+}
+type LoggerInfo struct {
+ LogLevel string `yaml: loglevel`
+ Host string `yaml: host`
+}
+
+type TargetInfo struct {
+ Type string `yaml: type`
+ Name string `yaml: name`
+ Port int `yaml: port`
+ Description string `yaml: description`
+}
+
+type Config struct {
+ Broker BrokerInfo `yaml: broker`
+ Logger LoggerInfo `yaml: logger`
+ Target TargetInfo `yaml: "target"`
+}
+
+// KPI Events format
type Metrics struct {
TxBytes float64 `json:"tx_bytes"`
TxPackets float64 `json:"tx_packets"`
@@ -79,3 +104,8 @@
DeviceID string `json:"deviceId"`
Ports []*OnosPort `json:"ports"`
}
+
+type ImporterKPI struct {
+ DeviceID string `json: "deviceId"`
+ // TODO: add metrics data
+}
\ No newline at end of file
diff --git a/voltha-kpi.go b/voltha-kpi.go
deleted file mode 100644
index 122f3f0..0000000
--- a/voltha-kpi.go
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright 2018 Open Networking Foundation
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
- "encoding/json"
- "fmt"
- "log"
- "os"
- "os/signal"
- "sync"
-
- "github.com/Shopify/sarama"
-)
-
-func VOLTHAListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
- fmt.Println("Starting VOLTHAListener")
- defer wg.Done()
- consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
- if err != nil {
- fmt.Println("VOLTHAListener panic")
- panic(err)
- }
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- doneCh := make(chan struct{})
- go func() {
- for {
- select {
- case err := <-consumer.Errors():
- fmt.Println(err)
- case msg := <-consumer.Messages():
- // fmt.Println(string(msg.Value))
-
- kpi := VolthaKPI{}
-
- err := json.Unmarshal(msg.Value, &kpi)
-
- if err != nil {
- log.Fatal(err)
- }
-
- exportVolthaKPI(kpi)
-
- case <-signals:
- fmt.Println("Interrupt is detected")
- doneCh <- struct{}{}
- }
- }
- }()
- <-doneCh
-}