[SEBA-265]
Poll for Kafka callback, flush at exit
Change-Id: I56b41f86c3df0463590682f6e6230de880fe6a4f
diff --git a/lib/xos-kafka/xoskafka/xoskafkaproducer.py b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
index 6781311..b4134d5 100644
--- a/lib/xos-kafka/xoskafka/xoskafkaproducer.py
+++ b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
@@ -64,9 +64,16 @@
callback=cls._kafka_delivery_callback
)
+ # see https://github.com/confluentinc/confluent-kafka-python/issues/16
+ kafka_producer.poll(0)
+
except confluent_kafka.KafkaError, err:
log.exception("Kafka Error", err)
+ def __del__(self):
+ if kafka_producer is not None:
+ kafka_producer.flush()
+
@staticmethod
def _kafka_delivery_callback(err, msg):
if err: