[SEBA-356] Sadis-server logs to kafka
Change-Id: Ia1473621e5f8c12818fff6d4dffe708f3cc146cd
diff --git a/vendor/github.com/gfremex/logrus-kafka-hook/kafka-hook.go b/vendor/github.com/gfremex/logrus-kafka-hook/kafka-hook.go
new file mode 100644
index 0000000..2684dda
--- /dev/null
+++ b/vendor/github.com/gfremex/logrus-kafka-hook/kafka-hook.go
@@ -0,0 +1,109 @@
+package logrus_kafka_hook
+
+import (
+ "errors"
+ "github.com/Shopify/sarama"
+ "github.com/sirupsen/logrus"
+ "log"
+ "time"
+)
+
+type KafkaHook struct {
+ // Id of the hook
+ id string
+
+ // Log levels allowed
+ levels []logrus.Level
+
+ // Log entry formatter
+ formatter logrus.Formatter
+
+ // sarama.AsyncProducer
+ producer sarama.AsyncProducer
+}
+
+// Create a new KafkaHook.
+func NewKafkaHook(id string, levels []logrus.Level, formatter logrus.Formatter, brokers []string) (*KafkaHook, error) {
+ kafkaConfig := sarama.NewConfig()
+ kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
+ kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
+ kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
+
+ producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
+
+ if err != nil {
+ return nil, err
+ }
+
+ // We will just log to STDOUT if we're not able to produce messages.
+ // Note: messages will only be returned here after all retry attempts are exhausted.
+ go func() {
+ for err := range producer.Errors() {
+ log.Printf("Failed to send log entry to kafka: %v\n", err)
+ }
+ }()
+
+ hook := &KafkaHook{
+ id,
+ levels,
+ formatter,
+ producer,
+ }
+
+ return hook, nil
+}
+
+func (hook *KafkaHook) Id() string {
+ return hook.id
+}
+
+func (hook *KafkaHook) Levels() []logrus.Level {
+ return hook.levels
+}
+
+func (hook *KafkaHook) Fire(entry *logrus.Entry) error {
+ // Check time for partition key
+ var partitionKey sarama.ByteEncoder
+
+ // Get field time
+ t, _ := entry.Data["time"].(time.Time)
+
+ // Convert it to bytes
+ b, err := t.MarshalBinary()
+
+ if err != nil {
+ return err
+ }
+
+ partitionKey = sarama.ByteEncoder(b)
+
+ // Check topics
+ var topics []string
+
+ if ts, ok := entry.Data["topics"]; ok {
+ if topics, ok = ts.([]string); !ok {
+ return errors.New("Field topics must be []string")
+ }
+ } else {
+ return errors.New("Field topics not found")
+ }
+
+ // Format before writing
+ b, err = hook.formatter.Format(entry)
+
+ if err != nil {
+ return err
+ }
+
+ value := sarama.ByteEncoder(b)
+
+ for _, topic := range topics {
+ hook.producer.Input() <- &sarama.ProducerMessage{
+ Key: partitionKey,
+ Topic: topic,
+ Value: value,
+ }
+ }
+
+ return nil
+}