This update consists of the following changes:
1) Add GroupConsumer to the Go sarama_client and modify the Core
code to use a groupConsumer instead of a partition consumer. This
change will ensure that multiple consumers (with different group Ids)
can consume kafka messages from the same topic.
2) Remove afkak kafka client and replace it with confluent kakfa,
a change done in voltha 1.x. Modify the code accordingly.
3) Add a Group Consumer to the Python kakfa client such that
several instances of an Adapter can consume the same messages from
the same topic.
4) Set the datapath_id for the logical device in the Core.
Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 0efb811..de8ae0b 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -22,6 +22,7 @@
from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
from twisted.internet import reactor
+
from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
from python.adapters.interface import IAdapterInterface
from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
@@ -29,7 +30,7 @@
from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
FlowGroupChanges, ofp_packet_out
from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
- get_messaging_proxy
+ get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
log = structlog.get_logger()
@@ -72,7 +73,7 @@
kafka_proxy = get_messaging_proxy()
device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
# yield kafka_proxy.create_topic(topic=device_topic)
- yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+ yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
log.debug("subscribed-to-topic", topic=device_topic)
def adopt_device(self, device):