blob: 26b1dc34e444df81919d3e8a85f3276c59611d59 [file] [log] [blame]
Matteo Scandolo189526a2018-07-13 09:10:23 -07001package main
2
3import (
4 "encoding/json"
5 "flag"
6 "fmt"
7 "math"
8 "net/http"
9 "os"
10 "os/signal"
11 "reflect"
12 "strings"
13
14 "github.com/Shopify/sarama"
15 "github.com/prometheus/client_golang/prometheus"
16)
17
18var (
19 txBytesTotal = prometheus.NewGaugeVec(
20 prometheus.GaugeOpts{
21 Name: "tx_bytes_total",
22 Help: "Number of total bytes transmitted, partitioned by device_id, port_type and port_id",
23 },
24 []string{"device_id", "port_type", "port_id"},
25 )
26 rxBytesTotal = prometheus.NewGaugeVec(
27 prometheus.GaugeOpts{
28 Name: "rx_bytes_total",
29 Help: "Number of total bytes received, partitioned by device_id, port_type and port_id",
30 },
31 []string{"device_id", "port_type", "port_id"},
32 )
33 txPacketsTotal = prometheus.NewGaugeVec(
34 prometheus.GaugeOpts{
35 Name: "tx_packets_total",
36 Help: "Number of total packets transmitted, partitioned by device_id, port_type and port_id",
37 },
38 []string{"device_id", "port_type", "port_id"},
39 )
40 rxPacketsTotal = prometheus.NewGaugeVec(
41 prometheus.GaugeOpts{
42 Name: "rx_packets_total",
43 Help: "Number of total packets received, partitioned by device_id, port_type and port_id",
44 },
45 []string{"device_id", "port_type", "port_id"},
46 )
47
48 // config
49 broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
50 brokers = []string{*broker}
51 topic = flag.String("topic", "voltha.kpis", "The Kafka topic")
52)
53
54var messageCountStart int
55
56func prefixToLabels(prefix string) (string, string, string) {
57 var p = strings.Split(prefix, ".")
58 var deviceId, portType, portId string = "", "", ""
59 if len(p) == 5 {
60 // format is voltha.openolt.000130af0b0b2c51.pon.0
61 deviceId = p[2]
62 portType = p[3]
63 portId = p[4]
64 }
65 if len(p) == 4 {
66 // fomrat is voltha.openolt.000130af0b0b2c51nni.129
67 s := p[2]
68 deviceId = string(s[0 : len(s)-3])
69 portType = string(s[len(s)-3:])
70 portId = p[3]
71 }
72
73 return deviceId, portType, portId
74
75}
76
77func interfaceToFloat(unk interface{}) float64 {
78 switch i := unk.(type) {
79 case float64:
80 return i
81 case float32:
82 return float64(i)
83 case int64:
84 return float64(i)
85 default:
86 return math.NaN()
87 }
88}
89
90func kafkaInit() {
91 config := sarama.NewConfig()
92 config.Consumer.Return.Errors = true
93
94 master, err := sarama.NewConsumer(brokers, config)
95 if err != nil {
96 panic(err)
97 }
98 defer func() {
99 if err := master.Close(); err != nil {
100 panic(err)
101 }
102 }()
103 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
104 if err != nil {
105 panic(err)
106 }
107 signals := make(chan os.Signal, 1)
108 signal.Notify(signals, os.Interrupt)
109 doneCh := make(chan struct{})
110 go func() {
111 for {
112 select {
113 case err := <-consumer.Errors():
114 fmt.Println(err)
115 case msg := <-consumer.Messages():
116 messageCountStart++
117
118 var label map[string]interface{}
119 json.Unmarshal(msg.Value, &label)
120
121 var data map[string]map[string]map[string]interface{}
122 json.Unmarshal(msg.Value, &data)
123
124 var tagString = reflect.ValueOf(label["prefixes"]).MapKeys()[0].String()
125
126 fmt.Println("tagString: "+tagString, "\n")
127 fmt.Println("data: ", data["prefixes"][tagString]["metrics"], "\n")
128
129 v, ok := data["prefixes"][tagString]["metrics"].(map[string]interface{})
130 if !ok {
131 // Can't assert, handle error.
132 fmt.Println("Eroror")
133 }
134 for k, s := range v {
135 fmt.Println("Type k: ", reflect.TypeOf(k))
136 fmt.Println("Type: ", reflect.TypeOf(s))
137 fmt.Printf("Value: %v\n", s)
138
139 d, pt, pi := prefixToLabels(tagString)
140
141 if k == "tx_bytes" {
142 txBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
143 }
144 if k == "rx_bytes" {
145 rxBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
146 }
147 if k == "tx_packets" {
148 txPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
149 }
150 if k == "rx_packets" {
151 rxPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
152 }
153 }
154
155 // fmt.Println("data: ", data["prefixes"][tagString]["metrics"].tx_bytes, "\n")
156 // var txBytesTotalValue = data["prefixes"][tagString]["metrics"]["tx_bytes"]
157
158 // d, pt, pi := prefixToLabels(tagString)
159
160 // txBytesTotal.WithLabelValues(d, pt, pi).Set(float64(txBytesTotalValue))
161
162 // fmt.Println("Adding txBytesTotal metric: ", d, pt, pi, txBytesTotalValue)
163
164 case <-signals:
165 fmt.Println("Interrupt is detected")
166 doneCh <- struct{}{}
167 }
168 }
169 }()
170 <-doneCh
171 fmt.Println("Processed", messageCountStart, "messages")
172}
173
174func runServer() {
175 fmt.Println("Starting Server")
176 http.Handle("/metrics", prometheus.Handler())
177 http.ListenAndServe(":8080", nil)
178}
179
180func init() {
181
182 // read config from cli flags
183 flag.Parse()
184 fmt.Println("Connecting to broker: ", *broker)
185 fmt.Println("Listening to topic: ", *topic)
186
187 // register metrics within Prometheus
188 prometheus.MustRegister(txBytesTotal)
189 prometheus.MustRegister(rxBytesTotal)
190 prometheus.MustRegister(txPacketsTotal)
191 prometheus.MustRegister(rxPacketsTotal)
192}
193
194func main() {
195 go kafkaInit()
196 runServer()
197}