[SEBA-37] Kafka topic exporter changes.

Change-Id: Idaf14ef433bc5fad870dc5a498e10bc5678e3b26
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..723ef36
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 641f75e..d8da92d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -13,17 +13,21 @@
 # limitations under the License.
 
 # docker build -t opencord/kafka-topic-exporter:latest .
-# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
+# docker build -t 10.128.22.1:30500/opencord/kafka-topic-exporter:latest .
 
 FROM golang:1.10-stretch as builder
 RUN mkdir -p /go/src/gerrit.opencord.org/kafka-topic-exporter
-ADD . /go/src/gerrit.opencord.org/kafka-topic-exporter
-WORKDIR /go/src/gerrit.opencord.org/kafka-topic-exporter
 RUN go get -u github.com/golang/dep/cmd/dep
+ADD Gopkg.lock /go/src/gerrit.opencord.org/kafka-topic-exporter
+ADD Gopkg.toml /go/src/gerrit.opencord.org/kafka-topic-exporter
+WORKDIR /go/src/gerrit.opencord.org/kafka-topic-exporter
 RUN dep ensure --vendor-only
+ADD . /go/src/gerrit.opencord.org/kafka-topic-exporter
 RUN CGO_ENABLED=0 GOOS=linux go build -o main .
 
 FROM alpine:3.8
 WORKDIR /go/src/gerrit.opencord.org/kafka-topic-exporter/
 COPY --from=builder /go/src/gerrit.opencord.org/kafka-topic-exporter/main .
+# FIXME this should be mounted by the helm charts
+ADD config/conf.yaml /etc/config/conf.yaml
 ENTRYPOINT ["./main"]
diff --git a/VERSION b/VERSION
index 6085e94..c9a5712 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.2.1
+1.2.2-dev
diff --git a/config/conf.yaml b/config/conf.yaml
index 745b169..0333fe6 100644
--- a/config/conf.yaml
+++ b/config/conf.yaml
@@ -1,12 +1,15 @@
 ---
 broker:
   name: broker-name
-  host: voltha-kafka.default.svc.cluster.local:9092
+  host: cord-kafka.default.svc.cluster.local:9092
   description: The kafka broker
-  topics: [voltha.kpis, onos.kpis]
+  topics:
+    - voltha.kpis
+    - onos.kpis
+    - onos.aaa.stats.kpis
 logger:
   loglevel: debug
-  host: voltha-kafka.default.svc.cluster.local:9092
+  host: cord-kafka.default.svc.cluster.local:9092
 target:
   type: prometheus-target
   name: http-server
diff --git a/main.go b/main.go
index cda8c19..f57547f 100644
--- a/main.go
+++ b/main.go
@@ -15,17 +15,16 @@
 package main
 
 import (
-	"strconv"
+	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
+	"github.com/Shopify/sarama"
+	"github.com/prometheus/client_golang/prometheus"
 	"gopkg.in/yaml.v2"
 	"io/ioutil"
 	"log"
 	"net/http"
+	"strconv"
 	"strings"
 	"sync"
-
-	"github.com/Shopify/sarama"
-	"github.com/prometheus/client_golang/prometheus"
-	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 )
 
 func kafkaInit(broker BrokerInfo) {
@@ -34,6 +33,7 @@
 	var wg sync.WaitGroup
 
 	master, err := sarama.NewConsumer([]string{broker.Host}, config)
+
 	if err != nil {
 		logger.Panic("kafkaInit panic")
 		panic(err)
@@ -71,7 +71,6 @@
 }
 
 func init() {
-
 	// register metrics within Prometheus
 	prometheus.MustRegister(volthaTxBytesTotal)
 	prometheus.MustRegister(volthaRxBytesTotal)
@@ -86,6 +85,19 @@
 	prometheus.MustRegister(onosRxPacketsTotal)
 	prometheus.MustRegister(onosTxDropPacketsTotal)
 	prometheus.MustRegister(onosRxDropPacketsTotal)
+
+	prometheus.MustRegister(onosaaaRxAcceptResponses)
+	prometheus.MustRegister(onosaaaRxRejectResponses)
+	prometheus.MustRegister(onosaaaRxChallengeResponses)
+	prometheus.MustRegister(onosaaaTxAccessRequests)
+	prometheus.MustRegister(onosaaaRxInvalidValidators)
+	prometheus.MustRegister(onosaaaRxUnknownType)
+	prometheus.MustRegister(onosaaaPendingRequests)
+	prometheus.MustRegister(onosaaaRxDroppedResponses)
+	prometheus.MustRegister(onosaaaRxMalformedResponses)
+	prometheus.MustRegister(onosaaaRxUnknownserver)
+	prometheus.MustRegister(onosaaaRequestRttMillis)
+	prometheus.MustRegister(onosaaaRequestReTx)
 }
 
 func loadConfigFile() Config {
diff --git a/topic-exporter.go b/topic-exporter.go
index 3313f46..7e604b5 100644
--- a/topic-exporter.go
+++ b/topic-exporter.go
@@ -113,6 +113,68 @@
 		},
 		[]string{"device_id", "port_id"},
 	)
