[SEBA-128] Using new KPI format

Change-Id: I25aee363f35dd1380af990bdfd9eca65d30ffc54
diff --git a/Dockerfile b/Dockerfile
index cf15f38..ef68715 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,3 +1,20 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# docker build -t opencord/kafka-topic-exporter:latest .
+# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
+
 FROM golang:latest as builder
 RUN mkdir /app 
 ADD . /app/ 
@@ -9,5 +26,4 @@
 FROM alpine:latest  
 WORKDIR /app/
 COPY --from=builder /app/main .
-ENTRYPOINT ["./main"]
-# CMD ["-broker=voltha-kafka.default.svc.cluster.local:9092", "-topic=voltha.kpis"]
\ No newline at end of file
+ENTRYPOINT ["./main"]
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7b10c8f
--- /dev/null
+++ b/README.md
@@ -0,0 +1,27 @@
+# Kafka topic exported
+
+## Expected format
+
+```json
+{
+    "type": "slice",
+    "ts": 1536617075.762331,
+    "slice_data": [
+        {
+            "metrics": {
+                "deferreds": 119.0,
+                "rss-mb": 106.0
+            },
+            "metadata": {
+                "logical_device_id": "",
+                "title": "voltha.internal",
+                "serial_no": "",
+                "ts": 1536617075.762331,
+                "context": {
+                    "instance_id": "vcore-0"
+                },     "device_id": ""
+           
+            }
+    ]
+}
+```
\ No newline at end of file
diff --git a/kb8s-deployment.yaml b/kb8s-deployment.yaml
deleted file mode 100644
index ebadddd..0000000
--- a/kb8s-deployment.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-# kubectl create -f kb8s-deployment.yaml
-# kubectl delete pod prometheus-go-exporter
-
-apiVersion: v1
-kind: Pod
-metadata:
-  name: prometheus-go-exporter
-  labels:
-    app: prometheus-go-exporter
-spec:
-  containers:
-    - name: prometheus-go-exporter
-      image: prometheus-go-exporter:latest
-      imagePullPolicy: IfNotPresent
-      args: ["-broker=voltha-kafka.default.svc.cluster.local:9092", "-topic=voltha.kpis"]
-  restartPolicy: Never
-
----
-apiVersion: v1
-kind: Service
-metadata:
-  name: prometheus-go-exporter
-spec:
-  selector:
-    app: prometheus-go-exporter
-  type: NodePort
-  ports:
-    - port: 8080
-      nodePort: 30080
\ No newline at end of file
diff --git a/main.go b/main.go
index 512dba3..106421f 100644
--- a/main.go
+++ b/main.go
@@ -1,91 +1,38 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package main
 
 import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"math"
+	"log"
 	"net/http"
 	"os"
 	"os/signal"
-	"reflect"
-	"strings"
 
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
 var (
-	txBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_bytes_total",
-			Help: "Number of total bytes transmitted, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-	rxBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_bytes_total",
-			Help: "Number of total bytes received, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-	txPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_packets_total",
-			Help: "Number of total packets transmitted, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-	rxPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_packets_total",
-			Help: "Number of total packets received, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-
-	// config
-	broker  = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
-	topic   = flag.String("topic", "voltha.kpis", "The Kafka topic")
+	broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+	topic  = flag.String("topic", "voltha.kpis", "The Kafka topic")
 )
 
 var brokers []string
