Make kafkaloghandler standalone, add unit tests

Change-Id: I36e8c5cc33459e8243d3aaac3fefc2928b663d36
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
new file mode 100644
index 0000000..52f9feb
--- /dev/null
+++ b/tests/test_kafkaloghandler.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import unittest
+
+from mock import patch
+from kafkaloghandler import KafkaLogHandler
+
+
+class FakeKafkaProducer():
+    '''
+    Works like Producer in confluent_kafka, ref:
+    https://docs.confluent.io/current/clients/confluent-kafka-python/#producer
+    '''
+    def __init__(self, config=[]):
+        self.config = config
+
+    def produce(self, topic, value='', key=''):
+        self.topic = topic
+        self.value = value
+        self.key = key
+
+    def flush(self, timeout=1):
+        self.flush_timeout = timeout
+
+
+class TestKafkaLogHandler(unittest.TestCase):
+
+    def setUp(self):
+        '''
+        Setup tests for KafkaLogHandler, mainly common init of logger
+        '''
+        self.logger = logging.getLogger(__name__)
+        self.logger.handlers = []
+        self.logger.setLevel(logging.INFO)
+
+    def tearDown(self):
+        logging.shutdown()
+
+    def test_single_message(self):
+        '''
+        tests that _emit is called once when there is one message
+        '''
+
+        with patch.object(KafkaLogHandler, 'emit') as emit:
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            self.logger.addHandler(klh)
+
+            self.logger.warn('Warning')
+
+            emit.assert_called_once()
+
+    def test_with_structure(self):
+        '''
+        tests structured serialization of log to JSON
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "bar": "value2",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('structured', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.topic, 'testtopic')
+            self.assertEqual(decoded_message['msg'], 'structured')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['bar'], 'value2')
+            self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
+
+    def test_without_flatten(self):
+        '''
+        tests with flattening of objects disabled
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic",
+                                  flatten=0)
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('noflatten', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(decoded_message['msg'], 'noflatten')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
+
+    def test_with_shallow_flatten(self):
+        '''
+        Tests with a shallow flattening of objects, and different separator
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic",
+                                  flatten=1,
+                                  separator='_')
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('oneflatten', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(decoded_message['msg'], 'oneflatten')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
+
+    def test_override_key(self):
+        '''
+        Test setting the key argument to override the default
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            # log with default 'klh' key
+            self.logger.info('defaultkey', extra=extra_data)
+
+            decoded_message1 = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.key, 'klh')
+            self.assertEqual(decoded_message1['foo'], 'value1')
+            self.assertEqual(decoded_message1['msg'], 'defaultkey')
+            self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
+
+            # log with key overridden
+            extra_data.update({'key': 'override'})
+            self.logger.info('keyoverride', extra=extra_data)
+
+            decoded_message2 = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.key, 'override')
+            self.assertEqual(decoded_message2['msg'], 'keyoverride')
+            self.assertEqual(decoded_message2['foo'], 'value1')
+            self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
+
+    def test_blacklist(self):
+        '''
+        tests adding items to blacklist
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic",
+                                  blacklist=["bar"])
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "bar": "value2",
+                "l1": {"l2": {'l3': "nested"}},
+            }
+
+            self.logger.info('blacklist', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(klh.producer.topic, 'testtopic')
+            self.assertEqual(decoded_message['msg'], 'blacklist')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            with self.assertRaises(KeyError):
+                decoded_message['bar']