Openolt alarms thread
Change-Id: I68e511b5a613dca60fda4d37f3001be7d36fa604
diff --git a/voltha/adapters/openolt/openolt_alarms.py b/voltha/adapters/openolt/openolt_alarms.py
index e55d003..7b7b7ec 100644
--- a/voltha/adapters/openolt/openolt_alarms.py
+++ b/voltha/adapters/openolt/openolt_alarms.py
@@ -14,6 +14,8 @@
# limitations under the License.
#
+import threading
+from simplejson import loads
from voltha.extensions.alarms.device_alarms import DeviceAlarms
from voltha.extensions.alarms.simulator.simulate_alarms \
import AdapterAlarmSimulator
@@ -34,6 +36,8 @@
import OnuWindowDriftAlarm
from voltha.extensions.alarms.onu.onu_activation_fail_alarm \
import OnuActivationFailAlarm
+from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
+
class OpenOltAlarmMgr(object):
@@ -59,6 +63,11 @@
errmsg=initerr.message)
raise Exception(initerr)
+ self.alarms_thread_handle = threading.Thread(
+ target=self.alarms_thread)
+ self.alarms_thread_handle.setDaemon(True)
+ self.alarms_thread_handle.start()
+
def process_alarms(self, alarm_ind):
try:
self.log.debug('alarm-indication', alarm=alarm_ind)
@@ -437,3 +446,14 @@
def onu_processing_error_indication(self, onu_processing_error_ind):
self.log.info('not implemented yet')
+
+ def alarms_thread(self):
+ self.log.debug('alarm indications thread starting')
+ self.kafka_consumer = KConsumer("openolt.ind.alarm")
+ self.log.debug('alarm indications thread processing')
+ self.kafka_consumer.read(self.alarms_process)
+ self.log.debug('alarm indications thread exited')
+
+ def alarms_process(self, msg):
+ alarm = loads(msg)
+ self.log.debug("openolt alarm", alarm=alarm, type=type(alarm))
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index ac3cba5..99e7612 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -175,8 +175,7 @@
def indications_thread(self):
- def send_indication(msg):
- topic = "openolt.ind"
+ def forward_indication(topic, msg):
try:
kafka_proxy = get_kafka_proxy()
if kafka_proxy and not kafka_proxy.is_faulty():
@@ -245,7 +244,6 @@
indications=ind)
continue
- send_indication(ind)
# indication handlers run in the main event loop
if ind.HasField('olt_ind'):
@@ -273,6 +271,7 @@
self.stats_mgr.flow_statistics_indication,
ind.flow_stats)
elif ind.HasField('alarm_ind'):
+ forward_indication("openolt.ind.alarm", ind)
reactor.callFromThread(self.alarm_mgr.process_alarms,
ind.alarm_ind)
else:
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index 73ff0c2..e304488 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2016 Confluent Inc.
+# Copyright 2019 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,27 +15,83 @@
# limitations under the License.
#
-#
-# Example high-level Kafka 0.9 balanced Consumer
-#
-from confluent_kafka import Consumer, KafkaError
import sys
import getopt
-import json
import logging
-from pprint import pformat
+from structlog import get_logger
+from confluent_kafka import Consumer, KafkaError
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+
+log = get_logger()
+
+
+class KConsumer(object):
+ def __init__(self, *topics):
+ kafka_proxy = get_kafka_proxy()
+ if kafka_proxy and not kafka_proxy.is_faulty():
+ log.debug('kafka-proxy-available')
+ self.kafka_endpoint = kafka_proxy.kafka_endpoint
+ else:
+ self.log.error('kafka-proxy-unavailable')
+
+ conf = {'bootstrap.servers': self.kafka_endpoint,
+ 'group.id': "mygroup",
+ 'session.timeout.ms': 60000}
+
+ logger = logging.getLogger('openolt-kafka-consumer')
+ logger.setLevel(logging.DEBUG)
+ handler = logging.StreamHandler()
+ handler.setFormatter(logging.Formatter(
+ '%(asctime)-15s %(levelname)-8s %(message)s'))
+ logger.addHandler(handler)
+
+ # Create Consumer instance
+ # Hint: try debug='fetch' to generate some log messages
+ # self._c = Consumer(conf, logger=logger, debug='fetch')
+ log.debug('creating kafka consumer', conf=conf)
+ self._c = Consumer(conf, logger=logger)
+
+ # Subscribe to topics
+ log.debug('subscribe to topics', topics=topics)
+ self._c.subscribe(list(topics))
+
+ def read(self, callback):
+ # Read messages from Kafka and hand it to to callback
+ try:
+ while True:
+ log.debug('polling kafka for alarms')
+ msg = self._c.poll(timeout=1.0)
+ if msg is None:
+ continue
+ elif not msg.error():
+ print(msg.value())
+ callback(msg.value())
+ elif msg.error().code() == KafkaError._PARTITION_EOF:
+ pass
+ else:
+ log.error('Error occured: {0}'.format(msg.error().str()))
+
+ except KeyboardInterrupt:
+ pass
+
+ finally:
+ # Close down consumer to commit final offsets.
+ self._c.close()
+
def print_usage_and_exit(program_name):
- sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
- options = '''
- Options:
- -T <intvl> Enable client statistics at specified interval (ms)
-'''
- sys.stderr.write(options)
+ sys.stderr.write(
+ 'Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n'
+ % program_name)
sys.exit(1)
if __name__ == '__main__':
+ """
+ Usage:
+ python openolt_kafka_consumer.py $(kubectl get pod -o wide \
+ | grep cord-kafka-0 | awk '{print $6}'):9092 foo voltha.heartbeat
+ """
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])
@@ -45,10 +101,6 @@
topics = argv[2:]
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- '''
- conf = {'bootstrap.servers': broker,'group.id': group, 'session.timeout.ms': 60000,
- 'auto.offset.reset': 'earliest'}
- '''
conf = {'bootstrap.servers': broker,
'group.id': group,
'session.timeout.ms': 60000}
@@ -56,7 +108,8 @@
logger = logging.getLogger('openolt-kafka-consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
- handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
+ handler.setFormatter(logging.Formatter(
+ '%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance