VOL-2496 Add "event listen" command to voltctl

Change-Id: I8f1fb34b55f56c8125142ac289e2f19fc170d804
diff --git a/internal/pkg/commands/events.go b/internal/pkg/commands/events.go
new file mode 100644
index 0000000..b390987
--- /dev/null
+++ b/internal/pkg/commands/events.go
@@ -0,0 +1,526 @@
+/*
+ * Copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package commands
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/Shopify/sarama"
+	"github.com/fullstorydev/grpcurl"
+	flags "github.com/jessevdk/go-flags"
+	"github.com/jhump/protoreflect/desc"
+	"github.com/jhump/protoreflect/dynamic"
+	"github.com/opencord/voltctl/pkg/filter"
+	"github.com/opencord/voltctl/pkg/format"
+	"github.com/opencord/voltctl/pkg/model"
+	"log"
+	"os"
+	"os/signal"
+	"strings"
+	"time"
+)
+
+const (
+	DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
+)
+
+type EventListenOpts struct {
+	Format   string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
+	OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
+	Filter   string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
+	Follow   bool   `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
+	ShowBody bool   `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
+	Count    int    `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
+	Now      bool   `short:"n" long:"now" description:"Stop printing events when current time is reached"`
+	Timeout  int    `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
+}
+
+type EventOpts struct {
+	EventListen EventListenOpts `command:"listen"`
+}
+
+var eventOpts = EventOpts{}
+
+type EventHeader struct {
+	Category    string   `json:"category"`
+	SubCategory string   `json:"sub_category"`
+	Type        string   `json:"type"`
+	Raised_ts   float32  `json:"raised_ts"`
+	Reported_ts float32  `json:"reported_ts"`
+	Device_ids  []string `json:"device_ids"` // Opportunistically collected list of device_ids
+	Titles      []string `json:"titles"`     // Opportunistically collected list of titles
+	Timestamp   int64    `json:"timestamp"`  // Timestamp from Kafka
+}
+
+type EventHeaderWidths struct {
+	Category    int
+	SubCategory int
+	Type        int
+	Raised_ts   int
+	Reported_ts int
+	Device_ids  int
+	Titles      int
+	Timestamp   int
+}
+
+var DefaultWidths EventHeaderWidths = EventHeaderWidths{
+	Category:    13,
+	SubCategory: 3,
+	Type:        12,
+	Raised_ts:   10,
+	Reported_ts: 10,
+	Device_ids:  40,
+	Titles:      40,
+	Timestamp:   10,
+}
+
+func RegisterEventCommands(parent *flags.Parser) {
+	_, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
+	if err != nil {
+		Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
+	}
+}
+
+// Extract the header, as well as a few other items that might be of interest
+func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
+	m := dynamic.NewMessage(md)
+	err := m.Unmarshal(b)
+	if err != nil {
+		return nil, err
+	}
+
+	headerIntf, err := m.TryGetFieldByName("header")
+	if err != nil {
+		return nil, err
+	}
+
+	header := headerIntf.(*dynamic.Message)
+
+	catIntf, err := header.TryGetFieldByName("category")
+	if err != nil {
+		return nil, err
+	}
+	cat := catIntf.(int32)
+
+	subCatIntf, err := header.TryGetFieldByName("sub_category")
+	if err != nil {
+		return nil, err
+	}
+	subCat := subCatIntf.(int32)
+
+	typeIntf, err := header.TryGetFieldByName("type")
+	if err != nil {
+		return nil, err
+	}
+	evType := typeIntf.(int32)
+
+	raisedIntf, err := header.TryGetFieldByName("raised_ts")
+	if err != nil {
+		return nil, err
+	}
+	raised := raisedIntf.(float32)
+
+	reportedIntf, err := header.TryGetFieldByName("reported_ts")
+	if err != nil {
+		return nil, err
+	}
+	reported := reportedIntf.(float32)
+
+	// Opportunistically try to extract the device_id and title from a kpi_event2
+	// note that there might actually be multiple_slice data, so there could
+	// be multiple device_id, multiple title, etc.
+	device_ids := make(map[string]interface{})
+	titles := make(map[string]interface{})
+
+	kpiIntf, err := m.TryGetFieldByName("kpi_event2")
+	if err == nil {
+		kpi, ok := kpiIntf.(*dynamic.Message)
+		if ok == true && kpi != nil {
+			sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
+			if err == nil {
+				sliceIntf, ok := sliceListIntf.([]interface{})
+				if ok == true && len(sliceIntf) > 0 {
+					slice, ok := sliceIntf[0].(*dynamic.Message)
+					if ok == true && slice != nil {
+						metadataIntf, err := slice.TryGetFieldByName("metadata")
+						if err == nil {
+							metadata, ok := metadataIntf.(*dynamic.Message)
+							if ok == true && metadata != nil {
+								deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
+								if err == nil {
+									device_ids[deviceIdIntf.(string)] = slice
+								}
+								titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
+								if err == nil {
+									titles[titleIntf.(string)] = slice
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+
+	// Opportunistically try to pull a resource_id and title from a DeviceEvent
+	// There can only be one resource_id and title from a DeviceEvent, so it's easier
+	// than dealing with KPI_EVENT2.
+	deviceEventIntf, err := m.TryGetFieldByName("device_event")
+	if err == nil {
+		deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
+		if ok == true && deviceEvent != nil {
+			deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
+			if err == nil {
+				deviceEventName, ok := deviceEventNameIntf.(string)
+				if ok {
+					titles[deviceEventName] = deviceEvent
+				}
+			}
+			resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
+			if err == nil {
+				resourceId, ok := resourceIdIntf.(string)
+				if ok {
+					device_ids[resourceId] = deviceEvent
+				}
+			}
+		}
+	}
+
+	device_id_keys := make([]string, len(device_ids))
+	i := 0
+	for k, _ := range device_ids {
+		device_id_keys[i] = k
+		i++
+	}
+
+	title_keys := make([]string, len(titles))
+	i = 0
+	for k, _ := range titles {
+		title_keys[i] = k
+		i++
+	}
+
+	evHeader := EventHeader{Category: model.GetEnumString(header, "category", cat),
+		SubCategory: model.GetEnumString(header, "sub_category", subCat),
+		Type:        model.GetEnumString(header, "type", evType),
+		Raised_ts:   raised,
+		Reported_ts: reported,
+		Device_ids:  device_id_keys,
+		Timestamp:   ts.Unix(),
+		Titles:      title_keys}
+
+	return &evHeader, nil
+}
+
+// Print the full message, either in JSON or in GRPCURL-human-readable format,
+// depending on which grpcurl formatter is passed in.
+func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
+	m := dynamic.NewMessage(md)
+	err := m.Unmarshal(b)
+	if err != nil {
+		return err
+	}
+	s, err := f(m)
+	if err != nil {
+		return err
+	}
+	fmt.Println(s)
+	return nil
+}
+
+// Print just the enriched EventHeader. This is either in JSON format, or in
+// table format.
+func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
+	if outputAs == "json" {
+		asJson, err := json.Marshal(hdr)
+		if err != nil {
+			return fmt.Errorf("Error marshalling JSON: %v", err)
+		} else {
+			fmt.Printf("%s\n", asJson)
+		}
+	} else {
+		f := format.Format(outputFormat)
+		output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
+		if err != nil {
+			return err
+		}
+		fmt.Printf("%s\n", output)
+	}
+	return nil
+}
+
+func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
+	// This is a very long-winded way to get a message descriptor
+
+	// any descriptor on the file will do
+	descriptor, _, err := GetMethod("update-log-level")
+	if err != nil {
+		return nil, err
+	}
+
+	// get the symbol for voltha.events
+	eventSymbol, err := descriptor.FindSymbol("voltha.Event")
+	if err != nil {
+		return nil, err
+	}
+
+	/*
+	 * EventSymbol is a Descriptor, but not a MessageDescrptior,
+	 * so we can't look at it's fields yet. Go back to the file,
+	 * call FindMessage to get the Message, then ...
+	 */
+
+	eventFile := eventSymbol.GetFile()
+	eventMessage := eventFile.FindMessage("voltha.Event")
+
+	return eventMessage, nil
+}
+
+// Start output, print any column headers or other start characters
+func (options *EventListenOpts) StartOutput(outputFormat string) error {
+	if options.OutputAs == "json" {
+		fmt.Println("[")
+	} else if (options.OutputAs == "table") && !options.ShowBody {
+		f := format.Format(outputFormat)
+		output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
+		if err != nil {
+			return err
+		}
+		fmt.Println(output)
+	}
+	return nil
+}
+
+// Finish output, print any column footers or other end characters
+func (options *EventListenOpts) FinishOutput() {
+	if options.OutputAs == "json" {
+		fmt.Println("]")
+	}
+}
+
+func (options *EventListenOpts) Execute(args []string) error {
+	ProcessGlobalOptions()
+	if GlobalConfig.Kafka == "" {
+		return errors.New("Kafka address is not specified")
+	}
+
+	eventMessage, err := GetEventMessageDesc()
+	if err != nil {
+		return err
+	}
+
+	config := sarama.NewConfig()
+	config.ClientID = "go-kafka-consumer"
+	config.Consumer.Return.Errors = true
+	config.Version = sarama.V1_0_0_0
+	brokers := []string{GlobalConfig.Kafka}
+
+	client, err := sarama.NewClient(brokers, config)
+	if err != nil {
+		return err
+	}
+
+	defer func() {
+		if err := client.Close(); err != nil {
+			panic(err)
+		}
+	}()
+
+	consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
+	if err != nil {
+		return err
+	}
+
+	highwater := highwaterMarks["voltha.events"]
+
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+
+	// Count how many message processed
+	consumeCount := 0
+
+	// Count how many messages were printed
+	count := 0
+
+	var grpcurlFormatter grpcurl.Formatter
+
+	if options.ShowBody {
+		if options.OutputAs == "json" {
+			// need a descriptor source, any method will do
+			descriptor, _, _ := GetMethod("device-list")
+			if err != nil {
+				return err
+			}
+			grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
+		} else {
+			grpcurlFormatter = grpcurl.NewTextFormatter(false)
+		}
+	}
+
+	var headerFilter *filter.Filter
+	if options.Filter != "" {
+		headerFilterVal, err := filter.Parse(options.Filter)
+		if err != nil {
+			return fmt.Errorf("Failed to parse filter: %v", err)
+		}
+		headerFilter = &headerFilterVal
+	}
+
+	outputFormat := CharReplacer.Replace(options.Format)
+	if outputFormat == "" {
+		outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
+	}
+
+	err = options.StartOutput(outputFormat)
+	if err != nil {
+		return err
+	}
+
+	// Get signnal for finish
+	doneCh := make(chan struct{})
+	go func() {
+		// Count how many messages printed
+		//		count := 0
+	Loop:
+		for {
+			// Initialize the idle timeout timer
+			timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
+			select {
+			case msg := <-consumer:
+				consumeCount++
+				hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
+				if err != nil {
+					log.Printf("Error decoding header %v\n", err)
+					continue
+				}
+				if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
+					// skip printing message
+				} else {
+					// comma separated between this message and predecessor
+					if count > 0 {
+						if options.OutputAs == "json" {
+							fmt.Println(",")
+						}
+					}
+
+					// Print it
+					if options.ShowBody {
+						PrintMessage(grpcurlFormatter, eventMessage, msg.Value)
+					} else {
+						err := PrintEventHeader(options.OutputAs, outputFormat, hdr)
+						if err != nil {
+							log.Printf("%v\n", err)
+						}
+					}
+
+					// Check to see if we've hit the "count" threshold the user specified
+					count++
+					if (options.Count > 0) && (count >= options.Count) {
+						log.Println("Count reached")
+						doneCh <- struct{}{}
+						break Loop
+					}
+
+					// Check to see if we've hit the "now" threshold the user specified
+					if (options.Now) && (hdr.Timestamp >= time.Now().Unix()) {
+						log.Println("Now timestamp reached")
+						doneCh <- struct{}{}
+						break Loop
+					}
+				}
+
+				// If we're not in follow mode, see if we hit the highwater mark
+				if !options.Follow && !options.Now && (msg.Offset >= highwater) {
+					log.Println("High water reached")
+					doneCh <- struct{}{}
+					break Loop
+				}
+
+				// Reset the timeout timer
+				if !timeoutTimer.Stop() {
+					<-timeoutTimer.C
+				}
+			case consumerError := <-consumerErrors:
+				log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
+				doneCh <- struct{}{}
+			case <-signals:
+				doneCh <- struct{}{}
+			case <-timeoutTimer.C:
+				log.Printf("Idle timeout\n")
+				doneCh <- struct{}{}
+			}
+		}
+	}()
+
+	<-doneCh
+
+	options.FinishOutput()
+
+	log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
+
+	return nil
+}
+
+// Consume message from Sarama and send them out on a channel.
+// Supports multiple topics.
+// Taken from Sarama example consumer.
+func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
+	master, err := sarama.NewConsumerFromClient(client)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	consumers := make(chan *sarama.ConsumerMessage)
+	errors := make(chan *sarama.ConsumerError)
+	highwater := make(map[string]int64)
+	for _, topic := range topics {
+		if strings.Contains(topic, "__consumer_offsets") {
+			continue
+		}
+		partitions, _ := master.Partitions(topic)
+
+		// TODO: Add support for multiple partitions
+		if len(partitions) > 1 {
+			log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
+		}
+
+		hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
+		if err != nil {
+			return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
+		}
+		highwater[topic] = hw
+
+		consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
+		if nil != err {
+			return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
+		}
+		log.Println(" Start consuming topic ", topic)
+		go func(topic string, consumer sarama.PartitionConsumer) {
+			for {
+				select {
+				case consumerError := <-consumer.Errors():
+					errors <- consumerError
+
+				case msg := <-consumer.Messages():
+					consumers <- msg
+				}
+			}
+		}(topic, consumer)
+	}
+
+	return consumers, errors, highwater, nil
+}