Exporting PPPoE stats from ONOS

Change-Id: I031b9e747e77ecf4220238c0f27d7618851da7e0
diff --git a/config/conf.yaml b/config/conf.yaml
index 0333fe6..57b13ed 100644
--- a/config/conf.yaml
+++ b/config/conf.yaml
@@ -7,6 +7,7 @@
     - voltha.kpis
     - onos.kpis
     - onos.aaa.stats.kpis
+    - pppoe.stats
 logger:
   loglevel: debug
   host: cord-kafka.default.svc.cluster.local:9092
diff --git a/main.go b/main.go
index 81b1635..46f6df0 100644
--- a/main.go
+++ b/main.go
@@ -108,6 +108,16 @@
 	prometheus.MustRegister(onosaaaRxUnknownserver)
 	prometheus.MustRegister(onosaaaRequestRttMillis)
 	prometheus.MustRegister(onosaaaRequestReTx)
+
+	prometheus.MustRegister(onosPppoeUpTermBytes)
+	prometheus.MustRegister(onosPppoeUpTermPackets)
+	prometheus.MustRegister(onosPppoeUpDropBytes)
+	prometheus.MustRegister(onosPppoeUpDropPackets)
+	prometheus.MustRegister(onosPppoeUpControlPackets)
+	prometheus.MustRegister(onosPppoeDownRxBytes)
+	prometheus.MustRegister(onosPppoeDownRxPackets)
+	prometheus.MustRegister(onosPppoeDownTxBytes)
+	prometheus.MustRegister(onosPppoeDownTxPackets)
 }
 
 func loadConfigFile() Config {
diff --git a/topic-exporter.go b/topic-exporter.go
index c306975..5d33458 100644
--- a/topic-exporter.go
+++ b/topic-exporter.go
@@ -18,6 +18,8 @@
 	"encoding/json"
 	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 	"github.com/prometheus/client_golang/prometheus"
+	log "github.com/sirupsen/logrus"
+	"strconv"
 )
 
 var (
@@ -215,6 +217,72 @@
 			Name: "onosaaa_request_re_tx",
 			Help: "Number of access request packets retransmitted to the server",
 		})
+
+	// ONOS PPPoE kpis
+
+	onosPppoeUpTermBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupTermBytes",
+			Help: "onosPppoeupTermBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpTermPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupTermPackets",
+			Help: "onosPppoeupTermPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpDropBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupDropBytes",
+			Help: "onosPppoeupDropBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpDropPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupDropPackets",
+			Help: "onosPppoeupDropPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpControlPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupControlPackets",
+			Help: "onosPppoeupControlPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownRxBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownRxBytes",
+			Help: "onosPppoedownRxBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownRxPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownRxPackets",
+			Help: "onosPppoedownRxPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownTxBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownTxBytes",
+			Help: "onosPppoedownTxBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownTxPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownTxPackets",
+			Help: "onosPppoedownTxPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
 )
 
 func exportVolthaKPI(kpi VolthaKPI) {
@@ -385,7 +453,6 @@
 				data.Metadata.Title,
 			).Set(data.Metrics.ReceivedOpticalPower)
 
-
 		case "Ethernet_UNI_History":
 			// ONU. Do nothing.
 
@@ -533,6 +600,93 @@
 	onosaaaRequestReTx.Set(kpi.RequestReTx)
 }
 
+func exportOnosPppoeKPI(kpi OnosPppoeKPI) {
+	for _, s := range kpi.Subscribers {
+
+		logger.WithFields(log.Fields{
+			"Mac":        s.Mac,
+			"Ip":         s.Ip,
+			"SessionId":  strconv.Itoa(s.SessionId),
+			"STag":       strconv.Itoa(s.STag),
+			"CTag":       strconv.Itoa(s.CTag),
+			"onuSerialNumber": s.SerialNumber,
+		}).Trace("Received OnosPppoeKPI message")
+
+		onosPppoeUpTermBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpTermBytes)
+		onosPppoeUpTermPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpTermPackets)
+		onosPppoeUpDropBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpDropBytes)
+		onosPppoeUpDropPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpDropPackets)
+		onosPppoeUpControlPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpControlPackets)
+		onosPppoeDownRxBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownRxBytes)
+		onosPppoeDownRxPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownRxPackets)
+		onosPppoeDownTxBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownTxBytes)
+		onosPppoeDownTxPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownTxPackets)
+	}
+}
+
 func export(topic *string, data []byte) {
 	switch *topic {
 	case "voltha.kpis":
@@ -567,6 +721,14 @@
 			break
 		}
 		exportOnosAaaKPI(kpi)
+	case "pppoe.stats":
+		kpi := OnosPppoeKPI{}
+		err := json.Unmarshal(data, &kpi)
+		if err != nil {
+			logger.Error("Invalid msg on pppoe.stats: %s, Unprocessed Msg: %s", err.Error(), string(data))
+			break
+		}
+		exportOnosPppoeKPI(kpi)
 	default:
 		logger.Warn("Unexpected export. Topic [%s] not supported. Should not come here", *topic)
 	}
