blob: 0a09f4f1a3fd66b05f00950566aa122656080aa3 [file] [log] [blame]
Zack Williams9e8efd32018-10-17 15:01:13 -07001#!/usr/bin/env python
2
3# Copyright 2018-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# kafkaloghandler - logging handler that sends to Kafka
18
19import json
20import confluent_kafka
21import logging
22import sys
23
24
25class KafkaLogHandler(logging.Handler):
26
27 def __init__(self,
28 bootstrap_servers=["localhost:9092"],
29 key="klh", # kafka default key
30 topic="kafkaloghandler", # kafka default topic
31 timeout=10.0, # kafka connection timeout
32 flatten=5, # maximum depth of dict flattening
33 separator=".", # separator used when flattening
34 blacklist=["_logger"], # keys excluded from messages
35 ):
36
37 logging.Handler.__init__(self)
38
39 self.bootstrap_servers = bootstrap_servers
40 self.topic = topic
41 self.key = key
42 self.flatten = flatten
43 self.separator = separator
44 self.blacklist = blacklist
45 self.timeout = timeout
46 self.producer = None
47
48 def _connect(self):
49
50 try:
51 producer_config = {
52 'bootstrap.servers': ','.join(self.bootstrap_servers),
53 }
54
55 self.producer = confluent_kafka.Producer(**producer_config)
56
57 except confluent_kafka.KafkaError, e:
58 print "Kafka Error: %s" % e
59 # die if there's an error
60 sys.exit(1)
61
62 def _flatten(self, ns, toflatten, maxdepth):
63 """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
64
65 # if max depth reached, return k:v dict
66 if maxdepth < 1:
67 return {ns: toflatten}
68
69 flattened = {}
70
71 for k, v in toflatten.iteritems():
72
73 prefix = "%s%s%s" % (ns, self.separator, k)
74
75 if isinstance(v, dict):
76 flattened.update(self._flatten(prefix, v, maxdepth-1))
77 else:
78 flattened[prefix] = v
79
80 return flattened
81
82 def emit(self, record):
83
84 recvars = {}
85
86 message_key = self.key
87
88 # fixup any structured arguments
89 for k, v in vars(record).iteritems():
90 # remove any items with keys in blacklist
91 if k in self.blacklist:
92 continue
93
94 # if a "key" is found, use as the kafka key and remove
95 if k is 'key':
96 message_key = v
97 continue
98
99 # flatten any sub-dicts down, if enabled
100 if self.flatten and isinstance(v, dict):
101 recvars.update(self._flatten(k, v, self.flatten))
102 continue
103
104 recvars[k] = v
105
106 # Replace unserializable items with repr version.
107 # Otherwise, the log message may be discarded if it contains any
108 # unserializable fields
109 json_recvars = json.dumps(
110 recvars,
111 separators=(',', ':'),
112 default=lambda o: repr(o),
113 )
114
115 if self.producer is None:
116 self._connect()
117
118 try:
119 self.producer.produce(self.topic, json_recvars, message_key)
120
121 except confluent_kafka.KafkaError, e:
122 print "Kafka Error: %s" % e
123 # currently don't do anything on failure...
124 pass
125
126 def flush(self):
127
128 if self.producer:
129 self.producer.flush(self.timeout)