Make kafkaloghandler standalone, add unit tests
Change-Id: I36e8c5cc33459e8243d3aaac3fefc2928b663d36
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2780493
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*.pyc
+dist
+*.egg-info
+MANIFEST
+.coverage
+coverage.xml
+.git*
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..a52a120
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,91 @@
+KafkaLogHandler
+===============
+
+Provides a python ``logging`` compatible handler for producing messages to a
+Kafka message bus.
+
+Depends on the confluent_kafka module to connect to Kafka
+
+Designed to support structured logging, and serializes log data as JSON when
+published as a Kafka message.
+
+Usage
+=====
+
+**Example:**
+
+::
+
+ import logger
+
+ from kafkaloghandler import KafkaLogHandler
+
+ log = logging.getLogger()
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"], topic="testtopic")
+
+ log.addHandler(klh)
+
+ data={'example':'structured data'}
+
+ log.info('message to send to kafka', data=data)
+
+
+**Parameters that can be provided to KafkaLogHandler:**
+
+*bootstrap_servers*
+ List of Kafka bootstrap servers to connect to. See confluent_kafka docs.
+
+ **default:** ``["localhost:9092"]``
+
+*timeout*
+ Timeout in seconds for flushing producer queue. See confluent_kafka docs.
+
+ **default:** ``10.0``
+
+*topic*
+ String that sets the topic in Kafka.
+
+ **default:** ``"kafkaloghandler"``
+
+*key*
+ String that sets the default key in Kafka, can be used for summarization within Kafka.
+
+ NOTE: This default key can be overridden on a per-message basis by passing a
+ dict to the logger with ``{"key": "new_key_for_this_message"}`` in it.
+
+ **default:** ``"klh"``
+
+*flatten*
+ Flattens nested dictionary keys passed as structured logging into the parent
+ dictionary layer, up to a certain depth.
+
+ This is useful when logging to external systems that don't have good support
+ for hierarchical data.
+
+ Example: ``{'a': {'b': 'c'}}`` would be flattened to ``{'a.b': 'c'}``
+
+ If the depth is exceeded, any remaining deeper dict will be added to the
+ output under the flattened key.
+
+ Set to ``0`` to turn off flattening.
+
+ **default:** ``5``
+
+*separator*
+ Separator used between keys when flattening.
+
+ **default:** ``.``
+
+*blacklist*
+ List of top-level keys to discard from structured logs when outputting JSON.
+
+ **default:** ``['_logger']``
+
+
+Tests
+=====
+
+Unit tests can be run with:
+
+ nose2 --verbose --coverage-report term
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..728d00f
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.8.0-dev0
diff --git a/kafkaloghandler/__init__.py b/kafkaloghandler/__init__.py
new file mode 100644
index 0000000..258450c
--- /dev/null
+++ b/kafkaloghandler/__init__.py
@@ -0,0 +1,17 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .kafkaloghandler import KafkaLogHandler
+
+__all__ = ['KafkaLogHandler']
diff --git a/kafkaloghandler/kafkaloghandler.py b/kafkaloghandler/kafkaloghandler.py
new file mode 100644
index 0000000..0a09f4f
--- /dev/null
+++ b/kafkaloghandler/kafkaloghandler.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# kafkaloghandler - logging handler that sends to Kafka
+
+import json
+import confluent_kafka
+import logging
+import sys
+
+
+class KafkaLogHandler(logging.Handler):
+
+ def __init__(self,
+ bootstrap_servers=["localhost:9092"],
+ key="klh", # kafka default key
+ topic="kafkaloghandler", # kafka default topic
+ timeout=10.0, # kafka connection timeout
+ flatten=5, # maximum depth of dict flattening
+ separator=".", # separator used when flattening
+ blacklist=["_logger"], # keys excluded from messages
+ ):
+
+ logging.Handler.__init__(self)
+
+ self.bootstrap_servers = bootstrap_servers
+ self.topic = topic
+ self.key = key
+ self.flatten = flatten
+ self.separator = separator
+ self.blacklist = blacklist
+ self.timeout = timeout
+ self.producer = None
+
+ def _connect(self):
+
+ try:
+ producer_config = {
+ 'bootstrap.servers': ','.join(self.bootstrap_servers),
+ }
+
+ self.producer = confluent_kafka.Producer(**producer_config)
+
+ except confluent_kafka.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # die if there's an error
+ sys.exit(1)
+
+ def _flatten(self, ns, toflatten, maxdepth):
+ """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
+
+ # if max depth reached, return k:v dict
+ if maxdepth < 1:
+ return {ns: toflatten}
+
+ flattened = {}
+
+ for k, v in toflatten.iteritems():
+
+ prefix = "%s%s%s" % (ns, self.separator, k)
+
+ if isinstance(v, dict):
+ flattened.update(self._flatten(prefix, v, maxdepth-1))
+ else:
+ flattened[prefix] = v
+
+ return flattened
+
+ def emit(self, record):
+
+ recvars = {}
+
+ message_key = self.key
+
+ # fixup any structured arguments
+ for k, v in vars(record).iteritems():
+ # remove any items with keys in blacklist
+ if k in self.blacklist:
+ continue
+
+ # if a "key" is found, use as the kafka key and remove
+ if k is 'key':
+ message_key = v
+ continue
+
+ # flatten any sub-dicts down, if enabled
+ if self.flatten and isinstance(v, dict):
+ recvars.update(self._flatten(k, v, self.flatten))
+ continue
+
+ recvars[k] = v
+
+ # Replace unserializable items with repr version.
+ # Otherwise, the log message may be discarded if it contains any
+ # unserializable fields
+ json_recvars = json.dumps(
+ recvars,
+ separators=(',', ':'),
+ default=lambda o: repr(o),
+ )
+
+ if self.producer is None:
+ self._connect()
+
+ try:
+ self.producer.produce(self.topic, json_recvars, message_key)
+
+ except confluent_kafka.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # currently don't do anything on failure...
+ pass
+
+ def flush(self):
+
+ if self.producer:
+ self.producer.flush(self.timeout)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..9a597b4
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1 @@
+confluent-kafka>=0.11.5
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..3b98a2a
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from setuptools import setup
+
+
+def readme():
+ with open('README.rst') as f:
+ return f.read()
+
+
+def version():
+ with open('VERSION') as f:
+ return f.read()
+
+
+def parse_requirements(filename):
+ lineiter = (line.strip() for line in open(filename))
+ return [line for line in lineiter if line and not line.startswith("#")]
+
+
+setup(
+ name='kafkaloghandler',
+ version=version(),
+ description='Kafka Logging Handler',
+ long_description=readme(),
+ classifiers=[
+ 'Topic :: System :: Logging',
+ 'Topic :: Internet :: Log Analysis',
+ 'License :: OSI Approved :: Apache Software License',
+ ],
+ keywords='kafka logging',
+ url='https://gerrit.opencord.org/gitweb?p=kafkaloghandler.git',
+ author='Zack Williams',
+ author_email='zdw@opennetworking.org',
+ packages=['kafkaloghandler'],
+ license='Apache v2',
+ install_requires=parse_requirements('requirements.txt'),
+ include_package_data=True,
+ zip_safe=False,
+)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
new file mode 100644
index 0000000..52f9feb
--- /dev/null
+++ b/tests/test_kafkaloghandler.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import unittest
+
+from mock import patch
+from kafkaloghandler import KafkaLogHandler
+
+
+class FakeKafkaProducer():
+ '''
+ Works like Producer in confluent_kafka, ref:
+ https://docs.confluent.io/current/clients/confluent-kafka-python/#producer
+ '''
+ def __init__(self, config=[]):
+ self.config = config
+
+ def produce(self, topic, value='', key=''):
+ self.topic = topic
+ self.value = value
+ self.key = key
+
+ def flush(self, timeout=1):
+ self.flush_timeout = timeout
+
+
+class TestKafkaLogHandler(unittest.TestCase):
+
+ def setUp(self):
+ '''
+ Setup tests for KafkaLogHandler, mainly common init of logger
+ '''
+ self.logger = logging.getLogger(__name__)
+ self.logger.handlers = []
+ self.logger.setLevel(logging.INFO)
+
+ def tearDown(self):
+ logging.shutdown()
+
+ def test_single_message(self):
+ '''
+ tests that _emit is called once when there is one message
+ '''
+
+ with patch.object(KafkaLogHandler, 'emit') as emit:
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic")
+
+ self.logger.addHandler(klh)
+
+ self.logger.warn('Warning')
+
+ emit.assert_called_once()
+
+ def test_with_structure(self):
+ '''
+ tests structured serialization of log to JSON
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic")
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "bar": "value2",
+ "l1": {"l2": {'l3': "nested"}},
+ }
+
+ self.logger.info('structured', extra=extra_data)
+
+ decoded_message = json.loads(klh.producer.value)
+
+ self.assertEqual(klh.producer.topic, 'testtopic')
+ self.assertEqual(decoded_message['msg'], 'structured')
+ self.assertEqual(decoded_message['foo'], 'value1')
+ self.assertEqual(decoded_message['bar'], 'value2')
+ self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
+
+ def test_without_flatten(self):
+ '''
+ tests with flattening of objects disabled
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic",
+ flatten=0)
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "l1": {"l2": {'l3': "nested"}},
+ }
+
+ self.logger.info('noflatten', extra=extra_data)
+
+ decoded_message = json.loads(klh.producer.value)
+
+ self.assertEqual(decoded_message['msg'], 'noflatten')
+ self.assertEqual(decoded_message['foo'], 'value1')
+ self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
+
+ def test_with_shallow_flatten(self):
+ '''
+ Tests with a shallow flattening of objects, and different separator
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic",
+ flatten=1,
+ separator='_')
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "l1": {"l2": {'l3': "nested"}},
+ }
+
+ self.logger.info('oneflatten', extra=extra_data)
+
+ decoded_message = json.loads(klh.producer.value)
+
+ self.assertEqual(decoded_message['msg'], 'oneflatten')
+ self.assertEqual(decoded_message['foo'], 'value1')
+ self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
+
+ def test_override_key(self):
+ '''
+ Test setting the key argument to override the default
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic")
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "l1": {"l2": {'l3': "nested"}},
+ }
+
+ # log with default 'klh' key
+ self.logger.info('defaultkey', extra=extra_data)
+
+ decoded_message1 = json.loads(klh.producer.value)
+
+ self.assertEqual(klh.producer.key, 'klh')
+ self.assertEqual(decoded_message1['foo'], 'value1')
+ self.assertEqual(decoded_message1['msg'], 'defaultkey')
+ self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
+
+ # log with key overridden
+ extra_data.update({'key': 'override'})
+ self.logger.info('keyoverride', extra=extra_data)
+
+ decoded_message2 = json.loads(klh.producer.value)
+
+ self.assertEqual(klh.producer.key, 'override')
+ self.assertEqual(decoded_message2['msg'], 'keyoverride')
+ self.assertEqual(decoded_message2['foo'], 'value1')
+ self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
+
+ def test_blacklist(self):
+ '''
+ tests adding items to blacklist
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic",
+ blacklist=["bar"])
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "bar": "value2",
+ "l1": {"l2": {'l3': "nested"}},
+ }
+
+ self.logger.info('blacklist', extra=extra_data)
+
+ decoded_message = json.loads(klh.producer.value)
+
+ self.assertEqual(klh.producer.topic, 'testtopic')
+ self.assertEqual(decoded_message['msg'], 'blacklist')
+ self.assertEqual(decoded_message['foo'], 'value1')
+ with self.assertRaises(KeyError):
+ decoded_message['bar']
diff --git a/unittest.cfg b/unittest.cfg
new file mode 100644
index 0000000..67b89de
--- /dev/null
+++ b/unittest.cfg
@@ -0,0 +1,9 @@
+[unittest]
+plugins=nose2.plugins.junitxml
+code-directories=kafkaloghandler
+
+[coverage]
+always-on = True
+coverage = kafkaloghandler
+coverage-report = term
+coverage-report = xml