Merge "SEBA-384 release kafka-topic-exporter"
diff --git a/main.go b/main.go
index 106421f..0eba8ba 100644
--- a/main.go
+++ b/main.go
@@ -15,21 +15,22 @@
 package main
 
 import (
-	"encoding/json"
 	"flag"
 	"fmt"
-	"log"
 	"net/http"
-	"os"
-	"os/signal"
+	"sync"
 
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
 var (
-	broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
-	topic  = flag.String("topic", "voltha.kpis", "The Kafka topic")
+	broker      = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+	volthaTopic = "voltha.kpis"
+	onosTopic   = "onos.kpis"
+
+	volthaTopicPointer = &volthaTopic
+	onosTopicPointer   = &onosTopic
 )
 
 var brokers []string
@@ -37,48 +38,25 @@
 func kafkaInit(brokers []string) {
 	config := sarama.NewConfig()
 	config.Consumer.Return.Errors = true
+	var wg sync.WaitGroup
+
+	wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
 
 	master, err := sarama.NewConsumer(brokers, config)
 	if err != nil {
+		fmt.Println("kafkaInit panic")
 		panic(err)
 	}
 	defer func() {
+		fmt.Println("kafkaInit close connection")
 		if err := master.Close(); err != nil {
 			panic(err)
 		}
 	}()
-	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
-	if err != nil {
-		panic(err)
-	}
-	signals := make(chan os.Signal, 1)
-	signal.Notify(signals, os.Interrupt)
-	doneCh := make(chan struct{})
-	go func() {
-		for {
-			select {
-			case err := <-consumer.Errors():
-				fmt.Println(err)
-			case msg := <-consumer.Messages():
-				// fmt.Println(string(msg.Value))
+	go VOLTHAListener(volthaTopicPointer, master, wg)
+	go ONOSListener(onosTopicPointer, master, wg)
 
-				kpi := KPI{}
-
-				err := json.Unmarshal(msg.Value, &kpi)
-
-				if err != nil {
-					log.Fatal(err)
-				}
-
-				export(kpi)
-
-			case <-signals:
-				fmt.Println("Interrupt is detected")
-				doneCh <- struct{}{}
-			}
-		}
-	}()
-	<-doneCh
+	wg.Wait()
 }
 
 func runServer() {
@@ -94,13 +72,23 @@
 	brokers = make([]string, 0)
 	brokers = []string{*broker}
 	fmt.Println("Connecting to broker: ", brokers)
-	fmt.Println("Listening to topic: ", *topic)
+	fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
+	fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
 
 	// register metrics within Prometheus
-	prometheus.MustRegister(txBytesTotal)
-	prometheus.MustRegister(rxBytesTotal)
-	prometheus.MustRegister(txPacketsTotal)
-	prometheus.MustRegister(rxPacketsTotal)
+	prometheus.MustRegister(volthaTxBytesTotal)
+	prometheus.MustRegister(volthaRxBytesTotal)
+	prometheus.MustRegister(volthaTxPacketsTotal)
+	prometheus.MustRegister(volthaRxPacketsTotal)
+	prometheus.MustRegister(volthaTxErrorPacketsTotal)
+	prometheus.MustRegister(volthaRxErrorPacketsTotal)
+
+	prometheus.MustRegister(onosTxBytesTotal)
+	prometheus.MustRegister(onosRxBytesTotal)
+	prometheus.MustRegister(onosTxPacketsTotal)
+	prometheus.MustRegister(onosRxPacketsTotal)
+	prometheus.MustRegister(onosTxDropPacketsTotal)
+	prometheus.MustRegister(onosRxDropPacketsTotal)
 }
 
 func main() {
diff --git a/onos-kpi.go b/onos-kpi.go
new file mode 100644
index 0000000..da7b120
--- /dev/null
+++ b/onos-kpi.go
@@ -0,0 +1,63 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"os"
+	"os/signal"
+	"sync"
+
+	"github.com/Shopify/sarama"
+)
+
+func ONOSListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
+	fmt.Println("Starting ONOSListener")
+	defer wg.Done()
+	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+	if err != nil {
+		fmt.Println("ONOSListener panic")
+		panic(err)
+	}
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+	doneCh := make(chan struct{})
+	go func() {
+		for {
+			select {
+			case err := <-consumer.Errors():
+				fmt.Println(err)
+			case msg := <-consumer.Messages():
+
+				kpi := OnosKPI{}
+
+				err := json.Unmarshal(msg.Value, &kpi)
+
+				if err != nil {
+					log.Fatal(err)
+				}
+
+				exportOnosKPI(kpi)
+
+			case <-signals:
+				fmt.Println("Interrupt is detected")
+				doneCh <- struct{}{}
+			}
+		}
+	}()
+	<-doneCh
+}
diff --git a/onos-prometheus.go b/onos-prometheus.go
new file mode 100644
index 0000000..b383b55
--- /dev/null
+++ b/onos-prometheus.go
@@ -0,0 +1,103 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+	onosTxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_tx_bytes_total",
+			Help: "Number of total bytes transmitted",
+		},
+		[]string{"device_id", "port_id"},
+	)
+	onosRxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_rx_bytes_total",
+			Help: "Number of total bytes received",
+		},
+		[]string{"device_id", "port_id"},
+	)
+	onosTxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_tx_packets_total",
+			Help: "Number of total packets transmitted",
+		},
+		[]string{"device_id", "port_id"},
+	)
+	onosRxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_rx_packets_total",
+			Help: "Number of total packets received",
+		},
+		[]string{"device_id", "port_id"},
+	)
+
+	onosTxDropPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_tx_drop_packets_total",
+			Help: "Number of total transmitted packets dropped",
+		},
+		[]string{"device_id", "port_id"},
+	)
+
+	onosRxDropPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_rx_drop_packets_total",
+			Help: "Number of total received packets dropped",
+		},
+		[]string{"device_id", "port_id"},
+	)
+)
+
+func exportOnosKPI(kpi OnosKPI) {
+
+	for _, data := range kpi.Ports {
+
+		onosTxBytesTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.TxBytes)
+
+		onosRxBytesTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.RxBytes)
+
+		onosTxPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.TxPackets)
+
+		onosRxPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.RxPackets)
+
+		onosTxDropPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.TxPacketsDrop)
+
+		onosRxDropPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.RxPacketsDrop)
+
+	}
+}
diff --git a/prometheus.go b/prometheus.go
deleted file mode 100644
index a1bc327..0000000
--- a/prometheus.go
+++ /dev/null
@@ -1,195 +0,0 @@
-// Copyright 2018 Open Networking Foundation
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
-       "github.com/prometheus/client_golang/prometheus"
-)
-
-var (
-	txBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_bytes_total",
-			Help: "Number of total bytes transmitted",
-		},
-		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
-	)
-	rxBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_bytes_total",
-			Help: "Number of total bytes received",
-		},
-		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
-	)
-	txPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_packets_total",
-			Help: "Number of total packets transmitted",
-		},
-		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
-	)
-	rxPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_packets_total",
-			Help: "Number of total packets received",
-		},
-		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
-	)
-
-	txErrorPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_error_packets_total",
-			Help: "Number of total transmitted packets error",
-		},
-		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
-	)
-
-	rxErrorPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_error_packets_total",
-			Help: "Number of total received packets error",
-		},
-		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
-	)
-)
-
-func export(kpi KPI) {
-
-	for _, data := range kpi.SliceDatas {
-	        switch title := data.Metadata.Title; title {
-                        case "Ethernet", "PON":
-                                txBytesTotal.WithLabelValues(
-                                        data.Metadata.LogicalDeviceID,
-                                        data.Metadata.SerialNumber,
-                                        data.Metadata.DeviceID,
-                                        data.Metadata.Context.InterfaceID,
-                                        data.Metadata.Context.PonID,
-                                        data.Metadata.Context.PortNumber,
-                                        data.Metadata.Title,
-                                ).Set(data.Metrics.TxBytes)
-
-                                rxBytesTotal.WithLabelValues(
-                                        data.Metadata.LogicalDeviceID,
-                                        data.Metadata.SerialNumber,
-                                        data.Metadata.DeviceID,
-                                        data.Metadata.Context.InterfaceID,
-                                        data.Metadata.Context.PonID,
-                                        data.Metadata.Context.PortNumber,
-                                        data.Metadata.Title,
-                                ).Set(data.Metrics.RxBytes)
-
-                                txPacketsTotal.WithLabelValues(
-                                        data.Metadata.LogicalDeviceID,
-                                        data.Metadata.SerialNumber,
-                                        data.Metadata.DeviceID,
-                                        data.Metadata.Context.InterfaceID,
-                                        data.Metadata.Context.PonID,
-                                        data.Metadata.Context.PortNumber,
-                                        data.Metadata.Title,
-                                ).Set(data.Metrics.TxPackets)
-
-                                rxPacketsTotal.WithLabelValues(
-                                        data.Metadata.LogicalDeviceID,
-                                        data.Metadata.SerialNumber,
-                                        data.Metadata.DeviceID,
-                                        data.Metadata.Context.InterfaceID,
-                                        data.Metadata.Context.PonID,
-                                        data.Metadata.Context.PortNumber,
-                                        data.Metadata.Title,
-                                ).Set(data.Metrics.RxPackets)
-
-                                txErrorPacketsTotal.WithLabelValues(
-                                        data.Metadata.LogicalDeviceID,
-                                        data.Metadata.SerialNumber,
-                                        data.Metadata.DeviceID,
-                                        data.Metadata.Context.InterfaceID,
-                                        data.Metadata.Context.PonID,
-                                        data.Metadata.Context.PortNumber,
-                                        data.Metadata.Title,
-                                ).Set(data.Metrics.TxErrorPackets)
-
-                                rxErrorPacketsTotal.WithLabelValues(
-                                        data.Metadata.LogicalDeviceID,
-                                        data.Metadata.SerialNumber,
-                                        data.Metadata.DeviceID,
-                                        data.Metadata.Context.InterfaceID,
-                                        data.Metadata.Context.PonID,
-                                        data.Metadata.Context.PortNumber,
-                                        data.Metadata.Title,
-                                ).Set(data.Metrics.RxErrorPackets)
-
-                                // TODO add metrics for:
-                                // TxBcastPackets
-                                // TxUnicastPackets
-                                // TxMulticastPackets
-                                // RxBcastPackets
-                                // RxMulticastPackets
-
-                        case "Ethernet_Bridge_Port_History":
-                                if data.Metadata.Context.Upstream == "True" {
-                                        // ONU. Extended Ethernet statistics.
-                                        txPacketsTotal.WithLabelValues(
-                                                data.Metadata.LogicalDeviceID,
-                                                data.Metadata.SerialNumber,
-                                                data.Metadata.DeviceID,
-                                                "NA", // InterfaceID
-                                                "NA", // PonID
-                                                "NA", // PortNumber
-                                                data.Metadata.Title,
-                                        ).Add(data.Metrics.Packets)
-
-                                        txBytesTotal.WithLabelValues(
-                                                data.Metadata.LogicalDeviceID,
-                                                data.Metadata.SerialNumber,
-                                                data.Metadata.DeviceID,
-                                                "NA", // InterfaceID
-                                                "NA", // PonID
-                                                "NA", // PortNumber
-                                                data.Metadata.Title,
-                                        ).Add(data.Metrics.Octets)
-                                } else {
-                                         // ONU. Extended Ethernet statistics.
-                                        rxPacketsTotal.WithLabelValues(
-                                                data.Metadata.LogicalDeviceID,
-                                                data.Metadata.SerialNumber,
-                                                data.Metadata.DeviceID,
-                                                "NA", // InterfaceID
-                                                "NA", // PonID
-                                                "NA", // PortNumber
-                                                data.Metadata.Title,
-                                        ).Add(data.Metrics.Packets)
-
-                                        rxBytesTotal.WithLabelValues(
-                                                data.Metadata.LogicalDeviceID,
-                                                data.Metadata.SerialNumber,
-                                                data.Metadata.DeviceID,
-                                                "NA", // InterfaceID
-                                                "NA", // PonID
-                                                "NA", // PortNumber
-                                                data.Metadata.Title,
-                                        ).Add(data.Metrics.Octets)
-                                }
-
-                        case "Ethernet_UNI_History":
-                                // ONU. Do nothing.
-
-                        case "FEC_History":
-                                // ONU. Do Nothing.
-
-                        case "voltha.internal":
-                                // Voltha Internal. Do nothing.
-                }
-	}
-}
diff --git a/types.go b/types.go
index 1e29fbe..b3ecd59 100644
--- a/types.go
+++ b/types.go
@@ -59,8 +59,23 @@
 	Metadata *Metadata `json:"metadata"`
 }
 
