seba-631-632: exporter enhancements for logger and topics

Change-Id: If10e56a7ccfce758712ea02df9656d4f413dbf84
diff --git a/Dockerfile b/Dockerfile
index 29e46da..a9ff877 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,9 +21,12 @@
 WORKDIR /app
 RUN go get github.com/prometheus/client_golang/prometheus
 RUN go get github.com/Shopify/sarama
+RUN go get github.com/gfremex/logrus-kafka-hook
+RUN go get github.com/sirupsen/logrus
+RUN go get gerrit.opencord.org/kafka-topic-exporter/common/logger
 RUN CGO_ENABLED=0 GOOS=linux go build -o main .
 
 FROM alpine:3.8
 WORKDIR /app/
 COPY --from=builder /app/main .
-ENTRYPOINT ["./main"]
\ No newline at end of file
+ENTRYPOINT ["./main"]
diff --git a/README.md b/README.md
index 7b10c8f..e3cfe47 100644
--- a/README.md
+++ b/README.md
@@ -22,6 +22,7 @@
                 },     "device_id": ""
            
             }
+        }
     ]
 }
-```
\ No newline at end of file
+```
diff --git a/VERSION b/VERSION
index 45a1b3f..26aaba0 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.1.2
+1.2.0
diff --git a/common/logger/logger.go b/common/logger/logger.go
new file mode 100644
index 0000000..6f9d4c4
--- /dev/null
+++ b/common/logger/logger.go
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package logger
+
+import (
+	lkh "github.com/gfremex/logrus-kafka-hook"
+	log "github.com/sirupsen/logrus"
+	"time"
+)
+
+var (
+	myLogger *log.Entry
+)
+
+func Setup(kafkaBroker string, level string) {
+
+	logger := log.New()
+	//logger.SetReportCaller(true)
+	myLogger = logger.WithField("topics", []string{"kafka-exporter.log"})
+
+	var logLevel log.Level = log.DebugLevel
+	switch level {
+		case "TRACE":
+			logLevel = log.TraceLevel
+		case "INFO":
+			logLevel = log.InfoLevel
+		case "WARN":
+			logLevel = log.WarnLevel
+		case "ERROR":
+			logLevel = log.ErrorLevel
+		default:
+			logLevel = log.DebugLevel
+	}
+	logger.Println("Setting Log Level", logLevel)
+	logger.SetLevel(logLevel)
+
+	if len(kafkaBroker) > 0 {
+		myLogger.Debug("Setting up kafka integration")
+		hook, err := lkh.NewKafkaHook(
+			"kh",
+			[]log.Level{log.DebugLevel, log.InfoLevel, log.WarnLevel, log.ErrorLevel},
+			&log.JSONFormatter{
+				TimestampFormat: time.RFC3339Nano,
+				FieldMap: log.FieldMap{
+					log.FieldKeyTime:  "@timestamp",
+					log.FieldKeyLevel: "levelname",
+					log.FieldKeyMsg:   "message",
+				},
+			},
+			[]string{kafkaBroker},
+		)
+
+		if err != nil {
+			myLogger.Error(err)
+		}
+
+		logger.Hooks.Add(hook)
+		myLogger.WithField("kafkaBroker", kafkaBroker).Debug("Logger setup done")
+	}
+}
+
+func GetLogger() *log.Entry {
+	return myLogger
+}
+
+func WithField(key string, value interface{}) *log.Entry {
+	return myLogger.WithField(key, value)
+}
+
+func WithFields(fields log.Fields) *log.Entry {
+	return myLogger.WithFields(fields)
+}
+
+func Panic(msg string, args ...interface{}) {
+	myLogger.Panicf(msg, args...)
+}
+
+func Fatal(msg string, args ...interface{}) {
+	myLogger.Fatalf(msg, args...)
+}
+
+func Error(msg string, args ...interface{}) {
+	myLogger.Errorf(msg, args...)
+}
+
+func Warn(msg string, args ...interface{}) {
+	myLogger.Warnf(msg, args...)
+}
+
+func Info(msg string, args ...interface{}) {
+	myLogger.Infof(msg, args...)
+}
+
+func Debug(msg string, args ...interface{}) {
+	myLogger.Debugf(msg, args...)
+}
diff --git a/config/conf.yaml b/config/conf.yaml
new file mode 100644
index 0000000..c06dea8
--- /dev/null
+++ b/config/conf.yaml
@@ -0,0 +1,14 @@
+---
+broker:
+  name: broker-name
+  host: voltha-kafka.default.svc.cluster.local:9092
+  description: The kafka broker
+  topics: [voltha.kpis, onos.kpis, importer.kpis]
+logger:
+  loglevel: debug
+  host: voltha-kafka.default.svc.cluster.local:9092
+target:
+  type: prometheus-target
+  name: http-server
+  port: 8080
+  description: http target for prometheus
diff --git a/main.go b/main.go
index 0eba8ba..cda8c19 100644
--- a/main.go
+++ b/main.go
@@ -15,66 +15,63 @@
 package main
 
 import (
-	"flag"
-	"fmt"
+	"strconv"
+	"gopkg.in/yaml.v2"
+	"io/ioutil"
+	"log"
 	"net/http"
+	"strings"
 	"sync"
 
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
+	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 )
 
