blob: b3909872d8616f930fe9d2d89b96e22d19799f59 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/fullstorydev/grpcurl"
24 flags "github.com/jessevdk/go-flags"
25 "github.com/jhump/protoreflect/desc"
26 "github.com/jhump/protoreflect/dynamic"
27 "github.com/opencord/voltctl/pkg/filter"
28 "github.com/opencord/voltctl/pkg/format"
29 "github.com/opencord/voltctl/pkg/model"
30 "log"
31 "os"
32 "os/signal"
33 "strings"
34 "time"
35)
36
37const (
38 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
39)
40
41type EventListenOpts struct {
42 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
43 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
44 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
45 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
46 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
47 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
48 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
49 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
50}
51
52type EventOpts struct {
53 EventListen EventListenOpts `command:"listen"`
54}
55
56var eventOpts = EventOpts{}
57
58type EventHeader struct {
59 Category string `json:"category"`
60 SubCategory string `json:"sub_category"`
61 Type string `json:"type"`
62 Raised_ts float32 `json:"raised_ts"`
63 Reported_ts float32 `json:"reported_ts"`
64 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
65 Titles []string `json:"titles"` // Opportunistically collected list of titles
66 Timestamp int64 `json:"timestamp"` // Timestamp from Kafka
67}
68
69type EventHeaderWidths struct {
70 Category int
71 SubCategory int
72 Type int
73 Raised_ts int
74 Reported_ts int
75 Device_ids int
76 Titles int
77 Timestamp int
78}
79
80var DefaultWidths EventHeaderWidths = EventHeaderWidths{
81 Category: 13,
82 SubCategory: 3,
83 Type: 12,
84 Raised_ts: 10,
85 Reported_ts: 10,
86 Device_ids: 40,
87 Titles: 40,
88 Timestamp: 10,
89}
90
91func RegisterEventCommands(parent *flags.Parser) {
92 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
93 if err != nil {
94 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
95 }
96}
97
98// Extract the header, as well as a few other items that might be of interest
99func DecodeHeader(md *desc.MessageDescriptor, b []byte, ts time.Time) (*EventHeader, error) {
100 m := dynamic.NewMessage(md)
101 err := m.Unmarshal(b)
102 if err != nil {
103 return nil, err
104 }
105
106 headerIntf, err := m.TryGetFieldByName("header")
107 if err != nil {
108 return nil, err
109 }
110
111 header := headerIntf.(*dynamic.Message)
112
113 catIntf, err := header.TryGetFieldByName("category")
114 if err != nil {
115 return nil, err
116 }
117 cat := catIntf.(int32)
118
119 subCatIntf, err := header.TryGetFieldByName("sub_category")
120 if err != nil {
121 return nil, err
122 }
123 subCat := subCatIntf.(int32)
124
125 typeIntf, err := header.TryGetFieldByName("type")
126 if err != nil {
127 return nil, err
128 }
129 evType := typeIntf.(int32)
130
131 raisedIntf, err := header.TryGetFieldByName("raised_ts")
132 if err != nil {
133 return nil, err
134 }
135 raised := raisedIntf.(float32)
136
137 reportedIntf, err := header.TryGetFieldByName("reported_ts")
138 if err != nil {
139 return nil, err
140 }
141 reported := reportedIntf.(float32)
142
143 // Opportunistically try to extract the device_id and title from a kpi_event2
144 // note that there might actually be multiple_slice data, so there could
145 // be multiple device_id, multiple title, etc.
146 device_ids := make(map[string]interface{})
147 titles := make(map[string]interface{})
148
149 kpiIntf, err := m.TryGetFieldByName("kpi_event2")
150 if err == nil {
151 kpi, ok := kpiIntf.(*dynamic.Message)
152 if ok == true && kpi != nil {
153 sliceListIntf, err := kpi.TryGetFieldByName("slice_data")
154 if err == nil {
155 sliceIntf, ok := sliceListIntf.([]interface{})
156 if ok == true && len(sliceIntf) > 0 {
157 slice, ok := sliceIntf[0].(*dynamic.Message)
158 if ok == true && slice != nil {
159 metadataIntf, err := slice.TryGetFieldByName("metadata")
160 if err == nil {
161 metadata, ok := metadataIntf.(*dynamic.Message)
162 if ok == true && metadata != nil {
163 deviceIdIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("device_id")
164 if err == nil {
165 device_ids[deviceIdIntf.(string)] = slice
166 }
167 titleIntf, err := metadataIntf.(*dynamic.Message).TryGetFieldByName("title")
168 if err == nil {
169 titles[titleIntf.(string)] = slice
170 }
171 }
172 }
173 }
174 }
175 }
176 }
177 }
178
179 // Opportunistically try to pull a resource_id and title from a DeviceEvent
180 // There can only be one resource_id and title from a DeviceEvent, so it's easier
181 // than dealing with KPI_EVENT2.
182 deviceEventIntf, err := m.TryGetFieldByName("device_event")
183 if err == nil {
184 deviceEvent, ok := deviceEventIntf.(*dynamic.Message)
185 if ok == true && deviceEvent != nil {
186 deviceEventNameIntf, err := deviceEvent.TryGetFieldByName("device_event_name")
187 if err == nil {
188 deviceEventName, ok := deviceEventNameIntf.(string)
189 if ok {
190 titles[deviceEventName] = deviceEvent
191 }
192 }
193 resourceIdIntf, err := deviceEvent.TryGetFieldByName("resource_id")
194 if err == nil {
195 resourceId, ok := resourceIdIntf.(string)
196 if ok {
197 device_ids[resourceId] = deviceEvent
198 }
199 }
200 }
201 }
202
203 device_id_keys := make([]string, len(device_ids))
204 i := 0
205 for k, _ := range device_ids {
206 device_id_keys[i] = k
207 i++
208 }
209
210 title_keys := make([]string, len(titles))
211 i = 0
212 for k, _ := range titles {
213 title_keys[i] = k
214 i++
215 }
216
217 evHeader := EventHeader{Category: model.GetEnumString(header, "category", cat),
218 SubCategory: model.GetEnumString(header, "sub_category", subCat),
219 Type: model.GetEnumString(header, "type", evType),
220 Raised_ts: raised,
221 Reported_ts: reported,
222 Device_ids: device_id_keys,
223 Timestamp: ts.Unix(),
224 Titles: title_keys}
225
226 return &evHeader, nil
227}
228
229// Print the full message, either in JSON or in GRPCURL-human-readable format,
230// depending on which grpcurl formatter is passed in.
231func PrintMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
232 m := dynamic.NewMessage(md)
233 err := m.Unmarshal(b)
234 if err != nil {
235 return err
236 }
237 s, err := f(m)
238 if err != nil {
239 return err
240 }
241 fmt.Println(s)
242 return nil
243}
244
245// Print just the enriched EventHeader. This is either in JSON format, or in
246// table format.
247func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
248 if outputAs == "json" {
249 asJson, err := json.Marshal(hdr)
250 if err != nil {
251 return fmt.Errorf("Error marshalling JSON: %v", err)
252 } else {
253 fmt.Printf("%s\n", asJson)
254 }
255 } else {
256 f := format.Format(outputFormat)
257 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
258 if err != nil {
259 return err
260 }
261 fmt.Printf("%s\n", output)
262 }
263 return nil
264}
265
266func GetEventMessageDesc() (*desc.MessageDescriptor, error) {
267 // This is a very long-winded way to get a message descriptor
268
269 // any descriptor on the file will do
270 descriptor, _, err := GetMethod("update-log-level")
271 if err != nil {
272 return nil, err
273 }
274
275 // get the symbol for voltha.events
276 eventSymbol, err := descriptor.FindSymbol("voltha.Event")
277 if err != nil {
278 return nil, err
279 }
280
281 /*
282 * EventSymbol is a Descriptor, but not a MessageDescrptior,
283 * so we can't look at it's fields yet. Go back to the file,
284 * call FindMessage to get the Message, then ...
285 */
286
287 eventFile := eventSymbol.GetFile()
288 eventMessage := eventFile.FindMessage("voltha.Event")
289
290 return eventMessage, nil
291}
292
293// Start output, print any column headers or other start characters
294func (options *EventListenOpts) StartOutput(outputFormat string) error {
295 if options.OutputAs == "json" {
296 fmt.Println("[")
297 } else if (options.OutputAs == "table") && !options.ShowBody {
298 f := format.Format(outputFormat)
299 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
300 if err != nil {
301 return err
302 }
303 fmt.Println(output)
304 }
305 return nil
306}
307
308// Finish output, print any column footers or other end characters
309func (options *EventListenOpts) FinishOutput() {
310 if options.OutputAs == "json" {
311 fmt.Println("]")
312 }
313}
314
315func (options *EventListenOpts) Execute(args []string) error {
316 ProcessGlobalOptions()
317 if GlobalConfig.Kafka == "" {
318 return errors.New("Kafka address is not specified")
319 }
320
321 eventMessage, err := GetEventMessageDesc()
322 if err != nil {
323 return err
324 }
325
326 config := sarama.NewConfig()
327 config.ClientID = "go-kafka-consumer"
328 config.Consumer.Return.Errors = true
329 config.Version = sarama.V1_0_0_0
330 brokers := []string{GlobalConfig.Kafka}
331
332 client, err := sarama.NewClient(brokers, config)
333 if err != nil {
334 return err
335 }
336
337 defer func() {
338 if err := client.Close(); err != nil {
339 panic(err)
340 }
341 }()
342
343 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
344 if err != nil {
345 return err
346 }
347
348 highwater := highwaterMarks["voltha.events"]
349
350 signals := make(chan os.Signal, 1)
351 signal.Notify(signals, os.Interrupt)
352
353 // Count how many message processed
354 consumeCount := 0
355
356 // Count how many messages were printed
357 count := 0
358
359 var grpcurlFormatter grpcurl.Formatter
360
361 if options.ShowBody {
362 if options.OutputAs == "json" {
363 // need a descriptor source, any method will do
364 descriptor, _, _ := GetMethod("device-list")
365 if err != nil {
366 return err
367 }
368 grpcurlFormatter = grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
369 } else {
370 grpcurlFormatter = grpcurl.NewTextFormatter(false)
371 }
372 }
373
374 var headerFilter *filter.Filter
375 if options.Filter != "" {
376 headerFilterVal, err := filter.Parse(options.Filter)
377 if err != nil {
378 return fmt.Errorf("Failed to parse filter: %v", err)
379 }
380 headerFilter = &headerFilterVal
381 }
382
383 outputFormat := CharReplacer.Replace(options.Format)
384 if outputFormat == "" {
385 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
386 }
387
388 err = options.StartOutput(outputFormat)
389 if err != nil {
390 return err
391 }
392
393 // Get signnal for finish
394 doneCh := make(chan struct{})
395 go func() {
396 // Count how many messages printed
397 // count := 0
398 Loop:
399 for {
400 // Initialize the idle timeout timer
401 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
402 select {
403 case msg := <-consumer:
404 consumeCount++
405 hdr, err := DecodeHeader(eventMessage, msg.Value, msg.Timestamp)
406 if err != nil {
407 log.Printf("Error decoding header %v\n", err)
408 continue
409 }
410 if headerFilter != nil && !headerFilter.Evaluate(*hdr) {
411 // skip printing message
412 } else {
413 // comma separated between this message and predecessor
414 if count > 0 {
415 if options.OutputAs == "json" {
416 fmt.Println(",")
417 }
418 }
419
420 // Print it
421 if options.ShowBody {
422 PrintMessage(grpcurlFormatter, eventMessage, msg.Value)
423 } else {
424 err := PrintEventHeader(options.OutputAs, outputFormat, hdr)
425 if err != nil {
426 log.Printf("%v\n", err)
427 }
428 }
429
430 // Check to see if we've hit the "count" threshold the user specified
431 count++
432 if (options.Count > 0) && (count >= options.Count) {
433 log.Println("Count reached")
434 doneCh <- struct{}{}
435 break Loop
436 }
437
438 // Check to see if we've hit the "now" threshold the user specified
439 if (options.Now) && (hdr.Timestamp >= time.Now().Unix()) {
440 log.Println("Now timestamp reached")
441 doneCh <- struct{}{}
442 break Loop
443 }
444 }
445
446 // If we're not in follow mode, see if we hit the highwater mark
447 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
448 log.Println("High water reached")
449 doneCh <- struct{}{}
450 break Loop
451 }
452
453 // Reset the timeout timer
454 if !timeoutTimer.Stop() {
455 <-timeoutTimer.C
456 }
457 case consumerError := <-consumerErrors:
458 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
459 doneCh <- struct{}{}
460 case <-signals:
461 doneCh <- struct{}{}
462 case <-timeoutTimer.C:
463 log.Printf("Idle timeout\n")
464 doneCh <- struct{}{}
465 }
466 }
467 }()
468
469 <-doneCh
470
471 options.FinishOutput()
472
473 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
474
475 return nil
476}
477
478// Consume message from Sarama and send them out on a channel.
479// Supports multiple topics.
480// Taken from Sarama example consumer.
481func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
482 master, err := sarama.NewConsumerFromClient(client)
483 if err != nil {
484 return nil, nil, nil, err
485 }
486
487 consumers := make(chan *sarama.ConsumerMessage)
488 errors := make(chan *sarama.ConsumerError)
489 highwater := make(map[string]int64)
490 for _, topic := range topics {
491 if strings.Contains(topic, "__consumer_offsets") {
492 continue
493 }
494 partitions, _ := master.Partitions(topic)
495
496 // TODO: Add support for multiple partitions
497 if len(partitions) > 1 {
498 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
499 }
500
501 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
502 if err != nil {
503 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
504 }
505 highwater[topic] = hw
506
507 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
508 if nil != err {
509 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
510 }
511 log.Println(" Start consuming topic ", topic)
512 go func(topic string, consumer sarama.PartitionConsumer) {
513 for {
514 select {
515 case consumerError := <-consumer.Errors():
516 errors <- consumerError
517
518 case msg := <-consumer.Messages():
519 consumers <- msg
520 }
521 }
522 }(topic, consumer)
523 }
524
525 return consumers, errors, highwater, nil
526}