-type KPI struct {
+type VolthaKPI struct {
 	Type       string       `json:"type"`
 	Timestamp  float64      `json:"ts"`
 	SliceDatas []*SliceData `json:"slice_data"`
 }
+
+type OnosPort struct {
+	PortID        string  `json:"portId"`
+	RxPackets     float64 `json:"pktRx"`
+	TxPackets     float64 `json:"pktTx"`
+	RxBytes       float64 `json:"bytesRx"`
+	TxBytes       float64 `json:"bytesTx"`
+	RxPacketsDrop float64 `json:"pktRxDrp"`
+	TxPacketsDrop float64 `json:"pktTxDrp"`
+}
+
+type OnosKPI struct {
+	DeviceID string      `json:"deviceId"`
+	Ports    []*OnosPort `json:"ports"`
+}
diff --git a/voltha-kpi.go b/voltha-kpi.go
new file mode 100644
index 0000000..122f3f0
--- /dev/null
+++ b/voltha-kpi.go
@@ -0,0 +1,64 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"os"
+	"os/signal"
+	"sync"
+
+	"github.com/Shopify/sarama"
+)
+
+func VOLTHAListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
+	fmt.Println("Starting VOLTHAListener")
+	defer wg.Done()
+	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+	if err != nil {
+		fmt.Println("VOLTHAListener panic")
+		panic(err)
+	}
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+	doneCh := make(chan struct{})
+	go func() {
+		for {
+			select {
+			case err := <-consumer.Errors():
+				fmt.Println(err)
+			case msg := <-consumer.Messages():
+				// fmt.Println(string(msg.Value))
+
+				kpi := VolthaKPI{}
+
+				err := json.Unmarshal(msg.Value, &kpi)
+
+				if err != nil {
+					log.Fatal(err)
+				}
+
+				exportVolthaKPI(kpi)
+
+			case <-signals:
+				fmt.Println("Interrupt is detected")
+				doneCh <- struct{}{}
+			}
+		}
+	}()
+	<-doneCh
+}
diff --git a/voltha-prometheus.go b/voltha-prometheus.go
new file mode 100644
index 0000000..526010b
--- /dev/null
+++ b/voltha-prometheus.go
@@ -0,0 +1,262 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+	volthaTxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "voltha_tx_bytes_total",
+			Help: "Number of total bytes transmitted",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+	volthaRxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "voltha_rx_bytes_total",
+			Help: "Number of total bytes received",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+	volthaTxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "voltha_tx_packets_total",
+			Help: "Number of total packets transmitted",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+	volthaRxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "voltha_rx_packets_total",
+			Help: "Number of total packets received",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+
+	volthaTxErrorPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "voltha_tx_error_packets_total",
+			Help: "Number of total transmitted packets error",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+
+	volthaRxErrorPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "voltha_rx_error_packets_total",
+			Help: "Number of total received packets error",
+		},
+		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+	)
+)
+
+func exportVolthaKPI(kpi VolthaKPI) {
+
+	for _, data := range kpi.SliceDatas {
+		switch title := data.Metadata.Title; title {
+		case "Ethernet", "PON":
+			volthaTxBytesTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.TxBytes)
+
+			volthaRxBytesTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.RxBytes)
+
+			volthaTxPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.TxPackets)
+
+			volthaRxPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.RxPackets)
+
+			volthaTxErrorPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.TxErrorPackets)
+
+			volthaRxErrorPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.RxErrorPackets)
+
+			// TODO add metrics for:
+			// TxBcastPackets
+			// TxUnicastPackets
+			// TxMulticastPackets
+			// RxBcastPackets
+			// RxMulticastPackets
+
+		case "Ethernet_Bridge_Port_History":
+			if data.Metadata.Context.Upstream == "True" {
+				// ONU. Extended Ethernet statistics.
+				volthaTxPacketsTotal.WithLabelValues(
+					data.Metadata.LogicalDeviceID,
+					data.Metadata.SerialNumber,
+					data.Metadata.DeviceID,
+					"NA", // InterfaceID
+					"NA", // PonID
+					"NA", // PortNumber
+					data.Metadata.Title,
+				).Add(data.Metrics.Packets)
+
+				volthaTxBytesTotal.WithLabelValues(
+					data.Metadata.LogicalDeviceID,
+					data.Metadata.SerialNumber,
+					data.Metadata.DeviceID,
+					"NA", // InterfaceID
+					"NA", // PonID
+					"NA", // PortNumber
+					data.Metadata.Title,
+				).Add(data.Metrics.Octets)
+			} else {
+				// ONU. Extended Ethernet statistics.
+				volthaRxPacketsTotal.WithLabelValues(
+					data.Metadata.LogicalDeviceID,
+					data.Metadata.SerialNumber,
+					data.Metadata.DeviceID,
+					"NA", // InterfaceID
+					"NA", // PonID
+					"NA", // PortNumber
+					data.Metadata.Title,
+				).Add(data.Metrics.Packets)
+
+				volthaRxBytesTotal.WithLabelValues(
+					data.Metadata.LogicalDeviceID,
+					data.Metadata.SerialNumber,
+					data.Metadata.DeviceID,
+					"NA", // InterfaceID
+					"NA", // PonID
+					"NA", // PortNumber
+					data.Metadata.Title,
+				).Add(data.Metrics.Octets)
+			}
+
+		case "Ethernet_UNI_History":
+			// ONU. Do nothing.
+
+		case "FEC_History":
+			// ONU. Do Nothing.
+
+			volthaTxBytesTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.TxBytes)
+
+			volthaRxBytesTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.RxBytes)
+
+			volthaTxPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.TxPackets)
+
+			volthaRxPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.RxPackets)
+
+			volthaTxErrorPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.TxErrorPackets)
+
+			volthaRxErrorPacketsTotal.WithLabelValues(
+				data.Metadata.LogicalDeviceID,
+				data.Metadata.SerialNumber,
+				data.Metadata.DeviceID,
+				data.Metadata.Context.InterfaceID,
+				data.Metadata.Context.PonID,
+				data.Metadata.Context.PortNumber,
+				data.Metadata.Title,
+			).Set(data.Metrics.RxErrorPackets)
+
+			// TODO add metrics for:
+			// TxBcastPackets
+			// TxUnicastPackets
+			// TxMulticastPackets
+			// RxBcastPackets
+			// RxMulticastPackets
+
+		case "voltha.internal":
+			// Voltha Internal. Do nothing.
+		}
+	}
+}