[SEBA-37] Kafka topic exporter changes.
Change-Id: Idaf14ef433bc5fad870dc5a498e10bc5678e3b26
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..723ef36
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 641f75e..d8da92d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -13,17 +13,21 @@
# limitations under the License.
# docker build -t opencord/kafka-topic-exporter:latest .
-# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
+# docker build -t 10.128.22.1:30500/opencord/kafka-topic-exporter:latest .
FROM golang:1.10-stretch as builder
RUN mkdir -p /go/src/gerrit.opencord.org/kafka-topic-exporter
-ADD . /go/src/gerrit.opencord.org/kafka-topic-exporter
-WORKDIR /go/src/gerrit.opencord.org/kafka-topic-exporter
RUN go get -u github.com/golang/dep/cmd/dep
+ADD Gopkg.lock /go/src/gerrit.opencord.org/kafka-topic-exporter
+ADD Gopkg.toml /go/src/gerrit.opencord.org/kafka-topic-exporter
+WORKDIR /go/src/gerrit.opencord.org/kafka-topic-exporter
RUN dep ensure --vendor-only
+ADD . /go/src/gerrit.opencord.org/kafka-topic-exporter
RUN CGO_ENABLED=0 GOOS=linux go build -o main .
FROM alpine:3.8
WORKDIR /go/src/gerrit.opencord.org/kafka-topic-exporter/
COPY --from=builder /go/src/gerrit.opencord.org/kafka-topic-exporter/main .
+# FIXME this should be mounted by the helm charts
+ADD config/conf.yaml /etc/config/conf.yaml
ENTRYPOINT ["./main"]
diff --git a/VERSION b/VERSION
index 6085e94..c9a5712 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.2.1
+1.2.2-dev
diff --git a/config/conf.yaml b/config/conf.yaml
index 745b169..0333fe6 100644
--- a/config/conf.yaml
+++ b/config/conf.yaml
@@ -1,12 +1,15 @@
---
broker:
name: broker-name
- host: voltha-kafka.default.svc.cluster.local:9092
+ host: cord-kafka.default.svc.cluster.local:9092
description: The kafka broker
- topics: [voltha.kpis, onos.kpis]
+ topics:
+ - voltha.kpis
+ - onos.kpis
+ - onos.aaa.stats.kpis
logger:
loglevel: debug
- host: voltha-kafka.default.svc.cluster.local:9092
+ host: cord-kafka.default.svc.cluster.local:9092
target:
type: prometheus-target
name: http-server
diff --git a/main.go b/main.go
index cda8c19..f57547f 100644
--- a/main.go
+++ b/main.go
@@ -15,17 +15,16 @@
package main
import (
- "strconv"
+ "gerrit.opencord.org/kafka-topic-exporter/common/logger"
+ "github.com/Shopify/sarama"
+ "github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"net/http"
+ "strconv"
"strings"
"sync"
-
- "github.com/Shopify/sarama"
- "github.com/prometheus/client_golang/prometheus"
- "gerrit.opencord.org/kafka-topic-exporter/common/logger"
)
func kafkaInit(broker BrokerInfo) {
@@ -34,6 +33,7 @@
var wg sync.WaitGroup
master, err := sarama.NewConsumer([]string{broker.Host}, config)
+
if err != nil {
logger.Panic("kafkaInit panic")
panic(err)
@@ -71,7 +71,6 @@
}
func init() {
-
// register metrics within Prometheus
prometheus.MustRegister(volthaTxBytesTotal)
prometheus.MustRegister(volthaRxBytesTotal)
@@ -86,6 +85,19 @@
prometheus.MustRegister(onosRxPacketsTotal)
prometheus.MustRegister(onosTxDropPacketsTotal)
prometheus.MustRegister(onosRxDropPacketsTotal)
+
+ prometheus.MustRegister(onosaaaRxAcceptResponses)
+ prometheus.MustRegister(onosaaaRxRejectResponses)
+ prometheus.MustRegister(onosaaaRxChallengeResponses)
+ prometheus.MustRegister(onosaaaTxAccessRequests)
+ prometheus.MustRegister(onosaaaRxInvalidValidators)
+ prometheus.MustRegister(onosaaaRxUnknownType)
+ prometheus.MustRegister(onosaaaPendingRequests)
+ prometheus.MustRegister(onosaaaRxDroppedResponses)
+ prometheus.MustRegister(onosaaaRxMalformedResponses)
+ prometheus.MustRegister(onosaaaRxUnknownserver)
+ prometheus.MustRegister(onosaaaRequestRttMillis)
+ prometheus.MustRegister(onosaaaRequestReTx)
}
func loadConfigFile() Config {
diff --git a/topic-exporter.go b/topic-exporter.go
index 3313f46..7e604b5 100644
--- a/topic-exporter.go
+++ b/topic-exporter.go
@@ -113,6 +113,68 @@
},
[]string{"device_id", "port_id"},
)
+
+ // onos.aaa kpis
+ onosaaaRxAcceptResponses = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_accept_responses",
+ Help: "Number of access accept packets received from the server",
+ })
+ onosaaaRxRejectResponses = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_reject_responses",
+ Help: "Number of access reject packets received from the server",
+ })
+ onosaaaRxChallengeResponses = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_challenge_response",
+ Help: "Number of access challenge packets received from the server",
+ })
+ onosaaaTxAccessRequests = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_tx_access_requests",
+ Help: "Number of access request packets sent to the server",
+ })
+ onosaaaRxInvalidValidators = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_invalid_validators",
+ Help: "Number of access response packets received from the server with an invalid validator",
+ })
+ onosaaaRxUnknownType = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_unknown_type",
+ Help: "Number of packets of an unknown RADIUS type received from the accounting server",
+ })
+ onosaaaPendingRequests = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_pending_responses",
+ Help: "Number of access request packets pending a response from the server",
+ })
+ onosaaaRxDroppedResponses = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_dropped_responses",
+ Help: "Number of dropped packets received from the accounting server",
+ })
+ onosaaaRxMalformedResponses = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_malformed_responses",
+ Help: "Number of malformed access response packets received from the server",
+ })
+ onosaaaRxUnknownserver = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_rx_from_unknown_server",
+ Help: "Number of packets received from an unknown server",
+ })
+ onosaaaRequestRttMillis = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_request_rttmillis",
+ Help: "Roundtrip packet time to the accounting server in Miliseconds",
+ })
+ onosaaaRequestReTx = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "onosaaa_request_re_tx",
+ Help: "Number of access request packets retransmitted to the server",
+ })
)
func exportVolthaKPI(kpi VolthaKPI) {
@@ -352,6 +414,33 @@
logger.Info("To be implemented")
}
+func exportOnosAaaKPI(kpi OnosAaaKPI) {
+
+ onosaaaRxAcceptResponses.Set(kpi.RxAcceptResponses)
+
+ onosaaaRxRejectResponses.Set(kpi.RxRejectResponses)
+
+ onosaaaRxChallengeResponses.Set(kpi.RxChallengeResponses)
+
+ onosaaaTxAccessRequests.Set(kpi.TxAccessRequests)
+
+ onosaaaRxInvalidValidators.Set(kpi.RxInvalidValidators)
+
+ onosaaaRxUnknownType.Set(kpi.RxUnknownType)
+
+ onosaaaPendingRequests.Set(kpi.PendingRequests)
+
+ onosaaaRxDroppedResponses.Set(kpi.RxDroppedResponses)
+
+ onosaaaRxMalformedResponses.Set(kpi.RxMalformedResponses)
+
+ onosaaaRxUnknownserver.Set(kpi.RxUnknownserver)
+
+ onosaaaRequestRttMillis.Set(kpi.RequestRttMillis)
+
+ onosaaaRequestReTx.Set(kpi.RequestReTx)
+}
+
func export(topic *string, data []byte) {
switch *topic {
case "voltha.kpis":
@@ -375,6 +464,13 @@
log.Fatal(err)
}
exportImporterKPI(kpi)
+ case "onos.aaa.stats.kpis":
+ kpi := OnosAaaKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportOnosAaaKPI(kpi)
default:
logger.Warn("Unexpected export. Should not come here")
}
diff --git a/types.go b/types.go
index 8cb41ff..684518e 100644
--- a/types.go
+++ b/types.go
@@ -108,4 +108,19 @@
type ImporterKPI struct {
DeviceID string `json: "deviceId"`
// TODO: add metrics data
-}
\ No newline at end of file
+}
+
+type OnosAaaKPI struct {
+ RxAcceptResponses float64 `json:"acceptResponsesRx"`
+ RxRejectResponses float64 `json:"rejectResponsesRx"`
+ RxChallengeResponses float64 `json:"challengeResponsesRx"`
+ TxAccessRequests float64 `json:"accessRequestsTx"`
+ RxInvalidValidators float64 `json:"invalidValidatorsRx"`
+ RxUnknownType float64 `json:"unknownTypeRx"`
+ PendingRequests float64 `json:"pendingRequests"`
+ RxDroppedResponses float64 `json:"droppedResponsesRx"`
+ RxMalformedResponses float64 `json:"malformedResponsesRx"`
+ RxUnknownserver float64 `json:"unknownServerRx"`
+ RequestRttMillis float64 `json:"requestRttMillis"`
+ RequestReTx float64 `json:"requestReTx"`
+}