blob: 06f6aee85248cdea2ff97f3a3964f10631ef76d8 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001/*
2 * Copyright 2019-present Ciena Corporation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package commands
17
18import (
19 "encoding/json"
20 "errors"
21 "fmt"
David K. Bainbridge9189c632021-03-26 21:52:21 +000022 "log"
23 "os"
24 "os/signal"
25 "strings"
26 "time"
27
Scott Bakered4efab2020-01-13 19:12:25 -080028 "github.com/Shopify/sarama"
Scott Baker9173ed82020-05-19 08:30:12 -070029 "github.com/golang/protobuf/jsonpb"
30 "github.com/golang/protobuf/proto"
Scott Baker2fe436a2020-02-10 17:21:47 -080031 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/timestamp"
Scott Bakered4efab2020-01-13 19:12:25 -080033 flags "github.com/jessevdk/go-flags"
Scott Bakered4efab2020-01-13 19:12:25 -080034 "github.com/opencord/voltctl/pkg/filter"
35 "github.com/opencord/voltctl/pkg/format"
kesavand8ec4fc02021-01-27 09:10:22 -050036 "github.com/opencord/voltha-protos/v4/go/voltha"
Scott Bakered4efab2020-01-13 19:12:25 -080037)
38
39const (
40 DEFAULT_EVENT_FORMAT = "table{{.Category}}\t{{.SubCategory}}\t{{.Type}}\t{{.Timestamp}}\t{{.Device_ids}}\t{{.Titles}}"
41)
42
43type EventListenOpts struct {
David K. Bainbridge2b627612020-02-18 14:50:13 -080044 Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
45 // nolint: staticcheck
Scott Bakered4efab2020-01-13 19:12:25 -080046 OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
47 Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
48 Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
49 ShowBody bool `short:"b" long:"show-body" description:"Show body of events rather than only a header summary"`
50 Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
51 Now bool `short:"n" long:"now" description:"Stop printing events when current time is reached"`
52 Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
Scott Bakerf05e60a2020-02-02 21:53:57 -080053 Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
Scott Bakered4efab2020-01-13 19:12:25 -080054}
55
56type EventOpts struct {
57 EventListen EventListenOpts `command:"listen"`
58}
59
60var eventOpts = EventOpts{}
61
62type EventHeader struct {
Scott Baker2fe436a2020-02-10 17:21:47 -080063 Category string `json:"category"`
64 SubCategory string `json:"sub_category"`
65 Type string `json:"type"`
66 Raised_ts time.Time `json:"raised_ts"`
67 Reported_ts time.Time `json:"reported_ts"`
68 Device_ids []string `json:"device_ids"` // Opportunistically collected list of device_ids
69 Titles []string `json:"titles"` // Opportunistically collected list of titles
70 Timestamp time.Time `json:"timestamp"` // Timestamp from Kafka
Scott Bakered4efab2020-01-13 19:12:25 -080071}
72
73type EventHeaderWidths struct {
74 Category int
75 SubCategory int
76 Type int
77 Raised_ts int
78 Reported_ts int
79 Device_ids int
80 Titles int
81 Timestamp int
82}
83
84var DefaultWidths EventHeaderWidths = EventHeaderWidths{
85 Category: 13,
86 SubCategory: 3,
87 Type: 12,
88 Raised_ts: 10,
89 Reported_ts: 10,
90 Device_ids: 40,
91 Titles: 40,
92 Timestamp: 10,
93}
94
95func RegisterEventCommands(parent *flags.Parser) {
96 _, err := parent.AddCommand("event", "event commands", "Commands for observing events", &eventOpts)
97 if err != nil {
98 Error.Fatalf("Unable to register event commands with voltctl command parser: %s", err.Error())
99 }
100}
101
Scott Bakerf05e60a2020-02-02 21:53:57 -0800102func ParseSince(s string) (*time.Time, error) {
103 if strings.EqualFold(s, "now") {
104 since := time.Now()
105 return &since, nil
106 }
107
108 rfc3339Time, err := time.Parse(time.RFC3339, s)
109 if err == nil {
110 return &rfc3339Time, nil
111 }
112
113 duration, err := time.ParseDuration(s)
114 if err == nil {
115 since := time.Now().Add(-duration)
116 return &since, nil
117 }
118
119 return nil, fmt.Errorf("Unable to parse time specification `%s`. Please use either `now`, a duration, or an RFC3339-compliant string", s)
120}
121
Scott Baker2fe436a2020-02-10 17:21:47 -0800122// Convert a timestamp field in an event to a time.Time
123func DecodeTimestamp(tsIntf interface{}) (time.Time, error) {
124 ts, okay := tsIntf.(*timestamp.Timestamp)
125 if okay {
126 // Voltha-Protos 3.2.3 and above
127 result, err := ptypes.Timestamp(ts)
128 return result, err
129 }
130 tsFloat, okay := tsIntf.(float32)
131 if okay {
132 // Voltha-Protos 3.2.2 and below
133 return time.Unix(int64(tsFloat), 0), nil
134 }
Scott Bakera1e53fa2020-04-01 11:13:34 -0700135 tsInt64, okay := tsIntf.(int64)
136 if okay {
137 if tsInt64 > 10000000000000 {
138 // sometimes it's in nanoseconds
139 return time.Unix(tsInt64/1000000000, tsInt64%1000000000), nil
140 } else {
141 // sometimes it's in seconds
142 return time.Unix(tsInt64/1000, 0), nil
143 }
144 }
Scott Baker2fe436a2020-02-10 17:21:47 -0800145 return time.Time{}, errors.New("Failed to decode timestamp")
146}
147
Scott Bakered4efab2020-01-13 19:12:25 -0800148// Extract the header, as well as a few other items that might be of interest
Scott Baker9173ed82020-05-19 08:30:12 -0700149func DecodeHeader(b []byte, ts time.Time) (*EventHeader, error) {
150 m := &voltha.Event{}
151 if err := proto.Unmarshal(b, m); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800152 return nil, err
153 }
154
Scott Baker9173ed82020-05-19 08:30:12 -0700155 header := m.Header
156 cat := voltha.EventCategory_Types_name[int32(header.Category)]
157 subCat := voltha.EventSubCategory_Types_name[int32(header.SubCategory)]
158 evType := voltha.EventType_Types_name[int32(header.Type)]
159 raised, err := DecodeTimestamp(header.RaisedTs)
Scott Bakered4efab2020-01-13 19:12:25 -0800160 if err != nil {
161 return nil, err
162 }
Scott Baker9173ed82020-05-19 08:30:12 -0700163 reported, err := DecodeTimestamp(header.ReportedTs)
Scott Baker2fe436a2020-02-10 17:21:47 -0800164 if err != nil {
165 return nil, err
166 }
Scott Bakered4efab2020-01-13 19:12:25 -0800167
168 // Opportunistically try to extract the device_id and title from a kpi_event2
169 // note that there might actually be multiple_slice data, so there could
170 // be multiple device_id, multiple title, etc.
171 device_ids := make(map[string]interface{})
172 titles := make(map[string]interface{})
173
Scott Baker9173ed82020-05-19 08:30:12 -0700174 kpi := m.GetKpiEvent2()
175 if kpi != nil {
176 sliceList := kpi.SliceData
177 if len(sliceList) > 0 {
178 slice := sliceList[0]
179 if slice != nil {
180 metadata := slice.Metadata
181 deviceId := metadata.DeviceId
182 device_ids[deviceId] = slice
183
184 title := metadata.Title
185 titles[title] = slice
Scott Bakered4efab2020-01-13 19:12:25 -0800186 }
187 }
188 }
189
190 // Opportunistically try to pull a resource_id and title from a DeviceEvent
191 // There can only be one resource_id and title from a DeviceEvent, so it's easier
192 // than dealing with KPI_EVENT2.
Scott Baker9173ed82020-05-19 08:30:12 -0700193 deviceEvent := m.GetDeviceEvent()
194 if deviceEvent != nil {
195 titles[deviceEvent.DeviceEventName] = deviceEvent
196 device_ids[deviceEvent.ResourceId] = deviceEvent
Scott Bakered4efab2020-01-13 19:12:25 -0800197 }
198
199 device_id_keys := make([]string, len(device_ids))
200 i := 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800201 for k := range device_ids {
Scott Bakered4efab2020-01-13 19:12:25 -0800202 device_id_keys[i] = k
203 i++
204 }
205
206 title_keys := make([]string, len(titles))
207 i = 0
David K. Bainbridge2b627612020-02-18 14:50:13 -0800208 for k := range titles {
Scott Bakered4efab2020-01-13 19:12:25 -0800209 title_keys[i] = k
210 i++
211 }
212
Scott Baker9173ed82020-05-19 08:30:12 -0700213 evHeader := EventHeader{Category: cat,
214 SubCategory: subCat,
215 Type: evType,
Scott Bakered4efab2020-01-13 19:12:25 -0800216 Raised_ts: raised,
217 Reported_ts: reported,
218 Device_ids: device_id_keys,
Scott Baker2fe436a2020-02-10 17:21:47 -0800219 Timestamp: ts,
Scott Bakered4efab2020-01-13 19:12:25 -0800220 Titles: title_keys}
221
222 return &evHeader, nil
223}
224
225// Print the full message, either in JSON or in GRPCURL-human-readable format,
226// depending on which grpcurl formatter is passed in.
Scott Baker9173ed82020-05-19 08:30:12 -0700227func PrintMessage(outputAs string, b []byte) error {
228 ms := &voltha.Event{}
229 if err := proto.Unmarshal(b, ms); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800230 return err
231 }
Scott Baker9173ed82020-05-19 08:30:12 -0700232
233 if outputAs == "json" {
234 marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
235 asJson, err := marshaler.MarshalToString(ms)
236 if err != nil {
237 return fmt.Errorf("Failed to marshal the json: %s", err)
238 }
239 fmt.Println(asJson)
240 } else {
241 // print in golang native format
242 fmt.Printf("%v\n", ms)
Scott Bakered4efab2020-01-13 19:12:25 -0800243 }
Scott Baker9173ed82020-05-19 08:30:12 -0700244
Scott Bakered4efab2020-01-13 19:12:25 -0800245 return nil
246}
247
248// Print just the enriched EventHeader. This is either in JSON format, or in
249// table format.
250func PrintEventHeader(outputAs string, outputFormat string, hdr *EventHeader) error {
251 if outputAs == "json" {
252 asJson, err := json.Marshal(hdr)
253 if err != nil {
254 return fmt.Errorf("Error marshalling JSON: %v", err)
255 } else {
256 fmt.Printf("%s\n", asJson)
257 }
258 } else {
259 f := format.Format(outputFormat)
260 output, err := f.ExecuteFixedWidth(DefaultWidths, false, *hdr)
261 if err != nil {
262 return err
263 }
264 fmt.Printf("%s\n", output)
265 }
266 return nil
267}
268
Scott Bakered4efab2020-01-13 19:12:25 -0800269// Start output, print any column headers or other start characters
270func (options *EventListenOpts) StartOutput(outputFormat string) error {
271 if options.OutputAs == "json" {
272 fmt.Println("[")
273 } else if (options.OutputAs == "table") && !options.ShowBody {
274 f := format.Format(outputFormat)
275 output, err := f.ExecuteFixedWidth(DefaultWidths, true, nil)
276 if err != nil {
277 return err
278 }
279 fmt.Println(output)
280 }
281 return nil
282}
283
284// Finish output, print any column footers or other end characters
285func (options *EventListenOpts) FinishOutput() {
286 if options.OutputAs == "json" {
287 fmt.Println("]")
288 }
289}
290
291func (options *EventListenOpts) Execute(args []string) error {
292 ProcessGlobalOptions()
David K. Bainbridge9189c632021-03-26 21:52:21 +0000293 if GlobalConfig.Current().Kafka == "" {
Scott Bakered4efab2020-01-13 19:12:25 -0800294 return errors.New("Kafka address is not specified")
295 }
296
Scott Bakered4efab2020-01-13 19:12:25 -0800297 config := sarama.NewConfig()
298 config.ClientID = "go-kafka-consumer"
299 config.Consumer.Return.Errors = true
300 config.Version = sarama.V1_0_0_0
David K. Bainbridge9189c632021-03-26 21:52:21 +0000301 brokers := []string{GlobalConfig.Current().Kafka}
Scott Bakered4efab2020-01-13 19:12:25 -0800302
303 client, err := sarama.NewClient(brokers, config)
304 if err != nil {
305 return err
306 }
307
308 defer func() {
309 if err := client.Close(); err != nil {
310 panic(err)
311 }
312 }()
313
314 consumer, consumerErrors, highwaterMarks, err := startConsumer([]string{"voltha.events"}, client)
315 if err != nil {
316 return err
317 }
318
319 highwater := highwaterMarks["voltha.events"]
320
321 signals := make(chan os.Signal, 1)
322 signal.Notify(signals, os.Interrupt)
323
324 // Count how many message processed
325 consumeCount := 0
326
327 // Count how many messages were printed
328 count := 0
329
Scott Bakered4efab2020-01-13 19:12:25 -0800330 var headerFilter *filter.Filter
331 if options.Filter != "" {
332 headerFilterVal, err := filter.Parse(options.Filter)
333 if err != nil {
334 return fmt.Errorf("Failed to parse filter: %v", err)
335 }
336 headerFilter = &headerFilterVal
337 }
338
339 outputFormat := CharReplacer.Replace(options.Format)
340 if outputFormat == "" {
341 outputFormat = GetCommandOptionWithDefault("events-listen", "format", DEFAULT_EVENT_FORMAT)
342 }
343
344 err = options.StartOutput(outputFormat)
345 if err != nil {
346 return err
347 }
348
Scott Bakerf05e60a2020-02-02 21:53:57 -0800349 var since *time.Time
350 if options.Since != "" {
351 since, err = ParseSince(options.Since)
352 if err != nil {
353 return err
354 }
355 }
356
Scott Bakered4efab2020-01-13 19:12:25 -0800357 // Get signnal for finish
358 doneCh := make(chan struct{})
359 go func() {
Scott Baker2fe436a2020-02-10 17:21:47 -0800360 tStart := time.Now()
Scott Bakered4efab2020-01-13 19:12:25 -0800361 Loop:
362 for {
363 // Initialize the idle timeout timer
364 timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
365 select {
366 case msg := <-consumer:
367 consumeCount++
Scott Baker9173ed82020-05-19 08:30:12 -0700368 hdr, err := DecodeHeader(msg.Value, msg.Timestamp)
Scott Bakered4efab2020-01-13 19:12:25 -0800369 if err != nil {
370 log.Printf("Error decoding header %v\n", err)
371 continue
372 }
Scott Baker9a2d9a42020-06-09 18:11:26 -0700373
374 match := false
375 if headerFilter != nil {
376 var err error
377 if match, err = headerFilter.Evaluate(*hdr); err != nil {
378 log.Printf("%v\n", err)
379 }
380 } else {
381 match = true
382 }
383
384 if !match {
Scott Bakered4efab2020-01-13 19:12:25 -0800385 // skip printing message
Scott Baker2fe436a2020-02-10 17:21:47 -0800386 } else if since != nil && hdr.Timestamp.Before(*since) {
Scott Bakerf05e60a2020-02-02 21:53:57 -0800387 // it's too old
Scott Bakered4efab2020-01-13 19:12:25 -0800388 } else {
389 // comma separated between this message and predecessor
390 if count > 0 {
391 if options.OutputAs == "json" {
392 fmt.Println(",")
393 }
394 }
395
396 // Print it
397 if options.ShowBody {
Scott Baker9173ed82020-05-19 08:30:12 -0700398 if err := PrintMessage(options.OutputAs, msg.Value); err != nil {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800399 log.Printf("%v\n", err)
400 }
Scott Bakered4efab2020-01-13 19:12:25 -0800401 } else {
David K. Bainbridge2b627612020-02-18 14:50:13 -0800402 if err := PrintEventHeader(options.OutputAs, outputFormat, hdr); err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800403 log.Printf("%v\n", err)
404 }
405 }
406
407 // Check to see if we've hit the "count" threshold the user specified
408 count++
409 if (options.Count > 0) && (count >= options.Count) {
410 log.Println("Count reached")
411 doneCh <- struct{}{}
412 break Loop
413 }
414
415 // Check to see if we've hit the "now" threshold the user specified
Scott Baker2fe436a2020-02-10 17:21:47 -0800416 if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
Scott Bakered4efab2020-01-13 19:12:25 -0800417 log.Println("Now timestamp reached")
418 doneCh <- struct{}{}
419 break Loop
420 }
421 }
422
423 // If we're not in follow mode, see if we hit the highwater mark
424 if !options.Follow && !options.Now && (msg.Offset >= highwater) {
425 log.Println("High water reached")
426 doneCh <- struct{}{}
427 break Loop
428 }
429
430 // Reset the timeout timer
431 if !timeoutTimer.Stop() {
432 <-timeoutTimer.C
433 }
434 case consumerError := <-consumerErrors:
435 log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
436 doneCh <- struct{}{}
437 case <-signals:
438 doneCh <- struct{}{}
439 case <-timeoutTimer.C:
440 log.Printf("Idle timeout\n")
441 doneCh <- struct{}{}
442 }
443 }
444 }()
445
446 <-doneCh
447
448 options.FinishOutput()
449
450 log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
451
452 return nil
453}
454
455// Consume message from Sarama and send them out on a channel.
456// Supports multiple topics.
457// Taken from Sarama example consumer.
458func startConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
459 master, err := sarama.NewConsumerFromClient(client)
460 if err != nil {
461 return nil, nil, nil, err
462 }
463
464 consumers := make(chan *sarama.ConsumerMessage)
465 errors := make(chan *sarama.ConsumerError)
466 highwater := make(map[string]int64)
467 for _, topic := range topics {
468 if strings.Contains(topic, "__consumer_offsets") {
469 continue
470 }
471 partitions, _ := master.Partitions(topic)
472
473 // TODO: Add support for multiple partitions
474 if len(partitions) > 1 {
475 log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
476 }
477
478 hw, err := client.GetOffset("voltha.events", partitions[0], sarama.OffsetNewest)
479 if err != nil {
480 return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
481 }
482 highwater[topic] = hw
483
484 consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
485 if nil != err {
486 return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
487 }
488 log.Println(" Start consuming topic ", topic)
489 go func(topic string, consumer sarama.PartitionConsumer) {
490 for {
491 select {
492 case consumerError := <-consumer.Errors():
493 errors <- consumerError
494
495 case msg := <-consumer.Messages():
496 consumers <- msg
497 }
498 }
499 }(topic, consumer)
500 }
501
502 return consumers, errors, highwater, nil
503}