Use kafka producer properly
Calling producer.poll() is required or producer will fail with:
BufferError: Local: Queue full, instance_id: ...
Add 'extra_config' parameter to extend configuration of the kafka producer
Update tests/docs
Change-Id: I8f17437c0c24d61a6f3be5de3d4004c21fcad75a
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
index 420d1b2..4e16adc 100644
--- a/tests/test_kafkaloghandler.py
+++ b/tests/test_kafkaloghandler.py
@@ -39,6 +39,9 @@
self.value = value
self.key = key
+ def poll(self, timeout=1):
+ self.poll_timeout = timeout
+
def flush(self, timeout=1):
self.flush_timeout = timeout
@@ -72,6 +75,25 @@
assert emit.call_count == 1
+ def test_config(self):
+ '''
+ tests that producer_config dictionary is built correctly
+ '''
+
+ ec = {"queue.buffering.max.messages": 1000,
+ "queue.buffering.max.kbytes": 512,
+ "bootstrap.servers": 'foo'}
+
+ klh = KafkaLogHandler(
+ bootstrap_servers=["tk1:9092", "tk2:9093"],
+ extra_config=ec,
+ topic="testtopic")
+
+ self.assertEqual(klh.producer_config,
+ {"bootstrap.servers": "tk1:9092,tk2:9093",
+ "queue.buffering.max.messages": 1000,
+ "queue.buffering.max.kbytes": 512})
+
def test_with_structure(self):
'''
tests structured serialization of log to JSON
@@ -159,6 +181,35 @@
self.assertEqual(decoded_message['foo'], 'value1')
self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
+ def test_flatten_list(self):
+ '''
+ Tests flattening of lists
+ '''
+
+ with patch.object(KafkaLogHandler, '_connect'):
+
+ klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic")
+
+ klh.producer = FakeKafkaProducer()
+
+ self.logger.addHandler(klh)
+
+ extra_data = {
+ "foo": "value1",
+ "list": ['item0', 'item1', {'dict': 'in_list'}],
+ }
+
+ self.logger.info('listflatten', extra=extra_data)
+
+ decoded_message = json.loads(klh.producer.value)
+
+ self.assertEqual(decoded_message['message'], 'listflatten')
+ self.assertEqual(decoded_message['foo'], 'value1')
+ self.assertEqual(decoded_message['list.0'], 'item0')
+ self.assertEqual(decoded_message['list.1'], 'item1')
+ self.assertEqual(decoded_message['list.2.dict'], 'in_list')
+
def test_override_key(self):
'''
Test setting the key argument to override the default