Revert "Initial commit for handling openolt alarms in a separate thread"

This reverts commit 6636cc2737a54f29f4c91e51987946cc5bff5b4a.

Change-Id: I6df408408f45163b43f3ea8c10fedee9878991ee
diff --git a/voltha/adapters/openolt/openolt_alarms.py b/voltha/adapters/openolt/openolt_alarms.py
index 3095a5b..e55d003 100644
--- a/voltha/adapters/openolt/openolt_alarms.py
+++ b/voltha/adapters/openolt/openolt_alarms.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 #
 
-import threading
 from voltha.extensions.alarms.device_alarms import DeviceAlarms
 from voltha.extensions.alarms.simulator.simulate_alarms \
     import AdapterAlarmSimulator
@@ -35,7 +34,6 @@
     import OnuWindowDriftAlarm
 from voltha.extensions.alarms.onu.onu_activation_fail_alarm \
     import OnuActivationFailAlarm
-from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
 
 
 class OpenOltAlarmMgr(object):
@@ -61,11 +59,6 @@
                                errmsg=initerr.message)
             raise Exception(initerr)
 
-        self.indications_thread_handle = threading.Thread(
-            target=self.indications_thread)
-        self.indications_thread_handle.setDaemon(True)
-        self.indications_thread_handle.start()
-
     def process_alarms(self, alarm_ind):
         try:
             self.log.debug('alarm-indication', alarm=alarm_ind)
@@ -444,13 +437,3 @@
 
     def onu_processing_error_indication(self, onu_processing_error_ind):
         self.log.info('not implemented yet')
-
-    def indications_thread(self):
-        self.log.debug('alarm indications thread starting')
-        self.kafka_consumer = KConsumer("openolt.ind.alarm")
-        self.log.debug('alarm indications thread processing')
-        self.kafka_consumer.read(self.process)
-        self.log.debug('alarm indications thread exited')
-
-    def process(self, msg):
-        self.log.debug("openolt alarm ind", msg=msg)
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 8ebad2c..81231d2 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -170,21 +170,6 @@
         self.flow_mgr.reset_flows()
 
     def indications_thread(self):
-
-        def forward_indication(topic, msg):
-            try:
-                kafka_proxy = get_kafka_proxy()
-                if kafka_proxy and not kafka_proxy.is_faulty():
-                    self.log.debug('kafka-proxy-available')
-                    # convert to JSON string if msg is a protobuf msg
-                    if isinstance(msg, Message):
-                        msg = dumps(MessageToDict(msg, True, True))
-                    kafka_proxy.send_message(topic, dumps(msg))
-                else:
-                    self.log.error('kafka-proxy-unavailable')
-            except Exception, e:
-                self.log.exception('failed-sending-message', e=e)
-
         self.log.debug('starting-indications-thread')
         self.log.debug('connecting to olt')
 
@@ -268,7 +253,6 @@
                 elif ind.HasField('alarm_ind'):
                     reactor.callFromThread(self.alarm_mgr.process_alarms,
                                            ind.alarm_ind)
-                    forward_indication("openolt.ind.alarm", ind)
                 else:
                     self.log.warn('unknown indication type')
 
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
deleted file mode 100644
index e304488..0000000
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2019 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-import getopt
-import logging
-from structlog import get_logger
-from confluent_kafka import Consumer, KafkaError
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
-
-log = get_logger()
-
-
-class KConsumer(object):
-    def __init__(self, *topics):
-        kafka_proxy = get_kafka_proxy()
-        if kafka_proxy and not kafka_proxy.is_faulty():
-            log.debug('kafka-proxy-available')
-            self.kafka_endpoint = kafka_proxy.kafka_endpoint
-        else:
-            self.log.error('kafka-proxy-unavailable')
-
-        conf = {'bootstrap.servers': self.kafka_endpoint,
-                'group.id': "mygroup",
-                'session.timeout.ms': 60000}
-
-        logger = logging.getLogger('openolt-kafka-consumer')
-        logger.setLevel(logging.DEBUG)
-        handler = logging.StreamHandler()
-        handler.setFormatter(logging.Formatter(
-            '%(asctime)-15s %(levelname)-8s %(message)s'))
-        logger.addHandler(handler)
-
-        # Create Consumer instance
-        # Hint: try debug='fetch' to generate some log messages
-        # self._c = Consumer(conf, logger=logger, debug='fetch')
-        log.debug('creating kafka consumer', conf=conf)
-        self._c = Consumer(conf, logger=logger)
-
-        # Subscribe to topics
-        log.debug('subscribe to topics', topics=topics)
-        self._c.subscribe(list(topics))
-
-    def read(self, callback):
-        # Read messages from Kafka and hand it to to callback
-        try:
-            while True:
-                log.debug('polling kafka for alarms')
-                msg = self._c.poll(timeout=1.0)
-                if msg is None:
-                    continue
-                elif not msg.error():
-                    print(msg.value())
-                    callback(msg.value())
-                elif msg.error().code() == KafkaError._PARTITION_EOF:
-                    pass
-                else:
-                    log.error('Error occured: {0}'.format(msg.error().str()))
-
-        except KeyboardInterrupt:
-            pass
-
-        finally:
-            # Close down consumer to commit final offsets.
-            self._c.close()
-
-
-def print_usage_and_exit(program_name):
-    sys.stderr.write(
-        'Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n'
-        % program_name)
-    sys.exit(1)
-
-
-if __name__ == '__main__':
-    """
-    Usage:
-        python openolt_kafka_consumer.py $(kubectl get pod -o wide \
-            | grep cord-kafka-0 | awk '{print $6}'):9092 foo voltha.heartbeat
-    """
-    optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
-    if len(argv) < 3:
-        print_usage_and_exit(sys.argv[0])
-
-    broker = argv[0]
-    group = argv[1]
-    topics = argv[2:]
-    # Consumer configuration
-    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
-    conf = {'bootstrap.servers': broker,
-            'group.id': group,
-            'session.timeout.ms': 60000}
-
-    logger = logging.getLogger('openolt-kafka-consumer')
-    logger.setLevel(logging.DEBUG)
-    handler = logging.StreamHandler()
-    handler.setFormatter(logging.Formatter(
-        '%(asctime)-15s %(levelname)-8s %(message)s'))
-    logger.addHandler(handler)
-
-    # Create Consumer instance
-    # Hint: try debug='fetch' to generate some log messages
-    # c = Consumer(conf, logger=logger, debug='fetch')
-    c = Consumer(conf, logger=logger)
-
-    def print_assignment(consumer, partitions):
-        print('Assignment:', partitions)
-
-    # Subscribe to topics
-    c.subscribe(topics, on_assign=print_assignment)
-
-    # Read messages from Kafka, print to stdout
-    try:
-        while True:
-            msg = c.poll(timeout=1.0)
-            if msg is None:
-                continue
-            elif not msg.error():
-                print(msg.value())
-            elif msg.error().code() == KafkaError._PARTITION_EOF:
-                # print('End of partition reached {0}/{1}'
-                #       .format(msg.topic(), msg.partition()))
-                pass
-            else:
-                print('Error occured: {0}'.format(msg.error().str()))
-
-    except KeyboardInterrupt:
-        pass
-
-    finally:
-        # Close down consumer to commit final offsets.
-        c.close()