Misc fixes to kafka messaging

- Comment out alarm kafka handling
- Serialize indications as json strings

Change-Id: I15f041e8ba6f149a3eb450ea57755b88714cfb31
diff --git a/voltha/adapters/openolt/openolt_alarms.py b/voltha/adapters/openolt/openolt_alarms.py
index 7b7b7ec..b01645a 100644
--- a/voltha/adapters/openolt/openolt_alarms.py
+++ b/voltha/adapters/openolt/openolt_alarms.py
@@ -39,7 +39,6 @@
 from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
 
 
-
 class OpenOltAlarmMgr(object):
     def __init__(self, log, platform, data_model):
         self.log = log
@@ -63,10 +62,12 @@
                                errmsg=initerr.message)
             raise Exception(initerr)
 
+        '''
         self.alarms_thread_handle = threading.Thread(
             target=self.alarms_thread)
         self.alarms_thread_handle.setDaemon(True)
         self.alarms_thread_handle.start()
+        '''
 
     def process_alarms(self, alarm_ind):
         try:
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 49a6645..154f614 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -169,8 +169,9 @@
             "openolt.ind.pkt",
             "openolt.ind.olt")
         self.log.debug('openolt indications thread processing')
+        # block reading kafka
         self.kafka_consumer.read(self.indications_process)
-        self.log.debug('alarm indications thread exited')
+        self.log.debug('openolt indications thread stopped')
 
     def indications_process(self, msg):
         ind = loads(msg)
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 436503b..93addd7 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -19,7 +19,7 @@
 import threading
 from simplejson import dumps
 from twisted.internet import reactor
-from google.protobuf.json_format import MessageToDict
+from google.protobuf.json_format import MessageToJson
 from google.protobuf.message import Message
 from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
 from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
@@ -59,18 +59,14 @@
                 kafka_proxy = get_kafka_proxy()
                 if kafka_proxy and not kafka_proxy.is_faulty():
                     self.log.debug('kafka-proxy-available')
-                    self.log.debug('shad 1', topic=topic, msg=msg)
-                    # convert to JSON string if msg is a protobuf msg
-                    if isinstance(msg, Message):
-                        self.log.debug('shad 2', topic=topic, msg=msg)
-                        msg = dumps(MessageToDict(msg, True, True))
-                    self.log.debug('shad 3', topic=topic, msg=msg)
-                    kafka_proxy.send_message(topic, dumps(msg))
+                    kafka_proxy.send_message(
+                        topic,
+                        dumps(MessageToJson(msg,
+                                            preserving_proto_field_name=True)))
                 else:
                     self.log.error('kafka-proxy-unavailable')
             except Exception, e:
                 self.log.exception('failed-sending-message', e=e)
-            self.log.debug('shad 4', topic=topic, msg=msg)
 
         self.log.debug('openolt grpc connecting to olt')
 
@@ -162,7 +158,7 @@
                         self.device.stats_mgr.flow_statistics_indication,
                         ind.flow_stats)
                 elif ind.HasField('alarm_ind'):
-                    forward_indication("openolt.ind.alarm", ind.alarm_ind)
+                    # forward_indication("openolt.ind.alarm", ind.alarm_ind)
                     reactor.callFromThread(
                         self.device.alarm_mgr.process_alarms, ind.alarm_ind)
                 else:
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index e304488..adbe1d5 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -29,14 +29,13 @@
     def __init__(self, *topics):
         kafka_proxy = get_kafka_proxy()
         if kafka_proxy and not kafka_proxy.is_faulty():
-            log.debug('kafka-proxy-available')
             self.kafka_endpoint = kafka_proxy.kafka_endpoint
+            log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint)
         else:
             self.log.error('kafka-proxy-unavailable')
 
         conf = {'bootstrap.servers': self.kafka_endpoint,
-                'group.id': "mygroup",
-                'session.timeout.ms': 60000}
+                'group.id': "mygroup"}
 
         logger = logging.getLogger('openolt-kafka-consumer')
         logger.setLevel(logging.DEBUG)
@@ -53,18 +52,19 @@
 
         # Subscribe to topics
         log.debug('subscribe to topics', topics=topics)
-        self._c.subscribe(list(topics))
+        self.topics = list(topics)
+        self._c.subscribe(self.topics)
 
     def read(self, callback):
         # Read messages from Kafka and hand it to to callback
         try:
             while True:
-                log.debug('polling kafka for alarms')
+                log.debug('polling kafka for messages', topics=self.topics)
                 msg = self._c.poll(timeout=1.0)
                 if msg is None:
                     continue
                 elif not msg.error():
-                    print(msg.value())
+                    log.debug('got a kafka message', topic=msg.topic())
                     callback(msg.value())
                 elif msg.error().code() == KafkaError._PARTITION_EOF:
                     pass
@@ -90,7 +90,8 @@
     """
     Usage:
         python openolt_kafka_consumer.py $(kubectl get pod -o wide \
-            | grep cord-kafka-0 | awk '{print $6}'):9092 foo voltha.heartbeat
+        | grep cord-kafka-0 | awk '{print $6}'):9092 \
+        mygroup openolt.ind.olt openolt.ind.pkt
     """
     optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
     if len(argv) < 3:
