blob: cbde11910e1b2e429ca18a4f16f1533a25e29b8c [file] [log] [blame]
Scott Bakera1e53fa2020-04-01 11:13:34 -07001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
Scott Baker9173ed82020-05-19 08:30:12 -070023 "github.com/golang/protobuf/jsonpb"
24 "github.com/golang/protobuf/proto"
25 "github.com/golang/protobuf/ptypes"
Scott Bakera1e53fa2020-04-01 11:13:34 -070026 flags "github.com/jessevdk/go-flags"
Scott Bakera1e53fa2020-04-01 11:13:34 -070027 "github.com/opencord/voltctl/pkg/filter"
28 "github.com/opencord/voltctl/pkg/format"
Scott Baker9173ed82020-05-19 08:30:12 -070029 "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Bakera1e53fa2020-04-01 11:13:34 -070030 "log"
31 "os"
32 "os/signal"
33 "strings"
34 "time"
35)
36
37/*
38 * The "message listen" command supports two types of output:
39 * 1) A summary output where a row is displayed for each message received. For the summary
40 * format, DEFAULT_MESSAGE_FORMAT contains the default list of columns that will be
41 * display and can be overridden at runtime.
42 * 2) A body output where the full grpcurl or json body is output for each message received.
43 *
44 * These two modes are switched by using the "-b" / "--body" flag.
45 *
46 * The summary mode has the potential to aggregate data together from multiple parts of the
47 * message. For example, it currently aggregates the InterAdapterHeader contents together with
48 * the InterContainerHeader contents.
49 *
50 * Similar to "event listen", the "message listen" command operates in a streaming mode, rather
51 * than collecting a list of results and then emitting them at program exit. This is done to
52 * facilitate options such as "-F" / "--follow" where the program is intended to operate
53 * continuously. This means that automatically calculating column widths is not practical, and
54 * a set of Fixed widths (MessageHeaderDefaultWidths) are predefined.
55 *
56 * As there are multiple kafka topics that can be listened to, specifying a topic is a
57 * mandatory positional argument for the `message listen` command. Common topics include:
58 * * openolt
59 * * brcm_openonu_adapter
60 * * rwcore
61 * * core-pair-1
62 */
63
64const (
65 DEFAULT_MESSAGE_FORMAT = "table{{.Id}}\t{{.Type}}\t{{.FromTopic}}\t{{.ToTopic}}\t{{.KeyTopic}}\t{{.InterAdapterType}}"
66)
67
68type MessageListenOpts struct {
69 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
70 // nolint: staticcheck
71 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
72 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
73 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
74 ShowBody bool `short:"b" long:"show-body" description:"Show body of messages rather than only a header summary"`
75 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
76 Now bool `short:"n" long:"now" description:"Stop printing messages when current time is reached"`
77 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
78 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
79
80 Args struct {
81 Topic string
82 } `positional-args:"yes" required:"yes"`
83}
84
85type MessageOpts struct {
86 MessageListen MessageListenOpts `command:"listen"`
87}
88
89var interAdapterOpts = MessageOpts{}
90
91/* MessageHeader is a set of fields extracted
92 * from voltha.MessageHeader as well as useful other
93 * places such as InterAdapterHeader. These are fields that
94 * will be summarized in list mode and/or can be filtered
95 * on.
96 */
97type MessageHeader struct {
98 Id string `json:"id"`
99 Type string `json:"type"`
100 FromTopic string `json:"from_topic"`
101 ToTopic string `json:"to_topic"`
102 KeyTopic string `json:"key_topic"`
103 Timestamp time.Time `json:"timestamp"`
104 InterAdapterType string `json:"inter_adapter_type"` // interadapter header
105 ToDeviceId string `json:"to_device_id"` // interadapter header
106 ProxyDeviceId string `json:"proxy_device_id"` //interadapter header
107}
108
109/* Fixed widths because we output in a continuous streaming
110 * mode rather than a table-based dump at the end.
111 */
112type MessageHeaderWidths struct {
113 Id int
114 Type int
115 FromTopic int
116 ToTopic int
117 KeyTopic int
118 InterAdapterType int
119 ToDeviceId int
120 ProxyDeviceId int
121 Timestamp int
122}
123
124var DefaultMessageWidths MessageHeaderWidths = MessageHeaderWidths{
125 Id: 32,
126 Type: 10,
127 FromTopic: 16,
128 ToTopic: 16,
129 KeyTopic: 10,
130 Timestamp: 10,
131 InterAdapterType: 14,
132 ToDeviceId: 10,
133 ProxyDeviceId: 10,
134}
135
Scott Baker9173ed82020-05-19 08:30:12 -0700136// jsonpb requires a resolver to resolve Any.Any into proto.Message.
137type VolthaAnyResolver struct{}
138
139func (r *VolthaAnyResolver) Resolve(typeURL string) (proto.Message, error) {
140 // TODO: We should be able to get this automatically via reflection using
141 // the following commented-out code, but it requires upgrading voltctl to
142 // use newer versions of protobuf libraries.
143
144 /*
145 msgType, err := protoregistry.GlobalTypes.FindMessageByURL(typeURL)
146 if err != nil {
147 return err
148 }
149 return msgType.New(), nil*/
150
151 // The intercontianer message bus is where we need to map from Any.Any
152 // to the appropriate protos when generating json output.
153
154 typeURL = strings.TrimPrefix(typeURL, "type.googleapis.com/")
155
156 switch typeURL {
157 case "voltha.StrType":
158 return &inter_container.StrType{}, nil
159 case "voltha.IntType":
160 return &inter_container.IntType{}, nil
161 case "voltha.BoolType":
162 return &inter_container.BoolType{}, nil
163 case "voltha.Packet":
164 return &inter_container.Packet{}, nil
165 case "voltha.ErrorCode":
166 return &inter_container.ErrorCode{}, nil
167 case "voltha.Error":
168 return &inter_container.Error{}, nil
169 case "voltha.Header":
170 return &inter_container.Header{}, nil
171 case "voltha.Argument":
172 return &inter_container.Argument{}, nil
173 case "voltha.InterContainerMessage":
174 return &inter_container.InterContainerMessage{}, nil
175 case "voltha.InterContainerRequestBody":
176 return &inter_container.InterContainerRequestBody{}, nil
177 case "voltha.InterContainerResponseBody":
178 return &inter_container.InterContainerResponseBody{}, nil
179 case "voltha.SwitchCapability":
180 return &inter_container.SwitchCapability{}, nil
181 case "voltha.PortCapability":
182 return &inter_container.PortCapability{}, nil
183 case "voltha.DeviceDiscovered":
184 return &inter_container.DeviceDiscovered{}, nil
185 case "voltha.InterAdapterMessageType":
186 return &inter_container.InterAdapterMessageType{}, nil
187 case "voltha.InterAdapterOmciMessage":
188 return &inter_container.InterAdapterOmciMessage{}, nil
189 case "voltha.InterAdapterTechProfileDownloadMessage":
190 return &inter_container.InterAdapterTechProfileDownloadMessage{}, nil
191 case "voltha.InterAdapterDeleteGemPortMessage":
192 return &inter_container.InterAdapterDeleteGemPortMessage{}, nil
193 case "voltha.InterAdapterDeleteTcontMessage":
194 return &inter_container.InterAdapterDeleteTcontMessage{}, nil
195 case "voltha.InterAdapterResponseBody":
196 return &inter_container.InterAdapterResponseBody{}, nil
197 case "voltha.InterAdapterMessage":
198 return &inter_container.InterAdapterMessage{}, nil
199 }
200
201 return nil, fmt.Errorf("Unknown any type: %s", typeURL)
202}
203
Scott Bakera1e53fa2020-04-01 11:13:34 -0700204func RegisterMessageCommands(parent *flags.Parser) {
205 if _, err := parent.AddCommand("message", "message commands", "Commands for observing messages between components", &interAdapterOpts); err != nil {
206 Error.Fatalf("Unable to register message commands with voltctl command parser: %s", err.Error())
207 }
208}
209
Scott Bakera1e53fa2020-04-01 11:13:34 -0700210// Extract the header, as well as a few other items that might be of interest
Scott Baker9173ed82020-05-19 08:30:12 -0700211func DecodeInterContainerHeader(b []byte, ts time.Time) (*MessageHeader, error) {
212 m := &inter_container.InterContainerMessage{}
213 if err := proto.Unmarshal(b, m); err != nil {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700214 return nil, err
215 }
216
Scott Baker9173ed82020-05-19 08:30:12 -0700217 header := m.Header
218 id := header.Id
219 msgType := header.Type
220 fromTopic := header.FromTopic
221 toTopic := header.ToTopic
222 keyTopic := header.KeyTopic
223 timestamp, err := DecodeTimestamp(header.Timestamp)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700224 if err != nil {
225 return nil, err
226 }
227
228 // Pull some additional data out of the InterAdapterHeader, if it
229 // is embedded inside the InterContainerMessage
230
231 var iaMessageTypeStr string
232 var toDeviceId string
233 var proxyDeviceId string
Scott Baker9173ed82020-05-19 08:30:12 -0700234
235 bodyKind, err := ptypes.AnyMessageName(m.Body)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700236 if err != nil {
237 return nil, err
238 }
Scott Baker9173ed82020-05-19 08:30:12 -0700239
Scott Bakera1e53fa2020-04-01 11:13:34 -0700240 switch bodyKind {
241 case "voltha.InterContainerRequestBody":
Scott Baker9173ed82020-05-19 08:30:12 -0700242 icRequest := &inter_container.InterContainerRequestBody{}
243 err := ptypes.UnmarshalAny(m.Body, icRequest)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700244 if err != nil {
245 return nil, err
246 }
Scott Baker9173ed82020-05-19 08:30:12 -0700247
248 argList := icRequest.Args
249 for _, arg := range argList {
250 key := arg.Key
Scott Bakera1e53fa2020-04-01 11:13:34 -0700251 if key == "msg" {
Scott Baker9173ed82020-05-19 08:30:12 -0700252 argBodyKind, err := ptypes.AnyMessageName(m.Body)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700253 if err != nil {
254 return nil, err
255 }
256 switch argBodyKind {
257 case "voltha.InterAdapterMessage":
Scott Baker9173ed82020-05-19 08:30:12 -0700258 iaMsg := &inter_container.InterAdapterMessage{}
259 err := ptypes.UnmarshalAny(arg.Value, iaMsg)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700260 if err != nil {
261 return nil, err
262 }
Scott Baker9173ed82020-05-19 08:30:12 -0700263 iaHeader := iaMsg.Header
264 iaMessageType := iaHeader.Type
265 iaMessageTypeStr = inter_container.InterAdapterMessageType_Types_name[int32(iaMessageType)]
Scott Bakera1e53fa2020-04-01 11:13:34 -0700266
Scott Baker9173ed82020-05-19 08:30:12 -0700267 toDeviceId = iaHeader.ToDeviceId
268 proxyDeviceId = iaHeader.ProxyDeviceId
Scott Bakera1e53fa2020-04-01 11:13:34 -0700269 }
270 }
271 }
272 }
Scott Baker9173ed82020-05-19 08:30:12 -0700273
274 messageHeaderType := inter_container.MessageType_name[int32(msgType)]
Scott Bakera1e53fa2020-04-01 11:13:34 -0700275
276 icHeader := MessageHeader{Id: id,
Neha Sharma19ca2bf2020-05-11 15:34:17 +0000277 Type: messageHeaderType,
Scott Bakera1e53fa2020-04-01 11:13:34 -0700278 FromTopic: fromTopic,
279 ToTopic: toTopic,
280 KeyTopic: keyTopic,
281 Timestamp: timestamp,
282 InterAdapterType: iaMessageTypeStr,
283 ProxyDeviceId: proxyDeviceId,
284 ToDeviceId: toDeviceId,
285 }
286
287 return &icHeader, nil
288}
289
290// Print the full message, either in JSON or in GRPCURL-human-readable format,
291// depending on which grpcurl formatter is passed in.
Scott Baker9173ed82020-05-19 08:30:12 -0700292func PrintInterContainerMessage(outputAs string, b []byte) error {
293 ms := &inter_container.InterContainerMessage{}
294 if err := proto.Unmarshal(b, ms); err != nil {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700295 return err
296 }
Scott Baker9173ed82020-05-19 08:30:12 -0700297
298 if outputAs == "json" {
299 marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
300 asJson, err := marshaler.MarshalToString(ms)
301 if err != nil {
302 return fmt.Errorf("Failed to marshal the json: %s", err)
303 }
304 fmt.Println(asJson)
305 } else {
306 // print in golang native format
307 fmt.Printf("%v\n", ms)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700308 }
Scott Baker9173ed82020-05-19 08:30:12 -0700309
Scott Bakera1e53fa2020-04-01 11:13:34 -0700310 return nil
311}
312
313// Print just the enriched InterContainerHeader. This is either in JSON format, or in
314// table format.
315func PrintInterContainerHeader(outputAs string, outputFormat string, hdr *MessageHeader) error {
316 if outputAs == "json" {
317 asJson, err := json.Marshal(hdr)
318 if err != nil {
319 return fmt.Errorf("Error marshalling JSON: %v", err)
320 } else {
321 fmt.Printf("%s\n", asJson)
322 }
323 } else {
324 f := format.Format(outputFormat)
325 output, err := f.ExecuteFixedWidth(DefaultMessageWidths, false, *hdr)
326 if err != nil {
327 return err
328 }
329 fmt.Printf("%s\n", output)
330 }
331 return nil
332}
333
Scott Bakera1e53fa2020-04-01 11:13:34 -0700334// Start output, print any column headers or other start characters
335func (options *MessageListenOpts) StartOutput(outputFormat string) error {
336 if options.OutputAs == "json" {
337 fmt.Println("[")
338 } else if (options.OutputAs == "table") && !options.ShowBody {
339 f := format.Format(outputFormat)
340 output, err := f.ExecuteFixedWidth(DefaultMessageWidths, true, nil)
341 if err != nil {
342 return err
343 }
344 fmt.Println(output)
345 }
346 return nil
347}
348
349// Finish output, print any column footers or other end characters
350func (options *MessageListenOpts) FinishOutput() {
351 if options.OutputAs == "json" {
352 fmt.Println("]")
353 }
354}
355
356func (options *MessageListenOpts) Execute(args []string) error {
357 ProcessGlobalOptions()
358 if GlobalConfig.Kafka == "" {
359 return errors.New("Kafka address is not specified")
360 }
361
Scott Bakera1e53fa2020-04-01 11:13:34 -0700362 config := sarama.NewConfig()
363 config.ClientID = "go-kafka-consumer"
364 config.Consumer.Return.Errors = true
365 config.Version = sarama.V1_0_0_0
366 brokers := []string{GlobalConfig.Kafka}
367
368 client, err := sarama.NewClient(brokers, config)
369 if err != nil {
370 return err
371 }
372
373 defer func() {
374 if err := client.Close(); err != nil {
375 panic(err)
376 }
377 }()
378
379 consumer, consumerErrors, highwaterMarks, err := startInterContainerConsumer([]string{options.Args.Topic}, client)
380 if err != nil {
381 return err
382 }
383
384 highwater := highwaterMarks[options.Args.Topic]
385
386 signals := make(chan os.Signal, 1)
387 signal.Notify(signals, os.Interrupt)
388
389 // Count how many message processed
390 consumeCount := 0
391
392 // Count how many messages were printed
393 count := 0
394
Scott Bakera1e53fa2020-04-01 11:13:34 -0700395 var headerFilter *filter.Filter
396 if options.Filter != "" {
397 headerFilterVal, err := filter.Parse(options.Filter)
398 if err != nil {
399 return fmt.Errorf("Failed to parse filter: %v", err)
400 }
401 headerFilter = &headerFilterVal
402 }
403
404 outputFormat := CharReplacer.Replace(options.Format)
405 if outputFormat == "" {
406 outputFormat = GetCommandOptionWithDefault("intercontainer-listen", "format", DEFAULT_MESSAGE_FORMAT)
407 }
408
409 err = options.StartOutput(outputFormat)
410 if err != nil {
411 return err
412 }
413
414 var since *time.Time
415 if options.Since != "" {
416 since, err = ParseSince(options.Since)
417 if err != nil {
418 return err
419 }
420 }
421
422 // Get signnal for finish
423 doneCh := make(chan struct{})
424 go func() {
425 tStart := time.Now()
426 Loop:
427 for {
428 // Initialize the idle timeout timer
429 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
430 select {
431 case msg := <-consumer:
432 consumeCount++
Scott Baker9173ed82020-05-19 08:30:12 -0700433 hdr, err := DecodeInterContainerHeader(msg.Value, msg.Timestamp)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700434 if err != nil {
435 log.Printf("Error decoding header %v\n", err)
436 continue
437 }
Scott Baker9a2d9a42020-06-09 18:11:26 -0700438
439 match := false
440 if headerFilter != nil {
441 var err error
442 if match, err = headerFilter.Evaluate(*hdr); err != nil {
443 log.Printf("%v\n", err)
444 }
445 } else {
446 match = true
447 }
448
449 if !match {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700450 // skip printing message
451 } else if since != nil && hdr.Timestamp.Before(*since) {
452 // it's too old
453 } else {
454 // comma separated between this message and predecessor
455 if count > 0 {
456 if options.OutputAs == "json" {
457 fmt.Println(",")
458 }
459 }
460
461 // Print it
462 if options.ShowBody {
Scott Baker9173ed82020-05-19 08:30:12 -0700463 if err := PrintInterContainerMessage(options.OutputAs, msg.Value); err != nil {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700464 log.Printf("%v\n", err)
465 }
466 } else {
467 if err := PrintInterContainerHeader(options.OutputAs, outputFormat, hdr); err != nil {
468 log.Printf("%v\n", err)
469 }
470 }
471
472 // Check to see if we've hit the "count" threshold the user specified
473 count++
474 if (options.Count > 0) && (count >= options.Count) {
475 log.Println("Count reached")
476 doneCh <- struct{}{}
477 break Loop
478 }
479
480 // Check to see if we've hit the "now" threshold the user specified
481 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
482 log.Println("Now timestamp reached")
483 doneCh <- struct{}{}
484 break Loop
485 }
486 }
487
488 // If we're not in follow mode, see if we hit the highwater mark
489 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
490 log.Println("High water reached")
491 doneCh <- struct{}{}
492 break Loop
493 }
494
495 // Reset the timeout timer
496 if !timeoutTimer.Stop() {
497 <-timeoutTimer.C
498 }
499 case consumerError := <-consumerErrors:
500 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
501 doneCh <- struct{}{}
502 case <-signals:
503 doneCh <- struct{}{}
504 case <-timeoutTimer.C:
505 log.Printf("Idle timeout\n")
506 doneCh <- struct{}{}
507 }
508 }
509 }()
510
511 <-doneCh
512
513 options.FinishOutput()
514
515 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
516
517 return nil
518}
519
520// Consume message from Sarama and send them out on a channel.
521// Supports multiple topics.
522// Taken from Sarama example consumer.
523func startInterContainerConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
524 master, err := sarama.NewConsumerFromClient(client)
525 if err != nil {
526 return nil, nil, nil, err
527 }
528
529 consumers := make(chan *sarama.ConsumerMessage)
530 errors := make(chan *sarama.ConsumerError)
531 highwater := make(map[string]int64)
532 for _, topic := range topics {
533 if strings.Contains(topic, "__consumer_offsets") {
534 continue
535 }
536 partitions, _ := master.Partitions(topic)
537
538 // TODO: Add support for multiple partitions
539 if len(partitions) > 1 {
540 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
541 }
542
543 hw, err := client.GetOffset("openolt", partitions[0], sarama.OffsetNewest)
544 if err != nil {
545 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
546 }
547 highwater[topic] = hw
548
549 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
550 if nil != err {
551 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
552 }
553 log.Println(" Start consuming topic ", topic)
554 go func(topic string, consumer sarama.PartitionConsumer) {
555 for {
556 select {
557 case consumerError := <-consumer.Errors():
558 errors <- consumerError
559
560 case msg := <-consumer.Messages():
561 consumers <- msg
562 }
563 }
564 }(topic, consumer)
565 }
566
567 return consumers, errors, highwater, nil
568}