This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index aaa0c3c..a3de4f1 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -138,6 +138,11 @@
         except Exception as e:
             log.exception("Exception-when-stopping-messaging-proxy:", e=e)
 
+
+    @inlineCallbacks
+    def create_topic(self, topic):
+        yield self._wait_until_topic_is_ready(self.kafka_client, topic)
+
     @inlineCallbacks
     def _wait_until_topic_is_ready(self, client, topic):
         e = True
@@ -159,8 +164,9 @@
         return self.default_topic
 
     @inlineCallbacks
-    def _subscribe(self, topic, callback=None, target_cls=None):
+    def _subscribe(self, topic, offset, callback=None, target_cls=None):
         try:
+            log.debug("subscribing-to-topic-start", topic=topic)
             yield self._wait_until_topic_is_ready(self.kafka_client, topic)
             partitions = self.kafka_client.topic_partitions[topic]
             consumers = []
@@ -168,6 +174,7 @@
             # First setup the generic callback - all received messages will
             # go through that queue
             if topic not in self.topic_consumer_map:
+                log.debug("topic-not-in-consumer-map", topic=topic)
                 consumers = [Consumer(self.kafka_client, topic, partition,
                                       self._enqueue_received_message)
                              for partition in partitions]
@@ -206,15 +213,18 @@
                 log.warn("Consumers-failed", failure=failure)
 
             for c in consumers:
-                c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
+                c.start(offset).addCallbacks(cb_closed, eb_failed)
+
+            log.debug("subscribed-to-topic", topic=topic)
 
             returnValue(True)
         except Exception as e:
             log.exception("Exception-during-subscription", e=e)
             returnValue(False)
 
+    @inlineCallbacks
     def subscribe(self, topic, callback=None, target_cls=None,
-                  max_retry=3):
+                  max_retry=3, offset=OFFSET_LATEST):
         """
         Scenario 1:  invoked to subscribe to a specific topic with a
         target_cls to invoke when a message is received on that topic.  This
@@ -246,14 +256,26 @@
             return asleep(wait_time)
 
         retry = 0
-        while not self._subscribe(topic, callback=callback,
-                                  target_cls=target_cls):
-            if retry > max_retry:
-                return False
+        subscribed = False
+        while not subscribed:
+            subscribed = yield self._subscribe(topic, callback=callback,
+                                               target_cls=target_cls, offset=offset)
+            if subscribed:
+                returnValue(True)
+            elif retry > max_retry:
+                returnValue(False)
             else:
                 _backoff("subscription-not-complete", retry)
                 retry += 1
-        return True
+
+        # while not self._subscribe(topic, callback=callback,
+        #                           target_cls=target_cls):
+        #     if retry > max_retry:
+        #         return False
+        #     else:
+        #         _backoff("subscription-not-complete", retry)
+        #         retry += 1
+        # return True
 
     def unsubscribe(self, topic):
         """