blob: 0eba8baf4c66e55a9659376b369bc177783ddaaf [file] [log] [blame]
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Matteo Scandolo189526a2018-07-13 09:10:23 -070015package main
16
17import (
Matteo Scandolo189526a2018-07-13 09:10:23 -070018 "flag"
19 "fmt"
Matteo Scandolo189526a2018-07-13 09:10:23 -070020 "net/http"
Matteo Scandoloaab36db2018-10-09 19:54:11 -070021 "sync"
Matteo Scandolo189526a2018-07-13 09:10:23 -070022
23 "github.com/Shopify/sarama"
24 "github.com/prometheus/client_golang/prometheus"
25)
26
27var (
Matteo Scandoloaab36db2018-10-09 19:54:11 -070028 broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
29 volthaTopic = "voltha.kpis"
30 onosTopic = "onos.kpis"
31
32 volthaTopicPointer = &volthaTopic
33 onosTopicPointer = &onosTopic
Matteo Scandolo189526a2018-07-13 09:10:23 -070034)
35
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070036var brokers []string
Matteo Scandolo189526a2018-07-13 09:10:23 -070037
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070038func kafkaInit(brokers []string) {
Matteo Scandolo189526a2018-07-13 09:10:23 -070039 config := sarama.NewConfig()
40 config.Consumer.Return.Errors = true
Matteo Scandoloaab36db2018-10-09 19:54:11 -070041 var wg sync.WaitGroup
42
43 wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
Matteo Scandolo189526a2018-07-13 09:10:23 -070044
45 master, err := sarama.NewConsumer(brokers, config)
46 if err != nil {
Matteo Scandoloaab36db2018-10-09 19:54:11 -070047 fmt.Println("kafkaInit panic")
Matteo Scandolo189526a2018-07-13 09:10:23 -070048 panic(err)
49 }
50 defer func() {
Matteo Scandoloaab36db2018-10-09 19:54:11 -070051 fmt.Println("kafkaInit close connection")
Matteo Scandolo189526a2018-07-13 09:10:23 -070052 if err := master.Close(); err != nil {
53 panic(err)
54 }
55 }()
Matteo Scandoloaab36db2018-10-09 19:54:11 -070056 go VOLTHAListener(volthaTopicPointer, master, wg)
57 go ONOSListener(onosTopicPointer, master, wg)
Matteo Scandolo189526a2018-07-13 09:10:23 -070058
Matteo Scandoloaab36db2018-10-09 19:54:11 -070059 wg.Wait()
Matteo Scandolo189526a2018-07-13 09:10:23 -070060}
61
62func runServer() {
63 fmt.Println("Starting Server")
64 http.Handle("/metrics", prometheus.Handler())
65 http.ListenAndServe(":8080", nil)
66}
67
68func init() {
69
70 // read config from cli flags
71 flag.Parse()
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070072 brokers = make([]string, 0)
73 brokers = []string{*broker}
74 fmt.Println("Connecting to broker: ", brokers)
Matteo Scandoloaab36db2018-10-09 19:54:11 -070075 fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
76 fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
Matteo Scandolo189526a2018-07-13 09:10:23 -070077
78 // register metrics within Prometheus
Matteo Scandoloaab36db2018-10-09 19:54:11 -070079 prometheus.MustRegister(volthaTxBytesTotal)
80 prometheus.MustRegister(volthaRxBytesTotal)
81 prometheus.MustRegister(volthaTxPacketsTotal)
82 prometheus.MustRegister(volthaRxPacketsTotal)
83 prometheus.MustRegister(volthaTxErrorPacketsTotal)
84 prometheus.MustRegister(volthaRxErrorPacketsTotal)
85
86 prometheus.MustRegister(onosTxBytesTotal)
87 prometheus.MustRegister(onosRxBytesTotal)
88 prometheus.MustRegister(onosTxPacketsTotal)
89 prometheus.MustRegister(onosRxPacketsTotal)
90 prometheus.MustRegister(onosTxDropPacketsTotal)
91 prometheus.MustRegister(onosRxDropPacketsTotal)
Matteo Scandolo189526a2018-07-13 09:10:23 -070092}
93
94func main() {
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070095 go kafkaInit(brokers)
Matteo Scandolo189526a2018-07-13 09:10:23 -070096 runServer()
97}