Use kafka producer properly
Calling producer.poll() is required or producer will fail with:
BufferError: Local: Queue full, instance_id: ...
Add 'extra_config' parameter to extend configuration of the kafka producer
Update tests/docs
Change-Id: I8f17437c0c24d61a6f3be5de3d4004c21fcad75a
diff --git a/README.rst b/README.rst
index d5470bd..c76886b 100644
--- a/README.rst
+++ b/README.rst
@@ -35,12 +35,23 @@
**Parameters that can be provided to KafkaLogHandler:**
*bootstrap_servers*
- List of Kafka bootstrap servers to connect to. See confluent_kafka docs.
+ List of Kafka bootstrap servers to connect to.
**default:** ``["localhost:9092"]``
+*extra_config*
+ Dictionary of extra `producer configuration
+ <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_
+ passed to librdkafka.
+
+ NOTE: The ``bootstrap_servers`` parameter will overwrite
+ ``bootstrap.servers``.
+
+ **default:** ``{}``
+
+
*timeout*
- Timeout in seconds for flushing producer queue. See confluent_kafka docs.
+ Timeout in seconds for flushing producer queue. See librdkafka docs.
**default:** ``10.0``
@@ -58,15 +69,17 @@
**default:** ``"klh"``
*flatten*
- Flattens nested dictionary keys passed as structured logging into the parent
+ Flattens nested dictionaries and lists passed as structured logging into the parent
dictionary layer, up to a certain depth.
This is useful when logging to external systems that don't have good support
for hierarchical data.
- Example: ``{'a': {'b': 'c'}}`` would be flattened to ``{'a.b': 'c'}``
+ Example dictionary: ``{'a': {'b': 'c'}}`` would be flattened to ``{'a.b': 'c'}``
- If the depth is exceeded, any remaining deeper dict will be added to the
+ Example list: ``{'a': ['b', 'c']}`` would be flattened to ``{'a.0': 'b', 'a.1': 'c'}``
+
+ If the depth is exceeded, any remaining deeper items will be added to the
output under the flattened key.
Set to ``0`` to turn off flattening.
@@ -74,7 +87,7 @@
**default:** ``5``
*separator*
- Separator used between keys when flattening.
+ Separator used between items when flattening.
**default:** ``.``
@@ -84,9 +97,7 @@
**default:** ``["_logger", "_name"]``
-Tests
-=====
+Testing
+=======
-Unit tests can be run with:
-
- nose2 --verbose --coverage-report term
+Unit tests can be run with ``tox``
diff --git a/VERSION b/VERSION
index a3df0a6..ac39a10 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.8.0
+0.9.0
diff --git a/kafkaloghandler/kafkaloghandler.py b/kafkaloghandler/kafkaloghandler.py
index 38a9b9c..a57b5f1 100644
--- a/kafkaloghandler/kafkaloghandler.py
+++ b/kafkaloghandler/kafkaloghandler.py
@@ -29,6 +29,7 @@
bootstrap_servers=["localhost:9092"],
key="klh", # kafka default key
topic="kafkaloghandler", # kafka default topic
+ extra_config={}, # extra producer configuration
timeout=10.0, # kafka connection timeout
flatten=5, # maximum depth of dict flattening
separator=".", # separator used when flattening
@@ -37,7 +38,12 @@
logging.Handler.__init__(self)
- self.bootstrap_servers = bootstrap_servers
+ # Build the configuration for the kafka producer
+ self.producer_config = extra_config
+ self.producer_config.update({
+ 'bootstrap.servers': ','.join(bootstrap_servers),
+ })
+
self.topic = topic
self.key = key
self.flatten = flatten
@@ -49,11 +55,7 @@
def _connect(self):
try:
- producer_config = {
- 'bootstrap.servers': ','.join(self.bootstrap_servers),
- }
-
- self.producer = confluent_kafka.Producer(**producer_config)
+ self.producer = confluent_kafka.Producer(**self.producer_config)
except confluent_kafka.KafkaError as e:
print("Kafka Error: %s" % e)
@@ -61,7 +63,10 @@
sys.exit(1)
def _flatten(self, ns, toflatten, maxdepth):
- """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
+ """
+ flatten dicts creating a key.subkey.subsubkey... hierarchy
+ flatten lists creating a <index>.subkey... hierarchy
+ """
# if max depth reached, return k:v dict
if maxdepth < 1:
@@ -69,11 +74,17 @@
flattened = {}
- for k, v in toflatten.items():
+ # turn dict into tuples, enumerate lists
+ if isinstance(toflatten, list):
+ tf = enumerate(toflatten)
+ else:
+ tf = toflatten.items()
+
+ for k, v in tf:
prefix = "%s%s%s" % (ns, self.separator, k)
- if isinstance(v, dict):
+ if isinstance(v, dict) or isinstance(v, list):
flattened.update(self._flatten(prefix, v, maxdepth-1))
else:
flattened[prefix] = v
@@ -127,8 +138,8 @@
message_key = v
continue
- # flatten any sub-dicts down, if enabled
- if self.flatten and isinstance(v, dict):
+ # flatten any sub-dicts/lists down, if enabled
+ if self.flatten and isinstance(v, dict) or isinstance(v, list):
recvars.update(self._flatten(k, v, self.flatten))
continue
@@ -150,6 +161,9 @@
try:
self.producer.produce(self.topic, json_recvars, message_key)
+ # recommended by https://github.com/confluentinc/confluent-kafka-python/issues/16
+ self.producer.poll(0)
+
except confluent_kafka.KafkaError as e:
print("Kafka Error: %s" % e)
# currently don't do anything on failure...
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
index 420d1b2..4e16adc 100644
--- a/tests/test_kafkaloghandler.py
+++ b/tests/test_kafkaloghandler.py
@@ -39,6 +39,9 @@
self.value = value
self.key = key
+ def poll(self, timeout=1):
+ self.poll_timeout = timeout
+
def flush(self, timeout=1):
self.flush_timeout = timeout
@@ -72,6 +75,25 @@
assert emit.call_count == 1
+ def test_config(self):
+ '''
+ tests that producer_config dictionary is built correctly
+ '''
+
+ ec = {"queue.buffering.max.messages": 1000,
+ "queue.buffering.max.kbytes": 512,
+ "bootstrap.servers": 'foo'}
+
+ klh = KafkaLogHandler(
+ bootstrap_servers=["tk1:9092", "tk2:9093"],
+ extra_config=ec,
+ topic="testtopic")
+
+ self.assertEqual(klh.producer_config,
+ {"bootstrap.servers": "tk1:9092,tk2:9093",
+ "queue.buffering.max.messages": 1000,
+ "queue.buffering.max.kbytes": 512})
+
def test_with_structure(self):
'''
tests structured serialization of log to JSON
@@ -159,6 +181,35 @@
self.assertEqual(decoded_message['foo'], 'value1')
self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
+ def test_flatten_list(self):
+ '''
+ Tests flattening of lists
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic")
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "list": ['item0', 'item1', {'dict': 'in_list'}],
+ }
+
+ self.logger.info('listflatten', extra=extra_data)
+
+ decoded_message = json.loads(klh.producer.value)
+
+ self.assertEqual(decoded_message['message'], 'listflatten')
+ self.assertEqual(decoded_message['foo'], 'value1')
+ self.assertEqual(decoded_message['list.0'], 'item0')
+ self.assertEqual(decoded_message['list.1'], 'item1')
+ self.assertEqual(decoded_message['list.2.dict'], 'in_list')
+
def test_override_key(self):
'''
Test setting the key argument to override the default