blob: a9d20c8e2627f5cf82fe2754a9e8a06ea80d1988 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/fullstorydev/grpcurl"
Scott Baker2fe436a2020-02-10 17:21:47 -080024 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/timestamp"
Scott Bakered4efab2020-01-13 19:12:25 -080026 flags "github.com/jessevdk/go-flags"
27 "github.com/jhump/protoreflect/desc"
28 "github.com/jhump/protoreflect/dynamic"
29 "github.com/opencord/voltctl/pkg/filter"
30 "github.com/opencord/voltctl/pkg/format"
31 "github.com/opencord/voltctl/pkg/model"
32 "log"
33 "os"
34 "os/signal"
35 "strings"
36 "time"
37)
38
39const (
40 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
41)
42
43type EventListenOpts struct {
David K. Bainbridge2b627612020-02-18 14:50:13 -080044 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
45 // nolint: staticcheck
Scott Bakered4efab2020-01-13 19:12:25 -080046 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
47 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
48 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
49 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
50 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
51 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
52 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Scott Bakerf05e60a2020-02-02 21:53:57 -080053 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Scott Bakered4efab2020-01-13 19:12:25 -080054}
55
56type EventOpts struct {
57 EventListen EventListenOpts `command:"listen"`
58}
59
60var eventOpts = EventOpts{}
61
62type EventHeader struct {
Scott Baker2fe436a2020-02-10 17:21:47 -080063 Category string `json:"category"`
64 SubCategory string `json:"sub_category"`
65 Type string `json:"type"`
66 Raised_ts time.Time `json:"raised_ts"`
67 Reported_ts time.Time `json:"reported_ts"`
68 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
69 Titles []string `json:"titles"` // Opportunistically collected list of titles
70 Timestamp time.Time `json:"timestamp"` // Timestamp from Kafka
Scott Bakered4efab2020-01-13 19:12:25 -080071}
72
73type EventHeaderWidths struct {
74 Category int
75 SubCategory int
76 Type int
77 Raised_ts int
78 Reported_ts int
79 Device_ids int
80 Titles int
81 Timestamp int
82}
83
84var DefaultWidths EventHeaderWidths = EventHeaderWidths{
85 Category: 13,
86 SubCategory: 3,
87 Type: 12,
88 Raised_ts: 10,
89 Reported_ts: 10,
90 Device_ids: 40,
91 Titles: 40,
92 Timestamp: 10,
93}
94
95func RegisterEventCommands(parent *flags.Parser) {
96 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
97 if err != nil {
98 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
99 }
100}
101
Scott Bakerf05e60a2020-02-02 21:53:57 -0800102func ParseSince(s string) (*time.Time, error) {
103 if strings.EqualFold(s, "now") {
104 since := time.Now()
105 return &since, nil
106 }
107
108 rfc3339Time, err := time.Parse(time.RFC3339, s)
109 if err == nil {
110 return &rfc3339Time, nil
111 }
112
113 duration, err := time.ParseDuration(s)
114 if err == nil {
115 since := time.Now().Add(-duration)
116 return &since, nil
117 }
118
119 return nil, fmt.Errorf("Unable to parse time specification `%s`. Please use either `now`, a duration, or an RFC3339-compliant string", s)
120}
121
Scott Baker2fe436a2020-02-10 17:21:47 -0800122// Convert a timestamp field in an event to a time.Time
123func DecodeTimestamp(tsIntf interface{}) (time.Time, error) {
124 ts, okay := tsIntf.(*timestamp.Timestamp)
125 if okay {
126 // Voltha-Protos 3.2.3 and above
127 result, err := ptypes.Timestamp(ts)
128 return result, err
129 }
130 tsFloat, okay := tsIntf.(float32)
131 if okay {
132 // Voltha-Protos 3.2.2 and below
133 return time.Unix(int64(tsFloat), 0), nil
134 }
135 return time.Time{}, errors.New("Failed to decode timestamp")
136}
137
Scott Bakered4efab2020-01-13 19:12:25 -0800138// Extract the header, as well as a few other items that might be of interest
139func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
140 m := dynamic.NewMessage(md)
141 err := m.Unmarshal(b)
142 if err != nil {
143 return nil, err
144 }
145
146 headerIntf, err := m.TryGetFieldByName("header")
147 if err != nil {
148 return nil, err
149 }
150
151 header := headerIntf.(*dynamic.Message)
152
153 catIntf, err := header.TryGetFieldByName("category")
154 if err != nil {
155 return nil, err
156 }
157 cat := catIntf.(int32)
158
159 subCatIntf, err := header.TryGetFieldByName("sub_category")
160 if err != nil {
161 return nil, err
162 }
163 subCat := subCatIntf.(int32)
164
165 typeIntf, err := header.TryGetFieldByName("type")
166 if err != nil {
167 return nil, err
168 }
169 evType := typeIntf.(int32)
170
171 raisedIntf, err := header.TryGetFieldByName("raised_ts")
172 if err != nil {
173 return nil, err
174 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800175 raised, err := DecodeTimestamp(raisedIntf)
176 if err != nil {
177 return nil, err
178 }
Scott Bakered4efab2020-01-13 19:12:25 -0800179
180 reportedIntf, err := header.TryGetFieldByName("reported_ts")
181 if err != nil {
182 return nil, err
183 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800184 reported, err := DecodeTimestamp(reportedIntf)
185 if err != nil {
186 return nil, err
187 }
Scott Bakered4efab2020-01-13 19:12:25 -0800188
189 // Opportunistically try to extract the device_id and title from a kpi_event2
190 // note that there might actually be multiple_slice data, so there could
191 // be multiple device_id, multiple title, etc.
192 device_ids := make(map[string]interface{})
193 titles := make(map[string]interface{})
194
195 kpiIntf, err := m.TryGetFieldByName("kpi_event2")
196 if err == nil {
197 kpi, ok := kpiIntf.(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800198 if ok && kpi != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800199 sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
200 if err == nil {
201 sliceIntf, ok := sliceListIntf.([]interface{})
David K. Bainbridge2b627612020-02-18 14:50:13 -0800202 if ok && len(sliceIntf) > 0 {
Scott Bakered4efab2020-01-13 19:12:25 -0800203 slice, ok := sliceIntf[0].(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800204 if ok && slice != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800205 metadataIntf, err := slice.TryGetFieldByName("metadata")
206 if err == nil {
207 metadata, ok := metadataIntf.(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800208 if ok && metadata != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800209 deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
210 if err == nil {
211 device_ids[deviceIdIntf.(string)] = slice
212 }
213 titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
214 if err == nil {
215 titles[titleIntf.(string)] = slice
216 }
217 }
218 }
219 }
220 }
221 }
222 }
223 }
224
225 // Opportunistically try to pull a resource_id and title from a DeviceEvent
226 // There can only be one resource_id and title from a DeviceEvent, so it's easier
227 // than dealing with KPI_EVENT2.
228 deviceEventIntf, err := m.TryGetFieldByName("device_event")
229 if err == nil {
230 deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
David K. Bainbridge2b627612020-02-18 14:50:13 -0800231 if ok && deviceEvent != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800232 deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
233 if err == nil {
234 deviceEventName, ok := deviceEventNameIntf.(string)
235 if ok {
236 titles[deviceEventName] = deviceEvent
237 }
238 }
239 resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
240 if err == nil {
241 resourceId, ok := resourceIdIntf.(string)
242 if ok {
243 device_ids[resourceId] = deviceEvent
244 }
245 }
246 }
247 }
248
249 device_id_keys := make([]string, len(device_ids))
250 i := 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800251 for k := range device_ids {
Scott Bakered4efab2020-01-13 19:12:25 -0800252 device_id_keys[i] = k
253 i++
254 }
255
256 title_keys := make([]string, len(titles))
257 i = 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800258 for k := range titles {
Scott Bakered4efab2020-01-13 19:12:25 -0800259 title_keys[i] = k
260 i++
261 }
262
263 evHeader := EventHeader{Category: model.GetEnumString(header, "category", cat),
264 SubCategory: model.GetEnumString(header, "sub_category", subCat),
265 Type: model.GetEnumString(header, "type", evType),
266 Raised_ts: raised,
267 Reported_ts: reported,
268 Device_ids: device_id_keys,
Scott Baker2fe436a2020-02-10 17:21:47 -0800269 Timestamp: ts,
Scott Bakered4efab2020-01-13 19:12:25 -0800270 Titles: title_keys}
271
272 return &evHeader, nil
273}
274
275// Print the full message, either in JSON or in GRPCURL-human-readable format,
276// depending on which grpcurl formatter is passed in.
277func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
278 m := dynamic.NewMessage(md)
279 err := m.Unmarshal(b)
280 if err != nil {
281 return err
282 }
283 s, err := f(m)
284 if err != nil {
285 return err
286 }
287 fmt.Println(s)
288 return nil
289}
290
291// Print just the enriched EventHeader. This is either in JSON format, or in
292// table format.
293func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
294 if outputAs == "json" {
295 asJson, err := json.Marshal(hdr)
296 if err != nil {
297 return fmt.Errorf("Error marshalling JSON: %v", err)
298 } else {
299 fmt.Printf("%s\n", asJson)
300 }
301 } else {
302 f := format.Format(outputFormat)
303 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
304 if err != nil {
305 return err
306 }
307 fmt.Printf("%s\n", output)
308 }
309 return nil
310}
311
312func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
313 // This is a very long-winded way to get a message descriptor
314
Scott Baker2fe436a2020-02-10 17:21:47 -0800315 descriptor, err := GetDescriptorSource()
David K. Bainbridge2b627612020-02-18 14:50:13 -0800316 if err != nil {
317 return nil, err
318 }
Scott Bakered4efab2020-01-13 19:12:25 -0800319
320 // get the symbol for voltha.events
321 eventSymbol, err := descriptor.FindSymbol("voltha.Event")
322 if err != nil {
323 return nil, err
324 }
325
326 /*
327 * EventSymbol is a Descriptor, but not a MessageDescrptior,
328 * so we can't look at it's fields yet. Go back to the file,
329 * call FindMessage to get the Message, then ...
330 */
331
332 eventFile := eventSymbol.GetFile()
333 eventMessage := eventFile.FindMessage("voltha.Event")
334
335 return eventMessage, nil
336}
337
338// Start output, print any column headers or other start characters
339func (options *EventListenOpts) StartOutput(outputFormat string) error {
340 if options.OutputAs == "json" {
341 fmt.Println("[")
342 } else if (options.OutputAs == "table") && !options.ShowBody {
343 f := format.Format(outputFormat)
344 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
345 if err != nil {
346 return err
347 }
348 fmt.Println(output)
349 }
350 return nil
351}
352
353// Finish output, print any column footers or other end characters
354func (options *EventListenOpts) FinishOutput() {
355 if options.OutputAs == "json" {
356 fmt.Println("]")
357 }
358}
359
360func (options *EventListenOpts) Execute(args []string) error {
361 ProcessGlobalOptions()
362 if GlobalConfig.Kafka == "" {
363 return errors.New("Kafka address is not specified")
364 }
365
366 eventMessage, err := GetEventMessageDesc()
367 if err != nil {
368 return err
369 }
370
371 config := sarama.NewConfig()
372 config.ClientID = "go-kafka-consumer"
373 config.Consumer.Return.Errors = true
374 config.Version = sarama.V1_0_0_0
375 brokers := []string{GlobalConfig.Kafka}
376
377 client, err := sarama.NewClient(brokers, config)
378 if err != nil {
379 return err
380 }
381
382 defer func() {
383 if err := client.Close(); err != nil {
384 panic(err)
385 }
386 }()
387
388 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
389 if err != nil {
390 return err
391 }
392
393 highwater := highwaterMarks["voltha.events"]
394
395 signals := make(chan os.Signal, 1)
396 signal.Notify(signals, os.Interrupt)
397
398 // Count how many message processed
399 consumeCount := 0
400
401 // Count how many messages were printed
402 count := 0
403
404 var grpcurlFormatter grpcurl.Formatter
405
406 if options.ShowBody {
407 if options.OutputAs == "json" {
408 // need a descriptor source, any method will do
409 descriptor, _, _ := GetMethod("device-list")
410 if err != nil {
411 return err
412 }
413 grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
414 } else {
415 grpcurlFormatter = grpcurl.NewTextFormatter(false)
416 }
417 }
418
419 var headerFilter *filter.Filter
420 if options.Filter != "" {
421 headerFilterVal, err := filter.Parse(options.Filter)
422 if err != nil {
423 return fmt.Errorf("Failed to parse filter: %v", err)
424 }
425 headerFilter = &headerFilterVal
426 }
427
428 outputFormat := CharReplacer.Replace(options.Format)
429 if outputFormat == "" {
430 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
431 }
432
433 err = options.StartOutput(outputFormat)
434 if err != nil {
435 return err
436 }
437
Scott Bakerf05e60a2020-02-02 21:53:57 -0800438 var since *time.Time
439 if options.Since != "" {
440 since, err = ParseSince(options.Since)
441 if err != nil {
442 return err
443 }
444 }
445
Scott Bakered4efab2020-01-13 19:12:25 -0800446 // Get signnal for finish
447 doneCh := make(chan struct{})
448 go func() {
Scott Baker2fe436a2020-02-10 17:21:47 -0800449 tStart := time.Now()
Scott Bakered4efab2020-01-13 19:12:25 -0800450 Loop:
451 for {
452 // Initialize the idle timeout timer
453 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
454 select {
455 case msg := <-consumer:
456 consumeCount++
457 hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
458 if err != nil {
459 log.Printf("Error decoding header %v\n", err)
460 continue
461 }
462 if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
463 // skip printing message
Scott Baker2fe436a2020-02-10 17:21:47 -0800464 } else if since != nil && hdr.Timestamp.Before(*since) {
Scott Bakerf05e60a2020-02-02 21:53:57 -0800465 // it's too old
Scott Bakered4efab2020-01-13 19:12:25 -0800466 } else {
467 // comma separated between this message and predecessor
468 if count > 0 {
469 if options.OutputAs == "json" {
470 fmt.Println(",")
471 }
472 }
473
474 // Print it
475 if options.ShowBody {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800476 if err := PrintMessage(grpcurlFormatter, eventMessage, msg.Value); err != nil {
477 log.Printf("%v\n", err)
478 }
Scott Bakered4efab2020-01-13 19:12:25 -0800479 } else {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800480 if err := PrintEventHeader(options.OutputAs, outputFormat, hdr); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800481 log.Printf("%v\n", err)
482 }
483 }
484
485 // Check to see if we've hit the "count" threshold the user specified
486 count++
487 if (options.Count > 0) && (count >= options.Count) {
488 log.Println("Count reached")
489 doneCh <- struct{}{}
490 break Loop
491 }
492
493 // Check to see if we've hit the "now" threshold the user specified
Scott Baker2fe436a2020-02-10 17:21:47 -0800494 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
Scott Bakered4efab2020-01-13 19:12:25 -0800495 log.Println("Now timestamp reached")
496 doneCh <- struct{}{}
497 break Loop
498 }
499 }
500
501 // If we're not in follow mode, see if we hit the highwater mark
502 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
503 log.Println("High water reached")
504 doneCh <- struct{}{}
505 break Loop
506 }
507
508 // Reset the timeout timer
509 if !timeoutTimer.Stop() {
510 <-timeoutTimer.C
511 }
512 case consumerError := <-consumerErrors:
513 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
514 doneCh <- struct{}{}
515 case <-signals:
516 doneCh <- struct{}{}
517 case <-timeoutTimer.C:
518 log.Printf("Idle timeout\n")
519 doneCh <- struct{}{}
520 }
521 }
522 }()
523
524 <-doneCh
525
526 options.FinishOutput()
527
528 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
529
530 return nil
531}
532
533// Consume message from Sarama and send them out on a channel.
534// Supports multiple topics.
535// Taken from Sarama example consumer.
536func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
537 master, err := sarama.NewConsumerFromClient(client)
538 if err != nil {
539 return nil, nil, nil, err
540 }
541
542 consumers := make(chan *sarama.ConsumerMessage)
543 errors := make(chan *sarama.ConsumerError)
544 highwater := make(map[string]int64)
545 for _, topic := range topics {
546 if strings.Contains(topic, "__consumer_offsets") {
547 continue
548 }
549 partitions, _ := master.Partitions(topic)
550
551 // TODO: Add support for multiple partitions
552 if len(partitions) > 1 {
553 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
554 }
555
556 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
557 if err != nil {
558 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
559 }
560 highwater[topic] = hw
561
562 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
563 if nil != err {
564 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
565 }
566 log.Println(" Start consuming topic ", topic)
567 go func(topic string, consumer sarama.PartitionConsumer) {
568 for {
569 select {
570 case consumerError := <-consumer.Errors():
571 errors <- consumerError
572
573 case msg := <-consumer.Messages():
574 consumers <- msg
575 }
576 }
577 }(topic, consumer)
578 }
579
580 return consumers, errors, highwater, nil
581}