Use kafka producer properly

Calling producer.poll() is required or producer will fail with:

    BufferError: Local: Queue full, instance_id: ...

Add 'extra_config' parameter to extend configuration of the kafka producer

Update tests/docs

Change-Id: I8f17437c0c24d61a6f3be5de3d4004c21fcad75a
diff --git a/README.rst b/README.rst
index d5470bd..c76886b 100644
--- a/README.rst
+++ b/README.rst
@@ -35,12 +35,23 @@
 **Parameters that can be provided to KafkaLogHandler:**
 
 *bootstrap_servers*
-  List of Kafka bootstrap servers to connect to. See confluent_kafka docs.
+  List of Kafka bootstrap servers to connect to.
 
   **default:** ``["localhost:9092"]``
 
+*extra_config*
+  Dictionary of extra `producer configuration
+  <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_
+  passed to librdkafka.
+
+  NOTE: The ``bootstrap_servers`` parameter will overwrite
+  ``bootstrap.servers``.
+
+  **default:** ``{}``
+
+
 *timeout*
-  Timeout in seconds for flushing producer queue. See confluent_kafka docs.
+  Timeout in seconds for flushing producer queue. See librdkafka docs.
 
   **default:** ``10.0``
 
@@ -58,15 +69,17 @@
   **default:** ``"klh"``
 
 *flatten*
-  Flattens nested dictionary keys passed as structured logging into the parent
+  Flattens nested dictionaries and lists passed as structured logging into the parent
   dictionary layer, up to a certain depth.
 
   This is useful when logging to external systems that don't have good support
   for hierarchical data.
 
-  Example: ``{'a': {'b': 'c'}}`` would be flattened to ``{'a.b': 'c'}``
+  Example dictionary: ``{'a': {'b': 'c'}}`` would be flattened to ``{'a.b': 'c'}``
 
-  If the depth is exceeded, any remaining deeper dict will be added to the
+  Example list: ``{'a': ['b', 'c']}`` would be flattened to ``{'a.0': 'b', 'a.1': 'c'}``
+
+  If the depth is exceeded, any remaining deeper items will be added to the
   output under the flattened key.
 
   Set to ``0`` to turn off flattening.
@@ -74,7 +87,7 @@
   **default:** ``5``
 
 *separator*
-  Separator used between keys when flattening.
+  Separator used between items when flattening.
 
   **default:** ``.``
 
@@ -84,9 +97,7 @@
   **default:** ``["_logger", "_name"]``
 
 
-Tests
-=====
+Testing
+=======
 
-Unit tests can be run with:
-
-   nose2 --verbose --coverage-report term
+Unit tests can be run with ``tox``
diff --git a/VERSION b/VERSION
index a3df0a6..ac39a10 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.8.0
+0.9.0
diff --git a/kafkaloghandler/kafkaloghandler.py b/kafkaloghandler/kafkaloghandler.py
index 38a9b9c..a57b5f1 100644
--- a/kafkaloghandler/kafkaloghandler.py
+++ b/kafkaloghandler/kafkaloghandler.py
@@ -29,6 +29,7 @@
                  bootstrap_servers=["localhost:9092"],
                  key="klh",  # kafka default key
                  topic="kafkaloghandler",  # kafka default topic
+                 extra_config={},  # extra producer configuration
                  timeout=10.0,  # kafka connection timeout
                  flatten=5,  # maximum depth of dict flattening
                  separator=".",  # separator used when flattening
@@ -37,7 +38,12 @@
 
         logging.Handler.__init__(self)
 
-        self.bootstrap_servers = bootstrap_servers
+        # Build the configuration for the kafka producer
+        self.producer_config = extra_config
+        self.producer_config.update({
+                'bootstrap.servers': ','.join(bootstrap_servers),
+            })
+
         self.topic = topic
         self.key = key
         self.flatten = flatten
@@ -49,11 +55,7 @@
     def _connect(self):
 
         try:
-            producer_config = {
-                'bootstrap.servers': ','.join(self.bootstrap_servers),
-            }
-
-            self.producer = confluent_kafka.Producer(**producer_config)
+            self.producer = confluent_kafka.Producer(**self.producer_config)
 
         except confluent_kafka.KafkaError as e:
             print("Kafka Error: %s" % e)
@@ -61,7 +63,10 @@
             sys.exit(1)
 
     def _flatten(self, ns, toflatten, maxdepth):
-        """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
+        """
+        flatten dicts creating a key.subkey.subsubkey... hierarchy
+        flatten lists creating a <index>.subkey... hierarchy
+        """
 
         # if max depth reached, return k:v dict
         if maxdepth < 1:
@@ -69,11 +74,17 @@
 
         flattened = {}
 
-        for k, v in toflatten.items():
+        # turn dict into tuples, enumerate lists
+        if isinstance(toflatten, list):
+            tf = enumerate(toflatten)
+        else:
+            tf = toflatten.items()
+
+        for k, v in tf:
 
             prefix = "%s%s%s" % (ns, self.separator, k)
 
-            if isinstance(v, dict):
+            if isinstance(v, dict) or isinstance(v, list):
                 flattened.update(self._flatten(prefix, v, maxdepth-1))
             else:
                 flattened[prefix] = v
@@ -127,8 +138,8 @@
                 message_key = v
                 continue
 
-            # flatten any sub-dicts down, if enabled
-            if self.flatten and isinstance(v, dict):
+            # flatten any sub-dicts/lists down, if enabled
+            if self.flatten and isinstance(v, dict) or isinstance(v, list):
                 recvars.update(self._flatten(k, v, self.flatten))
                 continue
 
@@ -150,6 +161,9 @@
         try:
             self.producer.produce(self.topic, json_recvars, message_key)
 
+            # recommended by https://github.com/confluentinc/confluent-kafka-python/issues/16
+            self.producer.poll(0)
+
         except confluent_kafka.KafkaError as e:
             print("Kafka Error: %s" % e)
             # currently don't do anything on failure...
diff --git a/tests/test_kafkaloghandler.py b/tests/test_kafkaloghandler.py
index 420d1b2..4e16adc 100644
--- a/tests/test_kafkaloghandler.py
+++ b/tests/test_kafkaloghandler.py
@@ -39,6 +39,9 @@
         self.value = value
         self.key = key
 
+    def poll(self, timeout=1):
+        self.poll_timeout = timeout
+
     def flush(self, timeout=1):
         self.flush_timeout = timeout
 
@@ -72,6 +75,25 @@
 
             assert emit.call_count == 1
 
+    def test_config(self):
+        '''
+        tests that producer_config dictionary is built correctly
+        '''
+
+        ec = {"queue.buffering.max.messages": 1000,
+              "queue.buffering.max.kbytes": 512,
+              "bootstrap.servers": 'foo'}
+
+        klh = KafkaLogHandler(
+                bootstrap_servers=["tk1:9092", "tk2:9093"],
+                extra_config=ec,
+                topic="testtopic")
+
+        self.assertEqual(klh.producer_config,
+                         {"bootstrap.servers": "tk1:9092,tk2:9093",
+                          "queue.buffering.max.messages": 1000,
+                          "queue.buffering.max.kbytes": 512})
+
     def test_with_structure(self):
         '''
         tests structured serialization of log to JSON
@@ -159,6 +181,35 @@
             self.assertEqual(decoded_message['foo'], 'value1')
             self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
 
+    def test_flatten_list(self):
+        '''
+        Tests flattening of lists
+        '''
+
+        with patch.object(KafkaLogHandler, '_connect'):
+
+            klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
+                                  topic="testtopic")
+
+            klh.producer = FakeKafkaProducer()
+
+            self.logger.addHandler(klh)
+
+            extra_data = {
+                "foo": "value1",
+                "list": ['item0', 'item1', {'dict': 'in_list'}],
+            }
+
+            self.logger.info('listflatten', extra=extra_data)
+
+            decoded_message = json.loads(klh.producer.value)
+
+            self.assertEqual(decoded_message['message'], 'listflatten')
+            self.assertEqual(decoded_message['foo'], 'value1')
+            self.assertEqual(decoded_message['list.0'], 'item0')
+            self.assertEqual(decoded_message['list.1'], 'item1')
+            self.assertEqual(decoded_message['list.2.dict'], 'in_list')
+
     def test_override_key(self):
         '''
         Test setting the key argument to override the default