/*
 * Copyright 2019-present Ciena Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package commands

import (
	"encoding/json"
	"errors"
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/fullstorydev/grpcurl"
	flags "github.com/jessevdk/go-flags"
	"github.com/jhump/protoreflect/desc"
	"github.com/jhump/protoreflect/dynamic"
	"github.com/opencord/voltctl/pkg/filter"
	"github.com/opencord/voltctl/pkg/format"
	"github.com/opencord/voltctl/pkg/model"
	"log"
	"os"
	"os/signal"
	"strings"
	"time"
)

const (
	DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
)

type EventListenOpts struct {
	Format   string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
	OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
	Filter   string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
	Follow   bool   `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
	ShowBody bool   `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
	Count    int    `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
	Now      bool   `short:"n" long:"now" description:"Stop printing events when current time is reached"`
	Timeout  int    `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
}

type EventOpts struct {
	EventListen EventListenOpts `command:"listen"`
}

var eventOpts = EventOpts{}

type EventHeader struct {
	Category    string   `json:"category"`
	SubCategory string   `json:"sub_category"`
	Type        string   `json:"type"`
	Raised_ts   float32  `json:"raised_ts"`
	Reported_ts float32  `json:"reported_ts"`
	Device_ids  []string `json:"device_ids"` // Opportunistically collected list of device_ids
	Titles      []string `json:"titles"`     // Opportunistically collected list of titles
	Timestamp   int64    `json:"timestamp"`  // Timestamp from Kafka
}

type EventHeaderWidths struct {
	Category    int
	SubCategory int
	Type        int
	Raised_ts   int
	Reported_ts int
	Device_ids  int
	Titles      int
	Timestamp   int
}

var DefaultWidths EventHeaderWidths = EventHeaderWidths{
	Category:    13,
	SubCategory: 3,
	Type:        12,
	Raised_ts:   10,
	Reported_ts: 10,
	Device_ids:  40,
	Titles:      40,
	Timestamp:   10,
}

func RegisterEventCommands(parent *flags.Parser) {
	_, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
	if err != nil {
		Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
	}
}

// Extract the header, as well as a few other items that might be of interest
func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
	m := dynamic.NewMessage(md)
	err := m.Unmarshal(b)
	if err != nil {
		return nil, err
	}

	headerIntf, err := m.TryGetFieldByName("header")
	if err != nil {
		return nil, err
	}

	header := headerIntf.(*dynamic.Message)

	catIntf, err := header.TryGetFieldByName("category")
	if err != nil {
		return nil, err
	}
	cat := catIntf.(int32)

	subCatIntf, err := header.TryGetFieldByName("sub_category")
	if err != nil {
		return nil, err
	}
	subCat := subCatIntf.(int32)

	typeIntf, err := header.TryGetFieldByName("type")
	if err != nil {
		return nil, err
	}
	evType := typeIntf.(int32)

	raisedIntf, err := header.TryGetFieldByName("raised_ts")
	if err != nil {
		return nil, err
	}
	raised := raisedIntf.(float32)

	reportedIntf, err := header.TryGetFieldByName("reported_ts")
	if err != nil {
		return nil, err
	}
	reported := reportedIntf.(float32)

	// Opportunistically try to extract the device_id and title from a kpi_event2
	// note that there might actually be multiple_slice data, so there could
	// be multiple device_id, multiple title, etc.
	device_ids := make(map[string]interface{})
	titles := make(map[string]interface{})

	kpiIntf, err := m.TryGetFieldByName("kpi_event2")
	if err == nil {
		kpi, ok := kpiIntf.(*dynamic.Message)
		if ok == true && kpi != nil {
			sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
			if err == nil {
				sliceIntf, ok := sliceListIntf.([]interface{})
				if ok == true && len(sliceIntf) > 0 {
					slice, ok := sliceIntf[0].(*dynamic.Message)
					if ok == true && slice != nil {
						metadataIntf, err := slice.TryGetFieldByName("metadata")
						if err == nil {
							metadata, ok := metadataIntf.(*dynamic.Message)
							if ok == true && metadata != nil {
								deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
								if err == nil {
									device_ids[deviceIdIntf.(string)] = slice
								}
								titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
								if err == nil {
									titles[titleIntf.(string)] = slice
								}
							}
						}
					}
				}
			}
		}
	}

	// Opportunistically try to pull a resource_id and title from a DeviceEvent
	// There can only be one resource_id and title from a DeviceEvent, so it's easier
	// than dealing with KPI_EVENT2.
	deviceEventIntf, err := m.TryGetFieldByName("device_event")
	if err == nil {
		deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
		if ok == true && deviceEvent != nil {
			deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
			if err == nil {
				deviceEventName, ok := deviceEventNameIntf.(string)
				if ok {
					titles[deviceEventName] = deviceEvent
				}
			}
			resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
			if err == nil {
				resourceId, ok := resourceIdIntf.(string)
				if ok {
					device_ids[resourceId] = deviceEvent
				}
			}
		}
	}

	device_id_keys := make([]string, len(device_ids))
	i := 0
	for k, _ := range device_ids {
		device_id_keys[i] = k
		i++
	}

	title_keys := make([]string, len(titles))
	i = 0
	for k, _ := range titles {
		title_keys[i] = k
		i++
	}

	evHeader := EventHeader{Category: model.GetEnumString(header, "category", cat),
		SubCategory: model.GetEnumString(header, "sub_category", subCat),
		Type:        model.GetEnumString(header, "type", evType),
		Raised_ts:   raised,
		Reported_ts: reported,
		Device_ids:  device_id_keys,
		Timestamp:   ts.Unix(),
		Titles:      title_keys}

	return &evHeader, nil
}

// Print the full message, either in JSON or in GRPCURL-human-readable format,
// depending on which grpcurl formatter is passed in.
func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
	m := dynamic.NewMessage(md)
	err := m.Unmarshal(b)
	if err != nil {
		return err
	}
	s, err := f(m)
	if err != nil {
		return err
	}
	fmt.Println(s)
	return nil
}

// Print just the enriched EventHeader. This is either in JSON format, or in
// table format.
func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
	if outputAs == "json" {
		asJson, err := json.Marshal(hdr)
		if err != nil {
			return fmt.Errorf("Error marshalling JSON: %v", err)
		} else {
			fmt.Printf("%s\n", asJson)
		}
	} else {
		f := format.Format(outputFormat)
		output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
		if err != nil {
			return err
		}
		fmt.Printf("%s\n", output)
	}
	return nil
}

func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
	// This is a very long-winded way to get a message descriptor

	// any descriptor on the file will do
	descriptor, _, err := GetMethod("update-log-level")
	if err != nil {
		return nil, err
	}

	// get the symbol for voltha.events
	eventSymbol, err := descriptor.FindSymbol("voltha.Event")
	if err != nil {
		return nil, err
	}

	/*
	 * EventSymbol is a Descriptor, but not a MessageDescrptior,
	 * so we can't look at it's fields yet. Go back to the file,
	 * call FindMessage to get the Message, then ...
	 */

	eventFile := eventSymbol.GetFile()
	eventMessage := eventFile.FindMessage("voltha.Event")

	return eventMessage, nil
}

