blob: 0a09f4f1a3fd66b05f00950566aa122656080aa3 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# kafkaloghandler - logging handler that sends to Kafka
import json
import confluent_kafka
import logging
import sys
class KafkaLogHandler(logging.Handler):
def __init__(self,
bootstrap_servers=["localhost:9092"],
key="klh", # kafka default key
topic="kafkaloghandler", # kafka default topic
timeout=10.0, # kafka connection timeout
flatten=5, # maximum depth of dict flattening
separator=".", # separator used when flattening
blacklist=["_logger"], # keys excluded from messages
):
logging.Handler.__init__(self)
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.key = key
self.flatten = flatten
self.separator = separator
self.blacklist = blacklist
self.timeout = timeout
self.producer = None
def _connect(self):
try:
producer_config = {
'bootstrap.servers': ','.join(self.bootstrap_servers),
}
self.producer = confluent_kafka.Producer(**producer_config)
except confluent_kafka.KafkaError, e:
print "Kafka Error: %s" % e
# die if there's an error
sys.exit(1)
def _flatten(self, ns, toflatten, maxdepth):
""" flatten dicts creating a key.subkey.subsubkey... hierarchy """
# if max depth reached, return k:v dict
if maxdepth < 1:
return {ns: toflatten}
flattened = {}
for k, v in toflatten.iteritems():
prefix = "%s%s%s" % (ns, self.separator, k)
if isinstance(v, dict):
flattened.update(self._flatten(prefix, v, maxdepth-1))
else:
flattened[prefix] = v
return flattened
def emit(self, record):
recvars = {}
message_key = self.key
# fixup any structured arguments
for k, v in vars(record).iteritems():
# remove any items with keys in blacklist
if k in self.blacklist:
continue
# if a "key" is found, use as the kafka key and remove
if k is 'key':
message_key = v
continue
# flatten any sub-dicts down, if enabled
if self.flatten and isinstance(v, dict):
recvars.update(self._flatten(k, v, self.flatten))
continue
recvars[k] = v
# Replace unserializable items with repr version.
# Otherwise, the log message may be discarded if it contains any
# unserializable fields
json_recvars = json.dumps(
recvars,
separators=(',', ':'),
default=lambda o: repr(o),
)
if self.producer is None:
self._connect()
try:
self.producer.produce(self.topic, json_recvars, message_key)
except confluent_kafka.KafkaError, e:
print "Kafka Error: %s" % e
# currently don't do anything on failure...
pass
def flush(self):
if self.producer:
self.producer.flush(self.timeout)