VOL-1246 - Change to confluent-kafka-python from afkak library
to fix KafakUnavailalble Exception from afkak

Change-Id: Id9bf2d65f2972829f1c085a9b57b35a62ab73a40
diff --git a/requirements.txt b/requirements.txt
index a0641b2..56cb356 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,6 +3,7 @@
 bitstring==3.1.5
 cmd2==0.7.0
 colorama==0.3.9
+confluent-kafka==0.11.5
 cython==0.24.1
 decorator==4.1.2
 docker-py==1.10.6
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index ad1aad5..6e80252 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -14,13 +14,12 @@
 # limitations under the License.
 #
 
-from afkak.client import KafkaClient as _KafkaClient
-from afkak.common import (
-    PRODUCER_ACK_LOCAL_WRITE,
-)
-from afkak.producer import Producer as _kafkaProducer
+
+
+from confluent_kafka import Producer as _kafkaProducer
 from structlog import get_logger
 from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
 from zope.interface import implementer
 
 from common.utils.consulhelpers import get_endpoint_from_consul
@@ -34,6 +33,10 @@
 class KafkaProxy(object):
     """
     This is a singleton proxy kafka class to hide the kafka client details.
+
+    Removed the references to the afkak libraries and added confluent-kafka-python as producer.
+    The required the adding in of twisted.internet.threads.deferToThread call to avoid any potential
+    blocking the the producer code
     """
     _kafka_instance = None
 
@@ -114,8 +117,7 @@
             pass
 
     def _get_kafka_producer(self):
-        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
-        #  to a local log before sending response
+
         try:
 
             if self.kafka_endpoint.startswith('@'):
@@ -132,12 +134,10 @@
                     return
             else:
                 _k_endpoint = self.kafka_endpoint
-
-            self.kclient = _KafkaClient(_k_endpoint)
-            self.kproducer = _kafkaProducer(self.kclient,
-                                            req_acks=PRODUCER_ACK_LOCAL_WRITE,
-                                            ack_timeout=self.ack_timeout,
-                                            max_req_attempts=self.max_req_attempts)
+            self.kproducer = _kafkaProducer(
+                {'bootstrap.servers' :_k_endpoint}
+                )
+            pass
         except Exception, e:
             log.exception('failed-get-kafka-producer', e=e)
             return
@@ -163,9 +163,9 @@
                 log.debug('sending-kafka-msg', topic=topic, msg=msg)
                 msgs = [msg]
 
-                if self.kproducer and self.kclient and \
-                        self.event_bus_publisher and self.faulty is False:
-                    yield self.kproducer.send_messages(topic, msgs=msgs)
+                if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+                    d =  deferToThread(self.kproducer.produce, topic, msg)
+                    yield d
                     log.debug('sent-kafka-msg', topic=topic, msg=msg)
                 else:
                     return