SEBA-599 Kafka topic exporter port_id

This is a rebase and squash of patchsets 16737 and 16566

Change-Id: I772e3da35d6d2fd87d5b1d2b307cd08ddf69153e
diff --git a/config/conf.yaml b/config/conf.yaml
index 633273a..398e4f2 100644
--- a/config/conf.yaml
+++ b/config/conf.yaml
@@ -4,10 +4,11 @@
   host: cord-kafka.default.svc.cluster.local:9092
   description: The kafka broker
   topics:
-    - voltha.kpis
+    - importer
     - onos.kpis
     - onos.aaa.stats.kpis
     - bng.stats
+    - voltha.kpis
 logger:
   loglevel: debug
   host: cord-kafka.default.svc.cluster.local:9092
diff --git a/topic-exporter.go b/topic-exporter.go
index 5e49e9d..4532c28 100644
--- a/topic-exporter.go
+++ b/topic-exporter.go
@@ -322,27 +322,35 @@
 		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial", "type"},
 	)
 
-
-	deviceLaserBiasCurrent = prometheus.NewGauge(
+	deviceLaserBiasCurrent = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Name: "device_laser_bias_current",
 			Help: "Device Laser Bias Current",
-		})
-	deviceTemperature = prometheus.NewGauge(
+		},
+		[]string{"port_id"},
+	)
+	deviceTemperature = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Name: "device_temperature",
 			Help: "Device Temperature",
-		})
-	deviceTxPower = prometheus.NewGauge(
+		},
+		[]string{"port_id"},
+	)
+	deviceTxPower = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Name: "device_tx_power",
 			Help: "Device Tx Power",
-		})
-	deviceVoltage = prometheus.NewGauge(
+		},
+		[]string{"port_id"},
+	)
+	deviceVoltage = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Name: "device_voltage",
 			Help: "Device Voltage",
-		})
+		},
+		[]string{"port_id"},
+	)
+
 	onosaaaRxEapolLogoff = prometheus.NewGauge(
 		prometheus.GaugeOpts{
 			Name: "onosaaa_rx_eapol_Logoff",
@@ -709,10 +717,18 @@
 }
 
 func exportImporterKPI(kpi ImporterKPI) {
-	deviceLaserBiasCurrent.Set(kpi.LaserBiasCurrent)
-	deviceTemperature.Set(kpi.Temperature)
-	deviceTxPower.Set(kpi.TxPower)
-	deviceVoltage.Set(kpi.Voltage)
+	deviceLaserBiasCurrent.WithLabelValues(
+		kpi.PortId,
+	).Set(kpi.LaserBiasCurrent)
+	deviceTemperature.WithLabelValues(
+		kpi.PortId,
+	).Set(kpi.Temperature)
+	deviceTxPower.WithLabelValues(
+		kpi.PortId,
+	).Set(kpi.TxPower)
+	deviceVoltage.WithLabelValues(
+		kpi.PortId,
+	).Set(kpi.Voltage)
 }
 
 func exportOnosAaaKPI(kpi OnosAaaKPI) {
@@ -785,7 +801,7 @@
 		"onuSerialNumber": kpi.OnuSerialNumber,
 	}).Trace("Received OnosBngKPI message")
 
-    if kpi.UpTxBytes != nil {
+	if kpi.UpTxBytes != nil {
 		onosBngUpTxBytes.WithLabelValues(
 			kpi.Mac,
 			kpi.Ip,
@@ -953,7 +969,7 @@
 			break
 		}
 		exportOnosKPI(kpi)
-	case "importer.kpis":
+	case "importer":
 		kpi := ImporterKPI{}
 		strData := string(data)
 		idx := strings.Index(strData, "{")
@@ -962,22 +978,22 @@
 		var m map[string]interface{}
 		err := json.Unmarshal([]byte(strData), &m)
 		if err != nil {
-			logger.Error("Invalid msg on importer.kpis: %s", err.Error())
+			logger.Error("Invalid msg on importer: %s", err.Error())
 			logger.Debug("Unprocessed Msg: %s", strData)
 			break
 		}
 		if val, ok := m["TransceiverStatistics"]; ok {
 			stats := val.(map[string]interface{})
-			//kpi.Timestamp = time.Now().UnixNano()
 			kpi.LaserBiasCurrent = stats["BiasCurrent"].(map[string]interface{})["Reading"].(float64)
 			kpi.Temperature = stats["Temperature"].(map[string]interface{})["Reading"].(float64)
 			kpi.TxPower = stats["TxPower"].(map[string]interface{})["Reading"].(float64)
 			kpi.Voltage = stats["Voltage"].(map[string]interface{})["Reading"].(float64)
 		} else {
-			logger.Error("Optical stats (TransceiverStatistics) information missing [topic=importer.kpis")
+			logger.Error("Optical stats (TransceiverStatistics) information missing [topic=importer")
 			logger.Debug("Unprocessed Msg: %s", strData)
 			break
 		}
+		kpi.PortId = m["Id"].(string)
 		exportImporterKPI(kpi)
 	case "onos.aaa.stats.kpis":
 		kpi := OnosAaaKPI{}
diff --git a/types.go b/types.go
index 657cf70..ab4360b 100644
--- a/types.go
+++ b/types.go
@@ -112,7 +112,7 @@
 }
 
 type ImporterKPI struct {
-	Timestamp        int64
+	PortId           string
 	LaserBiasCurrent float64
 	Temperature      float64
 	TxPower          float64
@@ -150,15 +150,14 @@
 	RxResIdEapFrames      float64 `json:"resIdEapFramesRx"`
 }
 
-
 type OnosBngKPI struct {
-	Mac             string  `json:"macAddress"`
-	Ip              string  `json:"ipAddress"`
-	PppoeSessionId  int     `json:"pppoeSessionId"`
-	AttachmentType  string  `json:"attachmentType"`
-	STag            int     `json:"sTag"`
-	CTag            int     `json:"cTag"`
-	OnuSerialNumber string  `json:"onuSerialNumber"`
+	Mac             string   `json:"macAddress"`
+	Ip              string   `json:"ipAddress"`
+	PppoeSessionId  int      `json:"pppoeSessionId"`
+	AttachmentType  string   `json:"attachmentType"`
+	STag            int      `json:"sTag"`
+	CTag            int      `json:"cTag"`
+	OnuSerialNumber string   `json:"onuSerialNumber"`
 	DeviceId        string   `json:"deviceId"`
 	PortNumber      string   `json:"portNumber"`
 	UpTxBytes       *float64 `json:"upTxBytes,omitempty"`
@@ -174,5 +173,5 @@
 	DownDropBytes   *float64 `json:"downDropBytes,omitempty"`
 	DownDropPackets *float64 `json:"downDropPackets,omitempty"`
 	ControlPackets  *float64 `json:"controlPackets,omitempty"`
-	Timestamp       string  `json:"timestamp"`
+	Timestamp       string   `json:"timestamp"`
 }