Python 3 compatibility, testing using tox
Cleanup of keys to align with logstash
Change-Id: Iddd09deb479ee6f402a915de0267a0e93497778b
diff --git a/.gitignore b/.gitignore
index 2780493..cdb0195 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,6 @@
.coverage
coverage.xml
.git*
+.tox
+nose2-junit.xml
+
diff --git a/README.rst b/README.rst
index a52a120..d5470bd 100644
--- a/README.rst
+++ b/README.rst
@@ -4,10 +4,11 @@
Provides a python ``logging`` compatible handler for producing messages to a
Kafka message bus.
-Depends on the confluent_kafka module to connect to Kafka
+Depends on the confluent_kafka module to connect to Kafka.
-Designed to support structured logging, and serializes log data as JSON when
-published as a Kafka message.
+Designed to support both standard and structlog formats, and serializes log
+data as JSON when published as a Kafka message. Messages are normalized to be
+more compatible with Logstash/Filebeat formats.
Usage
=====
@@ -80,7 +81,7 @@
*blacklist*
List of top-level keys to discard from structured logs when outputting JSON.
- **default:** ``['_logger']``
+ **default:** ``["_logger", "_name"]``
Tests
diff --git a/VERSION b/VERSION
index 728d00f..a3df0a6 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.8.0-dev0
+0.8.0
diff --git a/kafkaloghandler/kafkaloghandler.py b/kafkaloghandler/kafkaloghandler.py
index 0a09f4f..38a9b9c 100644
--- a/kafkaloghandler/kafkaloghandler.py
+++ b/kafkaloghandler/kafkaloghandler.py
@@ -16,10 +16,11 @@
# kafkaloghandler - logging handler that sends to Kafka
-import json
import confluent_kafka
+import json
import logging
import sys
+from datetime import datetime
class KafkaLogHandler(logging.Handler):
@@ -31,7 +32,7 @@
timeout=10.0, # kafka connection timeout
flatten=5, # maximum depth of dict flattening
separator=".", # separator used when flattening
- blacklist=["_logger"], # keys excluded from messages
+ blacklist=["_logger", "_name"], # keys excluded from messages
):
logging.Handler.__init__(self)
@@ -54,8 +55,8 @@
self.producer = confluent_kafka.Producer(**producer_config)
- except confluent_kafka.KafkaError, e:
- print "Kafka Error: %s" % e
+ except confluent_kafka.KafkaError as e:
+ print("Kafka Error: %s" % e)
# die if there's an error
sys.exit(1)
@@ -68,7 +69,7 @@
flattened = {}
- for k, v in toflatten.iteritems():
+ for k, v in toflatten.items():
prefix = "%s%s%s" % (ns, self.separator, k)
@@ -81,17 +82,47 @@
def emit(self, record):
+ # make a dict from LogRecord
+ rec = vars(record)
+
recvars = {}
message_key = self.key
+ # structlog puts all arguments under a 'msg' dict, whereas
+ # with normal logging 'msg' is a string. If 'msg' is a dict,
+ # merge it with 'rec', and remove it.
+ if 'msg' in rec and isinstance(rec['msg'], dict):
+ rec.update(rec['msg'])
+ del rec['msg']
+
# fixup any structured arguments
- for k, v in vars(record).iteritems():
+ for k, v in rec.items():
+
# remove any items with keys in blacklist
if k in self.blacklist:
continue
- # if a "key" is found, use as the kafka key and remove
+ # conform vars to be closer to logstash format
+
+ # 'created' is naive (no timezone) time, per:
+ # https://github.com/python/cpython/blob/2.7/Lib/logging/__init__.py#L242
+ if k is 'created':
+ recvars['@timestamp'] = \
+ datetime.utcfromtimestamp(v).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
+ continue
+
+ # thread is an int in Python, but a string in others (Java), so rename
+ if k is 'thread':
+ recvars['threadId'] = v
+ continue
+
+ # 'message' is used more than 'msg' (standard) or 'event' (structlog)
+ if k in ['msg', 'event']:
+ recvars['message'] = v
+ continue
+
+ # if a 'key' is found, use as the kafka key and remove
if k is 'key':
message_key = v
continue
@@ -101,6 +132,7 @@
recvars.update(self._flatten(k, v, self.flatten))
continue
+ # pass remaining variables unchanged
recvars[k] = v
# Replace unserializable items with repr version.
@@ -118,8 +150,8 @@
try:
self.producer.produce(self.topic, json_recvars, message_key)
- except confluent_kafka.KafkaError, e:
- print "Kafka Error: %s" % e
+ except confluent_kafka.KafkaError as e:
+ print("Kafka Error: %s" % e)
# currently don't do anything on failure...
pass
diff --git a/setup.py b/setup.py
index 3b98a2a..c38e578 100644
--- a/setup.py
+++ b/setup.py
@@ -38,17 +38,19 @@
description='Kafka Logging Handler',
long_description=readme(),
classifiers=[
+ 'Development Status :: 4 - Beta',
'Topic :: System :: Logging',
'Topic :: Internet :: Log Analysis',
'License :: OSI Approved :: Apache Software License',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
],
- keywords='kafka logging',
+ keywords=['kafka', 'logging', 'log handler', 'message bus'],
url='https://gerrit.opencord.org/gitweb?p=kafkaloghandler.git',
author='Zack Williams',
author_email='zdw@opennetworking.org',
packages=['kafkaloghandler'],
license='Apache v2',
install_requires=parse_requirements('requirements.txt'),
- include_package_data=True,
- zip_safe=False,
+ data_files=[('', ['VERSION', 'requirements.txt', 'README.rst'])],
)
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
index 52f9feb..420d1b2 100644
--- a/tests/test_kafkaloghandler.py
+++ b/tests/test_kafkaloghandler.py
@@ -14,12 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from kafkaloghandler import KafkaLogHandler
import json
import logging
import unittest
-from mock import patch
-from kafkaloghandler import KafkaLogHandler
+# mock is a part of unittest in python 3
+try:
+ from mock import patch
+except ImportError:
+ from unittest.mock import patch
class FakeKafkaProducer():
@@ -66,7 +70,7 @@
self.logger.warn('Warning')
- emit.assert_called_once()
+ assert emit.call_count == 1
def test_with_structure(self):
'''
@@ -93,7 +97,7 @@
decoded_message = json.loads(klh.producer.value)
self.assertEqual(klh.producer.topic, 'testtopic')
- self.assertEqual(decoded_message['msg'], 'structured')
+ self.assertEqual(decoded_message['message'], 'structured')
self.assertEqual(decoded_message['foo'], 'value1')
self.assertEqual(decoded_message['bar'], 'value2')
self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
@@ -122,7 +126,7 @@
decoded_message = json.loads(klh.producer.value)
- self.assertEqual(decoded_message['msg'], 'noflatten')
+ self.assertEqual(decoded_message['message'], 'noflatten')
self.assertEqual(decoded_message['foo'], 'value1')
self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
@@ -151,7 +155,7 @@
decoded_message = json.loads(klh.producer.value)
- self.assertEqual(decoded_message['msg'], 'oneflatten')
+ self.assertEqual(decoded_message['message'], 'oneflatten')
self.assertEqual(decoded_message['foo'], 'value1')
self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
@@ -181,7 +185,7 @@
self.assertEqual(klh.producer.key, 'klh')
self.assertEqual(decoded_message1['foo'], 'value1')
- self.assertEqual(decoded_message1['msg'], 'defaultkey')
+ self.assertEqual(decoded_message1['message'], 'defaultkey')
self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
# log with key overridden
@@ -191,7 +195,7 @@
decoded_message2 = json.loads(klh.producer.value)
self.assertEqual(klh.producer.key, 'override')
- self.assertEqual(decoded_message2['msg'], 'keyoverride')
+ self.assertEqual(decoded_message2['message'], 'keyoverride')
self.assertEqual(decoded_message2['foo'], 'value1')
self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
@@ -221,7 +225,7 @@
decoded_message = json.loads(klh.producer.value)
self.assertEqual(klh.producer.topic, 'testtopic')
- self.assertEqual(decoded_message['msg'], 'blacklist')
+ self.assertEqual(decoded_message['message'], 'blacklist')
self.assertEqual(decoded_message['foo'], 'value1')
with self.assertRaises(KeyError):
decoded_message['bar']
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..ea8d61d
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,29 @@
+; Copyright 2018-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36
+skip_missing_interpreters = true
+
+[testenv]
+deps =
+ nose2
+ confluent-kafka
+ flake8
+commands =
+ nose2 --verbose --junit-xml
+ flake8
+
+[flake8]
+max-line-length = 119
diff --git a/unittest.cfg b/unittest.cfg
index 67b89de..63d3975 100644
--- a/unittest.cfg
+++ b/unittest.cfg
@@ -5,5 +5,4 @@
[coverage]
always-on = True
coverage = kafkaloghandler
-coverage-report = term
coverage-report = xml