This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 978f57d..cf7afd8 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -18,11 +18,11 @@
 This facade handles kafka-formatted messages from the Core, extracts the kafka
 formatting and forwards the request to the concrete handler.
 """
-
+import structlog
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 from twisted.internet import reactor
-
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
 from python.adapters.interface import IAdapterInterface
 from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
@@ -31,6 +31,7 @@
 from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
     get_messaging_proxy
 
+log = structlog.get_logger()
 
 class MacAddressError(BaseException):
     def __init__(self, error):
@@ -59,31 +60,34 @@
 
     @inlineCallbacks
     def start(self):
-        self.log.debug('starting')
+        log.debug('starting')
 
     @inlineCallbacks
     def stop(self):
-        self.log.debug('stopping')
+        log.debug('stopping')
 
-
+    @inlineCallbacks
     def createKafkaDeviceTopic(self, deviceId):
+        log.debug("subscribing-to-topic", device_id=deviceId)
         kafka_proxy = get_messaging_proxy()
         device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
-        kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+        # yield kafka_proxy.create_topic(topic=device_topic)
+        yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+        log.debug("subscribed-to-topic", topic=device_topic)
 
     def adopt_device(self, device):
         d = Device()
         if device:
             device.Unpack(d)
 
+            # Start the creation of a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device.
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
             result = self.adapter.adopt_device(d)
             # return True, self.adapter.adopt_device(d)
 
-            # Before we return, create a device specific topic to handle all
-            # subsequent requests from the Core. This adapter instance will
-            # handle all requests for that device
-            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
-
             return True, result
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,