This update consists of the following changes:
    1) Add GroupConsumer to the Go sarama_client and modify the Core
    code to use a groupConsumer instead of a partition consumer. This
    change will ensure that multiple consumers (with different group Ids)
    can consume kafka messages from the same topic.
    2) Remove afkak kafka client and replace it with confluent kakfa,
    a change done in voltha 1.x. Modify the code accordingly.
    3) Add a Group Consumer to the Python kakfa client such that
    several instances of an Adapter can consume the same messages from
    the same topic.
    4) Set the datapath_id for the logical device in the Core.

Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 0efb811..de8ae0b 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -22,6 +22,7 @@
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 from twisted.internet import reactor
+
 from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
 from python.adapters.interface import IAdapterInterface
 from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
@@ -29,7 +30,7 @@
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
 from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
-    get_messaging_proxy
+    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
 
 log = structlog.get_logger()
 
@@ -72,7 +73,7 @@
         kafka_proxy = get_messaging_proxy()
         device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
         # yield kafka_proxy.create_topic(topic=device_topic)
-        yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+        yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
         log.debug("subscribed-to-topic", topic=device_topic)
 
     def adopt_device(self, device):
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index fbb0834..5cad2e8 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -18,8 +18,6 @@
 from uuid import uuid4
 
 import structlog
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
     DeferredQueue, gatherResults
@@ -34,6 +32,9 @@
 
 log = structlog.get_logger()
 
+KAFKA_OFFSET_LATEST = 'latest'
+KAFKA_OFFSET_EARLIEST = 'earliest'
+
 
 class KafkaMessagingError(BaseException):
     def __init__(self, error):
@@ -48,6 +49,7 @@
                  kafka_host_port,
                  kv_store,
                  default_topic,
+                 group_id_prefix,
                  target_cls):
         """
         Initialize the kafka proxy.  This is a singleton (may change to
@@ -67,15 +69,15 @@
         self.kafka_host_port = kafka_host_port
         self.kv_store = kv_store
         self.default_topic = default_topic
+        self.default_group_id = "_".join((group_id_prefix, default_topic))
         self.target_cls = target_cls
         self.topic_target_cls_map = {}
-        self.topic_consumer_map = {}
         self.topic_callback_map = {}
         self.subscribers = {}
-        self.kafka_client = None
         self.kafka_proxy = None
         self.transaction_id_deferred_map = {}
         self.received_msg_queue = DeferredQueue()
+        self.stopped = False
 
         self.init_time = 0
         self.init_received_time = 0
@@ -91,11 +93,7 @@
 
     def start(self):
         try:
-            # Create the kafka client
-            # assert self.kafka_host is not None
-            # assert self.kafka_port is not None
-            # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
-            self.kafka_client = KafkaClient(self.kafka_host_port)
+            log.debug("KafkaProxy-starting")
 
             # Get the kafka proxy instance.  If it does not exist then
             # create it
@@ -110,12 +108,17 @@
             # Start the queue to handle incoming messages
             reactor.callLater(0, self._received_message_processing_loop)
 
-            # Start listening for incoming messages
-            reactor.callLater(0, self.subscribe, self.default_topic,
-                              target_cls=self.target_cls)
+            # Subscribe using the default topic and default group id.  Whenever
+            # a message is received on that topic then teh target_cls will be
+            # invoked.
+            reactor.callLater(0, self.subscribe,
+                              topic=self.default_topic,
+                              target_cls=self.target_cls,
+                              group_id=self.default_group_id)
 
             # Setup the singleton instance
             IKafkaMessagingProxy._kafka_messaging_instance = self
+            log.debug("KafkaProxy-started")
         except Exception as e:
             log.exception("Failed-to-start-proxy", e=e)
 
@@ -126,37 +129,14 @@
         """
         log.debug("Stopping-messaging-proxy ...")
         try:
