[SEBA-232]

Publish XOS internal events on kafka as well as redis

Switch to confluent_kafka, create XOSKafkaProducer wrapper lib

Remove nonfunctional test for connection failure

Change-Id: I4d3057fcc0b5b56022ef3f853dbe0323ef071af7
diff --git a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
index d7e5352..b5a2a06 100644
--- a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
+++ b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
@@ -17,7 +17,7 @@
 # kafkaloghandler - logging handler that sends to Kafka
 
 import json
-import kafka
+import confluent_kafka
 import logging
 import sys
 import time
@@ -36,33 +36,30 @@
 
         logging.Handler.__init__(self)
 
-        try:
-            self.producer = kafka.KafkaProducer(
-                bootstrap_servers=bootstrap_servers,
-
-                # Replace unserializable items with repr version.
-                # Otherwise, the entire log message is discarded if
-                # it contains any unserializable fields
-                value_serializer=lambda val: json.dumps(
-                    val,
-                    separators=(',', ':'),
-                    default=lambda o: repr(o),
-                    )
-                )
-
-        except kafka.errors.KafkaError, e:
-            print "Kafka Error: %s" % e
-            # die if there's an error
-            sys.exit(1)
-
+        self.bootstrap_servers = bootstrap_servers
         self.topic = topic
         self.key = key
         self.flatten = flatten
         self.blacklist = blacklist
         self.timeout = timeout
+        self.producer = None
+
+    def _connect(self):
+
+        try:
+            producer_config = {
+                'bootstrap.servers': ','.join(self.bootstrap_servers),
+            }
+
+            self.producer = confluent_kafka.Producer(**producer_config)
+
+        except confluent_kafka.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # die if there's an error
+            sys.exit(1)
 
     def _flatten(self, ns, toflatten, maxdepth):
-        """ flatten dicts creating a key_subkey_subsubkey_... hierarchy """
+        """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
 
         # avoid recursivly flattening forever
         if maxdepth < 1:
@@ -72,7 +69,7 @@
 
         for k, v in toflatten.iteritems():
 
-            prefix = "%s_%s" % (ns, k)
+            prefix = "%s.%s" % (ns, k)
 
             if isinstance(v, dict):
                 flattened.update(self._flatten(prefix, v, maxdepth-1))
@@ -97,13 +94,30 @@
 
             recvars[k] = v
 
-        self.producer.send(self.topic, key=self.key, value=recvars)
+        # Replace unserializable items with repr version.
+        # Otherwise, the log message may be discarded if it contains any
+        # unserializable fields
+        json_recvars = json.dumps(
+            recvars,
+            separators=(',', ':'),
+            default=lambda o: repr(o),
+            )
+
+        if self.producer is None:
+            self._connect()
+
+        try:
+            self.producer.produce(self.topic, json_recvars, self.key)
+
+        except confluent_kafka.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # currently don't do anything on failure...
+            pass
 
     def flush(self):
-        self.producer.flush(self.timeout)
 
-    def close(self):
-        self.producer.close(self.timeout)
+        if self.producer:
+            self.producer.flush(self.timeout)
 
 
 if __name__ == '__main__':
diff --git a/lib/kafkaloghandler/setup.py b/lib/kafkaloghandler/setup.py
index 9dbcbb2..cba89ff 100644
--- a/lib/kafkaloghandler/setup.py
+++ b/lib/kafkaloghandler/setup.py
@@ -38,7 +38,7 @@
     packages=['kafkaloghandler'],
     license='Apache v2',
     install_requires=[
-        'kafka>=1.3.5',
+        'confluent-kafka>=0.11.5',
         ],
     include_package_data=True,
     zip_safe=False,