blob: 43bfe751455b81decf3216f836d30ba2366647d1 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/fullstorydev/grpcurl"
Scott Baker2fe436a2020-02-10 17:21:47 -080024 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/timestamp"
Scott Bakered4efab2020-01-13 19:12:25 -080026 flags "github.com/jessevdk/go-flags"
27 "github.com/jhump/protoreflect/desc"
28 "github.com/jhump/protoreflect/dynamic"
29 "github.com/opencord/voltctl/pkg/filter"
30 "github.com/opencord/voltctl/pkg/format"
31 "github.com/opencord/voltctl/pkg/model"
32 "log"
33 "os"
34 "os/signal"
35 "strings"
36 "time"
37)
38
39const (
40 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
41)
42
43type EventListenOpts struct {
David K. Bainbridge2b627612020-02-18 14:50:13 -080044 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
45 // nolint: staticcheck
Scott Bakered4efab2020-01-13 19:12:25 -080046 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
47 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
48 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
49 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
50 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
51 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
52 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Scott Bakerf05e60a2020-02-02 21:53:57 -080053 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Scott Bakered4efab2020-01-13 19:12:25 -080054}
55
56type EventOpts struct {
57 EventListen EventListenOpts `command:"listen"`
58}
59
60var eventOpts = EventOpts{}
61
62type EventHeader struct {
Scott Baker2fe436a2020-02-10 17:21:47 -080063 Category string `json:"category"`
64 SubCategory string `json:"sub_category"`
65 Type string `json:"type"`
66 Raised_ts time.Time `json:"raised_ts"`
67 Reported_ts time.Time `json:"reported_ts"`
68 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
69 Titles []string `json:"titles"` // Opportunistically collected list of titles
70 Timestamp time.Time `json:"timestamp"` // Timestamp from Kafka
Scott Bakered4efab2020-01-13 19:12:25 -080071}
72
73type EventHeaderWidths struct {
74 Category int
75 SubCategory int
76 Type int
77 Raised_ts int
78 Reported_ts int
79 Device_ids int
80 Titles int
81 Timestamp int
82}
83
84var DefaultWidths EventHeaderWidths = EventHeaderWidths{
85 Category: 13,
86 SubCategory: 3,
87 Type: 12,
88 Raised_ts: 10,
89 Reported_ts: 10,
90 Device_ids: 40,
91 Titles: 40,
92 Timestamp: 10,
93}
94
95func RegisterEventCommands(parent *flags.Parser) {
96 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
97 if err != nil {
98 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
99 }
100}
101
Scott Bakerf05e60a2020-02-02 21:53:57 -0800102func ParseSince(s string) (*time.Time, error) {
103 if strings.EqualFold(s, "now") {
104 since := time.Now()
105 return &since, nil
106 }
107
108 rfc3339Time, err := time.Parse(time.RFC3339, s)
109 if err == nil {
110 return &rfc3339Time, nil
111 }
112
113 duration, err := time.ParseDuration(s)
114 if err == nil {
115 since := time.Now().Add(-duration)
116 return &since, nil
117 }
118
119 return nil, fmt.Errorf("Unable to parse time specification `%s`. Please use either `now`, a duration, or an RFC3339-compliant string", s)
120}
121
Scott Baker2fe436a2020-02-10 17:21:47 -0800122// Convert a timestamp field in an event to a time.Time
123func DecodeTimestamp(tsIntf interface{}) (time.Time, error) {
124 ts, okay := tsIntf.(*timestamp.Timestamp)
125 if okay {
126 // Voltha-Protos 3.2.3 and above
127 result, err := ptypes.Timestamp(ts)
128 return result, err
129 }
130 tsFloat, okay := tsIntf.(float32)
131 if okay {
132 // Voltha-Protos 3.2.2 and below
133 return time.Unix(int64(tsFloat), 0), nil
134 }
Scott Bakera1e53fa2020-04-01 11:13:34 -0700135 tsInt64, okay := tsIntf.(int64)
136 if okay {
137 if tsInt64 > 10000000000000 {
138 // sometimes it's in nanoseconds
139 return time.Unix(tsInt64/1000000000, tsInt64%1000000000), nil
140 } else {
141 // sometimes it's in seconds
142 return time.Unix(tsInt64/1000, 0), nil
143 }
144 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800145 return time.Time{}, errors.New("Failed to decode timestamp")
146}
147
Scott Bakered4efab2020-01-13 19:12:25 -0800148// Extract the header, as well as a few other items that might be of interest
149func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
150 m := dynamic.NewMessage(md)
151 err := m.Unmarshal(b)
152 if err != nil {
153 return nil, err
154 }
155
156 headerIntf, err := m.TryGetFieldByName("header")
157 if err != nil {
158 return nil, err
159 }
160
161 header := headerIntf.(*dynamic.Message)
162
163 catIntf, err := header.TryGetFieldByName("category")
164 if err != nil {
165 return nil, err
166 }
167 cat := catIntf.(int32)
168
169 subCatIntf, err := header.TryGetFieldByName("sub_category")
170 if err != nil {
171 return nil, err
172 }
173 subCat := subCatIntf.(int32)
174
175 typeIntf, err := header.TryGetFieldByName("type")
176 if err != nil {
177 return nil, err
178 }
179 evType := typeIntf.(int32)
180
181 raisedIntf, err := header.TryGetFieldByName("raised_ts")
182 if err != nil {
183 return nil, err
184 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800185 raised, err := DecodeTimestamp(raisedIntf)
186 if err != nil {
187 return nil, err
188 }
Scott Bakered4efab2020-01-13 19:12:25 -0800189
190 reportedIntf, err := header.TryGetFieldByName("reported_ts")
191 if err != nil {
192 return nil, err
193 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800194 reported, err := DecodeTimestamp(reportedIntf)
195 if err != nil {
196 return nil, err
197 }
Scott Bakered4efab2020-01-13 19:12:25 -0800198
199 // Opportunistically try to extract the device_id and title from a kpi_event2
200 // note that there might actually be multiple_slice data, so there could
201 // be multiple device_id, multiple title, etc.
202 device_ids := make(map[string]interface{})
203 titles := make(map[string]interface{})
204
205 kpiIntf, err := m.TryGetFieldByName("kpi_event2")
206 if err == nil {
207 kpi, ok := kpiIntf.(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800208 if ok && kpi != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800209 sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
210 if err == nil {
211 sliceIntf, ok := sliceListIntf.([]interface{})
David K. Bainbridge2b627612020-02-18 14:50:13 -0800212 if ok && len(sliceIntf) > 0 {
Scott Bakered4efab2020-01-13 19:12:25 -0800213 slice, ok := sliceIntf[0].(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800214 if ok && slice != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800215 metadataIntf, err := slice.TryGetFieldByName("metadata")
216 if err == nil {
217 metadata, ok := metadataIntf.(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800218 if ok && metadata != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800219 deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
220 if err == nil {
221 device_ids[deviceIdIntf.(string)] = slice
222 }
223 titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
224 if err == nil {
225 titles[titleIntf.(string)] = slice
226 }
227 }
228 }
229 }
230 }
231 }
232 }
233 }
234
235 // Opportunistically try to pull a resource_id and title from a DeviceEvent
236 // There can only be one resource_id and title from a DeviceEvent, so it's easier
237 // than dealing with KPI_EVENT2.
238 deviceEventIntf, err := m.TryGetFieldByName("device_event")
239 if err == nil {
240 deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800241 if ok && deviceEvent != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800242 deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
243 if err == nil {
244 deviceEventName, ok := deviceEventNameIntf.(string)
245 if ok {
246 titles[deviceEventName] = deviceEvent
247 }
248 }
249 resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
250 if err == nil {
251 resourceId, ok := resourceIdIntf.(string)
252 if ok {
253 device_ids[resourceId] = deviceEvent
254 }
255 }
256 }
257 }
258
259 device_id_keys := make([]string, len(device_ids))
260 i := 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800261 for k := range device_ids {
Scott Bakered4efab2020-01-13 19:12:25 -0800262 device_id_keys[i] = k
263 i++
264 }
265
266 title_keys := make([]string, len(titles))
267 i = 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800268 for k := range titles {
Scott Bakered4efab2020-01-13 19:12:25 -0800269 title_keys[i] = k
270 i++
271 }
272
Neha Sharma19ca2bf2020-05-11 15:34:17 +0000273 header_category, err := model.GetEnumString(header, "category", cat)
274 if err != nil {
275 return nil, err
276 }
277
278 header_subcategory, err := model.GetEnumString(header, "sub_category", subCat)
279 if err != nil {
280 return nil, err
281 }
282
283 header_type, err := model.GetEnumString(header, "type", evType)
284 if err != nil {
285 return nil, err
286 }
287
288 evHeader := EventHeader{Category: header_category,
289 SubCategory: header_subcategory,
290 Type: header_type,
Scott Bakered4efab2020-01-13 19:12:25 -0800291 Raised_ts: raised,
292 Reported_ts: reported,
293 Device_ids: device_id_keys,
Scott Baker2fe436a2020-02-10 17:21:47 -0800294 Timestamp: ts,
Scott Bakered4efab2020-01-13 19:12:25 -0800295 Titles: title_keys}
296
297 return &evHeader, nil
298}
299
300// Print the full message, either in JSON or in GRPCURL-human-readable format,
301// depending on which grpcurl formatter is passed in.
302func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
303 m := dynamic.NewMessage(md)
304 err := m.Unmarshal(b)
305 if err != nil {
306 return err
307 }
308 s, err := f(m)
309 if err != nil {
310 return err
311 }
312 fmt.Println(s)
313 return nil
314}
315
316// Print just the enriched EventHeader. This is either in JSON format, or in
317// table format.
318func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
319 if outputAs == "json" {
320 asJson, err := json.Marshal(hdr)
321 if err != nil {
322 return fmt.Errorf("Error marshalling JSON: %v", err)
323 } else {
324 fmt.Printf("%s\n", asJson)
325 }
326 } else {
327 f := format.Format(outputFormat)
328 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
329 if err != nil {
330 return err
331 }
332 fmt.Printf("%s\n", output)
333 }
334 return nil
335}
336
337func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
338 // This is a very long-winded way to get a message descriptor
339
Scott Baker2fe436a2020-02-10 17:21:47 -0800340 descriptor, err := GetDescriptorSource()
David K. Bainbridge2b627612020-02-18 14:50:13 -0800341 if err != nil {
342 return nil, err
343 }
Scott Bakered4efab2020-01-13 19:12:25 -0800344
345 // get the symbol for voltha.events
346 eventSymbol, err := descriptor.FindSymbol("voltha.Event")
347 if err != nil {
348 return nil, err
349 }
350
351 /*
352 * EventSymbol is a Descriptor, but not a MessageDescrptior,
353 * so we can't look at it's fields yet. Go back to the file,
354 * call FindMessage to get the Message, then ...
355 */
356
357 eventFile := eventSymbol.GetFile()
358 eventMessage := eventFile.FindMessage("voltha.Event")
359
360 return eventMessage, nil
361}
362
363// Start output, print any column headers or other start characters
364func (options *EventListenOpts) StartOutput(outputFormat string) error {
365 if options.OutputAs == "json" {
366 fmt.Println("[")
367 } else if (options.OutputAs == "table") && !options.ShowBody {
368 f := format.Format(outputFormat)
369 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
370 if err != nil {
371 return err
372 }
373 fmt.Println(output)
374 }
375 return nil
376}
377
378// Finish output, print any column footers or other end characters
379func (options *EventListenOpts) FinishOutput() {
380 if options.OutputAs == "json" {
381 fmt.Println("]")
382 }
383}
384
385func (options *EventListenOpts) Execute(args []string) error {
386 ProcessGlobalOptions()
387 if GlobalConfig.Kafka == "" {
388 return errors.New("Kafka address is not specified")
389 }
390
391 eventMessage, err := GetEventMessageDesc()
392 if err != nil {
393 return err
394 }
395
396 config := sarama.NewConfig()
397 config.ClientID = "go-kafka-consumer"
398 config.Consumer.Return.Errors = true
399 config.Version = sarama.V1_0_0_0
400 brokers := []string{GlobalConfig.Kafka}
401
402 client, err := sarama.NewClient(brokers, config)
403 if err != nil {
404 return err
405 }
406
407 defer func() {
408 if err := client.Close(); err != nil {
409 panic(err)
410 }
411 }()
412
413 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
414 if err != nil {
415 return err
416 }
417
418 highwater := highwaterMarks["voltha.events"]
419
420 signals := make(chan os.Signal, 1)
421 signal.Notify(signals, os.Interrupt)
422
423 // Count how many message processed
424 consumeCount := 0
425
426 // Count how many messages were printed
427 count := 0
428
429 var grpcurlFormatter grpcurl.Formatter
430
431 if options.ShowBody {
432 if options.OutputAs == "json" {
433 // need a descriptor source, any method will do
David K. Bainbridge4bbad142020-03-11 11:55:39 -0700434 descriptor, _, err := GetMethod("device-list")
Scott Bakered4efab2020-01-13 19:12:25 -0800435 if err != nil {
436 return err
437 }
438 grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
439 } else {
440 grpcurlFormatter = grpcurl.NewTextFormatter(false)
441 }
442 }
443
444 var headerFilter *filter.Filter
445 if options.Filter != "" {
446 headerFilterVal, err := filter.Parse(options.Filter)
447 if err != nil {
448 return fmt.Errorf("Failed to parse filter: %v", err)
449 }
450 headerFilter = &headerFilterVal
451 }
452
453 outputFormat := CharReplacer.Replace(options.Format)
454 if outputFormat == "" {
455 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
456 }
457
458 err = options.StartOutput(outputFormat)
459 if err != nil {
460 return err
461 }
462
Scott Bakerf05e60a2020-02-02 21:53:57 -0800463 var since *time.Time
464 if options.Since != "" {
465 since, err = ParseSince(options.Since)
466 if err != nil {
467 return err
468 }
469 }
470
Scott Bakered4efab2020-01-13 19:12:25 -0800471 // Get signnal for finish
472 doneCh := make(chan struct{})
473 go func() {
Scott Baker2fe436a2020-02-10 17:21:47 -0800474 tStart := time.Now()
Scott Bakered4efab2020-01-13 19:12:25 -0800475 Loop:
476 for {
477 // Initialize the idle timeout timer
478 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
479 select {
480 case msg := <-consumer:
481 consumeCount++
482 hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
483 if err != nil {
484 log.Printf("Error decoding header %v\n", err)
485 continue
486 }
487 if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
488 // skip printing message
Scott Baker2fe436a2020-02-10 17:21:47 -0800489 } else if since != nil && hdr.Timestamp.Before(*since) {
Scott Bakerf05e60a2020-02-02 21:53:57 -0800490 // it's too old
Scott Bakered4efab2020-01-13 19:12:25 -0800491 } else {
492 // comma separated between this message and predecessor
493 if count > 0 {
494 if options.OutputAs == "json" {
495 fmt.Println(",")
496 }
497 }
498
499 // Print it
500 if options.ShowBody {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800501 if err := PrintMessage(grpcurlFormatter, eventMessage, msg.Value); err != nil {
502 log.Printf("%v\n", err)
503 }
Scott Bakered4efab2020-01-13 19:12:25 -0800504 } else {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800505 if err := PrintEventHeader(options.OutputAs, outputFormat, hdr); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800506 log.Printf("%v\n", err)
507 }
508 }
509
510 // Check to see if we've hit the "count" threshold the user specified
511 count++
512 if (options.Count > 0) && (count >= options.Count) {
513 log.Println("Count reached")
514 doneCh <- struct{}{}
515 break Loop
516 }
517
518 // Check to see if we've hit the "now" threshold the user specified
Scott Baker2fe436a2020-02-10 17:21:47 -0800519 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
Scott Bakered4efab2020-01-13 19:12:25 -0800520 log.Println("Now timestamp reached")
521 doneCh <- struct{}{}
522 break Loop
523 }
524 }
525
526 // If we're not in follow mode, see if we hit the highwater mark
527 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
528 log.Println("High water reached")
529 doneCh <- struct{}{}
530 break Loop
531 }
532
533 // Reset the timeout timer
534 if !timeoutTimer.Stop() {
535 <-timeoutTimer.C
536 }
537 case consumerError := <-consumerErrors:
538 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
539 doneCh <- struct{}{}
540 case <-signals:
541 doneCh <- struct{}{}
542 case <-timeoutTimer.C:
543 log.Printf("Idle timeout\n")
544 doneCh <- struct{}{}
545 }
546 }
547 }()
548
549 <-doneCh
550
551 options.FinishOutput()
552
553 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
554
555 return nil
556}
557
558// Consume message from Sarama and send them out on a channel.
559// Supports multiple topics.
560// Taken from Sarama example consumer.
561func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
562 master, err := sarama.NewConsumerFromClient(client)
563 if err != nil {
564 return nil, nil, nil, err
565 }
566
567 consumers := make(chan *sarama.ConsumerMessage)
568 errors := make(chan *sarama.ConsumerError)
569 highwater := make(map[string]int64)
570 for _, topic := range topics {
571 if strings.Contains(topic, "__consumer_offsets") {
572 continue
573 }
574 partitions, _ := master.Partitions(topic)
575
576 // TODO: Add support for multiple partitions
577 if len(partitions) > 1 {
578 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
579 }
580
581 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
582 if err != nil {
583 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
584 }
585 highwater[topic] = hw
586
587 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
588 if nil != err {
589 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
590 }
591 log.Println(" Start consuming topic ", topic)
592 go func(topic string, consumer sarama.PartitionConsumer) {
593 for {
594 select {
595 case consumerError := <-consumer.Errors():
596 errors <- consumerError
597
598 case msg := <-consumer.Messages():
599 consumers <- msg
600 }
601 }
602 }(topic, consumer)
603 }
604
605 return consumers, errors, highwater, nil
606}