-            # Stop all the consumers
-            deferred_list = []
-            for key, values in self.topic_consumer_map.iteritems():
-                deferred_list.extend([c.stop() for c in values])
-
-            if not deferred_list:
-                d = gatherResults(deferred_list)
-                d.addCallback(lambda result: self.kafka_client.close())
+            # Stop the kafka proxy.  This will stop all the consumers
+            # and producers
+            self.stopped = True
+            self.kafka_proxy.stop()
             log.debug("Messaging-proxy-stopped.")
         except Exception as e:
             log.exception("Exception-when-stopping-messaging-proxy:", e=e)
 
-
-    @inlineCallbacks
-    def create_topic(self, topic):
-        yield self._wait_until_topic_is_ready(self.kafka_client, topic)
-
-    @inlineCallbacks
-    def _wait_until_topic_is_ready(self, client, topic):
-        e = True
-        while e:
-            yield client.load_metadata_for_topics(topic)
-            e = client.metadata_error_for_topic(topic)
-            if e:
-                log.debug("Topic-not-ready-retrying...", topic=topic)
-
-    def _clear_backoff(self):
-        if self.retries:
-            log.info('reconnected-to-consul', after_retries=self.retries)
-            self.retries = 0
-
     def get_target_cls(self):
         return self.target_cls
 
@@ -164,24 +144,13 @@
         return self.default_topic
 
     @inlineCallbacks
-    def _subscribe(self, topic, offset, callback=None, target_cls=None):
+    def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
+                                  target_cls=None):
         try:
             log.debug("subscribing-to-topic-start", topic=topic)
-            yield self._wait_until_topic_is_ready(self.kafka_client, topic)
-            partitions = self.kafka_client.topic_partitions[topic]
-            consumers = []
-
-            # First setup the generic callback - all received messages will
-            # go through that queue
-            if topic not in self.topic_consumer_map:
-                log.debug("topic-not-in-consumer-map", topic=topic)
-                consumers = [Consumer(self.kafka_client, topic, partition,
-                                      self._enqueue_received_message)
-                             for partition in partitions]
-                self.topic_consumer_map[topic] = consumers
-
-            log.debug("_subscribe", topic=topic,
-                      consumermap=self.topic_consumer_map)
+            yield self.kafka_proxy.subscribe(topic,
+                                             self._enqueue_received_group_message,
+                                             group_id, offset)
 
             if target_cls is not None and callback is None:
                 # Scenario #1
@@ -198,25 +167,6 @@
             else:
                 log.warn("invalid-parameters")
 
-            def cb_closed(result):
-                """
-                Called when a consumer cleanly stops.
-                """
-                log.debug("Consumers-cleanly-stopped")
-
-            def eb_failed(failure):
-                """
-                Called when a consumer fails due to an uncaught exception in the
-                processing callback or a network error on shutdown. In this case we
-                simply log the error.
-                """
-                log.warn("Consumers-failed", failure=failure)
-
-            for c in consumers:
-                c.start(offset).addCallbacks(cb_closed, eb_failed)
-
-            log.debug("subscribed-to-topic", topic=topic)
-
             returnValue(True)
         except Exception as e:
             log.exception("Exception-during-subscription", e=e)
@@ -224,7 +174,7 @@
 
     @inlineCallbacks
     def subscribe(self, topic, callback=None, target_cls=None,
-                  max_retry=3, offset=OFFSET_LATEST):
+                  max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
         """
         Scenario 1:  invoked to subscribe to a specific topic with a
         target_cls to invoke when a message is received on that topic.  This
@@ -245,6 +195,8 @@
         :param max_retry:  the number of retries before reporting failure
         to subscribe.  This caters for scenario where the kafka topic is not
         ready.
+        :param group_id:  The ID of the group the consumer is subscribing to
+        :param offset: The topic offset on the kafka bus from where message consumption will start
         :return: True on success, False on failure
         """
         RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
@@ -253,13 +205,20 @@
             wait_time = RETRY_BACKOFF[min(retries,
                                           len(RETRY_BACKOFF) - 1)]
             log.info(msg, retry_in=wait_time)
