Python 3 compatibility, testing using tox

Cleanup of keys to align with logstash

Change-Id: Iddd09deb479ee6f402a915de0267a0e93497778b
diff --git a/.gitignore b/.gitignore
index 2780493..cdb0195 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,6 @@
 .coverage
 coverage.xml
 .git*
+.tox
+nose2-junit.xml
+
diff --git a/README.rst b/README.rst
index a52a120..d5470bd 100644
--- a/README.rst
+++ b/README.rst
@@ -4,10 +4,11 @@
 Provides a python ``logging`` compatible handler for producing messages to a
 Kafka message bus.
 
-Depends on the confluent_kafka module to connect to Kafka
+Depends on the confluent_kafka module to connect to Kafka.
 
-Designed to support structured logging, and serializes log data as JSON when
-published as a Kafka message.
+Designed to support both standard and structlog formats, and serializes log
+data as JSON when published as a Kafka message.  Messages are normalized to be
+more compatible with Logstash/Filebeat formats.
 
 Usage
 =====
@@ -80,7 +81,7 @@
 *blacklist*
   List of top-level keys to discard from structured logs when outputting JSON.
 
-  **default:** ``['_logger']``
+  **default:** ``["_logger", "_name"]``
 
 
 Tests
diff --git a/VERSION b/VERSION
index 728d00f..a3df0a6 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.8.0-dev0
+0.8.0
diff --git a/kafkaloghandler/kafkaloghandler.py b/kafkaloghandler/kafkaloghandler.py
index 0a09f4f..38a9b9c 100644
--- a/kafkaloghandler/kafkaloghandler.py
+++ b/kafkaloghandler/kafkaloghandler.py
@@ -16,10 +16,11 @@
 
 # kafkaloghandler - logging handler that sends to Kafka
 
-import json
 import confluent_kafka
+import json
 import logging
 import sys
+from datetime import datetime
 
 
 class KafkaLogHandler(logging.Handler):
@@ -31,7 +32,7 @@
                  timeout=10.0,  # kafka connection timeout
                  flatten=5,  # maximum depth of dict flattening
                  separator=".",  # separator used when flattening
-                 blacklist=["_logger"],  # keys excluded from messages
+                 blacklist=["_logger", "_name"],  # keys excluded from messages
                  ):
 
         logging.Handler.__init__(self)
@@ -54,8 +55,8 @@
 
             self.producer = confluent_kafka.Producer(**producer_config)
 
-        except confluent_kafka.KafkaError, e:
-            print "Kafka Error: %s" % e
+        except confluent_kafka.KafkaError as e:
+            print("Kafka Error: %s" % e)
             # die if there's an error
             sys.exit(1)
 
@@ -68,7 +69,7 @@
 
         flattened = {}
 
-        for k, v in toflatten.iteritems():
+        for k, v in toflatten.items():
 
             prefix = "%s%s%s" % (ns, self.separator, k)
 
@@ -81,17 +82,47 @@
 
     def emit(self, record):
 
+        # make a dict from LogRecord
+        rec = vars(record)
+
         recvars = {}
 
         message_key = self.key
 
+        # structlog puts all arguments under a 'msg' dict, whereas
+        # with normal logging 'msg' is a string. If 'msg' is a dict,
+        # merge it with 'rec', and remove it.
+        if 'msg' in rec and isinstance(rec['msg'], dict):
+            rec.update(rec['msg'])
+            del rec['msg']
+
         # fixup any structured arguments
-        for k, v in vars(record).iteritems():
+        for k, v in rec.items():
+
             # remove any items with keys in blacklist
             if k in self.blacklist:
                 continue
 
-            # if a "key" is found, use as the kafka key and remove
+            # conform vars to be closer to logstash format
+
+            # 'created' is naive (no timezone) time, per:
+            # https://github.com/python/cpython/blob/2.7/Lib/logging/__init__.py#L242
+            if k is 'created':
+                recvars['@timestamp'] = \
+                    datetime.utcfromtimestamp(v).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
+                continue
+
+            # thread is an int in Python, but a string in others (Java), so rename
+            if k is 'thread':
+                recvars['threadId'] = v
+                continue
+
+            # 'message' is used more than 'msg' (standard) or 'event' (structlog)
+            if k in ['msg', 'event']:
+                recvars['message'] = v
+                continue
+
+            # if a 'key' is found, use as the kafka key and remove
             if k is 'key':
                 message_key = v
                 continue
@@ -101,6 +132,7 @@
                 recvars.update(self._flatten(k, v, self.flatten))
                 continue
 
+            # pass remaining variables unchanged
             recvars[k] = v
 
         # Replace unserializable items with repr version.
@@ -118,8 +150,8 @@
         try:
             self.producer.produce(self.topic, json_recvars, message_key)
 
