Make kafkaloghandler standalone, add unit tests

Change-Id: I36e8c5cc33459e8243d3aaac3fefc2928b663d36
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2780493
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*.pyc
+dist
+*.egg-info
+MANIFEST
+.coverage
+coverage.xml
+.git*
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..a52a120
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,91 @@
+KafkaLogHandler
+===============
+
+Provides a python ``logging`` compatible handler for producing messages to a
+Kafka message bus.
+
+Depends on the confluent_kafka module to connect to Kafka
+
+Designed to support structured logging, and serializes log data as JSON when
+published as a Kafka message.
+
+Usage
+=====
+
+**Example:**
+
+::
+
+  import logger
+
+  from kafkaloghandler import KafkaLogHandler
+
+  log = logging.getLogger()
+
+  klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"], topic="testtopic")
+
+  log.addHandler(klh)
+
+  data={'example':'structured data'}
+
+  log.info('message to send to kafka', data=data)
+
+
+**Parameters that can be provided to KafkaLogHandler:**
+
+*bootstrap_servers*
+  List of Kafka bootstrap servers to connect to. See confluent_kafka docs.
+
+  **default:** ``["localhost:9092"]``
+
+*timeout*
+  Timeout in seconds for flushing producer queue. See confluent_kafka docs.
+
+  **default:** ``10.0``
+
+*topic*
+  String that sets the topic in Kafka.
+
+  **default:** ``"kafkaloghandler"``
+
+*key*
+  String that sets the default key in Kafka, can be used for summarization within Kafka.
+
+  NOTE: This default key can be overridden on a per-message basis by passing a
+  dict to the logger with ``{"key": "new_key_for_this_message"}`` in it.
+
+  **default:** ``"klh"``
+
+*flatten*
+  Flattens nested dictionary keys passed as structured logging into the parent
+  dictionary layer, up to a certain depth.
+
+  This is useful when logging to external systems that don't have good support
+  for hierarchical data.
+
+  Example: ``{'a': {'b': 'c'}}`` would be flattened to ``{'a.b': 'c'}``
+
+  If the depth is exceeded, any remaining deeper dict will be added to the
+  output under the flattened key.
+
+  Set to ``0`` to turn off flattening.
+
+  **default:** ``5``
+
+*separator*
+  Separator used between keys when flattening.
+
+  **default:** ``.``
+
+*blacklist*
+  List of top-level keys to discard from structured logs when outputting JSON.
+
+  **default:** ``['_logger']``
+
+
+Tests
+=====
+
+Unit tests can be run with:
+
+   nose2 --verbose --coverage-report term
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..728d00f
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.8.0-dev0
diff --git a/kafkaloghandler/__init__.py b/kafkaloghandler/__init__.py
new file mode 100644
index 0000000..258450c
--- /dev/null
+++ b/kafkaloghandler/__init__.py
@@ -0,0 +1,17 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .kafkaloghandler import KafkaLogHandler
+
+__all__ = ['KafkaLogHandler']
diff --git a/kafkaloghandler/kafkaloghandler.py b/kafkaloghandler/kafkaloghandler.py
new file mode 100644
index 0000000..0a09f4f
--- /dev/null
+++ b/kafkaloghandler/kafkaloghandler.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# kafkaloghandler - logging handler that sends to Kafka
+
+import json
+import confluent_kafka
+import logging
+import sys
+
+
+class KafkaLogHandler(logging.Handler):
+
+    def __init__(self,
+                 bootstrap_servers=["localhost:9092"],
+                 key="klh",  # kafka default key
+                 topic="kafkaloghandler",  # kafka default topic
+                 timeout=10.0,  # kafka connection timeout
+                 flatten=5,  # maximum depth of dict flattening
+                 separator=".",  # separator used when flattening
+                 blacklist=["_logger"],  # keys excluded from messages
+                 ):
+
+        logging.Handler.__init__(self)
+
+        self.bootstrap_servers = bootstrap_servers
+        self.topic = topic
+        self.key = key
+        self.flatten = flatten
+        self.separator = separator
+        self.blacklist = blacklist
+        self.timeout = timeout
+        self.producer = None
+
+    def _connect(self):
+
+        try:
+            producer_config = {
+                'bootstrap.servers': ','.join(self.bootstrap_servers),
+            }
+
+            self.producer = confluent_kafka.Producer(**producer_config)
+
+        except confluent_kafka.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # die if there's an error
+            sys.exit(1)
+
+    def _flatten(self, ns, toflatten, maxdepth):
+        """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
+
+        # if max depth reached, return k:v dict
+        if maxdepth < 1:
+            return {ns: toflatten}
+
+        flattened = {}
+
+        for k, v in toflatten.iteritems():
+
+            prefix = "%s%s%s" % (ns, self.separator, k)
+
+            if isinstance(v, dict):
+                flattened.update(self._flatten(prefix, v, maxdepth-1))
+            else:
+                flattened[prefix] = v
+
+        return flattened
+
+    def emit(self, record):
+
+        recvars = {}
+
+        message_key = self.key
+
+        # fixup any structured arguments
+        for k, v in vars(record).iteritems():
+            # remove any items with keys in blacklist
+            if k in self.blacklist:
+                continue
+
+            # if a "key" is found, use as the kafka key and remove
+            if k is 'key':
+                message_key = v
+                continue
+
+            # flatten any sub-dicts down, if enabled
+            if self.flatten and isinstance(v, dict):
+                recvars.update(self._flatten(k, v, self.flatten))
+                continue
+
+            recvars[k] = v
+
+        # Replace unserializable items with repr version.
+        # Otherwise, the log message may be discarded if it contains any
+        # unserializable fields
+        json_recvars = json.dumps(
+            recvars,
+            separators=(',', ':'),
+            default=lambda o: repr(o),
+            )
+
+        if self.producer is None:
+            self._connect()
+
+        try:
+            self.producer.produce(self.topic, json_recvars, message_key)
+
+        except confluent_kafka.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # currently don't do anything on failure...
+            pass
+
+    def flush(self):
+
+        if self.producer:
+            self.producer.flush(self.timeout)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..9a597b4
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1 @@
+confluent-kafka>=0.11.5
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..3b98a2a
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from setuptools import setup
+
+
+def readme():
+    with open('README.rst') as f:
+        return f.read()
+
+
+def version():
+    with open('VERSION') as f:
+        return f.read()
+
+
+def parse_requirements(filename):
+    lineiter = (line.strip() for line in open(filename))
+    return [line for line in lineiter if line and not line.startswith("#")]
+
+
+setup(
+    name='kafkaloghandler',
+    version=version(),
+    description='Kafka Logging Handler',
+    long_description=readme(),
+    classifiers=[
+        'Topic :: System :: Logging',
+        'Topic :: Internet :: Log Analysis',
+        'License :: OSI Approved :: Apache Software License',
+    ],
+    keywords='kafka logging',
+    url='https://gerrit.opencord.org/gitweb?p=kafkaloghandler.git',
+    author='Zack Williams',
+    author_email='zdw@opennetworking.org',
+    packages=['kafkaloghandler'],
+    license='Apache v2',
+    install_requires=parse_requirements('requirements.txt'),
+    include_package_data=True,
+    zip_safe=False,
+)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
new file mode 100644
index 0000000..52f9feb
--- /dev/null
+++ b/tests/test_kafkaloghandler.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import unittest
+
+from mock import patch
+from kafkaloghandler import KafkaLogHandler
+
+
+class FakeKafkaProducer():
+    '''
+    Works like Producer in confluent_kafka, ref:
+    https://docs.confluent.io/current/clients/confluent-kafka-python/#producer
+    '''
+    def __init__(self, config=[]):
+        self.config = config
+
+    def produce(self, topic, value='', key=''):
+        self.topic = topic
+        self.value = value
+        self.key = key
+
+    def flush(self, timeout=1):
+        self.flush_timeout = timeout
+
+
+class TestKafkaLogHandler(unittest.TestCase):
+
+    def setUp(self):
+        '''
+        Setup tests for KafkaLogHandler, mainly common init of logger
+        '''
+        self.logger = logging.getLogger(__name__)
+        self.logger.handlers = []
+        self.logger.setLevel(logging.INFO)
+
+    def tearDown(self):
+        logging.shutdown()
+
+    def test_single_message(self):
+        '''
+        tests that _emit is called once when there is one message
+        '''
+
+        with patch.object(KafkaLogHandler, 'emit') as emit:
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            self.logger.addHandler(klh)
+
+            self.logger.warn('Warning')
+
+            emit.assert_called_once()
+
+    def test_with_structure(self):
+        '''
+        tests structured serialization of log to JSON
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "bar": "value2",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('structured', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.topic, 'testtopic')
+            self.assertEqual(decoded_message['msg'], 'structured')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['bar'], 'value2')
+            self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
+
+    def test_without_flatten(self):
+        '''
+        tests with flattening of objects disabled
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic",
+                                  flatten=0)
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('noflatten', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(decoded_message['msg'], 'noflatten')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
+
+    def test_with_shallow_flatten(self):
+        '''
+        Tests with a shallow flattening of objects, and different separator
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic",
+                                  flatten=1,
+                                  separator='_')
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('oneflatten', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(decoded_message['msg'], 'oneflatten')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
+
+    def test_override_key(self):
+        '''
+        Test setting the key argument to override the default
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            # log with default 'klh' key
+            self.logger.info('defaultkey', extra=extra_data)
+
+            decoded_message1 = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.key, 'klh')
+            self.assertEqual(decoded_message1['foo'], 'value1')
+            self.assertEqual(decoded_message1['msg'], 'defaultkey')
+            self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
+
+            # log with key overridden
+            extra_data.update({'key': 'override'})
+            self.logger.info('keyoverride', extra=extra_data)
+
+            decoded_message2 = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.key, 'override')
+            self.assertEqual(decoded_message2['msg'], 'keyoverride')
+            self.assertEqual(decoded_message2['foo'], 'value1')
+            self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
+
+    def test_blacklist(self):
+        '''
+        tests adding items to blacklist
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic",
+                                  blacklist=["bar"])
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "bar": "value2",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('blacklist', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.topic, 'testtopic')
+            self.assertEqual(decoded_message['msg'], 'blacklist')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            with self.assertRaises(KeyError):
+                decoded_message['bar']
diff --git a/unittest.cfg b/unittest.cfg
new file mode 100644
index 0000000..67b89de
--- /dev/null
+++ b/unittest.cfg
@@ -0,0 +1,9 @@
+[unittest]
+plugins=nose2.plugins.junitxml
+code-directories=kafkaloghandler
+
+[coverage]
+always-on = True
+coverage = kafkaloghandler
+coverage-report = term
+coverage-report = xml