blob: 338affdbdf350e7969f6ded90290667bcba76da2 [file] [log] [blame]
Scott Bakera1e53fa2020-04-01 11:13:34 -07001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
Scott Baker9173ed82020-05-19 08:30:12 -070023 "github.com/golang/protobuf/jsonpb"
24 "github.com/golang/protobuf/proto"
25 "github.com/golang/protobuf/ptypes"
Scott Bakera1e53fa2020-04-01 11:13:34 -070026 flags "github.com/jessevdk/go-flags"
Scott Bakera1e53fa2020-04-01 11:13:34 -070027 "github.com/opencord/voltctl/pkg/filter"
28 "github.com/opencord/voltctl/pkg/format"
kesavand8ec4fc02021-01-27 09:10:22 -050029 "github.com/opencord/voltha-protos/v4/go/inter_container"
Scott Bakera1e53fa2020-04-01 11:13:34 -070030 "log"
31 "os"
32 "os/signal"
33 "strings"
34 "time"
35)
36
37/*
38 * The "message listen" command supports two types of output:
39 * 1) A summary output where a row is displayed for each message received. For the summary
40 * format, DEFAULT_MESSAGE_FORMAT contains the default list of columns that will be
41 * display and can be overridden at runtime.
42 * 2) A body output where the full grpcurl or json body is output for each message received.
43 *
44 * These two modes are switched by using the "-b" / "--body" flag.
45 *
46 * The summary mode has the potential to aggregate data together from multiple parts of the
47 * message. For example, it currently aggregates the InterAdapterHeader contents together with
48 * the InterContainerHeader contents.
49 *
50 * Similar to "event listen", the "message listen" command operates in a streaming mode, rather
51 * than collecting a list of results and then emitting them at program exit. This is done to
52 * facilitate options such as "-F" / "--follow" where the program is intended to operate
53 * continuously. This means that automatically calculating column widths is not practical, and
54 * a set of Fixed widths (MessageHeaderDefaultWidths) are predefined.
55 *
56 * As there are multiple kafka topics that can be listened to, specifying a topic is a
57 * mandatory positional argument for the `message listen` command. Common topics include:
58 * * openolt
59 * * brcm_openonu_adapter
60 * * rwcore
61 * * core-pair-1
62 */
63
64const (
65 DEFAULT_MESSAGE_FORMAT = "table{{.Id}}\t{{.Type}}\t{{.FromTopic}}\t{{.ToTopic}}\t{{.KeyTopic}}\t{{.InterAdapterType}}"
66)
67
68type MessageListenOpts struct {
69 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
70 // nolint: staticcheck
71 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
72 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
73 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
74 ShowBody bool `short:"b" long:"show-body" description:"Show body of messages rather than only a header summary"`
75 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
76 Now bool `short:"n" long:"now" description:"Stop printing messages when current time is reached"`
77 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
78 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
79
80 Args struct {
81 Topic string
82 } `positional-args:"yes" required:"yes"`
83}
84
85type MessageOpts struct {
86 MessageListen MessageListenOpts `command:"listen"`
87}
88
89var interAdapterOpts = MessageOpts{}
90
91/* MessageHeader is a set of fields extracted
92 * from voltha.MessageHeader as well as useful other
93 * places such as InterAdapterHeader. These are fields that
94 * will be summarized in list mode and/or can be filtered
95 * on.
96 */
97type MessageHeader struct {
98 Id string `json:"id"`
99 Type string `json:"type"`
100 FromTopic string `json:"from_topic"`
101 ToTopic string `json:"to_topic"`
102 KeyTopic string `json:"key_topic"`
103 Timestamp time.Time `json:"timestamp"`
104 InterAdapterType string `json:"inter_adapter_type"` // interadapter header
105 ToDeviceId string `json:"to_device_id"` // interadapter header
106 ProxyDeviceId string `json:"proxy_device_id"` //interadapter header
107}
108
109/* Fixed widths because we output in a continuous streaming
110 * mode rather than a table-based dump at the end.
111 */
112type MessageHeaderWidths struct {
113 Id int
114 Type int
115 FromTopic int
116 ToTopic int
117 KeyTopic int
118 InterAdapterType int
119 ToDeviceId int
120 ProxyDeviceId int
121 Timestamp int
122}
123
124var DefaultMessageWidths MessageHeaderWidths = MessageHeaderWidths{
125 Id: 32,
126 Type: 10,
127 FromTopic: 16,
128 ToTopic: 16,
129 KeyTopic: 10,
130 Timestamp: 10,
131 InterAdapterType: 14,
132 ToDeviceId: 10,
133 ProxyDeviceId: 10,
134}
135
Scott Baker9173ed82020-05-19 08:30:12 -0700136// jsonpb requires a resolver to resolve Any.Any into proto.Message.
137type VolthaAnyResolver struct{}
138
139func (r *VolthaAnyResolver) Resolve(typeURL string) (proto.Message, error) {
140 // TODO: We should be able to get this automatically via reflection using
141 // the following commented-out code, but it requires upgrading voltctl to
142 // use newer versions of protobuf libraries.
143
144 /*
145 msgType, err := protoregistry.GlobalTypes.FindMessageByURL(typeURL)
146 if err != nil {
147 return err
148 }
149 return msgType.New(), nil*/
150
151 // The intercontianer message bus is where we need to map from Any.Any
152 // to the appropriate protos when generating json output.
153
154 typeURL = strings.TrimPrefix(typeURL, "type.googleapis.com/")
155
156 switch typeURL {
157 case "voltha.StrType":
158 return &inter_container.StrType{}, nil
159 case "voltha.IntType":
160 return &inter_container.IntType{}, nil
161 case "voltha.BoolType":
162 return &inter_container.BoolType{}, nil
163 case "voltha.Packet":
164 return &inter_container.Packet{}, nil
165 case "voltha.ErrorCode":
166 return &inter_container.ErrorCode{}, nil
167 case "voltha.Error":
168 return &inter_container.Error{}, nil
169 case "voltha.Header":
170 return &inter_container.Header{}, nil
171 case "voltha.Argument":
172 return &inter_container.Argument{}, nil
173 case "voltha.InterContainerMessage":
174 return &inter_container.InterContainerMessage{}, nil
175 case "voltha.InterContainerRequestBody":
176 return &inter_container.InterContainerRequestBody{}, nil
177 case "voltha.InterContainerResponseBody":
178 return &inter_container.InterContainerResponseBody{}, nil
179 case "voltha.SwitchCapability":
180 return &inter_container.SwitchCapability{}, nil
Scott Baker9173ed82020-05-19 08:30:12 -0700181 case "voltha.DeviceDiscovered":
182 return &inter_container.DeviceDiscovered{}, nil
183 case "voltha.InterAdapterMessageType":
184 return &inter_container.InterAdapterMessageType{}, nil
185 case "voltha.InterAdapterOmciMessage":
186 return &inter_container.InterAdapterOmciMessage{}, nil
187 case "voltha.InterAdapterTechProfileDownloadMessage":
188 return &inter_container.InterAdapterTechProfileDownloadMessage{}, nil
189 case "voltha.InterAdapterDeleteGemPortMessage":
190 return &inter_container.InterAdapterDeleteGemPortMessage{}, nil
191 case "voltha.InterAdapterDeleteTcontMessage":
192 return &inter_container.InterAdapterDeleteTcontMessage{}, nil
193 case "voltha.InterAdapterResponseBody":
194 return &inter_container.InterAdapterResponseBody{}, nil
195 case "voltha.InterAdapterMessage":
196 return &inter_container.InterAdapterMessage{}, nil
197 }
198
199 return nil, fmt.Errorf("Unknown any type: %s", typeURL)
200}
201
Scott Bakera1e53fa2020-04-01 11:13:34 -0700202func RegisterMessageCommands(parent *flags.Parser) {
203 if _, err := parent.AddCommand("message", "message commands", "Commands for observing messages between components", &interAdapterOpts); err != nil {
204 Error.Fatalf("Unable to register message commands with voltctl command parser: %s", err.Error())
205 }
206}
207
Scott Bakera1e53fa2020-04-01 11:13:34 -0700208// Extract the header, as well as a few other items that might be of interest
Scott Baker9173ed82020-05-19 08:30:12 -0700209func DecodeInterContainerHeader(b []byte, ts time.Time) (*MessageHeader, error) {
210 m := &inter_container.InterContainerMessage{}
211 if err := proto.Unmarshal(b, m); err != nil {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700212 return nil, err
213 }
214
Scott Baker9173ed82020-05-19 08:30:12 -0700215 header := m.Header
216 id := header.Id
217 msgType := header.Type
218 fromTopic := header.FromTopic
219 toTopic := header.ToTopic
220 keyTopic := header.KeyTopic
221 timestamp, err := DecodeTimestamp(header.Timestamp)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700222 if err != nil {
223 return nil, err
224 }
225
226 // Pull some additional data out of the InterAdapterHeader, if it
227 // is embedded inside the InterContainerMessage
228
229 var iaMessageTypeStr string
230 var toDeviceId string
231 var proxyDeviceId string
Scott Baker9173ed82020-05-19 08:30:12 -0700232
233 bodyKind, err := ptypes.AnyMessageName(m.Body)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700234 if err != nil {
235 return nil, err
236 }
Scott Baker9173ed82020-05-19 08:30:12 -0700237
Scott Bakera1e53fa2020-04-01 11:13:34 -0700238 switch bodyKind {
239 case "voltha.InterContainerRequestBody":
Scott Baker9173ed82020-05-19 08:30:12 -0700240 icRequest := &inter_container.InterContainerRequestBody{}
241 err := ptypes.UnmarshalAny(m.Body, icRequest)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700242 if err != nil {
243 return nil, err
244 }
Scott Baker9173ed82020-05-19 08:30:12 -0700245
246 argList := icRequest.Args
247 for _, arg := range argList {
248 key := arg.Key
Scott Bakera1e53fa2020-04-01 11:13:34 -0700249 if key == "msg" {
Scott Baker9173ed82020-05-19 08:30:12 -0700250 argBodyKind, err := ptypes.AnyMessageName(m.Body)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700251 if err != nil {
252 return nil, err
253 }
254 switch argBodyKind {
255 case "voltha.InterAdapterMessage":
Scott Baker9173ed82020-05-19 08:30:12 -0700256 iaMsg := &inter_container.InterAdapterMessage{}
257 err := ptypes.UnmarshalAny(arg.Value, iaMsg)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700258 if err != nil {
259 return nil, err
260 }
Scott Baker9173ed82020-05-19 08:30:12 -0700261 iaHeader := iaMsg.Header
262 iaMessageType := iaHeader.Type
263 iaMessageTypeStr = inter_container.InterAdapterMessageType_Types_name[int32(iaMessageType)]
Scott Bakera1e53fa2020-04-01 11:13:34 -0700264
Scott Baker9173ed82020-05-19 08:30:12 -0700265 toDeviceId = iaHeader.ToDeviceId
266 proxyDeviceId = iaHeader.ProxyDeviceId
Scott Bakera1e53fa2020-04-01 11:13:34 -0700267 }
268 }
269 }
270 }
Scott Baker9173ed82020-05-19 08:30:12 -0700271
272 messageHeaderType := inter_container.MessageType_name[int32(msgType)]
Scott Bakera1e53fa2020-04-01 11:13:34 -0700273
274 icHeader := MessageHeader{Id: id,
Neha Sharma19ca2bf2020-05-11 15:34:17 +0000275 Type: messageHeaderType,
Scott Bakera1e53fa2020-04-01 11:13:34 -0700276 FromTopic: fromTopic,
277 ToTopic: toTopic,
278 KeyTopic: keyTopic,
279 Timestamp: timestamp,
280 InterAdapterType: iaMessageTypeStr,
281 ProxyDeviceId: proxyDeviceId,
282 ToDeviceId: toDeviceId,
283 }
284
285 return &icHeader, nil
286}
287
288// Print the full message, either in JSON or in GRPCURL-human-readable format,
289// depending on which grpcurl formatter is passed in.
Scott Baker9173ed82020-05-19 08:30:12 -0700290func PrintInterContainerMessage(outputAs string, b []byte) error {
291 ms := &inter_container.InterContainerMessage{}
292 if err := proto.Unmarshal(b, ms); err != nil {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700293 return err
294 }
Scott Baker9173ed82020-05-19 08:30:12 -0700295
296 if outputAs == "json" {
297 marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
298 asJson, err := marshaler.MarshalToString(ms)
299 if err != nil {
300 return fmt.Errorf("Failed to marshal the json: %s", err)
301 }
302 fmt.Println(asJson)
303 } else {
304 // print in golang native format
305 fmt.Printf("%v\n", ms)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700306 }
Scott Baker9173ed82020-05-19 08:30:12 -0700307
Scott Bakera1e53fa2020-04-01 11:13:34 -0700308 return nil
309}
310
311// Print just the enriched InterContainerHeader. This is either in JSON format, or in
312// table format.
313func PrintInterContainerHeader(outputAs string, outputFormat string, hdr *MessageHeader) error {
314 if outputAs == "json" {
315 asJson, err := json.Marshal(hdr)
316 if err != nil {
317 return fmt.Errorf("Error marshalling JSON: %v", err)
318 } else {
319 fmt.Printf("%s\n", asJson)
320 }
321 } else {
322 f := format.Format(outputFormat)
323 output, err := f.ExecuteFixedWidth(DefaultMessageWidths, false, *hdr)
324 if err != nil {
325 return err
326 }
327 fmt.Printf("%s\n", output)
328 }
329 return nil
330}
331
Scott Bakera1e53fa2020-04-01 11:13:34 -0700332// Start output, print any column headers or other start characters
333func (options *MessageListenOpts) StartOutput(outputFormat string) error {
334 if options.OutputAs == "json" {
335 fmt.Println("[")
336 } else if (options.OutputAs == "table") && !options.ShowBody {
337 f := format.Format(outputFormat)
338 output, err := f.ExecuteFixedWidth(DefaultMessageWidths, true, nil)
339 if err != nil {
340 return err
341 }
342 fmt.Println(output)
343 }
344 return nil
345}
346
347// Finish output, print any column footers or other end characters
348func (options *MessageListenOpts) FinishOutput() {
349 if options.OutputAs == "json" {
350 fmt.Println("]")
351 }
352}
353
354func (options *MessageListenOpts) Execute(args []string) error {
355 ProcessGlobalOptions()
David K. Bainbridge9189c632021-03-26 21:52:21 +0000356 if GlobalConfig.Current().Kafka == "" {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700357 return errors.New("Kafka address is not specified")
358 }
359
Scott Bakera1e53fa2020-04-01 11:13:34 -0700360 config := sarama.NewConfig()
361 config.ClientID = "go-kafka-consumer"
362 config.Consumer.Return.Errors = true
363 config.Version = sarama.V1_0_0_0
David K. Bainbridge9189c632021-03-26 21:52:21 +0000364 brokers := []string{GlobalConfig.Current().Kafka}
Scott Bakera1e53fa2020-04-01 11:13:34 -0700365
366 client, err := sarama.NewClient(brokers, config)
367 if err != nil {
368 return err
369 }
370
371 defer func() {
372 if err := client.Close(); err != nil {
373 panic(err)
374 }
375 }()
376
377 consumer, consumerErrors, highwaterMarks, err := startInterContainerConsumer([]string{options.Args.Topic}, client)
378 if err != nil {
379 return err
380 }
381
382 highwater := highwaterMarks[options.Args.Topic]
383
384 signals := make(chan os.Signal, 1)
385 signal.Notify(signals, os.Interrupt)
386
387 // Count how many message processed
388 consumeCount := 0
389
390 // Count how many messages were printed
391 count := 0
392
Scott Bakera1e53fa2020-04-01 11:13:34 -0700393 var headerFilter *filter.Filter
394 if options.Filter != "" {
395 headerFilterVal, err := filter.Parse(options.Filter)
396 if err != nil {
397 return fmt.Errorf("Failed to parse filter: %v", err)
398 }
399 headerFilter = &headerFilterVal
400 }
401
402 outputFormat := CharReplacer.Replace(options.Format)
403 if outputFormat == "" {
404 outputFormat = GetCommandOptionWithDefault("intercontainer-listen", "format", DEFAULT_MESSAGE_FORMAT)
405 }
406
407 err = options.StartOutput(outputFormat)
408 if err != nil {
409 return err
410 }
411
412 var since *time.Time
413 if options.Since != "" {
414 since, err = ParseSince(options.Since)
415 if err != nil {
416 return err
417 }
418 }
419
420 // Get signnal for finish
421 doneCh := make(chan struct{})
422 go func() {
423 tStart := time.Now()
424 Loop:
425 for {
426 // Initialize the idle timeout timer
427 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
428 select {
429 case msg := <-consumer:
430 consumeCount++
Scott Baker9173ed82020-05-19 08:30:12 -0700431 hdr, err := DecodeInterContainerHeader(msg.Value, msg.Timestamp)
Scott Bakera1e53fa2020-04-01 11:13:34 -0700432 if err != nil {
433 log.Printf("Error decoding header %v\n", err)
434 continue
435 }
Scott Baker9a2d9a42020-06-09 18:11:26 -0700436
437 match := false
438 if headerFilter != nil {
439 var err error
440 if match, err = headerFilter.Evaluate(*hdr); err != nil {
441 log.Printf("%v\n", err)
442 }
443 } else {
444 match = true
445 }
446
447 if !match {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700448 // skip printing message
449 } else if since != nil && hdr.Timestamp.Before(*since) {
450 // it's too old
451 } else {
452 // comma separated between this message and predecessor
453 if count > 0 {
454 if options.OutputAs == "json" {
455 fmt.Println(",")
456 }
457 }
458
459 // Print it
460 if options.ShowBody {
Scott Baker9173ed82020-05-19 08:30:12 -0700461 if err := PrintInterContainerMessage(options.OutputAs, msg.Value); err != nil {
Scott Bakera1e53fa2020-04-01 11:13:34 -0700462 log.Printf("%v\n", err)
463 }
464 } else {
465 if err := PrintInterContainerHeader(options.OutputAs, outputFormat, hdr); err != nil {
466 log.Printf("%v\n", err)
467 }
468 }
469
470 // Check to see if we've hit the "count" threshold the user specified
471 count++
472 if (options.Count > 0) && (count >= options.Count) {
473 log.Println("Count reached")
474 doneCh <- struct{}{}
475 break Loop
476 }
477
478 // Check to see if we've hit the "now" threshold the user specified
479 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
480 log.Println("Now timestamp reached")
481 doneCh <- struct{}{}
482 break Loop
483 }
484 }
485
486 // If we're not in follow mode, see if we hit the highwater mark
487 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
488 log.Println("High water reached")
489 doneCh <- struct{}{}
490 break Loop
491 }
492
493 // Reset the timeout timer
494 if !timeoutTimer.Stop() {
495 <-timeoutTimer.C
496 }
497 case consumerError := <-consumerErrors:
498 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
499 doneCh <- struct{}{}
500 case <-signals:
501 doneCh <- struct{}{}
502 case <-timeoutTimer.C:
503 log.Printf("Idle timeout\n")
504 doneCh <- struct{}{}
505 }
506 }
507 }()
508
509 <-doneCh
510
511 options.FinishOutput()
512
513 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
514
515 return nil
516}
517
518// Consume message from Sarama and send them out on a channel.
519// Supports multiple topics.
520// Taken from Sarama example consumer.
521func startInterContainerConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
522 master, err := sarama.NewConsumerFromClient(client)
523 if err != nil {
524 return nil, nil, nil, err
525 }
526
527 consumers := make(chan *sarama.ConsumerMessage)
528 errors := make(chan *sarama.ConsumerError)
529 highwater := make(map[string]int64)
530 for _, topic := range topics {
531 if strings.Contains(topic, "__consumer_offsets") {
532 continue
533 }
534 partitions, _ := master.Partitions(topic)
535
536 // TODO: Add support for multiple partitions
537 if len(partitions) > 1 {
538 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
539 }
540
541 hw, err := client.GetOffset("openolt", partitions[0], sarama.OffsetNewest)
542 if err != nil {
543 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
544 }
545 highwater[topic] = hw
546
547 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
548 if nil != err {
549 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
550 }
551 log.Println(" Start consuming topic ", topic)
552 go func(topic string, consumer sarama.PartitionConsumer) {
553 for {
554 select {
555 case consumerError := <-consumer.Errors():
556 errors <- consumerError
557
558 case msg := <-consumer.Messages():
559 consumers <- msg
560 }
561 }
562 }(topic, consumer)
563 }
564
565 return consumers, errors, highwater, nil
566}