-            return asleep(wait_time)
+            return asleep.asleep(wait_time)
+
+        log.debug("subscribing", topic=topic, group_id=group_id,
+                  callback=callback, target=target_cls)
 
         retry = 0
         subscribed = False
+        if group_id is None:
+            group_id = self.default_group_id
         while not subscribed:
-            subscribed = yield self._subscribe(topic, callback=callback,
-                                               target_cls=target_cls, offset=offset)
+            subscribed = yield self._subscribe_group_consumer(group_id, topic,
+                                                              callback=callback,
+                                                              target_cls=target_cls,
+                                                              offset=offset)
             if subscribed:
                 returnValue(True)
             elif retry > max_retry:
@@ -268,51 +227,56 @@
                 _backoff("subscription-not-complete", retry)
                 retry += 1
 
-        # while not self._subscribe(topic, callback=callback,
-        #                           target_cls=target_cls):
-        #     if retry > max_retry:
-        #         return False
-        #     else:
-        #         _backoff("subscription-not-complete", retry)
-        #         retry += 1
-        # return True
-
-    def unsubscribe(self, topic):
+    def unsubscribe(self, topic, callback=None, target_cls=None):
         """
         Invoked when unsubscribing to a topic
-        :param topic: topic to unsubscibe from
+        :param topic: topic to unsubscribe from
+        :param callback:  the callback used when subscribing to the topic, if any
+        :param target_cls: the targert class used when subscribing to the topic, if any
         :return: None on success or Exception on failure
         """
         log.debug("Unsubscribing-to-topic", topic=topic)
 
-        def remove_topic(topic):
-            if topic in self.topic_consumer_map:
-                del self.topic_consumer_map[topic]
-
         try:
-            if topic in self.topic_consumer_map:
-                consumers = self.topic_consumer_map[topic]
-                d = gatherResults([c.stop() for c in consumers])
-                d.addCallback(remove_topic, topic)
-                log.debug("Unsubscribed-to-topic.", topic=topic)
-            else:
-                log.debug("Topic-does-not-exist.", topic=topic)
+            self.kafka_proxy.unsubscribe(topic,
+                                         self._enqueue_received_group_message)
+
+            if callback is None and target_cls is None:
+                log.error("both-call-and-target-cls-cannot-be-none",
+                          topic=topic)
+                raise KafkaMessagingError(
+                    error="both-call-and-target-cls-cannot-be-none")
+
+            if target_cls is not None and topic in self.topic_target_cls_map:
+                del self.topic_target_cls_map[topic]
+
+            if callback is not None and topic in self.topic_callback_map:
+                index = 0
+                for cb in self.topic_callback_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callback_map[topic]):
+                    self.topic_callback_map[topic].pop(index)
+
+                if len(self.topic_callback_map[topic]) == 0:
+                    del self.topic_callback_map[topic]
         except Exception as e:
-            log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+            log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
+                          e=e)
+            return e
 
     @inlineCallbacks
-    def _enqueue_received_message(self, reactor, message_list):
+    def _enqueue_received_group_message(self, msg):
         """
         Internal method to continuously queue all received messaged
         irrespective of topic
-        :param reactor: A requirement by the Twisted Python kafka library
-        :param message_list: Received list of messages
+        :param msg: Received message
         :return: None on success, Exception on failure
         """
         try:
-            for m in message_list:
-                log.debug("received-msg", msg=m)
-                yield self.received_msg_queue.put(m)
+            log.debug("received-msg", msg=msg)
+            yield self.received_msg_queue.put(msg)
         except Exception as e:
             log.exception("Failed-enqueueing-received-message", e=e)
 
@@ -327,6 +291,8 @@
             try:
                 message = yield self.received_msg_queue.get()
                 yield self._process_message(message)
+                if self.stopped:
+                    break
             except Exception as e:
                 log.exception("Failed-dequeueing-received-message", e=e)
 
