blob: deacb79b9d1540cb784f590f8a51e9b1e0af30d6 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/fullstorydev/grpcurl"
Scott Baker2fe436a2020-02-10 17:21:47 -080024 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/timestamp"
Scott Bakered4efab2020-01-13 19:12:25 -080026 flags "github.com/jessevdk/go-flags"
27 "github.com/jhump/protoreflect/desc"
28 "github.com/jhump/protoreflect/dynamic"
29 "github.com/opencord/voltctl/pkg/filter"
30 "github.com/opencord/voltctl/pkg/format"
31 "github.com/opencord/voltctl/pkg/model"
32 "log"
33 "os"
34 "os/signal"
35 "strings"
36 "time"
37)
38
39const (
40 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
41)
42
43type EventListenOpts struct {
44 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
45 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
46 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
47 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
48 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
49 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
50 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
51 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Scott Bakerf05e60a2020-02-02 21:53:57 -080052 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Scott Bakered4efab2020-01-13 19:12:25 -080053}
54
55type EventOpts struct {
56 EventListen EventListenOpts `command:"listen"`
57}
58
59var eventOpts = EventOpts{}
60
61type EventHeader struct {
Scott Baker2fe436a2020-02-10 17:21:47 -080062 Category string `json:"category"`
63 SubCategory string `json:"sub_category"`
64 Type string `json:"type"`
65 Raised_ts time.Time `json:"raised_ts"`
66 Reported_ts time.Time `json:"reported_ts"`
67 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
68 Titles []string `json:"titles"` // Opportunistically collected list of titles
69 Timestamp time.Time `json:"timestamp"` // Timestamp from Kafka
Scott Bakered4efab2020-01-13 19:12:25 -080070}
71
72type EventHeaderWidths struct {
73 Category int
74 SubCategory int
75 Type int
76 Raised_ts int
77 Reported_ts int
78 Device_ids int
79 Titles int
80 Timestamp int
81}
82
83var DefaultWidths EventHeaderWidths = EventHeaderWidths{
84 Category: 13,
85 SubCategory: 3,
86 Type: 12,
87 Raised_ts: 10,
88 Reported_ts: 10,
89 Device_ids: 40,
90 Titles: 40,
91 Timestamp: 10,
92}
93
94func RegisterEventCommands(parent *flags.Parser) {
95 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
96 if err != nil {
97 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
98 }
99}
100
Scott Bakerf05e60a2020-02-02 21:53:57 -0800101func ParseSince(s string) (*time.Time, error) {
102 if strings.EqualFold(s, "now") {
103 since := time.Now()
104 return &since, nil
105 }
106
107 rfc3339Time, err := time.Parse(time.RFC3339, s)
108 if err == nil {
109 return &rfc3339Time, nil
110 }
111
112 duration, err := time.ParseDuration(s)
113 if err == nil {
114 since := time.Now().Add(-duration)
115 return &since, nil
116 }
117
118 return nil, fmt.Errorf("Unable to parse time specification `%s`. Please use either `now`, a duration, or an RFC3339-compliant string", s)
119}
120
Scott Baker2fe436a2020-02-10 17:21:47 -0800121// Convert a timestamp field in an event to a time.Time
122func DecodeTimestamp(tsIntf interface{}) (time.Time, error) {
123 ts, okay := tsIntf.(*timestamp.Timestamp)
124 if okay {
125 // Voltha-Protos 3.2.3 and above
126 result, err := ptypes.Timestamp(ts)
127 return result, err
128 }
129 tsFloat, okay := tsIntf.(float32)
130 if okay {
131 // Voltha-Protos 3.2.2 and below
132 return time.Unix(int64(tsFloat), 0), nil
133 }
134 return time.Time{}, errors.New("Failed to decode timestamp")
135}
136
Scott Bakered4efab2020-01-13 19:12:25 -0800137// Extract the header, as well as a few other items that might be of interest
138func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
139 m := dynamic.NewMessage(md)
140 err := m.Unmarshal(b)
141 if err != nil {
142 return nil, err
143 }
144
145 headerIntf, err := m.TryGetFieldByName("header")
146 if err != nil {
147 return nil, err
148 }
149
150 header := headerIntf.(*dynamic.Message)
151
152 catIntf, err := header.TryGetFieldByName("category")
153 if err != nil {
154 return nil, err
155 }
156 cat := catIntf.(int32)
157
158 subCatIntf, err := header.TryGetFieldByName("sub_category")
159 if err != nil {
160 return nil, err
161 }
162 subCat := subCatIntf.(int32)
163
164 typeIntf, err := header.TryGetFieldByName("type")
165 if err != nil {
166 return nil, err
167 }
168 evType := typeIntf.(int32)
169
170 raisedIntf, err := header.TryGetFieldByName("raised_ts")
171 if err != nil {
172 return nil, err
173 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800174 raised, err := DecodeTimestamp(raisedIntf)
175 if err != nil {
176 return nil, err
177 }
Scott Bakered4efab2020-01-13 19:12:25 -0800178
179 reportedIntf, err := header.TryGetFieldByName("reported_ts")
180 if err != nil {
181 return nil, err
182 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800183 reported, err := DecodeTimestamp(reportedIntf)
184 if err != nil {
185 return nil, err
186 }
Scott Bakered4efab2020-01-13 19:12:25 -0800187
188 // Opportunistically try to extract the device_id and title from a kpi_event2
189 // note that there might actually be multiple_slice data, so there could
190 // be multiple device_id, multiple title, etc.
191 device_ids := make(map[string]interface{})
192 titles := make(map[string]interface{})
193
194 kpiIntf, err := m.TryGetFieldByName("kpi_event2")
195 if err == nil {
196 kpi, ok := kpiIntf.(*dynamic.Message)
197 if ok == true && kpi != nil {
198 sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
199 if err == nil {
200 sliceIntf, ok := sliceListIntf.([]interface{})
201 if ok == true && len(sliceIntf) > 0 {
202 slice, ok := sliceIntf[0].(*dynamic.Message)
203 if ok == true && slice != nil {
204 metadataIntf, err := slice.TryGetFieldByName("metadata")
205 if err == nil {
206 metadata, ok := metadataIntf.(*dynamic.Message)
207 if ok == true && metadata != nil {
208 deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
209 if err == nil {
210 device_ids[deviceIdIntf.(string)] = slice
211 }
212 titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
213 if err == nil {
214 titles[titleIntf.(string)] = slice
215 }
216 }
217 }
218 }
219 }
220 }
221 }
222 }
223
224 // Opportunistically try to pull a resource_id and title from a DeviceEvent
225 // There can only be one resource_id and title from a DeviceEvent, so it's easier
226 // than dealing with KPI_EVENT2.
227 deviceEventIntf, err := m.TryGetFieldByName("device_event")
228 if err == nil {
229 deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
230 if ok == true && deviceEvent != nil {
231 deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
232 if err == nil {
233 deviceEventName, ok := deviceEventNameIntf.(string)
234 if ok {
235 titles[deviceEventName] = deviceEvent
236 }
237 }
238 resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
239 if err == nil {
240 resourceId, ok := resourceIdIntf.(string)
241 if ok {
242 device_ids[resourceId] = deviceEvent
243 }
244 }
245 }
246 }
247
248 device_id_keys := make([]string, len(device_ids))
249 i := 0
250 for k, _ := range device_ids {
251 device_id_keys[i] = k
252 i++
253 }
254
255 title_keys := make([]string, len(titles))
256 i = 0
257 for k, _ := range titles {
258 title_keys[i] = k
259 i++
260 }
261
262 evHeader := EventHeader{Category: model.GetEnumString(header, "category", cat),
263 SubCategory: model.GetEnumString(header, "sub_category", subCat),
264 Type: model.GetEnumString(header, "type", evType),
265 Raised_ts: raised,
266 Reported_ts: reported,
267 Device_ids: device_id_keys,
Scott Baker2fe436a2020-02-10 17:21:47 -0800268 Timestamp: ts,
Scott Bakered4efab2020-01-13 19:12:25 -0800269 Titles: title_keys}
270
271 return &evHeader, nil
272}
273
274// Print the full message, either in JSON or in GRPCURL-human-readable format,
275// depending on which grpcurl formatter is passed in.
276func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
277 m := dynamic.NewMessage(md)
278 err := m.Unmarshal(b)
279 if err != nil {
280 return err
281 }
282 s, err := f(m)
283 if err != nil {
284 return err
285 }
286 fmt.Println(s)
287 return nil
288}
289
290// Print just the enriched EventHeader. This is either in JSON format, or in
291// table format.
292func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
293 if outputAs == "json" {
294 asJson, err := json.Marshal(hdr)
295 if err != nil {
296 return fmt.Errorf("Error marshalling JSON: %v", err)
297 } else {
298 fmt.Printf("%s\n", asJson)
299 }
300 } else {
301 f := format.Format(outputFormat)
302 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
303 if err != nil {
304 return err
305 }
306 fmt.Printf("%s\n", output)
307 }
308 return nil
309}
310
311func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
312 // This is a very long-winded way to get a message descriptor
313
Scott Baker2fe436a2020-02-10 17:21:47 -0800314 descriptor, err := GetDescriptorSource()
Scott Bakered4efab2020-01-13 19:12:25 -0800315
316 // get the symbol for voltha.events
317 eventSymbol, err := descriptor.FindSymbol("voltha.Event")
318 if err != nil {
319 return nil, err
320 }
321
322 /*
323 * EventSymbol is a Descriptor, but not a MessageDescrptior,
324 * so we can't look at it's fields yet. Go back to the file,
325 * call FindMessage to get the Message, then ...
326 */
327
328 eventFile := eventSymbol.GetFile()
329 eventMessage := eventFile.FindMessage("voltha.Event")
330
331 return eventMessage, nil
332}
333
334// Start output, print any column headers or other start characters
335func (options *EventListenOpts) StartOutput(outputFormat string) error {
336 if options.OutputAs == "json" {
337 fmt.Println("[")
338 } else if (options.OutputAs == "table") && !options.ShowBody {
339 f := format.Format(outputFormat)
340 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
341 if err != nil {
342 return err
343 }
344 fmt.Println(output)
345 }
346 return nil
347}
348
349// Finish output, print any column footers or other end characters
350func (options *EventListenOpts) FinishOutput() {
351 if options.OutputAs == "json" {
352 fmt.Println("]")
353 }
354}
355
356func (options *EventListenOpts) Execute(args []string) error {
357 ProcessGlobalOptions()
358 if GlobalConfig.Kafka == "" {
359 return errors.New("Kafka address is not specified")
360 }
361
362 eventMessage, err := GetEventMessageDesc()
363 if err != nil {
364 return err
365 }
366
367 config := sarama.NewConfig()
368 config.ClientID = "go-kafka-consumer"
369 config.Consumer.Return.Errors = true
370 config.Version = sarama.V1_0_0_0
371 brokers := []string{GlobalConfig.Kafka}
372
373 client, err := sarama.NewClient(brokers, config)
374 if err != nil {
375 return err
376 }
377
378 defer func() {
379 if err := client.Close(); err != nil {
380 panic(err)
381 }
382 }()
383
384 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
385 if err != nil {
386 return err
387 }
388
389 highwater := highwaterMarks["voltha.events"]
390
391 signals := make(chan os.Signal, 1)
392 signal.Notify(signals, os.Interrupt)
393
394 // Count how many message processed
395 consumeCount := 0
396
397 // Count how many messages were printed
398 count := 0
399
400 var grpcurlFormatter grpcurl.Formatter
401
402 if options.ShowBody {
403 if options.OutputAs == "json" {
404 // need a descriptor source, any method will do
405 descriptor, _, _ := GetMethod("device-list")
406 if err != nil {
407 return err
408 }
409 grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
410 } else {
411 grpcurlFormatter = grpcurl.NewTextFormatter(false)
412 }
413 }
414
415 var headerFilter *filter.Filter
416 if options.Filter != "" {
417 headerFilterVal, err := filter.Parse(options.Filter)
418 if err != nil {
419 return fmt.Errorf("Failed to parse filter: %v", err)
420 }
421 headerFilter = &headerFilterVal
422 }
423
424 outputFormat := CharReplacer.Replace(options.Format)
425 if outputFormat == "" {
426 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
427 }
428
429 err = options.StartOutput(outputFormat)
430 if err != nil {
431 return err
432 }
433
Scott Bakerf05e60a2020-02-02 21:53:57 -0800434 var since *time.Time
435 if options.Since != "" {
436 since, err = ParseSince(options.Since)
437 if err != nil {
438 return err
439 }
440 }
441
Scott Bakered4efab2020-01-13 19:12:25 -0800442 // Get signnal for finish
443 doneCh := make(chan struct{})
444 go func() {
Scott Baker2fe436a2020-02-10 17:21:47 -0800445 tStart := time.Now()
Scott Bakered4efab2020-01-13 19:12:25 -0800446 Loop:
447 for {
448 // Initialize the idle timeout timer
449 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
450 select {
451 case msg := <-consumer:
452 consumeCount++
453 hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
454 if err != nil {
455 log.Printf("Error decoding header %v\n", err)
456 continue
457 }
458 if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
459 // skip printing message
Scott Baker2fe436a2020-02-10 17:21:47 -0800460 } else if since != nil && hdr.Timestamp.Before(*since) {
Scott Bakerf05e60a2020-02-02 21:53:57 -0800461 // it's too old
Scott Bakered4efab2020-01-13 19:12:25 -0800462 } else {
463 // comma separated between this message and predecessor
464 if count > 0 {
465 if options.OutputAs == "json" {
466 fmt.Println(",")
467 }
468 }
469
470 // Print it
471 if options.ShowBody {
472 PrintMessage(grpcurlFormatter, eventMessage, msg.Value)
473 } else {
474 err := PrintEventHeader(options.OutputAs, outputFormat, hdr)
475 if err != nil {
476 log.Printf("%v\n", err)
477 }
478 }
479
480 // Check to see if we've hit the "count" threshold the user specified
481 count++
482 if (options.Count > 0) && (count >= options.Count) {
483 log.Println("Count reached")
484 doneCh <- struct{}{}
485 break Loop
486 }
487
488 // Check to see if we've hit the "now" threshold the user specified
Scott Baker2fe436a2020-02-10 17:21:47 -0800489 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
Scott Bakered4efab2020-01-13 19:12:25 -0800490 log.Println("Now timestamp reached")
491 doneCh <- struct{}{}
492 break Loop
493 }
494 }
495
496 // If we're not in follow mode, see if we hit the highwater mark
497 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
498 log.Println("High water reached")
499 doneCh <- struct{}{}
500 break Loop
501 }
502
503 // Reset the timeout timer
504 if !timeoutTimer.Stop() {
505 <-timeoutTimer.C
506 }
507 case consumerError := <-consumerErrors:
508 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
509 doneCh <- struct{}{}
510 case <-signals:
511 doneCh <- struct{}{}
512 case <-timeoutTimer.C:
513 log.Printf("Idle timeout\n")
514 doneCh <- struct{}{}
515 }
516 }
517 }()
518
519 <-doneCh
520
521 options.FinishOutput()
522
523 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
524
525 return nil
526}
527
528// Consume message from Sarama and send them out on a channel.
529// Supports multiple topics.
530// Taken from Sarama example consumer.
531func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
532 master, err := sarama.NewConsumerFromClient(client)
533 if err != nil {
534 return nil, nil, nil, err
535 }
536
537 consumers := make(chan *sarama.ConsumerMessage)
538 errors := make(chan *sarama.ConsumerError)
539 highwater := make(map[string]int64)
540 for _, topic := range topics {
541 if strings.Contains(topic, "__consumer_offsets") {
542 continue
543 }
544 partitions, _ := master.Partitions(topic)
545
546 // TODO: Add support for multiple partitions
547 if len(partitions) > 1 {
548 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
549 }
550
551 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
552 if err != nil {
553 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
554 }
555 highwater[topic] = hw
556
557 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
558 if nil != err {
559 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
560 }
561 log.Println(" Start consuming topic ", topic)
562 go func(topic string, consumer sarama.PartitionConsumer) {
563 for {
564 select {
565 case consumerError := <-consumer.Errors():
566 errors <- consumerError
567
568 case msg := <-consumer.Messages():
569 consumers <- msg
570 }
571 }
572 }(topic, consumer)
573 }
574
575 return consumers, errors, highwater, nil
576}