blob: 2684dda242e022e283a1d002bdb785ab32e35526 [file] [log] [blame]
package logrus_kafka_hook
import (
"errors"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"log"
"time"
)
type KafkaHook struct {
// Id of the hook
id string
// Log levels allowed
levels []logrus.Level
// Log entry formatter
formatter logrus.Formatter
// sarama.AsyncProducer
producer sarama.AsyncProducer
}
// Create a new KafkaHook.
func NewKafkaHook(id string, levels []logrus.Level, formatter logrus.Formatter, brokers []string) (*KafkaHook, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
if err != nil {
return nil, err
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Printf("Failed to send log entry to kafka: %v\n", err)
}
}()
hook := &KafkaHook{
id,
levels,
formatter,
producer,
}
return hook, nil
}
func (hook *KafkaHook) Id() string {
return hook.id
}
func (hook *KafkaHook) Levels() []logrus.Level {
return hook.levels
}
func (hook *KafkaHook) Fire(entry *logrus.Entry) error {
// Check time for partition key
var partitionKey sarama.ByteEncoder
// Get field time
t, _ := entry.Data["time"].(time.Time)
// Convert it to bytes
b, err := t.MarshalBinary()
if err != nil {
return err
}
partitionKey = sarama.ByteEncoder(b)
// Check topics
var topics []string
if ts, ok := entry.Data["topics"]; ok {
if topics, ok = ts.([]string); !ok {
return errors.New("Field topics must be []string")
}
} else {
return errors.New("Field topics not found")
}
// Format before writing
b, err = hook.formatter.Format(entry)
if err != nil {
return err
}
value := sarama.ByteEncoder(b)
for _, topic := range topics {
hook.producer.Input() <- &sarama.ProducerMessage{
Key: partitionKey,
Topic: topic,
Value: value,
}
}
return nil
}