// Copyright 2018 Open Networking Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
	"github.com/Shopify/sarama"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"gopkg.in/yaml.v2"
	"io/ioutil"
	"log"
	"net/http"
	"strconv"
	"strings"
	"sync"
)

func kafkaInit(broker BrokerInfo) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	var wg sync.WaitGroup

	master, err := sarama.NewConsumer([]string{broker.Host}, config)

	if err != nil {
		logger.Panic("kafkaInit panic")
		panic(err)
	}
	defer func() {
		logger.Debug("kafkaInit close connection")
		if err := master.Close(); err != nil {
			panic(err)
		}
	}()

	// read topics from config
	topics := broker.Topics

	// we are spinning threads for each topic, we need to wait for
	// them to exit before stopping the kafka connection
	wg.Add(len(topics))

	for _, topic := range topics {
		t := topic
		go topicListener(&t, master, wg)
	}

	wg.Wait()
}

func runServer(target TargetInfo) {
	if target.Port == 0 {
		logger.Warn("Prometheus target port not configured, using default 8080")
		target.Port = 8080
	}
	logger.Debug("Starting HTTP Server on %d port", target.Port)
	http.Handle("/metrics", promhttp.Handler())
	err := http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
	if err != nil {
		logger.Error("HTTP Server Error: %s", err.Error())
       }
}

func init() {
	// register metrics within Prometheus
	prometheus.MustRegister(volthaTxBytesTotal)
	prometheus.MustRegister(volthaRxBytesTotal)
	prometheus.MustRegister(volthaTxPacketsTotal)
	prometheus.MustRegister(volthaRxPacketsTotal)
	prometheus.MustRegister(volthaTxErrorPacketsTotal)
	prometheus.MustRegister(volthaRxErrorPacketsTotal)

	prometheus.MustRegister(VolthaOnuLaserBiasCurrent)
	prometheus.MustRegister(volthaOnuTemperature)
	prometheus.MustRegister(VolthaOnuPowerFeedVoltage)
	prometheus.MustRegister(VolthaOnuMeanOpticalLaunchPower)
	prometheus.MustRegister(VolthaOnuReceivedOpticalPower)

	prometheus.MustRegister(onosTxBytesTotal)
	prometheus.MustRegister(onosRxBytesTotal)
	prometheus.MustRegister(onosTxPacketsTotal)
	prometheus.MustRegister(onosRxPacketsTotal)
	prometheus.MustRegister(onosTxDropPacketsTotal)
	prometheus.MustRegister(onosRxDropPacketsTotal)

	prometheus.MustRegister(onosaaaRxAcceptResponses)
	prometheus.MustRegister(onosaaaRxRejectResponses)
	prometheus.MustRegister(onosaaaRxChallengeResponses)
	prometheus.MustRegister(onosaaaTxAccessRequests)
	prometheus.MustRegister(onosaaaRxInvalidValidators)
	prometheus.MustRegister(onosaaaRxUnknownType)
	prometheus.MustRegister(onosaaaPendingRequests)
	prometheus.MustRegister(onosaaaRxDroppedResponses)
	prometheus.MustRegister(onosaaaRxMalformedResponses)
	prometheus.MustRegister(onosaaaRxUnknownserver)
	prometheus.MustRegister(onosaaaRequestRttMillis)
	prometheus.MustRegister(onosaaaRequestReTx)

	prometheus.MustRegister(onosBngUpTxBytes)
	prometheus.MustRegister(onosBngUpTxPackets)
	prometheus.MustRegister(onosBngUpDropBytes)
	prometheus.MustRegister(onosBngUpDropPackets)
	prometheus.MustRegister(onosBngControlPackets)
	prometheus.MustRegister(onosBngDownRxBytes)
	prometheus.MustRegister(onosBngDownRxPackets)
	prometheus.MustRegister(onosBngDownTxBytes)
	prometheus.MustRegister(onosBngDownTxPackets)
	prometheus.MustRegister(onosBngDownDropPackets)
	prometheus.MustRegister(onosBngDownDropBytes)

	prometheus.MustRegister(deviceLaserBiasCurrent)
	prometheus.MustRegister(deviceTemperature)
	prometheus.MustRegister(deviceTxPower)
	prometheus.MustRegister(deviceVoltage)

	prometheus.MustRegister(onosaaaRxEapolLogoff)
	prometheus.MustRegister(onosaaaTxEapolResIdentityMsg)
	prometheus.MustRegister(onosaaaTxAuthSuccess)
	prometheus.MustRegister(onosaaaTxAuthFailure)
	prometheus.MustRegister(onosaaaTxStartReq)
	prometheus.MustRegister(onosaaaEapPktTxAuthChooseEap)
	prometheus.MustRegister(onosaaaTxRespnotNak)

        prometheus.MustRegister(onosaaaEapolFramesTx)
	prometheus.MustRegister(onosaaaAuthStateIdle)
	prometheus.MustRegister(onosaaaRequestIdFramesTx)
	prometheus.MustRegister(onosaaaRequestEapFramesTx)
	prometheus.MustRegister(onosaaaInvalidPktType)
	prometheus.MustRegister(onosaaaInvalidBodyLength)
	prometheus.MustRegister(onosaaaValidEapolFramesRx)
	prometheus.MustRegister(onosaaaPendingResSupplicant)
	prometheus.MustRegister(onosaaaRxResIdEapFrames)
}

func loadConfigFile() Config {
	m := Config{}
	// this file path is configmap mounted in pod yaml
	yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
	if err != nil {
		log.Printf("yamlFile.Get err: %v ", err)
	}
	err = yaml.Unmarshal(yamlFile, &m)
	if err != nil {
		log.Fatalf("Unmarshal: %v", err)
	}
	return m
}

func main() {
	// load configuration
	conf := loadConfigFile()

	// logger setup
	logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
	logger.Info("Connecting to broker: [%s]", conf.Broker.Host)

	go kafkaInit(conf.Broker)
	runServer(conf.Target)
}
