Matteo Scandolo | 189526a | 2018-07-13 09:10:23 -0700 | [diff] [blame^] | 1 | package main |
| 2 | |
| 3 | import ( |
| 4 | "encoding/json" |
| 5 | "flag" |
| 6 | "fmt" |
| 7 | "math" |
| 8 | "net/http" |
| 9 | "os" |
| 10 | "os/signal" |
| 11 | "reflect" |
| 12 | "strings" |
| 13 | |
| 14 | "github.com/Shopify/sarama" |
| 15 | "github.com/prometheus/client_golang/prometheus" |
| 16 | ) |
| 17 | |
| 18 | var ( |
| 19 | txBytesTotal = prometheus.NewGaugeVec( |
| 20 | prometheus.GaugeOpts{ |
| 21 | Name: "tx_bytes_total", |
| 22 | Help: "Number of total bytes transmitted, partitioned by device_id, port_type and port_id", |
| 23 | }, |
| 24 | []string{"device_id", "port_type", "port_id"}, |
| 25 | ) |
| 26 | rxBytesTotal = prometheus.NewGaugeVec( |
| 27 | prometheus.GaugeOpts{ |
| 28 | Name: "rx_bytes_total", |
| 29 | Help: "Number of total bytes received, partitioned by device_id, port_type and port_id", |
| 30 | }, |
| 31 | []string{"device_id", "port_type", "port_id"}, |
| 32 | ) |
| 33 | txPacketsTotal = prometheus.NewGaugeVec( |
| 34 | prometheus.GaugeOpts{ |
| 35 | Name: "tx_packets_total", |
| 36 | Help: "Number of total packets transmitted, partitioned by device_id, port_type and port_id", |
| 37 | }, |
| 38 | []string{"device_id", "port_type", "port_id"}, |
| 39 | ) |
| 40 | rxPacketsTotal = prometheus.NewGaugeVec( |
| 41 | prometheus.GaugeOpts{ |
| 42 | Name: "rx_packets_total", |
| 43 | Help: "Number of total packets received, partitioned by device_id, port_type and port_id", |
| 44 | }, |
| 45 | []string{"device_id", "port_type", "port_id"}, |
| 46 | ) |
| 47 | |
| 48 | // config |
| 49 | broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker") |
| 50 | brokers = []string{*broker} |
| 51 | topic = flag.String("topic", "voltha.kpis", "The Kafka topic") |
| 52 | ) |
| 53 | |
| 54 | var messageCountStart int |
| 55 | |
| 56 | func prefixToLabels(prefix string) (string, string, string) { |
| 57 | var p = strings.Split(prefix, ".") |
| 58 | var deviceId, portType, portId string = "", "", "" |
| 59 | if len(p) == 5 { |
| 60 | // format is voltha.openolt.000130af0b0b2c51.pon.0 |
| 61 | deviceId = p[2] |
| 62 | portType = p[3] |
| 63 | portId = p[4] |
| 64 | } |
| 65 | if len(p) == 4 { |
| 66 | // fomrat is voltha.openolt.000130af0b0b2c51nni.129 |
| 67 | s := p[2] |
| 68 | deviceId = string(s[0 : len(s)-3]) |
| 69 | portType = string(s[len(s)-3:]) |
| 70 | portId = p[3] |
| 71 | } |
| 72 | |
| 73 | return deviceId, portType, portId |
| 74 | |
| 75 | } |
| 76 | |
| 77 | func interfaceToFloat(unk interface{}) float64 { |
| 78 | switch i := unk.(type) { |
| 79 | case float64: |
| 80 | return i |
| 81 | case float32: |
| 82 | return float64(i) |
| 83 | case int64: |
| 84 | return float64(i) |
| 85 | default: |
| 86 | return math.NaN() |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | func kafkaInit() { |
| 91 | config := sarama.NewConfig() |
| 92 | config.Consumer.Return.Errors = true |
| 93 | |
| 94 | master, err := sarama.NewConsumer(brokers, config) |
| 95 | if err != nil { |
| 96 | panic(err) |
| 97 | } |
| 98 | defer func() { |
| 99 | if err := master.Close(); err != nil { |
| 100 | panic(err) |
| 101 | } |
| 102 | }() |
| 103 | consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest) |
| 104 | if err != nil { |
| 105 | panic(err) |
| 106 | } |
| 107 | signals := make(chan os.Signal, 1) |
| 108 | signal.Notify(signals, os.Interrupt) |
| 109 | doneCh := make(chan struct{}) |
| 110 | go func() { |
| 111 | for { |
| 112 | select { |
| 113 | case err := <-consumer.Errors(): |
| 114 | fmt.Println(err) |
| 115 | case msg := <-consumer.Messages(): |
| 116 | messageCountStart++ |
| 117 | |
| 118 | var label map[string]interface{} |
| 119 | json.Unmarshal(msg.Value, &label) |
| 120 | |
| 121 | var data map[string]map[string]map[string]interface{} |
| 122 | json.Unmarshal(msg.Value, &data) |
| 123 | |
| 124 | var tagString = reflect.ValueOf(label["prefixes"]).MapKeys()[0].String() |
| 125 | |
| 126 | fmt.Println("tagString: "+tagString, "\n") |
| 127 | fmt.Println("data: ", data["prefixes"][tagString]["metrics"], "\n") |
| 128 | |
| 129 | v, ok := data["prefixes"][tagString]["metrics"].(map[string]interface{}) |
| 130 | if !ok { |
| 131 | // Can't assert, handle error. |
| 132 | fmt.Println("Eroror") |
| 133 | } |
| 134 | for k, s := range v { |
| 135 | fmt.Println("Type k: ", reflect.TypeOf(k)) |
| 136 | fmt.Println("Type: ", reflect.TypeOf(s)) |
| 137 | fmt.Printf("Value: %v\n", s) |
| 138 | |
| 139 | d, pt, pi := prefixToLabels(tagString) |
| 140 | |
| 141 | if k == "tx_bytes" { |
| 142 | txBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s)) |
| 143 | } |
| 144 | if k == "rx_bytes" { |
| 145 | rxBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s)) |
| 146 | } |
| 147 | if k == "tx_packets" { |
| 148 | txPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s)) |
| 149 | } |
| 150 | if k == "rx_packets" { |
| 151 | rxPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s)) |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | // fmt.Println("data: ", data["prefixes"][tagString]["metrics"].tx_bytes, "\n") |
| 156 | // var txBytesTotalValue = data["prefixes"][tagString]["metrics"]["tx_bytes"] |
| 157 | |
| 158 | // d, pt, pi := prefixToLabels(tagString) |
| 159 | |
| 160 | // txBytesTotal.WithLabelValues(d, pt, pi).Set(float64(txBytesTotalValue)) |
| 161 | |
| 162 | // fmt.Println("Adding txBytesTotal metric: ", d, pt, pi, txBytesTotalValue) |
| 163 | |
| 164 | case <-signals: |
| 165 | fmt.Println("Interrupt is detected") |
| 166 | doneCh <- struct{}{} |
| 167 | } |
| 168 | } |
| 169 | }() |
| 170 | <-doneCh |
| 171 | fmt.Println("Processed", messageCountStart, "messages") |
| 172 | } |
| 173 | |
| 174 | func runServer() { |
| 175 | fmt.Println("Starting Server") |
| 176 | http.Handle("/metrics", prometheus.Handler()) |
| 177 | http.ListenAndServe(":8080", nil) |
| 178 | } |
| 179 | |
| 180 | func init() { |
| 181 | |
| 182 | // read config from cli flags |
| 183 | flag.Parse() |
| 184 | fmt.Println("Connecting to broker: ", *broker) |
| 185 | fmt.Println("Listening to topic: ", *topic) |
| 186 | |
| 187 | // register metrics within Prometheus |
| 188 | prometheus.MustRegister(txBytesTotal) |
| 189 | prometheus.MustRegister(rxBytesTotal) |
| 190 | prometheus.MustRegister(txPacketsTotal) |
| 191 | prometheus.MustRegister(rxPacketsTotal) |
| 192 | } |
| 193 | |
| 194 | func main() { |
| 195 | go kafkaInit() |
| 196 | runServer() |
| 197 | } |