// Start output, print any column headers or other start characters
func (options *EventListenOpts) StartOutput(outputFormat string) error {
	if options.OutputAs == "json" {
		fmt.Println("[")
	} else if (options.OutputAs == "table") && !options.ShowBody {
		f := format.Format(outputFormat)
		output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
		if err != nil {
			return err
		}
		fmt.Println(output)
	}
	return nil
}

// Finish output, print any column footers or other end characters
func (options *EventListenOpts) FinishOutput() {
	if options.OutputAs == "json" {
		fmt.Println("]")
	}
}

func (options *EventListenOpts) Execute(args []string) error {
	ProcessGlobalOptions()
	if GlobalConfig.Kafka == "" {
		return errors.New("Kafka address is not specified")
	}

	eventMessage, err := GetEventMessageDesc()
	if err != nil {
		return err
	}

	config := sarama.NewConfig()
	config.ClientID = "go-kafka-consumer"
	config.Consumer.Return.Errors = true
	config.Version = sarama.V1_0_0_0
	brokers := []string{GlobalConfig.Kafka}

	client, err := sarama.NewClient(brokers, config)
	if err != nil {
		return err
	}

	defer func() {
		if err := client.Close(); err != nil {
			panic(err)
		}
	}()

	consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
	if err != nil {
		return err
	}

	highwater := highwaterMarks["voltha.events"]

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// Count how many message processed
	consumeCount := 0

	// Count how many messages were printed
	count := 0

	var grpcurlFormatter grpcurl.Formatter

	if options.ShowBody {
		if options.OutputAs == "json" {
			// need a descriptor source, any method will do
			descriptor, _, _ := GetMethod("device-list")
			if err != nil {
				return err
			}
			grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
		} else {
			grpcurlFormatter = grpcurl.NewTextFormatter(false)
		}
	}

	var headerFilter *filter.Filter
	if options.Filter != "" {
		headerFilterVal, err := filter.Parse(options.Filter)
		if err != nil {
			return fmt.Errorf("Failed to parse filter: %v", err)
		}
		headerFilter = &headerFilterVal
	}

	outputFormat := CharReplacer.Replace(options.Format)
	if outputFormat == "" {
		outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
	}

	err = options.StartOutput(outputFormat)
	if err != nil {
		return err
	}

	// Get signnal for finish
	doneCh := make(chan struct{})
	go func() {
		// Count how many messages printed
		//		count := 0
	Loop:
		for {
			// Initialize the idle timeout timer
			timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
			select {
			case msg := <-consumer:
				consumeCount++
				hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
				if err != nil {
					log.Printf("Error decoding header %v\n", err)
					continue
				}
				if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
					// skip printing message
				} else {
					// comma separated between this message and predecessor
					if count > 0 {
						if options.OutputAs == "json" {
							fmt.Println(",")
						}
					}

					// Print it
					if options.ShowBody {
						PrintMessage(grpcurlFormatter, eventMessage, msg.Value)
					} else {
						err := PrintEventHeader(options.OutputAs, outputFormat, hdr)
						if err != nil {
							log.Printf("%v\n", err)
						}
					}

					// Check to see if we've hit the "count" threshold the user specified
					count++
					if (options.Count > 0) && (count >= options.Count) {
						log.Println("Count reached")
						doneCh <- struct{}{}
						break Loop
					}

					// Check to see if we've hit the "now" threshold the user specified
					if (options.Now) && (hdr.Timestamp >= time.Now().Unix()) {
						log.Println("Now timestamp reached")
						doneCh <- struct{}{}
						break Loop
					}
				}

				// If we're not in follow mode, see if we hit the highwater mark
				if !options.Follow && !options.Now && (msg.Offset >= highwater) {
					log.Println("High water reached")
					doneCh <- struct{}{}
					break Loop
				}

				// Reset the timeout timer
				if !timeoutTimer.Stop() {
					<-timeoutTimer.C
				}
			case consumerError := <-consumerErrors:
				log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
				doneCh <- struct{}{}
			case <-signals:
				doneCh <- struct{}{}
			case <-timeoutTimer.C:
				log.Printf("Idle timeout\n")
				doneCh <- struct{}{}
			}
		}
	}()

	<-doneCh

	options.FinishOutput()

	log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)

	return nil
}

// Consume message from Sarama and send them out on a channel.
// Supports multiple topics.
// Taken from Sarama example consumer.
func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
	master, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		return nil, nil, nil, err
	}

	consumers := make(chan *sarama.ConsumerMessage)
	errors := make(chan *sarama.ConsumerError)
	highwater := make(map[string]int64)
	for _, topic := range topics {
		if strings.Contains(topic, "__consumer_offsets") {
			continue
		}
		partitions, _ := master.Partitions(topic)

		// TODO: Add support for multiple partitions
		if len(partitions) > 1 {
			log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
		}

		hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
		if err != nil {
			return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
		}
		highwater[topic] = hw

		consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
		if nil != err {
			return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
		}
		log.Println(" Start consuming topic ", topic)
		go func(topic string, consumer sarama.PartitionConsumer) {
			for {
				select {
				case consumerError := <-consumer.Errors():
					errors <- consumerError

				case msg := <-consumer.Messages():
					consumers <- msg
				}
			}
		}(topic, consumer)
	}

	return consumers, errors, highwater, nil
}
