VOL-1246 - Change to confluent-kafka-python from afkak library
to fix KafakUnavailalble Exception from afkak
Change-Id: Id9bf2d65f2972829f1c085a9b57b35a62ab73a40
diff --git a/requirements.txt b/requirements.txt
index a0641b2..56cb356 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,6 +3,7 @@
bitstring==3.1.5
cmd2==0.7.0
colorama==0.3.9
+confluent-kafka==0.11.5
cython==0.24.1
decorator==4.1.2
docker-py==1.10.6
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index ad1aad5..6e80252 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -14,13 +14,12 @@
# limitations under the License.
#
-from afkak.client import KafkaClient as _KafkaClient
-from afkak.common import (
- PRODUCER_ACK_LOCAL_WRITE,
-)
-from afkak.producer import Producer as _kafkaProducer
+
+
+from confluent_kafka import Producer as _kafkaProducer
from structlog import get_logger
from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
from zope.interface import implementer
from common.utils.consulhelpers import get_endpoint_from_consul
@@ -34,6 +33,10 @@
class KafkaProxy(object):
"""
This is a singleton proxy kafka class to hide the kafka client details.
+
+ Removed the references to the afkak libraries and added confluent-kafka-python as producer.
+ The required the adding in of twisted.internet.threads.deferToThread call to avoid any potential
+ blocking the the producer code
"""
_kafka_instance = None
@@ -114,8 +117,7 @@
pass
def _get_kafka_producer(self):
- # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
- # to a local log before sending response
+
try:
if self.kafka_endpoint.startswith('@'):
@@ -132,12 +134,10 @@
return
else:
_k_endpoint = self.kafka_endpoint
-
- self.kclient = _KafkaClient(_k_endpoint)
- self.kproducer = _kafkaProducer(self.kclient,
- req_acks=PRODUCER_ACK_LOCAL_WRITE,
- ack_timeout=self.ack_timeout,
- max_req_attempts=self.max_req_attempts)
+ self.kproducer = _kafkaProducer(
+ {'bootstrap.servers' :_k_endpoint}
+ )
+ pass
except Exception, e:
log.exception('failed-get-kafka-producer', e=e)
return
@@ -163,9 +163,9 @@
log.debug('sending-kafka-msg', topic=topic, msg=msg)
msgs = [msg]
- if self.kproducer and self.kclient and \
- self.event_bus_publisher and self.faulty is False:
- yield self.kproducer.send_messages(topic, msgs=msgs)
+ if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+ d = deferToThread(self.kproducer.produce, topic, msg)
+ yield d
log.debug('sent-kafka-msg', topic=topic, msg=msg)
else:
return