@@ -457,16 +423,18 @@
         current_time = int(round(time.time() * 1000))
         # log.debug("Got Message", message=m)
         try:
-            val = m.message.value
+            val = m.value()
+            # val = m.message.value
             # print m.topic
 
             # Go over customized callbacks first
-            if m.topic in self.topic_callback_map:
-                for c in self.topic_callback_map[m.topic]:
+            m_topic = m.topic()
+            if m_topic in self.topic_callback_map:
+                for c in self.topic_callback_map[m_topic]:
                     yield c(val)
 
             #  Check whether we need to process request/response scenario
-            if m.topic not in self.topic_target_cls_map:
+            if m_topic not in self.topic_target_cls_map:
                 return
 
             # Process request/response scenario
@@ -506,7 +474,8 @@
                                 response.header.to_topic)
                             self._send_kafka_message(res_topic, response)
 
-                        log.debug("Response-sent", response=response.body, to_topic=res_topic)
+                        log.debug("Response-sent", response=response.body,
+                                  to_topic=res_topic)
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map:
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index d596334..cefc590 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -14,18 +14,19 @@
 # limitations under the License.
 #
 
-from afkak.client import KafkaClient as _KafkaClient
-from afkak.common import (
-    PRODUCER_ACK_NOT_REQUIRED
-)
-from afkak.producer import Producer as _kafkaProducer
+
+from confluent_kafka import Producer as _kafkaProducer
 from structlog import get_logger
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
 from zope.interface import implementer
 
 from python.common.utils.consulhelpers import get_endpoint_from_consul
-from python.common.utils.registry import IComponent
 from event_bus_publisher import EventBusPublisher
+from python.common.utils.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
 
 log = get_logger()
 
@@ -33,7 +34,11 @@
 @implementer(IComponent)
 class KafkaProxy(object):
     """
-    This is a singleton proxy kafka class to hide the kafka client details.
+    This is a singleton proxy kafka class to hide the kafka client details. This
+    proxy uses confluent-kafka-python as the kafka client. Since that client is
+    not a Twisted client then requests to that client are wrapped with
+    twisted.internet.threads.deferToThread to avoid any potential blocking of
+    the Twisted loop.
     """
     _kafka_instance = None
 
@@ -42,6 +47,7 @@
                  kafka_endpoint='localhost:9092',
                  ack_timeout=1000,
                  max_req_attempts=10,
+                 consumer_poll_timeout=10,
                  config={}):
 
         # return an exception if the object already exist
@@ -59,6 +65,10 @@
         self.event_bus_publisher = None
         self.stopping = False
         self.faulty = False
+        self.consumer_poll_timeout = consumer_poll_timeout
+        self.topic_consumer_map = {}
+        self.topic_callbacks_map = {}
+        self.topic_any_map_lock = threading.Lock()
         log.debug('initialized', endpoint=kafka_endpoint)
 
     @inlineCallbacks
@@ -77,6 +87,7 @@
     def stop(self):
         try:
             log.debug('stopping-kafka-proxy')
+            self.stopping = True
             try:
                 if self.kclient:
                     yield self.kclient.close()