@@ -102,8 +103,7 @@
     # Consumer configuration
     # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
     conf = {'bootstrap.servers': broker,
-            'group.id': group,
-            'session.timeout.ms': 60000}
+            'group.id': group}
 
     logger = logging.getLogger('openolt-kafka-consumer')
     logger.setLevel(logging.DEBUG)
@@ -117,11 +117,8 @@
     # c = Consumer(conf, logger=logger, debug='fetch')
     c = Consumer(conf, logger=logger)
 
-    def print_assignment(consumer, partitions):
-        print('Assignment:', partitions)
-
     # Subscribe to topics
-    c.subscribe(topics, on_assign=print_assignment)
+    c.subscribe(topics)
 
     # Read messages from Kafka, print to stdout
     try:
@@ -130,6 +127,7 @@
             if msg is None:
                 continue
             elif not msg.error():
+                print('got a kafka message, topic: {0}'.format(msg.topic()))
                 print(msg.value())
             elif msg.error().code() == KafkaError._PARTITION_EOF:
                 # print('End of partition reached {0}/{1}'
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index b615b19..0fba534 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -15,9 +15,9 @@
 #
 
 
-
 from confluent_kafka import Producer as _kafkaProducer
 from structlog import get_logger
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
 from twisted.internet.threads import deferToThread
 from zope.interface import implementer
@@ -25,6 +25,8 @@
 from common.utils.consulhelpers import get_endpoint_from_consul
 from voltha.northbound.kafka.event_bus_publisher import EventBusPublisher
 from voltha.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
 
 log = get_logger()
 
@@ -32,11 +34,11 @@
 @implementer(IComponent)
 class KafkaProxy(object):
     """
-    This is a singleton proxy kafka class to hide the kafka client details.
-
-    Removed the references to the afkak libraries and added confluent-kafka-python as producer.
-    The required the adding in of twisted.internet.threads.deferToThread call to avoid any potential
-    blocking the the producer code
+    This is a singleton proxy kafka class to hide the kafka client details. This
+    proxy uses confluent-kafka-python as the kafka client. Since that client is
+    not a Twisted client then requests to that client are wrapped with
+    twisted.internet.threads.deferToThread to avoid any potential blocking of
+    the Twisted loop.
     """
     _kafka_instance = None
 
@@ -45,6 +47,7 @@
                  kafka_endpoint='localhost:9092',
                  ack_timeout=1000,
                  max_req_attempts=10,
+                 consumer_poll_timeout=10,
                  config={}):
 
         # return an exception if the object already exist
@@ -62,6 +65,10 @@
         self.event_bus_publisher = None
         self.stopping = False
         self.faulty = False
+        self.consumer_poll_timeout = consumer_poll_timeout
+        self.topic_consumer_map = {}
+        self.topic_callbacks_map = {}
+        self.topic_any_map_lock = threading.Lock()
         log.debug('initialized', endpoint=kafka_endpoint)
 
     @inlineCallbacks
@@ -80,6 +87,7 @@
     def stop(self):
         try:
             log.debug('stopping-kafka-proxy')
+            self.stopping = True
             try:
                 if self.kclient:
                     yield self.kclient.close()
@@ -87,23 +95,36 @@
                     log.debug('stopped-kclient-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
-                pass
 
             try:
                 if self.kproducer:
-                    yield self.kproducer.stop()
+                    yield self.kproducer.flush()
                     self.kproducer = None
                     log.debug('stopped-kproducer-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
-                pass
 
-            #try:
+            # Stop all consumers
+            try:
+                self.topic_any_map_lock.acquire()
+                log.debug('stopping-consumers-kafka-proxy')
+                for _, c in self.topic_consumer_map.iteritems():
+                    yield deferToThread(c.close)
+                self.topic_consumer_map.clear()
+                self.topic_callbacks_map.clear()
+                log.debug('stopped-consumers-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+            finally:
+                self.topic_any_map_lock.release()
+                log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+            # try:
             #    if self.event_bus_publisher:
             #        yield self.event_bus_publisher.stop()
             #        self.event_bus_publisher = None
             #        log.debug('stopped-event-bus-publisher-kafka-proxy')
-            #except Exception, e:
+            # except Exception, e:
             #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
             #    pass
 
@@ -112,7 +133,7 @@
         except Exception, e:
             self.kclient = None
             self.kproducer = None
-            #self.event_bus_publisher = None
+            # self.event_bus_publisher = None
             log.exception('failed-stopped-kafka-proxy', e=e)
             pass
 
@@ -123,7 +144,8 @@
             if self.kafka_endpoint.startswith('@'):
                 try:
                     _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
-                                                           self.kafka_endpoint[1:])
+                                                           self.kafka_endpoint[
+                                                           1:])
                     log.debug('found-kafka-service', endpoint=_k_endpoint)
 
                 except Exception as e:
@@ -135,14 +157,123 @@
             else:
                 _k_endpoint = self.kafka_endpoint
             self.kproducer = _kafkaProducer(
-                {'bootstrap.servers' :_k_endpoint}
-                )
+                {'bootstrap.servers': _k_endpoint,
+                 }
+            )
             pass
         except Exception, e:
             log.exception('failed-get-kafka-producer', e=e)
             return
 
     @inlineCallbacks