-var (
-	broker      = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
-	volthaTopic = "voltha.kpis"
-	onosTopic   = "onos.kpis"
-
-	volthaTopicPointer = &volthaTopic
-	onosTopicPointer   = &onosTopic
-)
-
-var brokers []string
-
-func kafkaInit(brokers []string) {
+func kafkaInit(broker BrokerInfo) {
 	config := sarama.NewConfig()
 	config.Consumer.Return.Errors = true
 	var wg sync.WaitGroup
 
-	wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
-
-	master, err := sarama.NewConsumer(brokers, config)
+	master, err := sarama.NewConsumer([]string{broker.Host}, config)
 	if err != nil {
-		fmt.Println("kafkaInit panic")
+		logger.Panic("kafkaInit panic")
 		panic(err)
 	}
 	defer func() {
-		fmt.Println("kafkaInit close connection")
+		logger.Debug("kafkaInit close connection")
 		if err := master.Close(); err != nil {
 			panic(err)
 		}
 	}()
-	go VOLTHAListener(volthaTopicPointer, master, wg)
-	go ONOSListener(onosTopicPointer, master, wg)
+
+	// read topics from config
+	topics := broker.Topics
+
+	// we are spinning threads for each topic, we need to wait for
+	// them to exit before stopping the kafka connection
+	wg.Add(len(topics))
+
+	for _, topic := range topics {
+		t := topic
+		go topicListener(&t, master, wg)
+	}
 
 	wg.Wait()
 }
 
-func runServer() {
-	fmt.Println("Starting Server")
+func runServer(target TargetInfo) {
+	if target.Port == 0 {
+		logger.Warn("Prometheus target port not configured, using default 8080")
+		target.Port = 8080
+	}
+	logger.Debug("Starting HTTP Server on %d port", target.Port)
 	http.Handle("/metrics", prometheus.Handler())
-	http.ListenAndServe(":8080", nil)
+	http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
 }
 
 func init() {
 
-	// read config from cli flags
-	flag.Parse()
-	brokers = make([]string, 0)
-	brokers = []string{*broker}
-	fmt.Println("Connecting to broker: ", brokers)
-	fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
-	fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
-
 	// register metrics within Prometheus
 	prometheus.MustRegister(volthaTxBytesTotal)
 	prometheus.MustRegister(volthaRxBytesTotal)
@@ -91,7 +88,28 @@
 	prometheus.MustRegister(onosRxDropPacketsTotal)
 }
 
-func main() {
-	go kafkaInit(brokers)
-	runServer()
+func loadConfigFile() Config {
+	m := Config{}
+	// this file path is configmap mounted in pod yaml
+	yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
+	if err != nil {
+		log.Printf("yamlFile.Get err: %v ", err)
+	}
+	err = yaml.Unmarshal(yamlFile, &m)
+	if err != nil {
+		log.Fatalf("Unmarshal: %v", err)
+	}
+	return m
 }
+
+func main() {
+	// load configuration
+	conf := loadConfigFile()
+
+	// logger setup
+	logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
+	logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
+
+	go kafkaInit(conf.Broker)
+	runServer(conf.Target)
+}
\ No newline at end of file
diff --git a/onos-prometheus.go b/onos-prometheus.go
deleted file mode 100644
index b383b55..0000000
--- a/onos-prometheus.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2018 Open Networking Foundation
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
-	"github.com/prometheus/client_golang/prometheus"
-)
-
-var (
-	onosTxBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "onos_tx_bytes_total",
-			Help: "Number of total bytes transmitted",
-		},
-		[]string{"device_id", "port_id"},
-	)
-	onosRxBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "onos_rx_bytes_total",
-			Help: "Number of total bytes received",
-		},
-		[]string{"device_id", "port_id"},
-	)
-	onosTxPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "onos_tx_packets_total",
-			Help: "Number of total packets transmitted",
-		},
-		[]string{"device_id", "port_id"},
-	)
-	onosRxPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "onos_rx_packets_total",
-			Help: "Number of total packets received",
-		},
-		[]string{"device_id", "port_id"},
-	)
-
-	onosTxDropPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "onos_tx_drop_packets_total",
-			Help: "Number of total transmitted packets dropped",
-		},
-		[]string{"device_id", "port_id"},
-	)
-
-	onosRxDropPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "onos_rx_drop_packets_total",
-			Help: "Number of total received packets dropped",
-		},
-		[]string{"device_id", "port_id"},
-	)
-)
-
-func exportOnosKPI(kpi OnosKPI) {
-
-	for _, data := range kpi.Ports {
-
-		onosTxBytesTotal.WithLabelValues(
-			kpi.DeviceID,
-			data.PortID,
-		).Set(data.TxBytes)
-
-		onosRxBytesTotal.WithLabelValues(
-			kpi.DeviceID,
-			data.PortID,
-		).Set(data.RxBytes)
-
-		onosTxPacketsTotal.WithLabelValues(
-			kpi.DeviceID,
-			data.PortID,
-		).Set(data.TxPackets)
-
-		onosRxPacketsTotal.WithLabelValues(
-			kpi.DeviceID,
-			data.PortID,
-		).Set(data.RxPackets)
-
-		onosTxDropPacketsTotal.WithLabelValues(
-			kpi.DeviceID,
-			data.PortID,
-		).Set(data.TxPacketsDrop)
-
-		onosRxDropPacketsTotal.WithLabelValues(
-			kpi.DeviceID,
-			data.PortID,
-		).Set(data.RxPacketsDrop)
-
-	}
-}
diff --git a/voltha-prometheus.go b/topic-exporter.go
similarity index 74%
rename from voltha-prometheus.go
rename to topic-exporter.go
index 526010b..3313f46 100644
--- a/voltha-prometheus.go
+++ b/topic-exporter.go
@@ -15,10 +15,14 @@
 package main
 
 import (
+	"encoding/json"
+	"log"
 	"github.com/prometheus/client_golang/prometheus"
+	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 )
 
 var (
+	// voltha kpis
 	volthaTxBytesTotal = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Name: "voltha_tx_bytes_total",
@@ -63,6 +67,52 @@
 		},
 		[]string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
 	)