@@ -84,16 +95,38 @@
                     log.debug('stopped-kclient-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
-                pass
 
             try:
                 if self.kproducer:
-                    yield self.kproducer.stop()
+                    yield self.kproducer.flush()
                     self.kproducer = None
                     log.debug('stopped-kproducer-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
-                pass
+
+            # Stop all consumers
+            try:
+                self.topic_any_map_lock.acquire()
+                log.debug('stopping-consumers-kafka-proxy')
+                for _, c in self.topic_consumer_map.iteritems():
+                    yield deferToThread(c.close)
+                self.topic_consumer_map.clear()
+                self.topic_callbacks_map.clear()
+                log.debug('stopped-consumers-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+            finally:
+                self.topic_any_map_lock.release()
+                log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+            # try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            # except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
 
             log.debug('stopped-kafka-proxy')
 
@@ -105,8 +138,7 @@
             pass
 
     def _get_kafka_producer(self):
-        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
-        #  to a local log before sending response
+
         try:
 
             if self.kafka_endpoint.startswith('@'):
@@ -124,39 +156,125 @@
                     return
             else:
                 _k_endpoint = self.kafka_endpoint
-
-            self.kclient = _KafkaClient(_k_endpoint)
-            self.kproducer = _kafkaProducer(self.kclient,
-                                            req_acks=PRODUCER_ACK_NOT_REQUIRED,
-                                            # req_acks=PRODUCER_ACK_LOCAL_WRITE,
-                                            # ack_timeout=self.ack_timeout,
-                                            # max_req_attempts=self.max_req_attempts)
-                                            )
+            self.kproducer = _kafkaProducer(
+                {'bootstrap.servers': _k_endpoint,
+                 }
+            )
+            pass
         except Exception, e:
             log.exception('failed-get-kafka-producer', e=e)
             return
 
+    @inlineCallbacks
+    def _wait_for_messages(self, consumer, topic):
+        while True:
+            try:
+                msg = yield deferToThread(consumer.poll,
+                                          self.consumer_poll_timeout)
 
-    # @inlineCallbacks
-    # def wait_until_topic_is_ready(self, topic):
-    #     #  Assumes "auto.create.topics.enable" is set in the broker
-    #     #  configuration
-    #     e = True
-    #     while e:
-    #         yield self.kclient.load_metadata_for_topics(topic)
-    #         e = self.kclient.metadata_error_for_topic(topic)
-    #         if e:
-    #             log.debug("Topic-not-ready-retrying...", topic=topic)
+                if self.stopping:
+                    log.debug("stop-request-recieved", topic=topic)
+                    break
 
+                if msg is None:
+                    continue
+                if msg.error():
+                    # This typically is received when there are no more messages
+                    # to read from kafka. Ignore.
+                    continue
+
+                # Invoke callbacks
+                for cb in self.topic_callbacks_map[topic]:
+                    yield cb(msg)
+            except Exception as e:
+                log.debug("exception-receiving-msg", topic=topic, e=e)
 
     @inlineCallbacks
-    def create_topic(self, topic):
-        # Assume auto create is enabled on the broker configuration
-        yield self.wait_until_topic_is_ready(topic)
+    def subscribe(self, topic, callback, groupId, offset='latest'):
+        """
+        subscribe allows a caller to subscribe to a given kafka topic.  This API
+        always create a group consumer.
+        :param topic - the topic to subscribe to
+        :param callback - the callback to invoke whenever a message is received
+        on that topic
+        :param groupId - the groupId for this consumer.  In the current
+        implementation there is a one-to-one mapping between a topic and a
+        groupId.  In other words, once a groupId is used for a given topic then
+        we won't be able to create another groupId for the same topic.
+        :param offset:  the kafka offset from where the consumer will start
+        consuming messages
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            if topic in self.topic_consumer_map:
+                # Just add the callback
+                if topic in self.topic_callbacks_map:
+                    self.topic_callbacks_map[topic].append(callback)
+                else:
+                    self.topic_callbacks_map[topic] = [callback]
+                return
 
+            # Create consumer for that topic
+            c = Consumer({
+                'bootstrap.servers': self.kafka_endpoint,
+                'group.id': groupId,
+                'auto.offset.reset': offset
+            })
+            yield deferToThread(c.subscribe, [topic])
+            # c.subscribe([topic])
+            self.topic_consumer_map[topic] = c
+            self.topic_callbacks_map[topic] = [callback]
+            # Start the consumer
+            reactor.callLater(0, self._wait_for_messages, c, topic)
+        except Exception, e:
+            log.exception("topic-subscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
 
     @inlineCallbacks
-    def send_message(self, topic, msg):
+    def unsubscribe(self, topic, callback):
+        """
+        Unsubscribe to a given topic.  Since there they be multiple callers
+        consuming from the same topic then to ensure only the relevant caller
+        gets unsubscribe then the callback is used as a differentiator.   The
+        kafka consumer will be closed when there are no callbacks required.
+        :param topic: topic to unsubscribe
+        :param callback: callback the caller used when subscribing to the topic.
+        If multiple callers have subscribed to a topic using the same callback
+        then the first callback on the list will be removed.
+        :return:None
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            log.debug("unsubscribing-to-topic", topic=topic)
+            if topic in self.topic_callbacks_map:
+                index = 0
+                for cb in self.topic_callbacks_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callbacks_map[topic]):
+                    self.topic_callbacks_map[topic].pop(index)
+
+                if len(self.topic_callbacks_map[topic]) == 0:
+                    # Stop the consumer
+                    if topic in self.topic_consumer_map:
+                        yield deferToThread(
+                            self.topic_consumer_map[topic].close)
+                        del self.topic_consumer_map[topic]
+                    del self.topic_callbacks_map[topic]
+                    log.debug("unsubscribed-to-topic", topic=topic)
+                else:
+                    log.debug("consumers-for-topic-still-exist", topic=topic,
+                              num=len(self.topic_callbacks_map[topic]))
+        except Exception, e:
+            log.exception("topic-unsubscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+            log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+    @inlineCallbacks
+    def send_message(self, topic, msg, key=None):
         assert topic is not None
         assert msg is not None
 
@@ -174,22 +292,23 @@
                                   endpoint=self.kafka_endpoint)
                         return
 
-                # log.debug('sending-kafka-msg', topic=topic, msg=msg)
+                log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
                 msgs = [msg]
 
-                if self.kproducer and self.kclient and \
-                        self.event_bus_publisher and self.faulty is False:
-                    # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
-
-                    yield self.kproducer.send_messages(topic, msgs=msgs)
-                    # self.kproducer.send_messages(topic, msgs=msgs)
-                    # log.debug('sent-kafka-msg', topic=topic, msg=msg)
+                if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+                    d = deferToThread(self.kproducer.produce, topic, msg, key)
+                    yield d
+                    log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+                    # send a lightweight poll to avoid an exception after 100k messages.
+                    d1 = deferToThread(self.kproducer.poll, 0)
+                    yield d1
                 else:
                     return
 
         except Exception, e:
             self.faulty = True
-            log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+                      e=e)
 
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different
diff --git a/python/adapters/ponsim_olt/main.py b/python/adapters/ponsim_olt/main.py
index 09b78fc..86b6be0 100755
--- a/python/adapters/ponsim_olt/main.py
+++ b/python/adapters/ponsim_olt/main.py
@@ -63,8 +63,8 @@
     core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
     interface=os.environ.get('INTERFACE', get_my_primary_interface()),
     instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
-    kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
-    kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
+    kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
+    kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
     backend=os.environ.get('BACKEND', 'none'),
     retry_interval=os.environ.get('RETRY_INTERVAL', 2),
     heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
@@ -380,7 +380,7 @@
                     # TODO: Add KV Store object reference
                     kv_store=self.args.backend,
                     default_topic=self.args.name,
-                    # Needs to assign a real class
+                    group_id_prefix=self.args.instance_id,
                     target_cls=ponsim_request_handler
                 )
             ).start()
diff --git a/python/adapters/ponsim_onu/main.py b/python/adapters/ponsim_onu/main.py
index d6418e9..c4c12db 100755
--- a/python/adapters/ponsim_onu/main.py
+++ b/python/adapters/ponsim_onu/main.py
@@ -379,6 +379,7 @@
                     # TODO: Add KV Store object reference
                     kv_store=self.args.backend,
                     default_topic=self.args.name,
+                    group_id_prefix=self.args.instance_id,
                     target_cls=ponsim_request_handler
                 )
             ).start()