[SEBA-128] Using new KPI format
Change-Id: I25aee363f35dd1380af990bdfd9eca65d30ffc54
diff --git a/Dockerfile b/Dockerfile
index cf15f38..ef68715 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,3 +1,20 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# docker build -t opencord/kafka-topic-exporter:latest .
+# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
+
FROM golang:latest as builder
RUN mkdir /app
ADD . /app/
@@ -9,5 +26,4 @@
FROM alpine:latest
WORKDIR /app/
COPY --from=builder /app/main .
-ENTRYPOINT ["./main"]
-# CMD ["-broker=voltha-kafka.default.svc.cluster.local:9092", "-topic=voltha.kpis"]
\ No newline at end of file
+ENTRYPOINT ["./main"]
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7b10c8f
--- /dev/null
+++ b/README.md
@@ -0,0 +1,27 @@
+# Kafka topic exported
+
+## Expected format
+
+```json
+{
+ "type": "slice",
+ "ts": 1536617075.762331,
+ "slice_data": [
+ {
+ "metrics": {
+ "deferreds": 119.0,
+ "rss-mb": 106.0
+ },
+ "metadata": {
+ "logical_device_id": "",
+ "title": "voltha.internal",
+ "serial_no": "",
+ "ts": 1536617075.762331,
+ "context": {
+ "instance_id": "vcore-0"
+ }, "device_id": ""
+
+ }
+ ]
+}
+```
\ No newline at end of file
diff --git a/kb8s-deployment.yaml b/kb8s-deployment.yaml
deleted file mode 100644
index ebadddd..0000000
--- a/kb8s-deployment.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-# kubectl create -f kb8s-deployment.yaml
-# kubectl delete pod prometheus-go-exporter
-
-apiVersion: v1
-kind: Pod
-metadata:
- name: prometheus-go-exporter
- labels:
- app: prometheus-go-exporter
-spec:
- containers:
- - name: prometheus-go-exporter
- image: prometheus-go-exporter:latest
- imagePullPolicy: IfNotPresent
- args: ["-broker=voltha-kafka.default.svc.cluster.local:9092", "-topic=voltha.kpis"]
- restartPolicy: Never
-
----
-apiVersion: v1
-kind: Service
-metadata:
- name: prometheus-go-exporter
-spec:
- selector:
- app: prometheus-go-exporter
- type: NodePort
- ports:
- - port: 8080
- nodePort: 30080
\ No newline at end of file
diff --git a/main.go b/main.go
index 512dba3..106421f 100644
--- a/main.go
+++ b/main.go
@@ -1,91 +1,38 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package main
import (
"encoding/json"
"flag"
"fmt"
- "math"
+ "log"
"net/http"
"os"
"os/signal"
- "reflect"
- "strings"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
)
var (
- txBytesTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "tx_bytes_total",
- Help: "Number of total bytes transmitted, partitioned by device_id, port_type and port_id",
- },
- []string{"device_id", "port_type", "port_id"},
- )
- rxBytesTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "rx_bytes_total",
- Help: "Number of total bytes received, partitioned by device_id, port_type and port_id",
- },
- []string{"device_id", "port_type", "port_id"},
- )
- txPacketsTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "tx_packets_total",
- Help: "Number of total packets transmitted, partitioned by device_id, port_type and port_id",
- },
- []string{"device_id", "port_type", "port_id"},
- )
- rxPacketsTotal = prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: "rx_packets_total",
- Help: "Number of total packets received, partitioned by device_id, port_type and port_id",
- },
- []string{"device_id", "port_type", "port_id"},
- )
-
- // config
- broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
- topic = flag.String("topic", "voltha.kpis", "The Kafka topic")
+ broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+ topic = flag.String("topic", "voltha.kpis", "The Kafka topic")
)
var brokers []string
-var messageCountStart int
-
-func prefixToLabels(prefix string) (string, string, string) {
- var p = strings.Split(prefix, ".")
- var deviceId, portType, portId string = "", "", ""
- if len(p) == 5 {
- // format is voltha.openolt.000130af0b0b2c51.pon.0
- deviceId = p[2]
- portType = p[3]
- portId = p[4]
- }
- if len(p) == 4 {
- // fomrat is voltha.openolt.000130af0b0b2c51nni.129
- s := p[2]
- deviceId = string(s[0 : len(s)-3])
- portType = string(s[len(s)-3:])
- portId = p[3]
- }
-
- return deviceId, portType, portId
-
-}
-
-func interfaceToFloat(unk interface{}) float64 {
- switch i := unk.(type) {
- case float64:
- return i
- case float32:
- return float64(i)
- case int64:
- return float64(i)
- default:
- return math.NaN()
- }
-}
func kafkaInit(brokers []string) {
config := sarama.NewConfig()
@@ -113,53 +60,17 @@
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
- messageCountStart++
+ // fmt.Println(string(msg.Value))
- var label map[string]interface{}
- json.Unmarshal(msg.Value, &label)
+ kpi := KPI{}
- var data map[string]map[string]map[string]interface{}
- json.Unmarshal(msg.Value, &data)
+ err := json.Unmarshal(msg.Value, &kpi)
- var tagString = reflect.ValueOf(label["prefixes"]).MapKeys()[0].String()
-
- fmt.Println("tagString: "+tagString, "\n")
- fmt.Println("data: ", data["prefixes"][tagString]["metrics"], "\n")
-
- v, ok := data["prefixes"][tagString]["metrics"].(map[string]interface{})
- if !ok {
- // Can't assert, handle error.
- fmt.Println("Eroror")
- }
- for k, s := range v {
- fmt.Println("Type k: ", reflect.TypeOf(k))
- fmt.Println("Type: ", reflect.TypeOf(s))
- fmt.Printf("Value: %v\n", s)
-
- d, pt, pi := prefixToLabels(tagString)
-
- if k == "tx_bytes" {
- txBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
- }
- if k == "rx_bytes" {
- rxBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
- }
- if k == "tx_packets" {
- txPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
- }
- if k == "rx_packets" {
- rxPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
- }
+ if err != nil {
+ log.Fatal(err)
}
- // fmt.Println("data: ", data["prefixes"][tagString]["metrics"].tx_bytes, "\n")
- // var txBytesTotalValue = data["prefixes"][tagString]["metrics"]["tx_bytes"]
-
- // d, pt, pi := prefixToLabels(tagString)
-
- // txBytesTotal.WithLabelValues(d, pt, pi).Set(float64(txBytesTotalValue))
-
- // fmt.Println("Adding txBytesTotal metric: ", d, pt, pi, txBytesTotalValue)
+ export(kpi)
case <-signals:
fmt.Println("Interrupt is detected")
@@ -168,7 +79,6 @@
}
}()
<-doneCh
- fmt.Println("Processed", messageCountStart, "messages")
}
func runServer() {
diff --git a/prometheus.go b/prometheus.go
new file mode 100644
index 0000000..15ce599
--- /dev/null
+++ b/prometheus.go
@@ -0,0 +1,137 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+ txBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "tx_bytes_total",
+ Help: "Number of total bytes transmitted",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+ rxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "rx_bytes_total",
+ Help: "Number of total bytes received",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+ txPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "tx_packets_total",
+ Help: "Number of total packets transmitted",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+ rxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "rx_packets_total",
+ Help: "Number of total packets received",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+
+ txErrorPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "tx_error_packets_total",
+ Help: "Number of total transmitted packets error",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+
+ rxErrorPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "rx_error_packets_total",
+ Help: "Number of total received packets error",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+)
+
+func export(kpi KPI) {
+
+ for _, data := range kpi.SliceDatas {
+ txBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxBytes)
+
+ rxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxBytes)
+
+ txPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxPackets)
+
+ rxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxPackets)
+
+ txErrorPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxErrorPackets)
+
+ rxErrorPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxErrorPackets)
+
+ // TODO add metrics for:
+ // TxBcastPackets
+ // TxUnicastPackets
+ // TxMulticastPackets
+ // RxBcastPackets
+ // RxMulticastPackets
+
+ }
+}
diff --git a/types.go b/types.go
new file mode 100644
index 0000000..25ed429
--- /dev/null
+++ b/types.go
@@ -0,0 +1,57 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+// KPI Events format
+
+type Metrics struct {
+ TxBytes float64 `json:"tx_bytes"`
+ TxPackets float64 `json:"tx_packets"`
+ TxErrorPackets float64 `json:"tx_error_packets"`
+ TxBcastPackets float64 `json:"tx_bcast_packets"`
+ TxUnicastPackets float64 `json:"tx_ucast_packets"`
+ TxMulticastPackets float64 `json:"tx_mcast_packets"`
+ RxBytes float64 `json:"rx_bytes"`
+ RxPackets float64 `json:"rx_packets"`
+ RxErrorPackets float64 `json:"rx_error_packets"`
+ RxBcastPackets float64 `json:"rx_bcast_packets"`
+ RxMulticastPackets float64 `json:"rx_mcast_packets"`
+}
+
+type Context struct {
+ InterfaceID string `json:"intf_id"`
+ PonID string `json:"pon_id"`
+ PortNumber string `json:"port_no"`
+}
+
+type Metadata struct {
+ LogicalDeviceID string `json:"logical_device_id"`
+ Title string `json:"title"`
+ SerialNumber string `json:"serial_no"`
+ Timestamp float64 `json:"ts"`
+ DeviceID string `json:"device_id"`
+ Context *Context `json:"context"`
+}
+
+type SliceData struct {
+ Metrics *Metrics `json:"metrics"`
+ Metadata *Metadata `json:"metadata"`
+}
+
+type KPI struct {
+ Type string `json:"type"`
+ Timestamp float64 `json:"ts"`
+ SliceDatas []*SliceData `json:"slice_data"`
+}