Openolt alarms thread

Change-Id: I68e511b5a613dca60fda4d37f3001be7d36fa604
diff --git a/voltha/adapters/openolt/openolt_alarms.py b/voltha/adapters/openolt/openolt_alarms.py
index e55d003..7b7b7ec 100644
--- a/voltha/adapters/openolt/openolt_alarms.py
+++ b/voltha/adapters/openolt/openolt_alarms.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 #
 
+import threading
+from simplejson import loads
 from voltha.extensions.alarms.device_alarms import DeviceAlarms
 from voltha.extensions.alarms.simulator.simulate_alarms \
     import AdapterAlarmSimulator
@@ -34,6 +36,8 @@
     import OnuWindowDriftAlarm
 from voltha.extensions.alarms.onu.onu_activation_fail_alarm \
     import OnuActivationFailAlarm
+from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
+
 
 
 class OpenOltAlarmMgr(object):
@@ -59,6 +63,11 @@
                                errmsg=initerr.message)
             raise Exception(initerr)
 
+        self.alarms_thread_handle = threading.Thread(
+            target=self.alarms_thread)
+        self.alarms_thread_handle.setDaemon(True)
+        self.alarms_thread_handle.start()
+
     def process_alarms(self, alarm_ind):
         try:
             self.log.debug('alarm-indication', alarm=alarm_ind)
@@ -437,3 +446,14 @@
 
     def onu_processing_error_indication(self, onu_processing_error_ind):
         self.log.info('not implemented yet')
+
+    def alarms_thread(self):
+        self.log.debug('alarm indications thread starting')
+        self.kafka_consumer = KConsumer("openolt.ind.alarm")
+        self.log.debug('alarm indications thread processing')
+        self.kafka_consumer.read(self.alarms_process)
+        self.log.debug('alarm indications thread exited')
+
+    def alarms_process(self, msg):
+        alarm = loads(msg)
+        self.log.debug("openolt alarm", alarm=alarm, type=type(alarm))
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index ac3cba5..99e7612 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -175,8 +175,7 @@
 
     def indications_thread(self):
 
-        def send_indication(msg):
-            topic = "openolt.ind"
+        def forward_indication(topic, msg):
             try:
                 kafka_proxy = get_kafka_proxy()
                 if kafka_proxy and not kafka_proxy.is_faulty():
@@ -245,7 +244,6 @@
                                       indications=ind)
                         continue
 
-                send_indication(ind)
 
                 # indication handlers run in the main event loop
                 if ind.HasField('olt_ind'):
@@ -273,6 +271,7 @@
                         self.stats_mgr.flow_statistics_indication,
                         ind.flow_stats)
                 elif ind.HasField('alarm_ind'):
+                    forward_indication("openolt.ind.alarm", ind)
                     reactor.callFromThread(self.alarm_mgr.process_alarms,
                                            ind.alarm_ind)
                 else:
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index 73ff0c2..e304488 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 #
-# Copyright 2016 Confluent Inc.
+# Copyright 2019 the original author or authors.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,27 +15,83 @@
 # limitations under the License.
 #
 
-#
-# Example high-level Kafka 0.9 balanced Consumer
-#
-from confluent_kafka import Consumer, KafkaError
 import sys
 import getopt
-import json
 import logging
-from pprint import pformat
+from structlog import get_logger
+from confluent_kafka import Consumer, KafkaError
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+
+log = get_logger()
+
+
+class KConsumer(object):
+    def __init__(self, *topics):
+        kafka_proxy = get_kafka_proxy()
+        if kafka_proxy and not kafka_proxy.is_faulty():
+            log.debug('kafka-proxy-available')
+            self.kafka_endpoint = kafka_proxy.kafka_endpoint
+        else:
+            self.log.error('kafka-proxy-unavailable')
+
+        conf = {'bootstrap.servers': self.kafka_endpoint,
+                'group.id': "mygroup",
+                'session.timeout.ms': 60000}
+
+        logger = logging.getLogger('openolt-kafka-consumer')
+        logger.setLevel(logging.DEBUG)
+        handler = logging.StreamHandler()
+        handler.setFormatter(logging.Formatter(
+            '%(asctime)-15s %(levelname)-8s %(message)s'))
+        logger.addHandler(handler)
+
+        # Create Consumer instance
+        # Hint: try debug='fetch' to generate some log messages
+        # self._c = Consumer(conf, logger=logger, debug='fetch')
+        log.debug('creating kafka consumer', conf=conf)
+        self._c = Consumer(conf, logger=logger)
+
+        # Subscribe to topics
+        log.debug('subscribe to topics', topics=topics)
+        self._c.subscribe(list(topics))
+
+    def read(self, callback):
+        # Read messages from Kafka and hand it to to callback
+        try:
+            while True:
+                log.debug('polling kafka for alarms')
+                msg = self._c.poll(timeout=1.0)
+                if msg is None:
+                    continue
+                elif not msg.error():
+                    print(msg.value())
+                    callback(msg.value())
+                elif msg.error().code() == KafkaError._PARTITION_EOF:
+                    pass
+                else:
+                    log.error('Error occured: {0}'.format(msg.error().str()))
+
+        except KeyboardInterrupt:
+            pass
+
+        finally:
+            # Close down consumer to commit final offsets.
+            self._c.close()
+
 
 def print_usage_and_exit(program_name):
-    sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
-    options = '''
- Options:
-  -T <intvl>   Enable client statistics at specified interval (ms)
-'''
-    sys.stderr.write(options)
+    sys.stderr.write(
+        'Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n'
+        % program_name)
     sys.exit(1)
 
 
 if __name__ == '__main__':
+    """
+    Usage:
+        python openolt_kafka_consumer.py $(kubectl get pod -o wide \
+            | grep cord-kafka-0 | awk '{print $6}'):9092 foo voltha.heartbeat
+    """
     optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
     if len(argv) < 3:
         print_usage_and_exit(sys.argv[0])
@@ -45,10 +101,6 @@
     topics = argv[2:]
     # Consumer configuration
     # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
-    '''
-    conf = {'bootstrap.servers': broker,'group.id': group, 'session.timeout.ms': 60000,
-            'auto.offset.reset': 'earliest'}
-    '''
     conf = {'bootstrap.servers': broker,
             'group.id': group,
             'session.timeout.ms': 60000}
@@ -56,7 +108,8 @@
     logger = logging.getLogger('openolt-kafka-consumer')
     logger.setLevel(logging.DEBUG)
     handler = logging.StreamHandler()
-    handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
+    handler.setFormatter(logging.Formatter(
+        '%(asctime)-15s %(levelname)-8s %(message)s'))
     logger.addHandler(handler)
 
     # Create Consumer instance