diff --git a/types.go b/types.go
index 753b357..6185064 100644
--- a/types.go
+++ b/types.go
@@ -16,28 +16,28 @@
 
 // configuration
 type BrokerInfo struct {
-	Name			string `yaml: name`
-	Host			string `yaml: host`
-	Description		string `yaml: description`
-	Topics		  []string `yaml: topics`
+	Name        string   `yaml: name`
+	Host        string   `yaml: host`
+	Description string   `yaml: description`
+	Topics      []string `yaml: topics`
 }
 
 type LoggerInfo struct {
-	LogLevel		string `yaml: loglevel`
-	Host			string `yaml: host`
+	LogLevel string `yaml: loglevel`
+	Host     string `yaml: host`
 }
 
 type TargetInfo struct {
-	Type			string `yaml: type`
-	Name			string `yaml: name`
-	Port			int    `yaml: port`
-	Description		string `yaml: description`
+	Type        string `yaml: type`
+	Name        string `yaml: name`
+	Port        int    `yaml: port`
+	Description string `yaml: description`
 }
 
 type Config struct {
-	Broker		BrokerInfo `yaml: broker`
-	Logger		LoggerInfo `yaml: logger`
-	Target		TargetInfo `yaml: "target"`
+	Broker BrokerInfo `yaml: broker`
+	Logger LoggerInfo `yaml: logger`
+	Target TargetInfo `yaml: "target"`
 }
 
 // KPI Events format
@@ -54,15 +54,15 @@
 	RxBcastPackets     float64 `json:"rx_bcast_packets"`
 	RxMulticastPackets float64 `json:"rx_mcast_packets"`
 
-	LaserBiasCurrent   float64 `json:"laser_bias_current"`
-	Temperature        float64 `json:"temperature"`
-	PowerFeedVoltage   float64 `json:"power_feed_voltage"`
+	LaserBiasCurrent       float64 `json:"laser_bias_current"`
+	Temperature            float64 `json:"temperature"`
+	PowerFeedVoltage       float64 `json:"power_feed_voltage"`
 	MeanOpticalLaunchPower float64 `json:"mean_optical_launch_power"`
 	ReceivedOpticalPower   float64 `json:"received_optical_power"`
 
 	// ONU Ethernet_Bridge_Port_history
-	Packets            float64 `json:"packets"`
-	Octets             float64 `json:"octets"`
+	Packets float64 `json:"packets"`
+	Octets  float64 `json:"octets"`
 }
 
 type Context struct {
@@ -71,9 +71,9 @@
 	PortNumber  string `json:"port_no"`
 
 	// ONU Performance Metrics
-	ParentClassId string `json:"parent_class_id"`
+	ParentClassId  string `json:"parent_class_id"`
 	ParentEntityId string `json:"parent_entity_id"`
-	Upstream    string `json:"upstream"`
+	Upstream       string `json:"upstream"`
 }
 
 type Metadata struct {
@@ -112,7 +112,7 @@
 }
 
 type ImporterKPI struct {
-	DeviceID string 	`json: "deviceId"`
+	DeviceID string `json: "deviceId"`
 	// TODO: add metrics data
 }
 
@@ -130,3 +130,26 @@
 	RequestRttMillis     float64 `json:"requestRttMillis"`
 	RequestReTx          float64 `json:"requestReTx"`
 }
+
+type OnosPppoeSubscriberKPI struct {
+	Mac              string `json:"mac"`
+	Ip               string `json:"ip"`
+	SessionId        int `json:"sessionId"`
+	STag             int `json:"sTag"`
+	CTag             int `json:"cTag"`
+	SerialNumber 	 string `json:"serialNumber"`
+	UpTermBytes      float64 `json:"upTermBytes"`
+	UpTermPackets    float64 `json:"upTermPackets"`
+	UpDropBytes      float64 `json:"upDropBytes"`
+	UpDropPackets    float64 `json:"upDropPackets"`
+	DownRxBytes      float64 `json:"downRxBytes"`
+	DownRxPackets    float64 `json:"downRxPackets"`
+	DownTxBytes      float64 `json:"downTxBytes"`
+	DownTxPackets    float64 `json:"downTxPackets"`
+	UpControlPackets float64 `json:"upControlPackets"`
+}
+
+type OnosPppoeKPI struct {
+	Subscribers []OnosPppoeSubscriberKPI `json:"subscribers"`
+	Timestamp   string                   `json:"timestamp"`
+}