This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index aaa0c3c..a3de4f1 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -138,6 +138,11 @@
except Exception as e:
log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+
+ @inlineCallbacks
+ def create_topic(self, topic):
+ yield self._wait_until_topic_is_ready(self.kafka_client, topic)
+
@inlineCallbacks
def _wait_until_topic_is_ready(self, client, topic):
e = True
@@ -159,8 +164,9 @@
return self.default_topic
@inlineCallbacks
- def _subscribe(self, topic, callback=None, target_cls=None):
+ def _subscribe(self, topic, offset, callback=None, target_cls=None):
try:
+ log.debug("subscribing-to-topic-start", topic=topic)
yield self._wait_until_topic_is_ready(self.kafka_client, topic)
partitions = self.kafka_client.topic_partitions[topic]
consumers = []
@@ -168,6 +174,7 @@
# First setup the generic callback - all received messages will
# go through that queue
if topic not in self.topic_consumer_map:
+ log.debug("topic-not-in-consumer-map", topic=topic)
consumers = [Consumer(self.kafka_client, topic, partition,
self._enqueue_received_message)
for partition in partitions]
@@ -206,15 +213,18 @@
log.warn("Consumers-failed", failure=failure)
for c in consumers:
- c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
+ c.start(offset).addCallbacks(cb_closed, eb_failed)
+
+ log.debug("subscribed-to-topic", topic=topic)
returnValue(True)
except Exception as e:
log.exception("Exception-during-subscription", e=e)
returnValue(False)
+ @inlineCallbacks
def subscribe(self, topic, callback=None, target_cls=None,
- max_retry=3):
+ max_retry=3, offset=OFFSET_LATEST):
"""
Scenario 1: invoked to subscribe to a specific topic with a
target_cls to invoke when a message is received on that topic. This
@@ -246,14 +256,26 @@
return asleep(wait_time)
retry = 0
- while not self._subscribe(topic, callback=callback,
- target_cls=target_cls):
- if retry > max_retry:
- return False
+ subscribed = False
+ while not subscribed:
+ subscribed = yield self._subscribe(topic, callback=callback,
+ target_cls=target_cls, offset=offset)
+ if subscribed:
+ returnValue(True)
+ elif retry > max_retry:
+ returnValue(False)
else:
_backoff("subscription-not-complete", retry)
retry += 1
- return True
+
+ # while not self._subscribe(topic, callback=callback,
+ # target_cls=target_cls):
+ # if retry > max_retry:
+ # return False
+ # else:
+ # _backoff("subscription-not-complete", retry)
+ # retry += 1
+ # return True
def unsubscribe(self, topic):
"""