+
+	// onos kpis
+	onosTxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_tx_bytes_total",
+			Help: "Number of total bytes transmitted",
+		},
+		[]string{"device_id", "port_id"},
+	)
+	onosRxBytesTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_rx_bytes_total",
+			Help: "Number of total bytes received",
+		},
+		[]string{"device_id", "port_id"},
+	)
+	onosTxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_tx_packets_total",
+			Help: "Number of total packets transmitted",
+		},
+		[]string{"device_id", "port_id"},
+	)
+	onosRxPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_rx_packets_total",
+			Help: "Number of total packets received",
+		},
+		[]string{"device_id", "port_id"},
+	)
+
+	onosTxDropPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_tx_drop_packets_total",
+			Help: "Number of total transmitted packets dropped",
+		},
+		[]string{"device_id", "port_id"},
+	)
+
+	onosRxDropPacketsTotal = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onos_rx_drop_packets_total",
+			Help: "Number of total received packets dropped",
+		},
+		[]string{"device_id", "port_id"},
+	)
 )
 
 func exportVolthaKPI(kpi VolthaKPI) {
@@ -260,3 +310,72 @@
 		}
 	}
 }
+
+func exportOnosKPI(kpi OnosKPI) {
+
+	for _, data := range kpi.Ports {
+
+		onosTxBytesTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.TxBytes)
+
+		onosRxBytesTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.RxBytes)
+
+		onosTxPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.TxPackets)
+
+		onosRxPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.RxPackets)
+
+		onosTxDropPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.TxPacketsDrop)
+
+		onosRxDropPacketsTotal.WithLabelValues(
+			kpi.DeviceID,
+			data.PortID,
+		).Set(data.RxPacketsDrop)
+	}
+}
+
+func exportImporterKPI(kpi ImporterKPI) {
+	// TODO: add metrics for importer data
+	logger.Info("To be implemented")
+}
+
+func export(topic *string, data []byte) {
+	switch *topic {
+	case "voltha.kpis":
+		kpi := VolthaKPI{}
+		err := json.Unmarshal(data, &kpi)
+		if err != nil {
+			log.Fatal(err)
+		}
+		exportVolthaKPI(kpi)
+	case "onos.kpis":
+		kpi := OnosKPI{}
+		err := json.Unmarshal(data, &kpi)
+		if err != nil {
+			log.Fatal(err)
+		}
+		exportOnosKPI(kpi)
+	case "importer.kpis":
+		kpi := ImporterKPI{}
+		err := json.Unmarshal(data, &kpi)
+		if err != nil {
+			log.Fatal(err)
+		}
+		exportImporterKPI(kpi)
+	default:
+		logger.Warn("Unexpected export. Should not come here")
+	}
+}
diff --git a/onos-kpi.go b/topic-listener.go
similarity index 73%
rename from onos-kpi.go
rename to topic-listener.go
index da7b120..9b0f2dc 100644
--- a/onos-kpi.go
+++ b/topic-listener.go
@@ -15,22 +15,20 @@
 package main
 
 import (
-	"encoding/json"
-	"fmt"
-	"log"
 	"os"
 	"os/signal"
 	"sync"
 
 	"github.com/Shopify/sarama"
+	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 )
 
-func ONOSListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
-	fmt.Println("Starting ONOSListener")
+func topicListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
+	logger.Info("Starting topicListener for [%s]", *topic)
 	defer wg.Done()
 	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
 	if err != nil {
-		fmt.Println("ONOSListener panic")
+		logger.Error("topicListener panic")
 		panic(err)
 	}
 	signals := make(chan os.Signal, 1)
@@ -40,21 +38,12 @@
 		for {
 			select {
 			case err := <-consumer.Errors():
-				fmt.Println(err)
+				logger.Error("%s", err)
 			case msg := <-consumer.Messages():
-
-				kpi := OnosKPI{}
-
-				err := json.Unmarshal(msg.Value, &kpi)
-
-				if err != nil {
-					log.Fatal(err)
-				}
-
-				exportOnosKPI(kpi)
-
+				logger.Debug("Message on %s: %s", *topic, string(msg.Value))
+				export(topic, msg.Value)
 			case <-signals:
-				fmt.Println("Interrupt is detected")
+				logger.Warn("Interrupt is detected")
 				doneCh <- struct{}{}
 			}
 		}
diff --git a/types.go b/types.go
index b3ecd59..8cb41ff 100644
--- a/types.go
+++ b/types.go
@@ -14,8 +14,33 @@
 
 package main
 
-// KPI Events format
+// configuration
+type BrokerInfo struct {
+	Name			string `yaml: name`
+	Host			string `yaml: host`
+	Description		string `yaml: description`
+	Topics		  []string `yaml: topics`
+}
 
+type LoggerInfo struct {
+	LogLevel		string `yaml: loglevel`
+	Host			string `yaml: host`
+}
+
+type TargetInfo struct {
+	Type			string `yaml: type`
+	Name			string `yaml: name`
+	Port			int    `yaml: port`
+	Description		string `yaml: description`
+}
+
+type Config struct {
+	Broker		BrokerInfo `yaml: broker`
+	Logger		LoggerInfo `yaml: logger`
+	Target		TargetInfo `yaml: "target"`
+}
+
+// KPI Events format
 type Metrics struct {
 	TxBytes            float64 `json:"tx_bytes"`
 	TxPackets          float64 `json:"tx_packets"`
@@ -79,3 +104,8 @@
 	DeviceID string      `json:"deviceId"`
 	Ports    []*OnosPort `json:"ports"`
 }
+
+type ImporterKPI struct {
+	DeviceID string 	`json: "deviceId"`
+	// TODO: add metrics data
+}
\ No newline at end of file
diff --git a/voltha-kpi.go b/voltha-kpi.go
deleted file mode 100644
index 122f3f0..0000000
--- a/voltha-kpi.go
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright 2018 Open Networking Foundation
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"log"
-	"os"
-	"os/signal"
-	"sync"
-
-	"github.com/Shopify/sarama"
-)
-
-func VOLTHAListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
-	fmt.Println("Starting VOLTHAListener")
-	defer wg.Done()
-	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
-	if err != nil {
-		fmt.Println("VOLTHAListener panic")
-		panic(err)
-	}
-	signals := make(chan os.Signal, 1)
-	signal.Notify(signals, os.Interrupt)
-	doneCh := make(chan struct{})
-	go func() {
-		for {
-			select {
-			case err := <-consumer.Errors():
-				fmt.Println(err)
-			case msg := <-consumer.Messages():
-				// fmt.Println(string(msg.Value))
-
-				kpi := VolthaKPI{}
-
-				err := json.Unmarshal(msg.Value, &kpi)
-
-				if err != nil {
-					log.Fatal(err)
-				}
-
-				exportVolthaKPI(kpi)
-
-			case <-signals:
-				fmt.Println("Interrupt is detected")
-				doneCh <- struct{}{}
-			}
-		}
-	}()
-	<-doneCh
-}