-        except confluent_kafka.KafkaError, e:
-            print "Kafka Error: %s" % e
+        except confluent_kafka.KafkaError as e:
+            print("Kafka Error: %s" % e)
             # currently don't do anything on failure...
             pass
 
diff --git a/setup.py b/setup.py
index 3b98a2a..c38e578 100644
--- a/setup.py
+++ b/setup.py
@@ -38,17 +38,19 @@
     description='Kafka Logging Handler',
     long_description=readme(),
     classifiers=[
+        'Development Status :: 4 - Beta',
         'Topic :: System :: Logging',
         'Topic :: Internet :: Log Analysis',
         'License :: OSI Approved :: Apache Software License',
+        'Programming Language :: Python :: 2.7',
+        'Programming Language :: Python :: 3',
     ],
-    keywords='kafka logging',
+    keywords=['kafka', 'logging', 'log handler', 'message bus'],
     url='https://gerrit.opencord.org/gitweb?p=kafkaloghandler.git',
     author='Zack Williams',
     author_email='zdw@opennetworking.org',
     packages=['kafkaloghandler'],
     license='Apache v2',
     install_requires=parse_requirements('requirements.txt'),
-    include_package_data=True,
-    zip_safe=False,
+    data_files=[('', ['VERSION', 'requirements.txt', 'README.rst'])],
 )
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
index 52f9feb..420d1b2 100644
--- a/tests/test_kafkaloghandler.py
+++ b/tests/test_kafkaloghandler.py
@@ -14,12 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from kafkaloghandler import KafkaLogHandler
 import json
 import logging
 import unittest
 
-from mock import patch
-from kafkaloghandler import KafkaLogHandler
+# mock is a part of unittest in python 3
+try:
+    from mock import patch
+except ImportError:
+    from unittest.mock import patch
 
 
 class FakeKafkaProducer():
@@ -66,7 +70,7 @@
 
             self.logger.warn('Warning')
 
-            emit.assert_called_once()
+            assert emit.call_count == 1
 
     def test_with_structure(self):
         '''
@@ -93,7 +97,7 @@
             decoded_message = json.loads(klh.producer.value)
 
             self.assertEqual(klh.producer.topic, 'testtopic')
-            self.assertEqual(decoded_message['msg'], 'structured')
+            self.assertEqual(decoded_message['message'], 'structured')
             self.assertEqual(decoded_message['foo'], 'value1')
             self.assertEqual(decoded_message['bar'], 'value2')
             self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
@@ -122,7 +126,7 @@
 
             decoded_message = json.loads(klh.producer.value)
 
-            self.assertEqual(decoded_message['msg'], 'noflatten')
+            self.assertEqual(decoded_message['message'], 'noflatten')
             self.assertEqual(decoded_message['foo'], 'value1')
             self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
 
@@ -151,7 +155,7 @@
 
             decoded_message = json.loads(klh.producer.value)
 
-            self.assertEqual(decoded_message['msg'], 'oneflatten')
+            self.assertEqual(decoded_message['message'], 'oneflatten')
             self.assertEqual(decoded_message['foo'], 'value1')
             self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
 
@@ -181,7 +185,7 @@
 
             self.assertEqual(klh.producer.key, 'klh')
             self.assertEqual(decoded_message1['foo'], 'value1')
-            self.assertEqual(decoded_message1['msg'], 'defaultkey')
+            self.assertEqual(decoded_message1['message'], 'defaultkey')
             self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
 
             # log with key overridden
@@ -191,7 +195,7 @@
             decoded_message2 = json.loads(klh.producer.value)
 
             self.assertEqual(klh.producer.key, 'override')
-            self.assertEqual(decoded_message2['msg'], 'keyoverride')
+            self.assertEqual(decoded_message2['message'], 'keyoverride')
             self.assertEqual(decoded_message2['foo'], 'value1')
             self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
 
@@ -221,7 +225,7 @@
             decoded_message = json.loads(klh.producer.value)
 
             self.assertEqual(klh.producer.topic, 'testtopic')
-            self.assertEqual(decoded_message['msg'], 'blacklist')
+            self.assertEqual(decoded_message['message'], 'blacklist')
             self.assertEqual(decoded_message['foo'], 'value1')
             with self.assertRaises(KeyError):
                 decoded_message['bar']
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..ea8d61d
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,29 @@
+; Copyright 2018-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36
+skip_missing_interpreters = true
+
+[testenv]
+deps =
+  nose2
+  confluent-kafka
+  flake8
+commands =
+  nose2 --verbose --junit-xml
+  flake8
+
+[flake8]
+max-line-length = 119
diff --git a/unittest.cfg b/unittest.cfg
index 67b89de..63d3975 100644
--- a/unittest.cfg
+++ b/unittest.cfg
@@ -5,5 +5,4 @@
 [coverage]
 always-on = True
 coverage = kafkaloghandler
-coverage-report = term
 coverage-report = xml