-var messageCountStart int
-
-func prefixToLabels(prefix string) (string, string, string) {
-	var p = strings.Split(prefix, ".")
-	var deviceId, portType, portId string = "", "", ""
-	if len(p) == 5 {
-		// format is voltha.openolt.000130af0b0b2c51.pon.0
-		deviceId = p[2]
-		portType = p[3]
-		portId = p[4]
-	}
-	if len(p) == 4 {
-		// fomrat is voltha.openolt.000130af0b0b2c51nni.129
-		s := p[2]
-		deviceId = string(s[0 : len(s)-3])
-		portType = string(s[len(s)-3:])
-		portId = p[3]
-	}
-
-	return deviceId, portType, portId
-
-}
-
-func interfaceToFloat(unk interface{}) float64 {
-	switch i := unk.(type) {
-	case float64:
-		return i
-	case float32:
-		return float64(i)
-	case int64:
-		return float64(i)
-	default:
-		return math.NaN()
-	}
-}
 
 func kafkaInit(brokers []string) {
 	config := sarama.NewConfig()
@@ -113,53 +60,17 @@
 			case err := <-consumer.Errors():
 				fmt.Println(err)
 			case msg := <-consumer.Messages():
-				messageCountStart++
+				// fmt.Println(string(msg.Value))
 
-				var label map[string]interface{}
-				json.Unmarshal(msg.Value, &label)
+				kpi := KPI{}
 
-				var data map[string]map[string]map[string]interface{}
-				json.Unmarshal(msg.Value, &data)
+				err := json.Unmarshal(msg.Value, &kpi)
 
-				var tagString = reflect.ValueOf(label["prefixes"]).MapKeys()[0].String()
-
-				fmt.Println("tagString: "+tagString, "\n")
-				fmt.Println("data: ", data["prefixes"][tagString]["metrics"], "\n")
-
-				v, ok := data["prefixes"][tagString]["metrics"].(map[string]interface{})
-				if !ok {
-					// Can't assert, handle error.
-					fmt.Println("Eroror")
-				}
-				for k, s := range v {
-					fmt.Println("Type k: ", reflect.TypeOf(k))
-					fmt.Println("Type: ", reflect.TypeOf(s))
-					fmt.Printf("Value: %v\n", s)
-
-					d, pt, pi := prefixToLabels(tagString)
-
-					if k == "tx_bytes" {
-						txBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
-					if k == "rx_bytes" {
-						rxBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
-					if k == "tx_packets" {
-						txPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
-					if k == "rx_packets" {
-						rxPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
+				if err != nil {
+					log.Fatal(err)
 				}
 
-				// fmt.Println("data: ", data["prefixes"][tagString]["metrics"].tx_bytes, "\n")
-				// var txBytesTotalValue = data["prefixes"][tagString]["metrics"]["tx_bytes"]
-
-				// d, pt, pi := prefixToLabels(tagString)
-
-				// txBytesTotal.WithLabelValues(d, pt, pi).Set(float64(txBytesTotalValue))
-
-				// fmt.Println("Adding txBytesTotal metric: ", d, pt, pi, txBytesTotalValue)
+				export(kpi)
 
 			case <-signals:
 				fmt.Println("Interrupt is detected")
@@ -168,7 +79,6 @@
 		}
 	}()
 	<-doneCh
-	fmt.Println("Processed", messageCountStart, "messages")
 }
 
 func runServer() {
diff --git a/prometheus.go b/prometheus.go
new file mode 100644
index 0000000..15ce599
--- /dev/null
+++ b/prometheus.go
@@ -0,0 +1,137 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+	txBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "tx_bytes_total",
+			Help: "Number of total bytes transmitted",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+	rxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "rx_bytes_total",
+			Help: "Number of total bytes received",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+	txPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "tx_packets_total",
+			Help: "Number of total packets transmitted",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+	rxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "rx_packets_total",
+			Help: "Number of total packets received",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+
+	txErrorPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "tx_error_packets_total",
+			Help: "Number of total transmitted packets error",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+
+	rxErrorPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "rx_error_packets_total",
+			Help: "Number of total received packets error",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+)
+
+func export(kpi KPI) {
+
+	for _, data := range kpi.SliceDatas {
+		txBytesTotal.WithLabelValues(
+			data.Metadata.LogicalDeviceID,
+			data.Metadata.SerialNumber,
+			data.Metadata.DeviceID,
+			data.Metadata.Context.InterfaceID,
+			data.Metadata.Context.PonID,
+			data.Metadata.Context.PortNumber,
+			data.Metadata.Title,
+		).Set(data.Metrics.TxBytes)
+
+		rxBytesTotal.WithLabelValues(
+			data.Metadata.LogicalDeviceID,
+			data.Metadata.SerialNumber,
+			data.Metadata.DeviceID,
+			data.Metadata.Context.InterfaceID,
+			data.Metadata.Context.PonID,
+			data.Metadata.Context.PortNumber,
+			data.Metadata.Title,
+		).Set(data.Metrics.RxBytes)
+
+		txPacketsTotal.WithLabelValues(
+			data.Metadata.LogicalDeviceID,
+			data.Metadata.SerialNumber,
+			data.Metadata.DeviceID,
+			data.Metadata.Context.InterfaceID,
+			data.Metadata.Context.PonID,
+			data.Metadata.Context.PortNumber,
+			data.Metadata.Title,
+		).Set(data.Metrics.TxPackets)
+
+		rxPacketsTotal.WithLabelValues(
+			data.Metadata.LogicalDeviceID,
+			data.Metadata.SerialNumber,
+			data.Metadata.DeviceID,
+			data.Metadata.Context.InterfaceID,
+			data.Metadata.Context.PonID,
+			data.Metadata.Context.PortNumber,
+			data.Metadata.Title,
+		).Set(data.Metrics.RxPackets)
+
+		txErrorPacketsTotal.WithLabelValues(
+			data.Metadata.LogicalDeviceID,
+			data.Metadata.SerialNumber,
+			data.Metadata.DeviceID,
+			data.Metadata.Context.InterfaceID,
+			data.Metadata.Context.PonID,
+			data.Metadata.Context.PortNumber,
+			data.Metadata.Title,
+		).Set(data.Metrics.TxErrorPackets)
+
+		rxErrorPacketsTotal.WithLabelValues(
+			data.Metadata.LogicalDeviceID,
+			data.Metadata.SerialNumber,
+			data.Metadata.DeviceID,
+			data.Metadata.Context.InterfaceID,
+			data.Metadata.Context.PonID,
+			data.Metadata.Context.PortNumber,
+			data.Metadata.Title,
+		).Set(data.Metrics.RxErrorPackets)
+
+		// TODO add metrics for:
+		// TxBcastPackets
+		// TxUnicastPackets
+		// TxMulticastPackets
+		// RxBcastPackets
+		// RxMulticastPackets
+
+	}
+}
diff --git a/types.go b/types.go
new file mode 100644
index 0000000..25ed429
--- /dev/null
+++ b/types.go
@@ -0,0 +1,57 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+// KPI Events format
+
+type Metrics struct {
+	TxBytes            float64 `json:"tx_bytes"`
+	TxPackets          float64 `json:"tx_packets"`
+	TxErrorPackets     float64 `json:"tx_error_packets"`
+	TxBcastPackets     float64 `json:"tx_bcast_packets"`
+	TxUnicastPackets   float64 `json:"tx_ucast_packets"`
+	TxMulticastPackets float64 `json:"tx_mcast_packets"`
+	RxBytes            float64 `json:"rx_bytes"`
+	RxPackets          float64 `json:"rx_packets"`
+	RxErrorPackets     float64 `json:"rx_error_packets"`
+	RxBcastPackets     float64 `json:"rx_bcast_packets"`
+	RxMulticastPackets float64 `json:"rx_mcast_packets"`
+}
+
+type Context struct {
+	InterfaceID string `json:"intf_id"`
+	PonID       string `json:"pon_id"`
+	PortNumber  string `json:"port_no"`
+}
+
+type Metadata struct {
+	LogicalDeviceID string   `json:"logical_device_id"`
+	Title           string   `json:"title"`
+	SerialNumber    string   `json:"serial_no"`
+	Timestamp       float64  `json:"ts"`
+	DeviceID        string   `json:"device_id"`
+	Context         *Context `json:"context"`
+}
+
+type SliceData struct {
+	Metrics  *Metrics  `json:"metrics"`
+	Metadata *Metadata `json:"metadata"`
+}
+
+type KPI struct {
+	Type       string       `json:"type"`
+	Timestamp  float64      `json:"ts"`
+	SliceDatas []*SliceData `json:"slice_data"`
+}