blob: 2684dda242e022e283a1d002bdb785ab32e35526 [file] [log] [blame]
Matteo Scandolo9a2772a2018-11-19 14:56:26 -08001package logrus_kafka_hook
2
3import (
4 "errors"
5 "github.com/Shopify/sarama"
6 "github.com/sirupsen/logrus"
7 "log"
8 "time"
9)
10
11type KafkaHook struct {
12 // Id of the hook
13 id string
14
15 // Log levels allowed
16 levels []logrus.Level
17
18 // Log entry formatter
19 formatter logrus.Formatter
20
21 // sarama.AsyncProducer
22 producer sarama.AsyncProducer
23}
24
25// Create a new KafkaHook.
26func NewKafkaHook(id string, levels []logrus.Level, formatter logrus.Formatter, brokers []string) (*KafkaHook, error) {
27 kafkaConfig := sarama.NewConfig()
28 kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
29 kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
30 kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
31
32 producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
33
34 if err != nil {
35 return nil, err
36 }
37
38 // We will just log to STDOUT if we're not able to produce messages.
39 // Note: messages will only be returned here after all retry attempts are exhausted.
40 go func() {
41 for err := range producer.Errors() {
42 log.Printf("Failed to send log entry to kafka: %v\n", err)
43 }
44 }()
45
46 hook := &KafkaHook{
47 id,
48 levels,
49 formatter,
50 producer,
51 }
52
53 return hook, nil
54}
55
56func (hook *KafkaHook) Id() string {
57 return hook.id
58}
59
60func (hook *KafkaHook) Levels() []logrus.Level {
61 return hook.levels
62}
63
64func (hook *KafkaHook) Fire(entry *logrus.Entry) error {
65 // Check time for partition key
66 var partitionKey sarama.ByteEncoder
67
68 // Get field time
69 t, _ := entry.Data["time"].(time.Time)
70
71 // Convert it to bytes
72 b, err := t.MarshalBinary()
73
74 if err != nil {
75 return err
76 }
77
78 partitionKey = sarama.ByteEncoder(b)
79
80 // Check topics
81 var topics []string
82
83 if ts, ok := entry.Data["topics"]; ok {
84 if topics, ok = ts.([]string); !ok {
85 return errors.New("Field topics must be []string")
86 }
87 } else {
88 return errors.New("Field topics not found")
89 }
90
91 // Format before writing
92 b, err = hook.formatter.Format(entry)
93
94 if err != nil {
95 return err
96 }
97
98 value := sarama.ByteEncoder(b)
99
100 for _, topic := range topics {
101 hook.producer.Input() <- &sarama.ProducerMessage{
102 Key: partitionKey,
103 Topic: topic,
104 Value: value,
105 }
106 }
107
108 return nil
109}