+
+	// onos.aaa kpis
+	onosaaaRxAcceptResponses = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_accept_responses",
+			Help: "Number of access accept packets received from the server",
+		})
+	onosaaaRxRejectResponses = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_reject_responses",
+			Help: "Number of access reject packets received from the server",
+		})
+	onosaaaRxChallengeResponses = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_challenge_response",
+			Help: "Number of access challenge packets received from the server",
+		})
+	onosaaaTxAccessRequests = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_tx_access_requests",
+			Help: "Number of access request packets sent to the server",
+		})
+	onosaaaRxInvalidValidators = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_invalid_validators",
+			Help: "Number of access response packets received from the server with an invalid validator",
+		})
+	onosaaaRxUnknownType = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_unknown_type",
+			Help: "Number of packets of an unknown RADIUS type received from the accounting server",
+		})
+	onosaaaPendingRequests = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_pending_responses",
+			Help: "Number of access request packets pending a response from the server",
+		})
+	onosaaaRxDroppedResponses = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_dropped_responses",
+			Help: "Number of dropped packets received from the accounting server",
+		})
+	onosaaaRxMalformedResponses = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_malformed_responses",
+			Help: "Number of malformed access response packets received from the server",
+		})
+	onosaaaRxUnknownserver = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_rx_from_unknown_server",
+			Help: "Number of packets received from an unknown server",
+		})
+	onosaaaRequestRttMillis = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_request_rttmillis",
+			Help: "Roundtrip packet time to the accounting server in Miliseconds",
+		})
+	onosaaaRequestReTx = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "onosaaa_request_re_tx",
+			Help: "Number of access request packets retransmitted to the server",
+		})
 )
 
 func exportVolthaKPI(kpi VolthaKPI) {
@@ -352,6 +414,33 @@
 	logger.Info("To be implemented")
 }
 
+func exportOnosAaaKPI(kpi OnosAaaKPI) {
+
+	onosaaaRxAcceptResponses.Set(kpi.RxAcceptResponses)
+
+	onosaaaRxRejectResponses.Set(kpi.RxRejectResponses)
+
+	onosaaaRxChallengeResponses.Set(kpi.RxChallengeResponses)
+
+	onosaaaTxAccessRequests.Set(kpi.TxAccessRequests)
+
+	onosaaaRxInvalidValidators.Set(kpi.RxInvalidValidators)
+
+	onosaaaRxUnknownType.Set(kpi.RxUnknownType)
+
+	onosaaaPendingRequests.Set(kpi.PendingRequests)
+
+	onosaaaRxDroppedResponses.Set(kpi.RxDroppedResponses)
+
+	onosaaaRxMalformedResponses.Set(kpi.RxMalformedResponses)
+
+	onosaaaRxUnknownserver.Set(kpi.RxUnknownserver)
+
+	onosaaaRequestRttMillis.Set(kpi.RequestRttMillis)
+
+	onosaaaRequestReTx.Set(kpi.RequestReTx)
+}
+
 func export(topic *string, data []byte) {
 	switch *topic {
 	case "voltha.kpis":
@@ -375,6 +464,13 @@
 			log.Fatal(err)
 		}
 		exportImporterKPI(kpi)
+	case "onos.aaa.stats.kpis":
+		kpi := OnosAaaKPI{}
+		err := json.Unmarshal(data, &kpi)
+		if err != nil {
+			log.Fatal(err)
+		}
+		exportOnosAaaKPI(kpi)
 	default:
 		logger.Warn("Unexpected export. Should not come here")
 	}
diff --git a/types.go b/types.go
index 8cb41ff..684518e 100644
--- a/types.go
+++ b/types.go
@@ -108,4 +108,19 @@
 type ImporterKPI struct {
 	DeviceID string 	`json: "deviceId"`
 	// TODO: add metrics data
-}
\ No newline at end of file
+}
+
+type OnosAaaKPI struct {
+	RxAcceptResponses    float64 `json:"acceptResponsesRx"`
+	RxRejectResponses    float64 `json:"rejectResponsesRx"`
+	RxChallengeResponses float64 `json:"challengeResponsesRx"`
+	TxAccessRequests     float64 `json:"accessRequestsTx"`
+	RxInvalidValidators  float64 `json:"invalidValidatorsRx"`
+	RxUnknownType        float64 `json:"unknownTypeRx"`
+	PendingRequests      float64 `json:"pendingRequests"`
+	RxDroppedResponses   float64 `json:"droppedResponsesRx"`
+	RxMalformedResponses float64 `json:"malformedResponsesRx"`
+	RxUnknownserver      float64 `json:"unknownServerRx"`
+	RequestRttMillis     float64 `json:"requestRttMillis"`
+	RequestReTx          float64 `json:"requestReTx"`
+}