blob: a57b5f1fdaacd4a520142e16d20feee46f6faac5 [file] [log] [blame]
Zack Williams9e8efd32018-10-17 15:01:13 -07001#!/usr/bin/env python
2
3# Copyright 2018-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# kafkaloghandler - logging handler that sends to Kafka
18
Zack Williams9e8efd32018-10-17 15:01:13 -070019import confluent_kafka
Zack Williams1f300022018-10-26 15:30:23 -070020import json
Zack Williams9e8efd32018-10-17 15:01:13 -070021import logging
22import sys
Zack Williams1f300022018-10-26 15:30:23 -070023from datetime import datetime
Zack Williams9e8efd32018-10-17 15:01:13 -070024
25
26class KafkaLogHandler(logging.Handler):
27
28 def __init__(self,
29 bootstrap_servers=["localhost:9092"],
30 key="klh", # kafka default key
31 topic="kafkaloghandler", # kafka default topic
Zack Williams19e1cc22018-11-05 23:18:41 -070032 extra_config={}, # extra producer configuration
Zack Williams9e8efd32018-10-17 15:01:13 -070033 timeout=10.0, # kafka connection timeout
34 flatten=5, # maximum depth of dict flattening
35 separator=".", # separator used when flattening
Zack Williams1f300022018-10-26 15:30:23 -070036 blacklist=["_logger", "_name"], # keys excluded from messages
Zack Williams9e8efd32018-10-17 15:01:13 -070037 ):
38
39 logging.Handler.__init__(self)
40
Zack Williams19e1cc22018-11-05 23:18:41 -070041 # Build the configuration for the kafka producer
42 self.producer_config = extra_config
43 self.producer_config.update({
44 'bootstrap.servers': ','.join(bootstrap_servers),
45 })
46
Zack Williams9e8efd32018-10-17 15:01:13 -070047 self.topic = topic
48 self.key = key
49 self.flatten = flatten
50 self.separator = separator
51 self.blacklist = blacklist
52 self.timeout = timeout
53 self.producer = None
54
55 def _connect(self):
56
57 try:
Zack Williams19e1cc22018-11-05 23:18:41 -070058 self.producer = confluent_kafka.Producer(**self.producer_config)
Zack Williams9e8efd32018-10-17 15:01:13 -070059
Zack Williams1f300022018-10-26 15:30:23 -070060 except confluent_kafka.KafkaError as e:
61 print("Kafka Error: %s" % e)
Zack Williams9e8efd32018-10-17 15:01:13 -070062 # die if there's an error
63 sys.exit(1)
64
65 def _flatten(self, ns, toflatten, maxdepth):
Zack Williams19e1cc22018-11-05 23:18:41 -070066 """
67 flatten dicts creating a key.subkey.subsubkey... hierarchy
68 flatten lists creating a <index>.subkey... hierarchy
69 """
Zack Williams9e8efd32018-10-17 15:01:13 -070070
71 # if max depth reached, return k:v dict
72 if maxdepth < 1:
73 return {ns: toflatten}
74
75 flattened = {}
76
Zack Williams19e1cc22018-11-05 23:18:41 -070077 # turn dict into tuples, enumerate lists
78 if isinstance(toflatten, list):
79 tf = enumerate(toflatten)
80 else:
81 tf = toflatten.items()
82
83 for k, v in tf:
Zack Williams9e8efd32018-10-17 15:01:13 -070084
85 prefix = "%s%s%s" % (ns, self.separator, k)
86
Zack Williams19e1cc22018-11-05 23:18:41 -070087 if isinstance(v, dict) or isinstance(v, list):
Zack Williams9e8efd32018-10-17 15:01:13 -070088 flattened.update(self._flatten(prefix, v, maxdepth-1))
89 else:
90 flattened[prefix] = v
91
92 return flattened
93
94 def emit(self, record):
95
Zack Williams1f300022018-10-26 15:30:23 -070096 # make a dict from LogRecord
97 rec = vars(record)
98
Zack Williams9e8efd32018-10-17 15:01:13 -070099 recvars = {}
100
101 message_key = self.key
102
Zack Williams1f300022018-10-26 15:30:23 -0700103 # structlog puts all arguments under a 'msg' dict, whereas
104 # with normal logging 'msg' is a string. If 'msg' is a dict,
105 # merge it with 'rec', and remove it.
106 if 'msg' in rec and isinstance(rec['msg'], dict):
107 rec.update(rec['msg'])
108 del rec['msg']
109
Zack Williams9e8efd32018-10-17 15:01:13 -0700110 # fixup any structured arguments
Zack Williams1f300022018-10-26 15:30:23 -0700111 for k, v in rec.items():
112
Zack Williams9e8efd32018-10-17 15:01:13 -0700113 # remove any items with keys in blacklist
114 if k in self.blacklist:
115 continue
116
Zack Williams1f300022018-10-26 15:30:23 -0700117 # conform vars to be closer to logstash format
118
119 # 'created' is naive (no timezone) time, per:
120 # https://github.com/python/cpython/blob/2.7/Lib/logging/__init__.py#L242
121 if k is 'created':
122 recvars['@timestamp'] = \
123 datetime.utcfromtimestamp(v).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
124 continue
125
126 # thread is an int in Python, but a string in others (Java), so rename
127 if k is 'thread':
128 recvars['threadId'] = v
129 continue
130
131 # 'message' is used more than 'msg' (standard) or 'event' (structlog)
132 if k in ['msg', 'event']:
133 recvars['message'] = v
134 continue
135
136 # if a 'key' is found, use as the kafka key and remove
Zack Williams9e8efd32018-10-17 15:01:13 -0700137 if k is 'key':
138 message_key = v
139 continue
140
Zack Williams19e1cc22018-11-05 23:18:41 -0700141 # flatten any sub-dicts/lists down, if enabled
142 if self.flatten and isinstance(v, dict) or isinstance(v, list):
Zack Williams9e8efd32018-10-17 15:01:13 -0700143 recvars.update(self._flatten(k, v, self.flatten))
144 continue
145
Zack Williams1f300022018-10-26 15:30:23 -0700146 # pass remaining variables unchanged
Zack Williams9e8efd32018-10-17 15:01:13 -0700147 recvars[k] = v
148
149 # Replace unserializable items with repr version.
150 # Otherwise, the log message may be discarded if it contains any
151 # unserializable fields
152 json_recvars = json.dumps(
153 recvars,
154 separators=(',', ':'),
155 default=lambda o: repr(o),
156 )
157
158 if self.producer is None:
159 self._connect()
160
161 try:
162 self.producer.produce(self.topic, json_recvars, message_key)
163
Zack Williams19e1cc22018-11-05 23:18:41 -0700164 # recommended by https://github.com/confluentinc/confluent-kafka-python/issues/16
165 self.producer.poll(0)
166
Zack Williams1f300022018-10-26 15:30:23 -0700167 except confluent_kafka.KafkaError as e:
168 print("Kafka Error: %s" % e)
Zack Williams9e8efd32018-10-17 15:01:13 -0700169 # currently don't do anything on failure...
170 pass
171
172 def flush(self):
173
174 if self.producer:
175 self.producer.flush(self.timeout)