This update consists of the following changes:
    1) Add GroupConsumer to the Go sarama_client and modify the Core
    code to use a groupConsumer instead of a partition consumer. This
    change will ensure that multiple consumers (with different group Ids)
    can consume kafka messages from the same topic.
    2) Remove afkak kafka client and replace it with confluent kakfa,
    a change done in voltha 1.x. Modify the code accordingly.
    3) Add a Group Consumer to the Python kakfa client such that
    several instances of an Adapter can consume the same messages from
    the same topic.
    4) Set the datapath_id for the logical device in the Core.

Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index d596334..cefc590 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -14,18 +14,19 @@
 # limitations under the License.
 #
 
-from afkak.client import KafkaClient as _KafkaClient
-from afkak.common import (
-    PRODUCER_ACK_NOT_REQUIRED
-)
-from afkak.producer import Producer as _kafkaProducer
+
+from confluent_kafka import Producer as _kafkaProducer
 from structlog import get_logger
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
 from zope.interface import implementer
 
 from python.common.utils.consulhelpers import get_endpoint_from_consul
-from python.common.utils.registry import IComponent
 from event_bus_publisher import EventBusPublisher
+from python.common.utils.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
 
 log = get_logger()
 
@@ -33,7 +34,11 @@
 @implementer(IComponent)
 class KafkaProxy(object):
     """
-    This is a singleton proxy kafka class to hide the kafka client details.
+    This is a singleton proxy kafka class to hide the kafka client details. This
+    proxy uses confluent-kafka-python as the kafka client. Since that client is
+    not a Twisted client then requests to that client are wrapped with
+    twisted.internet.threads.deferToThread to avoid any potential blocking of
+    the Twisted loop.
     """
     _kafka_instance = None
 
@@ -42,6 +47,7 @@
                  kafka_endpoint='localhost:9092',
                  ack_timeout=1000,
                  max_req_attempts=10,
+                 consumer_poll_timeout=10,
                  config={}):
 
         # return an exception if the object already exist
@@ -59,6 +65,10 @@
         self.event_bus_publisher = None
         self.stopping = False
         self.faulty = False
+        self.consumer_poll_timeout = consumer_poll_timeout
+        self.topic_consumer_map = {}
+        self.topic_callbacks_map = {}
+        self.topic_any_map_lock = threading.Lock()
         log.debug('initialized', endpoint=kafka_endpoint)
 
     @inlineCallbacks
@@ -77,6 +87,7 @@
     def stop(self):
         try:
             log.debug('stopping-kafka-proxy')
+            self.stopping = True
             try:
                 if self.kclient:
                     yield self.kclient.close()
@@ -84,16 +95,38 @@
                     log.debug('stopped-kclient-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
-                pass
 
             try:
                 if self.kproducer:
-                    yield self.kproducer.stop()
+                    yield self.kproducer.flush()
                     self.kproducer = None
                     log.debug('stopped-kproducer-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
-                pass
+
+            # Stop all consumers
+            try:
+                self.topic_any_map_lock.acquire()
+                log.debug('stopping-consumers-kafka-proxy')
+                for _, c in self.topic_consumer_map.iteritems():
+                    yield deferToThread(c.close)
+                self.topic_consumer_map.clear()
+                self.topic_callbacks_map.clear()
+                log.debug('stopped-consumers-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+            finally:
+                self.topic_any_map_lock.release()
+                log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+            # try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            # except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
 
             log.debug('stopped-kafka-proxy')
 
@@ -105,8 +138,7 @@
             pass
 
     def _get_kafka_producer(self):
-        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
-        #  to a local log before sending response
+
         try:
 
             if self.kafka_endpoint.startswith('@'):
@@ -124,39 +156,125 @@
                     return
             else:
                 _k_endpoint = self.kafka_endpoint
-
-            self.kclient = _KafkaClient(_k_endpoint)
-            self.kproducer = _kafkaProducer(self.kclient,
-                                            req_acks=PRODUCER_ACK_NOT_REQUIRED,
-                                            # req_acks=PRODUCER_ACK_LOCAL_WRITE,
-                                            # ack_timeout=self.ack_timeout,
-                                            # max_req_attempts=self.max_req_attempts)
-                                            )
+            self.kproducer = _kafkaProducer(
+                {'bootstrap.servers': _k_endpoint,
+                 }
+            )
+            pass
         except Exception, e:
             log.exception('failed-get-kafka-producer', e=e)
             return
 
+    @inlineCallbacks
+    def _wait_for_messages(self, consumer, topic):
+        while True:
+            try:
+                msg = yield deferToThread(consumer.poll,
+                                          self.consumer_poll_timeout)
 
-    # @inlineCallbacks
-    # def wait_until_topic_is_ready(self, topic):
-    #     #  Assumes "auto.create.topics.enable" is set in the broker
-    #     #  configuration
-    #     e = True
-    #     while e:
-    #         yield self.kclient.load_metadata_for_topics(topic)
-    #         e = self.kclient.metadata_error_for_topic(topic)
-    #         if e:
-    #             log.debug("Topic-not-ready-retrying...", topic=topic)
+                if self.stopping:
+                    log.debug("stop-request-recieved", topic=topic)
+                    break
 
+                if msg is None:
+                    continue
+                if msg.error():
+                    # This typically is received when there are no more messages
+                    # to read from kafka. Ignore.
+                    continue
+
+                # Invoke callbacks
+                for cb in self.topic_callbacks_map[topic]:
+                    yield cb(msg)
+            except Exception as e:
+                log.debug("exception-receiving-msg", topic=topic, e=e)
 
     @inlineCallbacks
-    def create_topic(self, topic):
-        # Assume auto create is enabled on the broker configuration
-        yield self.wait_until_topic_is_ready(topic)
+    def subscribe(self, topic, callback, groupId, offset='latest'):
+        """
+        subscribe allows a caller to subscribe to a given kafka topic.  This API
+        always create a group consumer.
+        :param topic - the topic to subscribe to
+        :param callback - the callback to invoke whenever a message is received
+        on that topic
+        :param groupId - the groupId for this consumer.  In the current
+        implementation there is a one-to-one mapping between a topic and a
+        groupId.  In other words, once a groupId is used for a given topic then
+        we won't be able to create another groupId for the same topic.
+        :param offset:  the kafka offset from where the consumer will start
+        consuming messages
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            if topic in self.topic_consumer_map:
+                # Just add the callback
+                if topic in self.topic_callbacks_map:
+                    self.topic_callbacks_map[topic].append(callback)
+                else:
+                    self.topic_callbacks_map[topic] = [callback]
+                return
 
+            # Create consumer for that topic
+            c = Consumer({
+                'bootstrap.servers': self.kafka_endpoint,
+                'group.id': groupId,
+                'auto.offset.reset': offset
+            })
+            yield deferToThread(c.subscribe, [topic])
+            # c.subscribe([topic])
+            self.topic_consumer_map[topic] = c
+            self.topic_callbacks_map[topic] = [callback]
+            # Start the consumer
+            reactor.callLater(0, self._wait_for_messages, c, topic)
+        except Exception, e:
+            log.exception("topic-subscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
 
     @inlineCallbacks
-    def send_message(self, topic, msg):
+    def unsubscribe(self, topic, callback):
+        """
+        Unsubscribe to a given topic.  Since there they be multiple callers
+        consuming from the same topic then to ensure only the relevant caller
+        gets unsubscribe then the callback is used as a differentiator.   The
+        kafka consumer will be closed when there are no callbacks required.
+        :param topic: topic to unsubscribe
+        :param callback: callback the caller used when subscribing to the topic.
+        If multiple callers have subscribed to a topic using the same callback
+        then the first callback on the list will be removed.
+        :return:None
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            log.debug("unsubscribing-to-topic", topic=topic)
+            if topic in self.topic_callbacks_map:
+                index = 0
+                for cb in self.topic_callbacks_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callbacks_map[topic]):
+                    self.topic_callbacks_map[topic].pop(index)
+
+                if len(self.topic_callbacks_map[topic]) == 0:
+                    # Stop the consumer
+                    if topic in self.topic_consumer_map:
+                        yield deferToThread(
+                            self.topic_consumer_map[topic].close)
+                        del self.topic_consumer_map[topic]
+                    del self.topic_callbacks_map[topic]
+                    log.debug("unsubscribed-to-topic", topic=topic)
+                else:
+                    log.debug("consumers-for-topic-still-exist", topic=topic,
+                              num=len(self.topic_callbacks_map[topic]))
+        except Exception, e:
+            log.exception("topic-unsubscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+            log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+    @inlineCallbacks
+    def send_message(self, topic, msg, key=None):
         assert topic is not None
         assert msg is not None
 
@@ -174,22 +292,23 @@
                                   endpoint=self.kafka_endpoint)
                         return
 
-                # log.debug('sending-kafka-msg', topic=topic, msg=msg)
+                log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
                 msgs = [msg]
 
-                if self.kproducer and self.kclient and \
-                        self.event_bus_publisher and self.faulty is False:
-                    # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
-
-                    yield self.kproducer.send_messages(topic, msgs=msgs)
-                    # self.kproducer.send_messages(topic, msgs=msgs)
-                    # log.debug('sent-kafka-msg', topic=topic, msg=msg)
+                if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+                    d = deferToThread(self.kproducer.produce, topic, msg, key)
+                    yield d
+                    log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+                    # send a lightweight poll to avoid an exception after 100k messages.
+                    d1 = deferToThread(self.kproducer.poll, 0)
+                    yield d1
                 else:
                     return
 
         except Exception, e:
             self.faulty = True
-            log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+                      e=e)
 
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different