blob: b278f8d15a5d4bda7b2b1c2256ca16a3cb5fe9b0 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/fullstorydev/grpcurl"
24 flags "github.com/jessevdk/go-flags"
25 "github.com/jhump/protoreflect/desc"
26 "github.com/jhump/protoreflect/dynamic"
27 "github.com/opencord/voltctl/pkg/filter"
28 "github.com/opencord/voltctl/pkg/format"
29 "github.com/opencord/voltctl/pkg/model"
30 "log"
31 "os"
32 "os/signal"
33 "strings"
34 "time"
35)
36
37const (
38 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
39)
40
41type EventListenOpts struct {
42 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
43 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
44 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
45 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
46 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
47 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
48 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
49 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Scott Bakerf05e60a2020-02-02 21:53:57 -080050 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Scott Bakered4efab2020-01-13 19:12:25 -080051}
52
53type EventOpts struct {
54 EventListen EventListenOpts `command:"listen"`
55}
56
57var eventOpts = EventOpts{}
58
59type EventHeader struct {
60 Category string `json:"category"`
61 SubCategory string `json:"sub_category"`
62 Type string `json:"type"`
63 Raised_ts float32 `json:"raised_ts"`
64 Reported_ts float32 `json:"reported_ts"`
65 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
66 Titles []string `json:"titles"` // Opportunistically collected list of titles
67 Timestamp int64 `json:"timestamp"` // Timestamp from Kafka
68}
69
70type EventHeaderWidths struct {
71 Category int
72 SubCategory int
73 Type int
74 Raised_ts int
75 Reported_ts int
76 Device_ids int
77 Titles int
78 Timestamp int
79}
80
81var DefaultWidths EventHeaderWidths = EventHeaderWidths{
82 Category: 13,
83 SubCategory: 3,
84 Type: 12,
85 Raised_ts: 10,
86 Reported_ts: 10,
87 Device_ids: 40,
88 Titles: 40,
89 Timestamp: 10,
90}
91
92func RegisterEventCommands(parent *flags.Parser) {
93 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
94 if err != nil {
95 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
96 }
97}
98
Scott Bakerf05e60a2020-02-02 21:53:57 -080099func ParseSince(s string) (*time.Time, error) {
100 if strings.EqualFold(s, "now") {
101 since := time.Now()
102 return &since, nil
103 }
104
105 rfc3339Time, err := time.Parse(time.RFC3339, s)
106 if err == nil {
107 return &rfc3339Time, nil
108 }
109
110 duration, err := time.ParseDuration(s)
111 if err == nil {
112 since := time.Now().Add(-duration)
113 return &since, nil
114 }
115
116 return nil, fmt.Errorf("Unable to parse time specification `%s`. Please use either `now`, a duration, or an RFC3339-compliant string", s)
117}
118
Scott Bakered4efab2020-01-13 19:12:25 -0800119// Extract the header, as well as a few other items that might be of interest
120func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
121 m := dynamic.NewMessage(md)
122 err := m.Unmarshal(b)
123 if err != nil {
124 return nil, err
125 }
126
127 headerIntf, err := m.TryGetFieldByName("header")
128 if err != nil {
129 return nil, err
130 }
131
132 header := headerIntf.(*dynamic.Message)
133
134 catIntf, err := header.TryGetFieldByName("category")
135 if err != nil {
136 return nil, err
137 }
138 cat := catIntf.(int32)
139
140 subCatIntf, err := header.TryGetFieldByName("sub_category")
141 if err != nil {
142 return nil, err
143 }
144 subCat := subCatIntf.(int32)
145
146 typeIntf, err := header.TryGetFieldByName("type")
147 if err != nil {
148 return nil, err
149 }
150 evType := typeIntf.(int32)
151
152 raisedIntf, err := header.TryGetFieldByName("raised_ts")
153 if err != nil {
154 return nil, err
155 }
156 raised := raisedIntf.(float32)
157
158 reportedIntf, err := header.TryGetFieldByName("reported_ts")
159 if err != nil {
160 return nil, err
161 }
162 reported := reportedIntf.(float32)
163
164 // Opportunistically try to extract the device_id and title from a kpi_event2
165 // note that there might actually be multiple_slice data, so there could
166 // be multiple device_id, multiple title, etc.
167 device_ids := make(map[string]interface{})
168 titles := make(map[string]interface{})
169
170 kpiIntf, err := m.TryGetFieldByName("kpi_event2")
171 if err == nil {
172 kpi, ok := kpiIntf.(*dynamic.Message)
173 if ok == true && kpi != nil {
174 sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
175 if err == nil {
176 sliceIntf, ok := sliceListIntf.([]interface{})
177 if ok == true && len(sliceIntf) > 0 {
178 slice, ok := sliceIntf[0].(*dynamic.Message)
179 if ok == true && slice != nil {
180 metadataIntf, err := slice.TryGetFieldByName("metadata")
181 if err == nil {
182 metadata, ok := metadataIntf.(*dynamic.Message)
183 if ok == true && metadata != nil {
184 deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
185 if err == nil {
186 device_ids[deviceIdIntf.(string)] = slice
187 }
188 titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
189 if err == nil {
190 titles[titleIntf.(string)] = slice
191 }
192 }
193 }
194 }
195 }
196 }
197 }
198 }
199
200 // Opportunistically try to pull a resource_id and title from a DeviceEvent
201 // There can only be one resource_id and title from a DeviceEvent, so it's easier
202 // than dealing with KPI_EVENT2.
203 deviceEventIntf, err := m.TryGetFieldByName("device_event")
204 if err == nil {
205 deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
206 if ok == true && deviceEvent != nil {
207 deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
208 if err == nil {
209 deviceEventName, ok := deviceEventNameIntf.(string)
210 if ok {
211 titles[deviceEventName] = deviceEvent
212 }
213 }
214 resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
215 if err == nil {
216 resourceId, ok := resourceIdIntf.(string)
217 if ok {
218 device_ids[resourceId] = deviceEvent
219 }
220 }
221 }
222 }
223
224 device_id_keys := make([]string, len(device_ids))
225 i := 0
226 for k, _ := range device_ids {
227 device_id_keys[i] = k
228 i++
229 }
230
231 title_keys := make([]string, len(titles))
232 i = 0
233 for k, _ := range titles {
234 title_keys[i] = k
235 i++
236 }
237
238 evHeader := EventHeader{Category: model.GetEnumString(header, "category", cat),
239 SubCategory: model.GetEnumString(header, "sub_category", subCat),
240 Type: model.GetEnumString(header, "type", evType),
241 Raised_ts: raised,
242 Reported_ts: reported,
243 Device_ids: device_id_keys,
244 Timestamp: ts.Unix(),
245 Titles: title_keys}
246
247 return &evHeader, nil
248}
249
250// Print the full message, either in JSON or in GRPCURL-human-readable format,
251// depending on which grpcurl formatter is passed in.
252func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
253 m := dynamic.NewMessage(md)
254 err := m.Unmarshal(b)
255 if err != nil {
256 return err
257 }
258 s, err := f(m)
259 if err != nil {
260 return err
261 }
262 fmt.Println(s)
263 return nil
264}
265
266// Print just the enriched EventHeader. This is either in JSON format, or in
267// table format.
268func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
269 if outputAs == "json" {
270 asJson, err := json.Marshal(hdr)
271 if err != nil {
272 return fmt.Errorf("Error marshalling JSON: %v", err)
273 } else {
274 fmt.Printf("%s\n", asJson)
275 }
276 } else {
277 f := format.Format(outputFormat)
278 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
279 if err != nil {
280 return err
281 }
282 fmt.Printf("%s\n", output)
283 }
284 return nil
285}
286
287func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
288 // This is a very long-winded way to get a message descriptor
289
290 // any descriptor on the file will do
291 descriptor, _, err := GetMethod("update-log-level")
292 if err != nil {
293 return nil, err
294 }
295
296 // get the symbol for voltha.events
297 eventSymbol, err := descriptor.FindSymbol("voltha.Event")
298 if err != nil {
299 return nil, err
300 }
301
302 /*
303 * EventSymbol is a Descriptor, but not a MessageDescrptior,
304 * so we can't look at it's fields yet. Go back to the file,
305 * call FindMessage to get the Message, then ...
306 */
307
308 eventFile := eventSymbol.GetFile()
309 eventMessage := eventFile.FindMessage("voltha.Event")
310
311 return eventMessage, nil
312}
313
314// Start output, print any column headers or other start characters
315func (options *EventListenOpts) StartOutput(outputFormat string) error {
316 if options.OutputAs == "json" {
317 fmt.Println("[")
318 } else if (options.OutputAs == "table") && !options.ShowBody {
319 f := format.Format(outputFormat)
320 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
321 if err != nil {
322 return err
323 }
324 fmt.Println(output)
325 }
326 return nil
327}
328
329// Finish output, print any column footers or other end characters
330func (options *EventListenOpts) FinishOutput() {
331 if options.OutputAs == "json" {
332 fmt.Println("]")
333 }
334}
335
336func (options *EventListenOpts) Execute(args []string) error {
337 ProcessGlobalOptions()
338 if GlobalConfig.Kafka == "" {
339 return errors.New("Kafka address is not specified")
340 }
341
342 eventMessage, err := GetEventMessageDesc()
343 if err != nil {
344 return err
345 }
346
347 config := sarama.NewConfig()
348 config.ClientID = "go-kafka-consumer"
349 config.Consumer.Return.Errors = true
350 config.Version = sarama.V1_0_0_0
351 brokers := []string{GlobalConfig.Kafka}
352
353 client, err := sarama.NewClient(brokers, config)
354 if err != nil {
355 return err
356 }
357
358 defer func() {
359 if err := client.Close(); err != nil {
360 panic(err)
361 }
362 }()
363
364 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
365 if err != nil {
366 return err
367 }
368
369 highwater := highwaterMarks["voltha.events"]
370
371 signals := make(chan os.Signal, 1)
372 signal.Notify(signals, os.Interrupt)
373
374 // Count how many message processed
375 consumeCount := 0
376
377 // Count how many messages were printed
378 count := 0
379
380 var grpcurlFormatter grpcurl.Formatter
381
382 if options.ShowBody {
383 if options.OutputAs == "json" {
384 // need a descriptor source, any method will do
385 descriptor, _, _ := GetMethod("device-list")
386 if err != nil {
387 return err
388 }
389 grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
390 } else {
391 grpcurlFormatter = grpcurl.NewTextFormatter(false)
392 }
393 }
394
395 var headerFilter *filter.Filter
396 if options.Filter != "" {
397 headerFilterVal, err := filter.Parse(options.Filter)
398 if err != nil {
399 return fmt.Errorf("Failed to parse filter: %v", err)
400 }
401 headerFilter = &headerFilterVal
402 }
403
404 outputFormat := CharReplacer.Replace(options.Format)
405 if outputFormat == "" {
406 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
407 }
408
409 err = options.StartOutput(outputFormat)
410 if err != nil {
411 return err
412 }
413
Scott Bakerf05e60a2020-02-02 21:53:57 -0800414 var since *time.Time
415 if options.Since != "" {
416 since, err = ParseSince(options.Since)
417 if err != nil {
418 return err
419 }
420 }
421
Scott Bakered4efab2020-01-13 19:12:25 -0800422 // Get signnal for finish
423 doneCh := make(chan struct{})
424 go func() {
425 // Count how many messages printed
426 // count := 0
427 Loop:
428 for {
429 // Initialize the idle timeout timer
430 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
431 select {
432 case msg := <-consumer:
433 consumeCount++
434 hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
435 if err != nil {
436 log.Printf("Error decoding header %v\n", err)
437 continue
438 }
439 if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
440 // skip printing message
Scott Bakerf05e60a2020-02-02 21:53:57 -0800441 } else if since != nil && since.Unix() > hdr.Timestamp {
442 // it's too old
Scott Bakered4efab2020-01-13 19:12:25 -0800443 } else {
444 // comma separated between this message and predecessor
445 if count > 0 {
446 if options.OutputAs == "json" {
447 fmt.Println(",")
448 }
449 }
450
451 // Print it
452 if options.ShowBody {
453 PrintMessage(grpcurlFormatter, eventMessage, msg.Value)
454 } else {
455 err := PrintEventHeader(options.OutputAs, outputFormat, hdr)
456 if err != nil {
457 log.Printf("%v\n", err)
458 }
459 }
460
461 // Check to see if we've hit the "count" threshold the user specified
462 count++
463 if (options.Count > 0) && (count >= options.Count) {
464 log.Println("Count reached")
465 doneCh <- struct{}{}
466 break Loop
467 }
468
469 // Check to see if we've hit the "now" threshold the user specified
470 if (options.Now) && (hdr.Timestamp >= time.Now().Unix()) {
471 log.Println("Now timestamp reached")
472 doneCh <- struct{}{}
473 break Loop
474 }
475 }
476
477 // If we're not in follow mode, see if we hit the highwater mark
478 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
479 log.Println("High water reached")
480 doneCh <- struct{}{}
481 break Loop
482 }
483
484 // Reset the timeout timer
485 if !timeoutTimer.Stop() {
486 <-timeoutTimer.C
487 }
488 case consumerError := <-consumerErrors:
489 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
490 doneCh <- struct{}{}
491 case <-signals:
492 doneCh <- struct{}{}
493 case <-timeoutTimer.C:
494 log.Printf("Idle timeout\n")
495 doneCh <- struct{}{}
496 }
497 }
498 }()
499
500 <-doneCh
501
502 options.FinishOutput()
503
504 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
505
506 return nil
507}
508
509// Consume message from Sarama and send them out on a channel.
510// Supports multiple topics.
511// Taken from Sarama example consumer.
512func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
513 master, err := sarama.NewConsumerFromClient(client)
514 if err != nil {
515 return nil, nil, nil, err
516 }
517
518 consumers := make(chan *sarama.ConsumerMessage)
519 errors := make(chan *sarama.ConsumerError)
520 highwater := make(map[string]int64)
521 for _, topic := range topics {
522 if strings.Contains(topic, "__consumer_offsets") {
523 continue
524 }
525 partitions, _ := master.Partitions(topic)
526
527 // TODO: Add support for multiple partitions
528 if len(partitions) > 1 {
529 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
530 }
531
532 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
533 if err != nil {
534 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
535 }
536 highwater[topic] = hw
537
538 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
539 if nil != err {
540 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
541 }
542 log.Println(" Start consuming topic ", topic)
543 go func(topic string, consumer sarama.PartitionConsumer) {
544 for {
545 select {
546 case consumerError := <-consumer.Errors():
547 errors <- consumerError
548
549 case msg := <-consumer.Messages():
550 consumers <- msg
551 }
552 }
553 }(topic, consumer)
554 }
555
556 return consumers, errors, highwater, nil
557}