+    def _wait_for_messages(self, consumer, topic):
+        while True:
+            try:
+                msg = yield deferToThread(consumer.poll,
+                                          self.consumer_poll_timeout)
+
+                if self.stopping:
+                    log.debug("stop-request-recieved", topic=topic)
+                    break
+
+                if msg is None:
+                    continue
+                if msg.error():
+                    # This typically is received when there are no more messages
+                    # to read from kafka. Ignore.
+                    continue
+
+                # Invoke callbacks
+                for cb in self.topic_callbacks_map[topic]:
+                    yield cb(msg)
+            except Exception as e:
+                log.debug("exception-receiving-msg", topic=topic, e=e)
+
+    @inlineCallbacks
+    def subscribe(self, topic, callback, groupId, offset='latest'):
+        """
+        subscribe allows a caller to subscribe to a given kafka topic.  This API
+        always create a group consumer.
+        :param topic - the topic to subscribe to
+        :param callback - the callback to invoke whenever a message is received
+        on that topic
+        :param groupId - the groupId for this consumer.  In the current
+        implementation there is a one-to-one mapping between a topic and a
+        groupId.  In other words, once a groupId is used for a given topic then
+        we won't be able to create another groupId for the same topic.
+        :param offset:  the kafka offset from where the consumer will start
+        consuming messages
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            if topic in self.topic_consumer_map:
+                # Just add the callback
+                if topic in self.topic_callbacks_map:
+                    self.topic_callbacks_map[topic].append(callback)
+                else:
+                    self.topic_callbacks_map[topic] = [callback]
+                return
+
+            # Create consumer for that topic
+            c = Consumer({
+                'bootstrap.servers': self.kafka_endpoint,
+                'group.id': groupId,
+                'auto.offset.reset': offset
+            })
+            yield deferToThread(c.subscribe, [topic])
+            # c.subscribe([topic])
+            self.topic_consumer_map[topic] = c
+            self.topic_callbacks_map[topic] = [callback]
+            # Start the consumer
+            reactor.callLater(0, self._wait_for_messages, c, topic)
+        except Exception, e:
+            log.exception("topic-subscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+
+    @inlineCallbacks
+    def unsubscribe(self, topic, callback):
+        """
+        Unsubscribe to a given topic.  Since there they be multiple callers
+        consuming from the same topic then to ensure only the relevant caller
+        gets unsubscribe then the callback is used as a differentiator.   The
+        kafka consumer will be closed when there are no callbacks required.
+        :param topic: topic to unsubscribe
+        :param callback: callback the caller used when subscribing to the topic.
+        If multiple callers have subscribed to a topic using the same callback
+        then the first callback on the list will be removed.
+        :return:None
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            log.debug("unsubscribing-to-topic", topic=topic)
+            if topic in self.topic_callbacks_map:
+                index = 0
+                for cb in self.topic_callbacks_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callbacks_map[topic]):
+                    self.topic_callbacks_map[topic].pop(index)
+
+                if len(self.topic_callbacks_map[topic]) == 0:
+                    # Stop the consumer
+                    if topic in self.topic_consumer_map:
+                        yield deferToThread(
+                            self.topic_consumer_map[topic].close)
+                        del self.topic_consumer_map[topic]
+                    del self.topic_callbacks_map[topic]
+                    log.debug("unsubscribed-to-topic", topic=topic)
+                else:
+                    log.debug("consumers-for-topic-still-exist", topic=topic,
+                              num=len(self.topic_callbacks_map[topic]))
+        except Exception, e:
+            log.exception("topic-unsubscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+            log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+    @inlineCallbacks
     def send_message(self, topic, msg, key=None):
         assert topic is not None
         assert msg is not None
@@ -157,14 +288,15 @@
                     self._get_kafka_producer()
                     # Lets the next message request do the retry if still a failure
                     if self.kproducer is None:
-                        log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+                        log.error('no-kafka-producer',
+                                  endpoint=self.kafka_endpoint)
                         return
 
                 log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
                 msgs = [msg]
 
                 if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
-                    d =  deferToThread(self.kproducer.produce, topic, msg, key)
+                    d = deferToThread(self.kproducer.produce, topic, msg, key)
                     yield d
                     log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
                     # send a lightweight poll to avoid an exception after 100k messages.
@@ -175,7 +307,8 @@
 
         except Exception, e:
             self.faulty = True
-            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg, e=e)
+            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+                      e=e)
 
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different
@@ -203,4 +336,3 @@
 # Common method to get the singleton instance of the kafka proxy class
 def get_kafka_proxy():
     return KafkaProxy._kafka_instance
-