blob: b5a2a06b07c059e39efe2766354be98c80d12add [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# kafkaloghandler - logging handler that sends to Kafka
import json
import confluent_kafka
import logging
import sys
import time
class KafkaLogHandler(logging.Handler):
def __init__(self,
bootstrap_servers=["localhost:9092"],
key="kh", # kafka key
topic="kafkaloghandler", # kafka topic
timeout=10.0, # kafka connection timeout
flatten=3, # maximum depth of dict flattening
blacklist=["_logger"], # keys excluded from messages
):
logging.Handler.__init__(self)
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.key = key
self.flatten = flatten
self.blacklist = blacklist
self.timeout = timeout
self.producer = None
def _connect(self):
try:
producer_config = {
'bootstrap.servers': ','.join(self.bootstrap_servers),
}
self.producer = confluent_kafka.Producer(**producer_config)
except confluent_kafka.KafkaError, e:
print "Kafka Error: %s" % e
# die if there's an error
sys.exit(1)
def _flatten(self, ns, toflatten, maxdepth):
""" flatten dicts creating a key.subkey.subsubkey... hierarchy """
# avoid recursivly flattening forever
if maxdepth < 1:
return toflatten
flattened = {}
for k, v in toflatten.iteritems():
prefix = "%s.%s" % (ns, k)
if isinstance(v, dict):
flattened.update(self._flatten(prefix, v, maxdepth-1))
else:
flattened[prefix] = v
return flattened
def emit(self, record):
recvars = {}
for k, v in vars(record).iteritems():
# skip items in blacklist
if k in self.blacklist:
continue
# flatten any sub-dicts down
if self.flatten and isinstance(v, dict):
recvars.update(self._flatten(k, v, self.flatten))
continue
recvars[k] = v
# Replace unserializable items with repr version.
# Otherwise, the log message may be discarded if it contains any
# unserializable fields
json_recvars = json.dumps(
recvars,
separators=(',', ':'),
default=lambda o: repr(o),
)
if self.producer is None:
self._connect()
try:
self.producer.produce(self.topic, json_recvars, self.key)
except confluent_kafka.KafkaError, e:
print "Kafka Error: %s" % e
# currently don't do anything on failure...
pass
def flush(self):
if self.producer:
self.producer.flush(self.timeout)
if __name__ == '__main__':
logger = logging.getLogger(__name__)
logger.handlers = []
logger.setLevel(logging.INFO)
kh = KafkaLogHandler(
bootstrap_servers=["test-kafka:9092"],
topic="testtopic",
)
logger.addHandler(kh)
logger.error('Error message')
extra_data = {
"key1": "value1",
"key2": "value2",
}
logger.info('Info message with extra data', extra=extra_data)
index = 0
while True:
logger.info('Info message - loop count: %s' % index)
index += 1
time.sleep(10)