blob: cc149d6417c91e17c147402a4c741a08738bfac4 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
David K. Bainbridge9189c632021-03-26 21:52:21 +000022 "log"
23 "os"
24 "os/signal"
25 "strings"
26 "time"
27
Scott Bakered4efab2020-01-13 19:12:25 -080028 "github.com/Shopify/sarama"
Scott Baker9173ed82020-05-19 08:30:12 -070029 "github.com/golang/protobuf/jsonpb"
30 "github.com/golang/protobuf/proto"
Scott Baker2fe436a2020-02-10 17:21:47 -080031 "github.com/golang/protobuf/ptypes/timestamp"
Scott Bakered4efab2020-01-13 19:12:25 -080032 flags "github.com/jessevdk/go-flags"
Scott Bakered4efab2020-01-13 19:12:25 -080033 "github.com/opencord/voltctl/pkg/filter"
34 "github.com/opencord/voltctl/pkg/format"
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000035 "github.com/opencord/voltha-protos/v5/go/voltha"
Scott Bakered4efab2020-01-13 19:12:25 -080036)
37
38const (
39 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
40)
41
42type EventListenOpts struct {
David K. Bainbridge2b627612020-02-18 14:50:13 -080043 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
44 // nolint: staticcheck
Scott Bakered4efab2020-01-13 19:12:25 -080045 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
46 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
47 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
48 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
49 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
50 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
51 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Scott Bakerf05e60a2020-02-02 21:53:57 -080052 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Scott Bakered4efab2020-01-13 19:12:25 -080053}
54
55type EventOpts struct {
56 EventListen EventListenOpts `command:"listen"`
57}
58
59var eventOpts = EventOpts{}
60
61type EventHeader struct {
Scott Baker2fe436a2020-02-10 17:21:47 -080062 Category string `json:"category"`
63 SubCategory string `json:"sub_category"`
64 Type string `json:"type"`
65 Raised_ts time.Time `json:"raised_ts"`
66 Reported_ts time.Time `json:"reported_ts"`
67 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
68 Titles []string `json:"titles"` // Opportunistically collected list of titles
69 Timestamp time.Time `json:"timestamp"` // Timestamp from Kafka
Scott Bakered4efab2020-01-13 19:12:25 -080070}
71
72type EventHeaderWidths struct {
73 Category int
74 SubCategory int
75 Type int
76 Raised_ts int
77 Reported_ts int
78 Device_ids int
79 Titles int
80 Timestamp int
81}
82
83var DefaultWidths EventHeaderWidths = EventHeaderWidths{
84 Category: 13,
85 SubCategory: 3,
86 Type: 12,
87 Raised_ts: 10,
88 Reported_ts: 10,
89 Device_ids: 40,
90 Titles: 40,
91 Timestamp: 10,
92}
93
94func RegisterEventCommands(parent *flags.Parser) {
95 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
96 if err != nil {
97 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
98 }
99}
100
Scott Bakerf05e60a2020-02-02 21:53:57 -0800101func ParseSince(s string) (*time.Time, error) {
102 if strings.EqualFold(s, "now") {
103 since := time.Now()
104 return &since, nil
105 }
106
107 rfc3339Time, err := time.Parse(time.RFC3339, s)
108 if err == nil {
109 return &rfc3339Time, nil
110 }
111
112 duration, err := time.ParseDuration(s)
113 if err == nil {
114 since := time.Now().Add(-duration)
115 return &since, nil
116 }
117
118 return nil, fmt.Errorf("Unable to parse time specification `%s`. Please use either `now`, a duration, or an RFC3339-compliant string", s)
119}
120
Scott Baker2fe436a2020-02-10 17:21:47 -0800121// Convert a timestamp field in an event to a time.Time
122func DecodeTimestamp(tsIntf interface{}) (time.Time, error) {
123 ts, okay := tsIntf.(*timestamp.Timestamp)
124 if okay {
125 // Voltha-Protos 3.2.3 and above
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000126 return ts.AsTime(), nil
Scott Baker2fe436a2020-02-10 17:21:47 -0800127 }
128 tsFloat, okay := tsIntf.(float32)
129 if okay {
130 // Voltha-Protos 3.2.2 and below
131 return time.Unix(int64(tsFloat), 0), nil
132 }
Scott Bakera1e53fa2020-04-01 11:13:34 -0700133 tsInt64, okay := tsIntf.(int64)
134 if okay {
135 if tsInt64 > 10000000000000 {
136 // sometimes it's in nanoseconds
137 return time.Unix(tsInt64/1000000000, tsInt64%1000000000), nil
138 } else {
139 // sometimes it's in seconds
140 return time.Unix(tsInt64/1000, 0), nil
141 }
142 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800143 return time.Time{}, errors.New("Failed to decode timestamp")
144}
145
Scott Bakered4efab2020-01-13 19:12:25 -0800146// Extract the header, as well as a few other items that might be of interest
Scott Baker9173ed82020-05-19 08:30:12 -0700147func DecodeHeader(b []byte, ts time.Time) (*EventHeader, error) {
148 m := &voltha.Event{}
149 if err := proto.Unmarshal(b, m); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800150 return nil, err
151 }
152
Scott Baker9173ed82020-05-19 08:30:12 -0700153 header := m.Header
154 cat := voltha.EventCategory_Types_name[int32(header.Category)]
155 subCat := voltha.EventSubCategory_Types_name[int32(header.SubCategory)]
156 evType := voltha.EventType_Types_name[int32(header.Type)]
157 raised, err := DecodeTimestamp(header.RaisedTs)
Scott Bakered4efab2020-01-13 19:12:25 -0800158 if err != nil {
159 return nil, err
160 }
Scott Baker9173ed82020-05-19 08:30:12 -0700161 reported, err := DecodeTimestamp(header.ReportedTs)
Scott Baker2fe436a2020-02-10 17:21:47 -0800162 if err != nil {
163 return nil, err
164 }
Scott Bakered4efab2020-01-13 19:12:25 -0800165
166 // Opportunistically try to extract the device_id and title from a kpi_event2
167 // note that there might actually be multiple_slice data, so there could
168 // be multiple device_id, multiple title, etc.
169 device_ids := make(map[string]interface{})
170 titles := make(map[string]interface{})
171
Scott Baker9173ed82020-05-19 08:30:12 -0700172 kpi := m.GetKpiEvent2()
173 if kpi != nil {
174 sliceList := kpi.SliceData
175 if len(sliceList) > 0 {
176 slice := sliceList[0]
177 if slice != nil {
178 metadata := slice.Metadata
179 deviceId := metadata.DeviceId
180 device_ids[deviceId] = slice
181
182 title := metadata.Title
183 titles[title] = slice
Scott Bakered4efab2020-01-13 19:12:25 -0800184 }
185 }
186 }
187
188 // Opportunistically try to pull a resource_id and title from a DeviceEvent
189 // There can only be one resource_id and title from a DeviceEvent, so it's easier
190 // than dealing with KPI_EVENT2.
Scott Baker9173ed82020-05-19 08:30:12 -0700191 deviceEvent := m.GetDeviceEvent()
192 if deviceEvent != nil {
193 titles[deviceEvent.DeviceEventName] = deviceEvent
194 device_ids[deviceEvent.ResourceId] = deviceEvent
Scott Bakered4efab2020-01-13 19:12:25 -0800195 }
196
197 device_id_keys := make([]string, len(device_ids))
198 i := 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800199 for k := range device_ids {
Scott Bakered4efab2020-01-13 19:12:25 -0800200 device_id_keys[i] = k
201 i++
202 }
203
204 title_keys := make([]string, len(titles))
205 i = 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800206 for k := range titles {
Scott Bakered4efab2020-01-13 19:12:25 -0800207 title_keys[i] = k
208 i++
209 }
210
Scott Baker9173ed82020-05-19 08:30:12 -0700211 evHeader := EventHeader{Category: cat,
212 SubCategory: subCat,
213 Type: evType,
Scott Bakered4efab2020-01-13 19:12:25 -0800214 Raised_ts: raised,
215 Reported_ts: reported,
216 Device_ids: device_id_keys,
Scott Baker2fe436a2020-02-10 17:21:47 -0800217 Timestamp: ts,
Scott Bakered4efab2020-01-13 19:12:25 -0800218 Titles: title_keys}
219
220 return &evHeader, nil
221}
222
223// Print the full message, either in JSON or in GRPCURL-human-readable format,
224// depending on which grpcurl formatter is passed in.
Scott Baker9173ed82020-05-19 08:30:12 -0700225func PrintMessage(outputAs string, b []byte) error {
226 ms := &voltha.Event{}
227 if err := proto.Unmarshal(b, ms); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800228 return err
229 }
Scott Baker9173ed82020-05-19 08:30:12 -0700230
231 if outputAs == "json" {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000232 marshaler := jsonpb.Marshaler{EmitDefaults: true}
Scott Baker9173ed82020-05-19 08:30:12 -0700233 asJson, err := marshaler.MarshalToString(ms)
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000234
Scott Baker9173ed82020-05-19 08:30:12 -0700235 if err != nil {
236 return fmt.Errorf("Failed to marshal the json: %s", err)
237 }
238 fmt.Println(asJson)
239 } else {
240 // print in golang native format
241 fmt.Printf("%v\n", ms)
Scott Bakered4efab2020-01-13 19:12:25 -0800242 }
Scott Baker9173ed82020-05-19 08:30:12 -0700243
Scott Bakered4efab2020-01-13 19:12:25 -0800244 return nil
245}
246
247// Print just the enriched EventHeader. This is either in JSON format, or in
248// table format.
249func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
250 if outputAs == "json" {
251 asJson, err := json.Marshal(hdr)
252 if err != nil {
253 return fmt.Errorf("Error marshalling JSON: %v", err)
254 } else {
255 fmt.Printf("%s\n", asJson)
256 }
257 } else {
258 f := format.Format(outputFormat)
259 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
260 if err != nil {
261 return err
262 }
263 fmt.Printf("%s\n", output)
264 }
265 return nil
266}
267
Scott Bakered4efab2020-01-13 19:12:25 -0800268// Start output, print any column headers or other start characters
269func (options *EventListenOpts) StartOutput(outputFormat string) error {
270 if options.OutputAs == "json" {
271 fmt.Println("[")
272 } else if (options.OutputAs == "table") && !options.ShowBody {
273 f := format.Format(outputFormat)
274 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
275 if err != nil {
276 return err
277 }
278 fmt.Println(output)
279 }
280 return nil
281}
282
283// Finish output, print any column footers or other end characters
284func (options *EventListenOpts) FinishOutput() {
285 if options.OutputAs == "json" {
286 fmt.Println("]")
287 }
288}
289
290func (options *EventListenOpts) Execute(args []string) error {
291 ProcessGlobalOptions()
David K. Bainbridge9189c632021-03-26 21:52:21 +0000292 if GlobalConfig.Current().Kafka == "" {
Scott Bakered4efab2020-01-13 19:12:25 -0800293 return errors.New("Kafka address is not specified")
294 }
295
Scott Bakered4efab2020-01-13 19:12:25 -0800296 config := sarama.NewConfig()
297 config.ClientID = "go-kafka-consumer"
298 config.Consumer.Return.Errors = true
299 config.Version = sarama.V1_0_0_0
David K. Bainbridge9189c632021-03-26 21:52:21 +0000300 brokers := []string{GlobalConfig.Current().Kafka}
Scott Bakered4efab2020-01-13 19:12:25 -0800301
302 client, err := sarama.NewClient(brokers, config)
303 if err != nil {
304 return err
305 }
306
307 defer func() {
308 if err := client.Close(); err != nil {
309 panic(err)
310 }
311 }()
312
313 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
314 if err != nil {
315 return err
316 }
317
318 highwater := highwaterMarks["voltha.events"]
319
320 signals := make(chan os.Signal, 1)
321 signal.Notify(signals, os.Interrupt)
322
323 // Count how many message processed
324 consumeCount := 0
325
326 // Count how many messages were printed
327 count := 0
328
Scott Bakered4efab2020-01-13 19:12:25 -0800329 var headerFilter *filter.Filter
330 if options.Filter != "" {
331 headerFilterVal, err := filter.Parse(options.Filter)
332 if err != nil {
333 return fmt.Errorf("Failed to parse filter: %v", err)
334 }
335 headerFilter = &headerFilterVal
336 }
337
338 outputFormat := CharReplacer.Replace(options.Format)
339 if outputFormat == "" {
340 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
341 }
342
343 err = options.StartOutput(outputFormat)
344 if err != nil {
345 return err
346 }
347
Scott Bakerf05e60a2020-02-02 21:53:57 -0800348 var since *time.Time
349 if options.Since != "" {
350 since, err = ParseSince(options.Since)
351 if err != nil {
352 return err
353 }
354 }
355
Scott Bakered4efab2020-01-13 19:12:25 -0800356 // Get signnal for finish
357 doneCh := make(chan struct{})
358 go func() {
Scott Baker2fe436a2020-02-10 17:21:47 -0800359 tStart := time.Now()
Scott Bakered4efab2020-01-13 19:12:25 -0800360 Loop:
361 for {
362 // Initialize the idle timeout timer
363 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
364 select {
365 case msg := <-consumer:
366 consumeCount++
Scott Baker9173ed82020-05-19 08:30:12 -0700367 hdr, err := DecodeHeader(msg.Value, msg.Timestamp)
Scott Bakered4efab2020-01-13 19:12:25 -0800368 if err != nil {
369 log.Printf("Error decoding header %v\n", err)
370 continue
371 }
Scott Baker9a2d9a42020-06-09 18:11:26 -0700372
373 match := false
374 if headerFilter != nil {
375 var err error
376 if match, err = headerFilter.Evaluate(*hdr); err != nil {
377 log.Printf("%v\n", err)
378 }
379 } else {
380 match = true
381 }
382
383 if !match {
Scott Bakered4efab2020-01-13 19:12:25 -0800384 // skip printing message
Scott Baker2fe436a2020-02-10 17:21:47 -0800385 } else if since != nil && hdr.Timestamp.Before(*since) {
Scott Bakerf05e60a2020-02-02 21:53:57 -0800386 // it's too old
Scott Bakered4efab2020-01-13 19:12:25 -0800387 } else {
388 // comma separated between this message and predecessor
389 if count > 0 {
390 if options.OutputAs == "json" {
391 fmt.Println(",")
392 }
393 }
394
395 // Print it
396 if options.ShowBody {
Scott Baker9173ed82020-05-19 08:30:12 -0700397 if err := PrintMessage(options.OutputAs, msg.Value); err != nil {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800398 log.Printf("%v\n", err)
399 }
Scott Bakered4efab2020-01-13 19:12:25 -0800400 } else {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800401 if err := PrintEventHeader(options.OutputAs, outputFormat, hdr); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800402 log.Printf("%v\n", err)
403 }
404 }
405
406 // Check to see if we've hit the "count" threshold the user specified
407 count++
408 if (options.Count > 0) && (count >= options.Count) {
409 log.Println("Count reached")
410 doneCh <- struct{}{}
411 break Loop
412 }
413
414 // Check to see if we've hit the "now" threshold the user specified
Scott Baker2fe436a2020-02-10 17:21:47 -0800415 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
Scott Bakered4efab2020-01-13 19:12:25 -0800416 log.Println("Now timestamp reached")
417 doneCh <- struct{}{}
418 break Loop
419 }
420 }
421
422 // If we're not in follow mode, see if we hit the highwater mark
423 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
424 log.Println("High water reached")
425 doneCh <- struct{}{}
426 break Loop
427 }
428
429 // Reset the timeout timer
430 if !timeoutTimer.Stop() {
431 <-timeoutTimer.C
432 }
433 case consumerError := <-consumerErrors:
434 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
435 doneCh <- struct{}{}
436 case <-signals:
437 doneCh <- struct{}{}
438 case <-timeoutTimer.C:
439 log.Printf("Idle timeout\n")
440 doneCh <- struct{}{}
441 }
442 }
443 }()
444
445 <-doneCh
446
447 options.FinishOutput()
448
449 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
450
451 return nil
452}
453
454// Consume message from Sarama and send them out on a channel.
455// Supports multiple topics.
456// Taken from Sarama example consumer.
457func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
458 master, err := sarama.NewConsumerFromClient(client)
459 if err != nil {
460 return nil, nil, nil, err
461 }
462
463 consumers := make(chan *sarama.ConsumerMessage)
464 errors := make(chan *sarama.ConsumerError)
465 highwater := make(map[string]int64)
466 for _, topic := range topics {
467 if strings.Contains(topic, "__consumer_offsets") {
468 continue
469 }
470 partitions, _ := master.Partitions(topic)
471
472 // TODO: Add support for multiple partitions
473 if len(partitions) > 1 {
474 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
475 }
476
477 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
478 if err != nil {
Matteo Scandolo0f3959b2021-11-30 15:59:37 -0800479 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v (%s)", topic, partitions, err)
Scott Bakered4efab2020-01-13 19:12:25 -0800480 }
481 highwater[topic] = hw
482
483 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
484 if nil != err {
485 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
486 }
487 log.Println(" Start consuming topic ", topic)
488 go func(topic string, consumer sarama.PartitionConsumer) {
489 for {
490 select {
491 case consumerError := <-consumer.Errors():
492 errors <- consumerError
493
494 case msg := <-consumer.Messages():
495 consumers <- msg
496 }
497 }
498 }(topic, consumer)
499 }
500
501 return consumers, errors, highwater, nil
502}