blob: 106421f63a004f56b012c82c205bf50c8b761d8d [file] [log] [blame]
// Copyright 2018 Open Networking Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
)
var (
broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
topic = flag.String("topic", "voltha.kpis", "The Kafka topic")
)
var brokers []string
func kafkaInit(brokers []string) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
doneCh := make(chan struct{})
go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
// fmt.Println(string(msg.Value))
kpi := KPI{}
err := json.Unmarshal(msg.Value, &kpi)
if err != nil {
log.Fatal(err)
}
export(kpi)
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
}
}
}()
<-doneCh
}
func runServer() {
fmt.Println("Starting Server")
http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(":8080", nil)
}
func init() {
// read config from cli flags
flag.Parse()
brokers = make([]string, 0)
brokers = []string{*broker}
fmt.Println("Connecting to broker: ", brokers)
fmt.Println("Listening to topic: ", *topic)
// register metrics within Prometheus
prometheus.MustRegister(txBytesTotal)
prometheus.MustRegister(rxBytesTotal)
prometheus.MustRegister(txPacketsTotal)
prometheus.MustRegister(rxPacketsTotal)
}
func main() {
go kafkaInit(brokers)
runServer()
}