blob: cbde11910e1b2e429ca18a4f16f1533a25e29b8c [file] [log] [blame]
/*
* Copyright 2019-present Ciena Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package commands
import (
"encoding/json"
"errors"
"fmt"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/filter"
"github.com/opencord/voltctl/pkg/format"
"github.com/opencord/voltha-protos/v3/go/inter_container"
"log"
"os"
"os/signal"
"strings"
"time"
)
/*
* The "message listen" command supports two types of output:
* 1) A summary output where a row is displayed for each message received. For the summary
* format, DEFAULT_MESSAGE_FORMAT contains the default list of columns that will be
* display and can be overridden at runtime.
* 2) A body output where the full grpcurl or json body is output for each message received.
*
* These two modes are switched by using the "-b" / "--body" flag.
*
* The summary mode has the potential to aggregate data together from multiple parts of the
* message. For example, it currently aggregates the InterAdapterHeader contents together with
* the InterContainerHeader contents.
*
* Similar to "event listen", the "message listen" command operates in a streaming mode, rather
* than collecting a list of results and then emitting them at program exit. This is done to
* facilitate options such as "-F" / "--follow" where the program is intended to operate
* continuously. This means that automatically calculating column widths is not practical, and
* a set of Fixed widths (MessageHeaderDefaultWidths) are predefined.
*
* As there are multiple kafka topics that can be listened to, specifying a topic is a
* mandatory positional argument for the `message listen` command. Common topics include:
* * openolt
* * brcm_openonu_adapter
* * rwcore
* * core-pair-1
*/
const (
DEFAULT_MESSAGE_FORMAT = "table{{.Id}}\t{{.Type}}\t{{.FromTopic}}\t{{.ToTopic}}\t{{.KeyTopic}}\t{{.InterAdapterType}}"
)
type MessageListenOpts struct {
Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
// nolint: staticcheck
OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
ShowBody bool `short:"b" long:"show-body" description:"Show body of messages rather than only a header summary"`
Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
Now bool `short:"n" long:"now" description:"Stop printing messages when current time is reached"`
Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Args struct {
Topic string
} `positional-args:"yes" required:"yes"`
}
type MessageOpts struct {
MessageListen MessageListenOpts `command:"listen"`
}
var interAdapterOpts = MessageOpts{}
/* MessageHeader is a set of fields extracted
* from voltha.MessageHeader as well as useful other
* places such as InterAdapterHeader. These are fields that
* will be summarized in list mode and/or can be filtered
* on.
*/
type MessageHeader struct {
Id string `json:"id"`
Type string `json:"type"`
FromTopic string `json:"from_topic"`
ToTopic string `json:"to_topic"`
KeyTopic string `json:"key_topic"`
Timestamp time.Time `json:"timestamp"`
InterAdapterType string `json:"inter_adapter_type"` // interadapter header
ToDeviceId string `json:"to_device_id"` // interadapter header
ProxyDeviceId string `json:"proxy_device_id"` //interadapter header
}
/* Fixed widths because we output in a continuous streaming
* mode rather than a table-based dump at the end.
*/
type MessageHeaderWidths struct {
Id int
Type int
FromTopic int
ToTopic int
KeyTopic int
InterAdapterType int
ToDeviceId int
ProxyDeviceId int
Timestamp int
}
var DefaultMessageWidths MessageHeaderWidths = MessageHeaderWidths{
Id: 32,
Type: 10,
FromTopic: 16,
ToTopic: 16,
KeyTopic: 10,
Timestamp: 10,
InterAdapterType: 14,
ToDeviceId: 10,
ProxyDeviceId: 10,
}
// jsonpb requires a resolver to resolve Any.Any into proto.Message.
type VolthaAnyResolver struct{}
func (r *VolthaAnyResolver) Resolve(typeURL string) (proto.Message, error) {
// TODO: We should be able to get this automatically via reflection using
// the following commented-out code, but it requires upgrading voltctl to
// use newer versions of protobuf libraries.
/*
msgType, err := protoregistry.GlobalTypes.FindMessageByURL(typeURL)
if err != nil {
return err
}
return msgType.New(), nil*/
// The intercontianer message bus is where we need to map from Any.Any
// to the appropriate protos when generating json output.
typeURL = strings.TrimPrefix(typeURL, "type.googleapis.com/")
switch typeURL {
case "voltha.StrType":
return &inter_container.StrType{}, nil
case "voltha.IntType":
return &inter_container.IntType{}, nil
case "voltha.BoolType":
return &inter_container.BoolType{}, nil
case "voltha.Packet":
return &inter_container.Packet{}, nil
case "voltha.ErrorCode":
return &inter_container.ErrorCode{}, nil
case "voltha.Error":
return &inter_container.Error{}, nil
case "voltha.Header":
return &inter_container.Header{}, nil
case "voltha.Argument":
return &inter_container.Argument{}, nil
case "voltha.InterContainerMessage":
return &inter_container.InterContainerMessage{}, nil
case "voltha.InterContainerRequestBody":
return &inter_container.InterContainerRequestBody{}, nil
case "voltha.InterContainerResponseBody":
return &inter_container.InterContainerResponseBody{}, nil
case "voltha.SwitchCapability":
return &inter_container.SwitchCapability{}, nil
case "voltha.PortCapability":
return &inter_container.PortCapability{}, nil
case "voltha.DeviceDiscovered":
return &inter_container.DeviceDiscovered{}, nil
case "voltha.InterAdapterMessageType":
return &inter_container.InterAdapterMessageType{}, nil
case "voltha.InterAdapterOmciMessage":
return &inter_container.InterAdapterOmciMessage{}, nil
case "voltha.InterAdapterTechProfileDownloadMessage":
return &inter_container.InterAdapterTechProfileDownloadMessage{}, nil
case "voltha.InterAdapterDeleteGemPortMessage":
return &inter_container.InterAdapterDeleteGemPortMessage{}, nil
case "voltha.InterAdapterDeleteTcontMessage":
return &inter_container.InterAdapterDeleteTcontMessage{}, nil
case "voltha.InterAdapterResponseBody":
return &inter_container.InterAdapterResponseBody{}, nil
case "voltha.InterAdapterMessage":
return &inter_container.InterAdapterMessage{}, nil
}
return nil, fmt.Errorf("Unknown any type: %s", typeURL)
}
func RegisterMessageCommands(parent *flags.Parser) {
if _, err := parent.AddCommand("message", "message commands", "Commands for observing messages between components", &interAdapterOpts); err != nil {
Error.Fatalf("Unable to register message commands with voltctl command parser: %s", err.Error())
}
}
// Extract the header, as well as a few other items that might be of interest
func DecodeInterContainerHeader(b []byte, ts time.Time) (*MessageHeader, error) {
m := &inter_container.InterContainerMessage{}
if err := proto.Unmarshal(b, m); err != nil {
return nil, err
}
header := m.Header
id := header.Id
msgType := header.Type
fromTopic := header.FromTopic
toTopic := header.ToTopic
keyTopic := header.KeyTopic
timestamp, err := DecodeTimestamp(header.Timestamp)
if err != nil {
return nil, err
}
// Pull some additional data out of the InterAdapterHeader, if it
// is embedded inside the InterContainerMessage
var iaMessageTypeStr string
var toDeviceId string
var proxyDeviceId string
bodyKind, err := ptypes.AnyMessageName(m.Body)
if err != nil {
return nil, err
}
switch bodyKind {
case "voltha.InterContainerRequestBody":
icRequest := &inter_container.InterContainerRequestBody{}
err := ptypes.UnmarshalAny(m.Body, icRequest)
if err != nil {
return nil, err
}
argList := icRequest.Args
for _, arg := range argList {
key := arg.Key
if key == "msg" {
argBodyKind, err := ptypes.AnyMessageName(m.Body)
if err != nil {
return nil, err
}
switch argBodyKind {
case "voltha.InterAdapterMessage":
iaMsg := &inter_container.InterAdapterMessage{}
err := ptypes.UnmarshalAny(arg.Value, iaMsg)
if err != nil {
return nil, err
}
iaHeader := iaMsg.Header
iaMessageType := iaHeader.Type
iaMessageTypeStr = inter_container.InterAdapterMessageType_Types_name[int32(iaMessageType)]
toDeviceId = iaHeader.ToDeviceId
proxyDeviceId = iaHeader.ProxyDeviceId
}
}
}
}
messageHeaderType := inter_container.MessageType_name[int32(msgType)]
icHeader := MessageHeader{Id: id,
Type: messageHeaderType,
FromTopic: fromTopic,
ToTopic: toTopic,
KeyTopic: keyTopic,
Timestamp: timestamp,
InterAdapterType: iaMessageTypeStr,
ProxyDeviceId: proxyDeviceId,
ToDeviceId: toDeviceId,
}
return &icHeader, nil
}
// Print the full message, either in JSON or in GRPCURL-human-readable format,
// depending on which grpcurl formatter is passed in.
func PrintInterContainerMessage(outputAs string, b []byte) error {
ms := &inter_container.InterContainerMessage{}
if err := proto.Unmarshal(b, ms); err != nil {
return err
}
if outputAs == "json" {
marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
asJson, err := marshaler.MarshalToString(ms)
if err != nil {
return fmt.Errorf("Failed to marshal the json: %s", err)
}
fmt.Println(asJson)
} else {
// print in golang native format
fmt.Printf("%v\n", ms)
}
return nil
}
// Print just the enriched InterContainerHeader. This is either in JSON format, or in
// table format.
func PrintInterContainerHeader(outputAs string, outputFormat string, hdr *MessageHeader) error {
if outputAs == "json" {
asJson, err := json.Marshal(hdr)
if err != nil {
return fmt.Errorf("Error marshalling JSON: %v", err)
} else {
fmt.Printf("%s\n", asJson)
}
} else {
f := format.Format(outputFormat)
output, err := f.ExecuteFixedWidth(DefaultMessageWidths, false, *hdr)
if err != nil {
return err
}
fmt.Printf("%s\n", output)
}
return nil
}
// Start output, print any column headers or other start characters
func (options *MessageListenOpts) StartOutput(outputFormat string) error {
if options.OutputAs == "json" {
fmt.Println("[")
} else if (options.OutputAs == "table") && !options.ShowBody {
f := format.Format(outputFormat)
output, err := f.ExecuteFixedWidth(DefaultMessageWidths, true, nil)
if err != nil {
return err
}
fmt.Println(output)
}
return nil
}
// Finish output, print any column footers or other end characters
func (options *MessageListenOpts) FinishOutput() {
if options.OutputAs == "json" {
fmt.Println("]")
}
}
func (options *MessageListenOpts) Execute(args []string) error {
ProcessGlobalOptions()
if GlobalConfig.Kafka == "" {
return errors.New("Kafka address is not specified")
}
config := sarama.NewConfig()
config.ClientID = "go-kafka-consumer"
config.Consumer.Return.Errors = true
config.Version = sarama.V1_0_0_0
brokers := []string{GlobalConfig.Kafka}
client, err := sarama.NewClient(brokers, config)
if err != nil {
return err
}
defer func() {
if err := client.Close(); err != nil {
panic(err)
}
}()
consumer, consumerErrors, highwaterMarks, err := startInterContainerConsumer([]string{options.Args.Topic}, client)
if err != nil {
return err
}
highwater := highwaterMarks[options.Args.Topic]
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// Count how many message processed
consumeCount := 0
// Count how many messages were printed
count := 0
var headerFilter *filter.Filter
if options.Filter != "" {
headerFilterVal, err := filter.Parse(options.Filter)
if err != nil {
return fmt.Errorf("Failed to parse filter: %v", err)
}
headerFilter = &headerFilterVal
}
outputFormat := CharReplacer.Replace(options.Format)
if outputFormat == "" {
outputFormat = GetCommandOptionWithDefault("intercontainer-listen", "format", DEFAULT_MESSAGE_FORMAT)
}
err = options.StartOutput(outputFormat)
if err != nil {
return err
}
var since *time.Time
if options.Since != "" {
since, err = ParseSince(options.Since)
if err != nil {
return err
}
}
// Get signnal for finish
doneCh := make(chan struct{})
go func() {
tStart := time.Now()
Loop:
for {
// Initialize the idle timeout timer
timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
select {
case msg := <-consumer:
consumeCount++
hdr, err := DecodeInterContainerHeader(msg.Value, msg.Timestamp)
if err != nil {
log.Printf("Error decoding header %v\n", err)
continue
}
match := false
if headerFilter != nil {
var err error
if match, err = headerFilter.Evaluate(*hdr); err != nil {
log.Printf("%v\n", err)
}
} else {
match = true
}
if !match {
// skip printing message
} else if since != nil && hdr.Timestamp.Before(*since) {
// it's too old
} else {
// comma separated between this message and predecessor
if count > 0 {
if options.OutputAs == "json" {
fmt.Println(",")
}
}
// Print it
if options.ShowBody {
if err := PrintInterContainerMessage(options.OutputAs, msg.Value); err != nil {
log.Printf("%v\n", err)
}
} else {
if err := PrintInterContainerHeader(options.OutputAs, outputFormat, hdr); err != nil {
log.Printf("%v\n", err)
}
}
// Check to see if we've hit the "count" threshold the user specified
count++
if (options.Count > 0) && (count >= options.Count) {
log.Println("Count reached")
doneCh <- struct{}{}
break Loop
}
// Check to see if we've hit the "now" threshold the user specified
if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
log.Println("Now timestamp reached")
doneCh <- struct{}{}
break Loop
}
}
// If we're not in follow mode, see if we hit the highwater mark
if !options.Follow && !options.Now && (msg.Offset >= highwater) {
log.Println("High water reached")
doneCh <- struct{}{}
break Loop
}
// Reset the timeout timer
if !timeoutTimer.Stop() {
<-timeoutTimer.C
}
case consumerError := <-consumerErrors:
log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
doneCh <- struct{}{}
case <-signals:
doneCh <- struct{}{}
case <-timeoutTimer.C:
log.Printf("Idle timeout\n")
doneCh <- struct{}{}
}
}
}()
<-doneCh
options.FinishOutput()
log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
return nil
}
// Consume message from Sarama and send them out on a channel.
// Supports multiple topics.
// Taken from Sarama example consumer.
func startInterContainerConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
master, err := sarama.NewConsumerFromClient(client)
if err != nil {
return nil, nil, nil, err
}
consumers := make(chan *sarama.ConsumerMessage)
errors := make(chan *sarama.ConsumerError)
highwater := make(map[string]int64)
for _, topic := range topics {
if strings.Contains(topic, "__consumer_offsets") {
continue
}
partitions, _ := master.Partitions(topic)
// TODO: Add support for multiple partitions
if len(partitions) > 1 {
log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
}
hw, err := client.GetOffset("openolt", partitions[0], sarama.OffsetNewest)
if err != nil {
return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
}
highwater[topic] = hw
consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
if nil != err {
return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
}
log.Println(" Start consuming topic ", topic)
go func(topic string, consumer sarama.PartitionConsumer) {
for {
select {
case consumerError := <-consumer.Errors():
errors <- consumerError
case msg := <-consumer.Messages():
consumers <- msg
}
}
}(topic, consumer)
}
return consumers, errors, highwater, nil
}