First working solution
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..26b1dc3
--- /dev/null
+++ b/main.go
@@ -0,0 +1,197 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"fmt"
+	"math"
+	"net/http"
+	"os"
+	"os/signal"
+	"reflect"
+	"strings"
+
+	"github.com/Shopify/sarama"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+	txBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "tx_bytes_total",
+			Help: "Number of total bytes transmitted, partitioned by device_id, port_type and port_id",
+		},
+		[]string{"device_id", "port_type", "port_id"},
+	)
+	rxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "rx_bytes_total",
+			Help: "Number of total bytes received, partitioned by device_id, port_type and port_id",
+		},
+		[]string{"device_id", "port_type", "port_id"},
+	)
+	txPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "tx_packets_total",
+			Help: "Number of total packets transmitted, partitioned by device_id, port_type and port_id",
+		},
+		[]string{"device_id", "port_type", "port_id"},
+	)
+	rxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "rx_packets_total",
+			Help: "Number of total packets received, partitioned by device_id, port_type and port_id",
+		},
+		[]string{"device_id", "port_type", "port_id"},
+	)
+
+	// config
+	broker  = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+	brokers = []string{*broker}
+	topic   = flag.String("topic", "voltha.kpis", "The Kafka topic")
+)
+
+var messageCountStart int
+
+func prefixToLabels(prefix string) (string, string, string) {
+	var p = strings.Split(prefix, ".")
+	var deviceId, portType, portId string = "", "", ""
+	if len(p) == 5 {
+		// format is voltha.openolt.000130af0b0b2c51.pon.0
+		deviceId = p[2]
+		portType = p[3]
+		portId = p[4]
+	}
+	if len(p) == 4 {
+		// fomrat is voltha.openolt.000130af0b0b2c51nni.129
+		s := p[2]
+		deviceId = string(s[0 : len(s)-3])
+		portType = string(s[len(s)-3:])
+		portId = p[3]
+	}
+
+	return deviceId, portType, portId
+
+}
+
+func interfaceToFloat(unk interface{}) float64 {
+	switch i := unk.(type) {
+	case float64:
+		return i
+	case float32:
+		return float64(i)
+	case int64:
+		return float64(i)
+	default:
+		return math.NaN()
+	}
+}
+
+func kafkaInit() {
+	config := sarama.NewConfig()
+	config.Consumer.Return.Errors = true
+
+	master, err := sarama.NewConsumer(brokers, config)
+	if err != nil {
+		panic(err)
+	}
+	defer func() {
+		if err := master.Close(); err != nil {
+			panic(err)
+		}
+	}()
+	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+	if err != nil {
+		panic(err)
+	}
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+	doneCh := make(chan struct{})
+	go func() {
+		for {
+			select {
+			case err := <-consumer.Errors():
+				fmt.Println(err)
+			case msg := <-consumer.Messages():
+				messageCountStart++
+
+				var label map[string]interface{}
+				json.Unmarshal(msg.Value, &label)
+
+				var data map[string]map[string]map[string]interface{}
+				json.Unmarshal(msg.Value, &data)
+
+				var tagString = reflect.ValueOf(label["prefixes"]).MapKeys()[0].String()
+
+				fmt.Println("tagString: "+tagString, "\n")
+				fmt.Println("data: ", data["prefixes"][tagString]["metrics"], "\n")
+
+				v, ok := data["prefixes"][tagString]["metrics"].(map[string]interface{})
+				if !ok {
+					// Can't assert, handle error.
+					fmt.Println("Eroror")
+				}
+				for k, s := range v {
+					fmt.Println("Type k: ", reflect.TypeOf(k))
+					fmt.Println("Type: ", reflect.TypeOf(s))
+					fmt.Printf("Value: %v\n", s)
+
+					d, pt, pi := prefixToLabels(tagString)
+
+					if k == "tx_bytes" {
+						txBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
+					}
+					if k == "rx_bytes" {
+						rxBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
+					}
+					if k == "tx_packets" {
+						txPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
+					}
+					if k == "rx_packets" {
+						rxPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
+					}
+				}
+
+				// fmt.Println("data: ", data["prefixes"][tagString]["metrics"].tx_bytes, "\n")
+				// var txBytesTotalValue = data["prefixes"][tagString]["metrics"]["tx_bytes"]
+
+				// d, pt, pi := prefixToLabels(tagString)
+
+				// txBytesTotal.WithLabelValues(d, pt, pi).Set(float64(txBytesTotalValue))
+
+				// fmt.Println("Adding txBytesTotal metric: ", d, pt, pi, txBytesTotalValue)
+
+			case <-signals:
+				fmt.Println("Interrupt is detected")
+				doneCh <- struct{}{}
+			}
+		}
+	}()
+	<-doneCh
+	fmt.Println("Processed", messageCountStart, "messages")
+}
+
+func runServer() {
+	fmt.Println("Starting Server")
+	http.Handle("/metrics", prometheus.Handler())
+	http.ListenAndServe(":8080", nil)
+}
+
+func init() {
+
+	// read config from cli flags
+	flag.Parse()
+	fmt.Println("Connecting to broker: ", *broker)
+	fmt.Println("Listening to topic: ", *topic)
+
+	// register metrics within Prometheus
+	prometheus.MustRegister(txBytesTotal)
+	prometheus.MustRegister(rxBytesTotal)
+	prometheus.MustRegister(txPacketsTotal)
+	prometheus.MustRegister(rxPacketsTotal)
+}
+
+func main() {
+	go kafkaInit()
+	runServer()
+}