This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 978f57d..cf7afd8 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -18,11 +18,11 @@
This facade handles kafka-formatted messages from the Core, extracts the kafka
formatting and forwards the request to the concrete handler.
"""
-
+import structlog
from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
from twisted.internet import reactor
-
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
from python.adapters.interface import IAdapterInterface
from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
from python.protos.device_pb2 import Device
@@ -31,6 +31,7 @@
from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
get_messaging_proxy
+log = structlog.get_logger()
class MacAddressError(BaseException):
def __init__(self, error):
@@ -59,31 +60,34 @@
@inlineCallbacks
def start(self):
- self.log.debug('starting')
+ log.debug('starting')
@inlineCallbacks
def stop(self):
- self.log.debug('stopping')
+ log.debug('stopping')
-
+ @inlineCallbacks
def createKafkaDeviceTopic(self, deviceId):
+ log.debug("subscribing-to-topic", device_id=deviceId)
kafka_proxy = get_messaging_proxy()
device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
- kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+ # yield kafka_proxy.create_topic(topic=device_topic)
+ yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+ log.debug("subscribed-to-topic", topic=device_topic)
def adopt_device(self, device):
d = Device()
if device:
device.Unpack(d)
+ # Start the creation of a device specific topic to handle all
+ # subsequent requests from the Core. This adapter instance will
+ # handle all requests for that device.
+ reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
result = self.adapter.adopt_device(d)
# return True, self.adapter.adopt_device(d)
- # Before we return, create a device specific topic to handle all
- # subsequent requests from the Core. This adapter instance will
- # handle all requests for that device
- reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
-
return True, result
else:
return False, Error(code=ErrorCode.INVALID_PARAMETERS,