This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 978f57d..cf7afd8 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -18,11 +18,11 @@
 This facade handles kafka-formatted messages from the Core, extracts the kafka
 formatting and forwards the request to the concrete handler.
 """
-
+import structlog
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 from twisted.internet import reactor
-
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
 from python.adapters.interface import IAdapterInterface
 from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
@@ -31,6 +31,7 @@
 from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
     get_messaging_proxy
 
+log = structlog.get_logger()
 
 class MacAddressError(BaseException):
     def __init__(self, error):
@@ -59,31 +60,34 @@
 
     @inlineCallbacks
     def start(self):
-        self.log.debug('starting')
+        log.debug('starting')
 
     @inlineCallbacks
     def stop(self):
-        self.log.debug('stopping')
+        log.debug('stopping')
 
-
+    @inlineCallbacks
     def createKafkaDeviceTopic(self, deviceId):
+        log.debug("subscribing-to-topic", device_id=deviceId)
         kafka_proxy = get_messaging_proxy()
         device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
-        kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+        # yield kafka_proxy.create_topic(topic=device_topic)
+        yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+        log.debug("subscribed-to-topic", topic=device_topic)
 
     def adopt_device(self, device):
         d = Device()
         if device:
             device.Unpack(d)
 
+            # Start the creation of a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device.
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
             result = self.adapter.adopt_device(d)
             # return True, self.adapter.adopt_device(d)
 
-            # Before we return, create a device specific topic to handle all
-            # subsequent requests from the Core. This adapter instance will
-            # handle all requests for that device
-            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
-
             return True, result
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index aaa0c3c..a3de4f1 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -138,6 +138,11 @@
         except Exception as e:
             log.exception("Exception-when-stopping-messaging-proxy:", e=e)
 
+
+    @inlineCallbacks
+    def create_topic(self, topic):
+        yield self._wait_until_topic_is_ready(self.kafka_client, topic)
+
     @inlineCallbacks
     def _wait_until_topic_is_ready(self, client, topic):
         e = True
@@ -159,8 +164,9 @@
         return self.default_topic
 
     @inlineCallbacks
-    def _subscribe(self, topic, callback=None, target_cls=None):
+    def _subscribe(self, topic, offset, callback=None, target_cls=None):
         try:
+            log.debug("subscribing-to-topic-start", topic=topic)
             yield self._wait_until_topic_is_ready(self.kafka_client, topic)
             partitions = self.kafka_client.topic_partitions[topic]
             consumers = []
@@ -168,6 +174,7 @@
             # First setup the generic callback - all received messages will
             # go through that queue
             if topic not in self.topic_consumer_map:
+                log.debug("topic-not-in-consumer-map", topic=topic)
                 consumers = [Consumer(self.kafka_client, topic, partition,
                                       self._enqueue_received_message)
                              for partition in partitions]
@@ -206,15 +213,18 @@
                 log.warn("Consumers-failed", failure=failure)
 
             for c in consumers:
-                c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
+                c.start(offset).addCallbacks(cb_closed, eb_failed)
+
+            log.debug("subscribed-to-topic", topic=topic)
 
             returnValue(True)
         except Exception as e:
             log.exception("Exception-during-subscription", e=e)
             returnValue(False)
 
+    @inlineCallbacks
     def subscribe(self, topic, callback=None, target_cls=None,
-                  max_retry=3):
+                  max_retry=3, offset=OFFSET_LATEST):
         """
         Scenario 1:  invoked to subscribe to a specific topic with a
         target_cls to invoke when a message is received on that topic.  This
@@ -246,14 +256,26 @@
             return asleep(wait_time)
 
         retry = 0
-        while not self._subscribe(topic, callback=callback,
-                                  target_cls=target_cls):
-            if retry > max_retry:
-                return False
+        subscribed = False
+        while not subscribed:
+            subscribed = yield self._subscribe(topic, callback=callback,
+                                               target_cls=target_cls, offset=offset)
+            if subscribed:
+                returnValue(True)
+            elif retry > max_retry:
+                returnValue(False)
             else:
                 _backoff("subscription-not-complete", retry)
                 retry += 1
-        return True
+
+        # while not self._subscribe(topic, callback=callback,
+        #                           target_cls=target_cls):
+        #     if retry > max_retry:
+        #         return False
+        #     else:
+        #         _backoff("subscription-not-complete", retry)
+        #         retry += 1
+        # return True
 
     def unsubscribe(self, topic):
         """
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index 6dcb10f..d596334 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -136,6 +136,25 @@
             log.exception('failed-get-kafka-producer', e=e)
             return
 
+
+    # @inlineCallbacks
+    # def wait_until_topic_is_ready(self, topic):
+    #     #  Assumes "auto.create.topics.enable" is set in the broker
+    #     #  configuration
+    #     e = True
+    #     while e:
+    #         yield self.kclient.load_metadata_for_topics(topic)
+    #         e = self.kclient.metadata_error_for_topic(topic)
+    #         if e:
+    #             log.debug("Topic-not-ready-retrying...", topic=topic)
+
+
+    @inlineCallbacks
+    def create_topic(self, topic):
+        # Assume auto create is enabled on the broker configuration
+        yield self.wait_until_topic_is_ready(topic)
+
+
     @inlineCallbacks
     def send_message(self, topic, msg):
         assert topic is not None