Misc fixes to kafka messaging
- Comment out alarm kafka handling
- Serialize indications as json strings
Change-Id: I15f041e8ba6f149a3eb450ea57755b88714cfb31
diff --git a/voltha/adapters/openolt/openolt_alarms.py b/voltha/adapters/openolt/openolt_alarms.py
index 7b7b7ec..b01645a 100644
--- a/voltha/adapters/openolt/openolt_alarms.py
+++ b/voltha/adapters/openolt/openolt_alarms.py
@@ -39,7 +39,6 @@
from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
-
class OpenOltAlarmMgr(object):
def __init__(self, log, platform, data_model):
self.log = log
@@ -63,10 +62,12 @@
errmsg=initerr.message)
raise Exception(initerr)
+ '''
self.alarms_thread_handle = threading.Thread(
target=self.alarms_thread)
self.alarms_thread_handle.setDaemon(True)
self.alarms_thread_handle.start()
+ '''
def process_alarms(self, alarm_ind):
try:
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 49a6645..154f614 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -169,8 +169,9 @@
"openolt.ind.pkt",
"openolt.ind.olt")
self.log.debug('openolt indications thread processing')
+ # block reading kafka
self.kafka_consumer.read(self.indications_process)
- self.log.debug('alarm indications thread exited')
+ self.log.debug('openolt indications thread stopped')
def indications_process(self, msg):
ind = loads(msg)
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 436503b..93addd7 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -19,7 +19,7 @@
import threading
from simplejson import dumps
from twisted.internet import reactor
-from google.protobuf.json_format import MessageToDict
+from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
@@ -59,18 +59,14 @@
kafka_proxy = get_kafka_proxy()
if kafka_proxy and not kafka_proxy.is_faulty():
self.log.debug('kafka-proxy-available')
- self.log.debug('shad 1', topic=topic, msg=msg)
- # convert to JSON string if msg is a protobuf msg
- if isinstance(msg, Message):
- self.log.debug('shad 2', topic=topic, msg=msg)
- msg = dumps(MessageToDict(msg, True, True))
- self.log.debug('shad 3', topic=topic, msg=msg)
- kafka_proxy.send_message(topic, dumps(msg))
+ kafka_proxy.send_message(
+ topic,
+ dumps(MessageToJson(msg,
+ preserving_proto_field_name=True)))
else:
self.log.error('kafka-proxy-unavailable')
except Exception, e:
self.log.exception('failed-sending-message', e=e)
- self.log.debug('shad 4', topic=topic, msg=msg)
self.log.debug('openolt grpc connecting to olt')
@@ -162,7 +158,7 @@
self.device.stats_mgr.flow_statistics_indication,
ind.flow_stats)
elif ind.HasField('alarm_ind'):
- forward_indication("openolt.ind.alarm", ind.alarm_ind)
+ # forward_indication("openolt.ind.alarm", ind.alarm_ind)
reactor.callFromThread(
self.device.alarm_mgr.process_alarms, ind.alarm_ind)
else:
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index e304488..adbe1d5 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -29,14 +29,13 @@
def __init__(self, *topics):
kafka_proxy = get_kafka_proxy()
if kafka_proxy and not kafka_proxy.is_faulty():
- log.debug('kafka-proxy-available')
self.kafka_endpoint = kafka_proxy.kafka_endpoint
+ log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint)
else:
self.log.error('kafka-proxy-unavailable')
conf = {'bootstrap.servers': self.kafka_endpoint,
- 'group.id': "mygroup",
- 'session.timeout.ms': 60000}
+ 'group.id': "mygroup"}
logger = logging.getLogger('openolt-kafka-consumer')
logger.setLevel(logging.DEBUG)
@@ -53,18 +52,19 @@
# Subscribe to topics
log.debug('subscribe to topics', topics=topics)
- self._c.subscribe(list(topics))
+ self.topics = list(topics)
+ self._c.subscribe(self.topics)
def read(self, callback):
# Read messages from Kafka and hand it to to callback
try:
while True:
- log.debug('polling kafka for alarms')
+ log.debug('polling kafka for messages', topics=self.topics)
msg = self._c.poll(timeout=1.0)
if msg is None:
continue
elif not msg.error():
- print(msg.value())
+ log.debug('got a kafka message', topic=msg.topic())
callback(msg.value())
elif msg.error().code() == KafkaError._PARTITION_EOF:
pass
@@ -90,7 +90,8 @@
"""
Usage:
python openolt_kafka_consumer.py $(kubectl get pod -o wide \
- | grep cord-kafka-0 | awk '{print $6}'):9092 foo voltha.heartbeat
+ | grep cord-kafka-0 | awk '{print $6}'):9092 \
+ mygroup openolt.ind.olt openolt.ind.pkt
"""
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 3:
@@ -102,8 +103,7 @@
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker,
- 'group.id': group,
- 'session.timeout.ms': 60000}
+ 'group.id': group}
logger = logging.getLogger('openolt-kafka-consumer')
logger.setLevel(logging.DEBUG)
@@ -117,11 +117,8 @@
# c = Consumer(conf, logger=logger, debug='fetch')
c = Consumer(conf, logger=logger)
- def print_assignment(consumer, partitions):
- print('Assignment:', partitions)
-
# Subscribe to topics
- c.subscribe(topics, on_assign=print_assignment)
+ c.subscribe(topics)
# Read messages from Kafka, print to stdout
try:
@@ -130,6 +127,7 @@
if msg is None:
continue
elif not msg.error():
+ print('got a kafka message, topic: {0}'.format(msg.topic()))
print(msg.value())
elif msg.error().code() == KafkaError._PARTITION_EOF:
# print('End of partition reached {0}/{1}'
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index b615b19..0fba534 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -15,9 +15,9 @@
#
-
from confluent_kafka import Producer as _kafkaProducer
from structlog import get_logger
+from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.threads import deferToThread
from zope.interface import implementer
@@ -25,6 +25,8 @@
from common.utils.consulhelpers import get_endpoint_from_consul
from voltha.northbound.kafka.event_bus_publisher import EventBusPublisher
from voltha.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
log = get_logger()
@@ -32,11 +34,11 @@
@implementer(IComponent)
class KafkaProxy(object):
"""
- This is a singleton proxy kafka class to hide the kafka client details.
-
- Removed the references to the afkak libraries and added confluent-kafka-python as producer.
- The required the adding in of twisted.internet.threads.deferToThread call to avoid any potential
- blocking the the producer code
+ This is a singleton proxy kafka class to hide the kafka client details. This
+ proxy uses confluent-kafka-python as the kafka client. Since that client is
+ not a Twisted client then requests to that client are wrapped with
+ twisted.internet.threads.deferToThread to avoid any potential blocking of
+ the Twisted loop.
"""
_kafka_instance = None
@@ -45,6 +47,7 @@
kafka_endpoint='localhost:9092',
ack_timeout=1000,
max_req_attempts=10,
+ consumer_poll_timeout=10,
config={}):
# return an exception if the object already exist
@@ -62,6 +65,10 @@
self.event_bus_publisher = None
self.stopping = False
self.faulty = False
+ self.consumer_poll_timeout = consumer_poll_timeout
+ self.topic_consumer_map = {}
+ self.topic_callbacks_map = {}
+ self.topic_any_map_lock = threading.Lock()
log.debug('initialized', endpoint=kafka_endpoint)
@inlineCallbacks
@@ -80,6 +87,7 @@
def stop(self):
try:
log.debug('stopping-kafka-proxy')
+ self.stopping = True
try:
if self.kclient:
yield self.kclient.close()
@@ -87,23 +95,36 @@
log.debug('stopped-kclient-kafka-proxy')
except Exception, e:
log.exception('failed-stopped-kclient-kafka-proxy', e=e)
- pass
try:
if self.kproducer:
- yield self.kproducer.stop()
+ yield self.kproducer.flush()
self.kproducer = None
log.debug('stopped-kproducer-kafka-proxy')
except Exception, e:
log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
- pass
- #try:
+ # Stop all consumers
+ try:
+ self.topic_any_map_lock.acquire()
+ log.debug('stopping-consumers-kafka-proxy')
+ for _, c in self.topic_consumer_map.iteritems():
+ yield deferToThread(c.close)
+ self.topic_consumer_map.clear()
+ self.topic_callbacks_map.clear()
+ log.debug('stopped-consumers-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+ finally:
+ self.topic_any_map_lock.release()
+ log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+ # try:
# if self.event_bus_publisher:
# yield self.event_bus_publisher.stop()
# self.event_bus_publisher = None
# log.debug('stopped-event-bus-publisher-kafka-proxy')
- #except Exception, e:
+ # except Exception, e:
# log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
# pass
@@ -112,7 +133,7 @@
except Exception, e:
self.kclient = None
self.kproducer = None
- #self.event_bus_publisher = None
+ # self.event_bus_publisher = None
log.exception('failed-stopped-kafka-proxy', e=e)
pass
@@ -123,7 +144,8 @@
if self.kafka_endpoint.startswith('@'):
try:
_k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
- self.kafka_endpoint[1:])
+ self.kafka_endpoint[
+ 1:])
log.debug('found-kafka-service', endpoint=_k_endpoint)
except Exception as e:
@@ -135,14 +157,123 @@
else:
_k_endpoint = self.kafka_endpoint
self.kproducer = _kafkaProducer(
- {'bootstrap.servers' :_k_endpoint}
- )
+ {'bootstrap.servers': _k_endpoint,
+ }
+ )
pass
except Exception, e:
log.exception('failed-get-kafka-producer', e=e)
return
@inlineCallbacks
+ def _wait_for_messages(self, consumer, topic):
+ while True:
+ try:
+ msg = yield deferToThread(consumer.poll,
+ self.consumer_poll_timeout)
+
+ if self.stopping:
+ log.debug("stop-request-recieved", topic=topic)
+ break
+
+ if msg is None:
+ continue
+ if msg.error():
+ # This typically is received when there are no more messages
+ # to read from kafka. Ignore.
+ continue
+
+ # Invoke callbacks
+ for cb in self.topic_callbacks_map[topic]:
+ yield cb(msg)
+ except Exception as e:
+ log.debug("exception-receiving-msg", topic=topic, e=e)
+
+ @inlineCallbacks
+ def subscribe(self, topic, callback, groupId, offset='latest'):
+ """
+ subscribe allows a caller to subscribe to a given kafka topic. This API
+ always create a group consumer.
+ :param topic - the topic to subscribe to
+ :param callback - the callback to invoke whenever a message is received
+ on that topic
+ :param groupId - the groupId for this consumer. In the current
+ implementation there is a one-to-one mapping between a topic and a
+ groupId. In other words, once a groupId is used for a given topic then
+ we won't be able to create another groupId for the same topic.
+ :param offset: the kafka offset from where the consumer will start
+ consuming messages
+ """
+ try:
+ self.topic_any_map_lock.acquire()
+ if topic in self.topic_consumer_map:
+ # Just add the callback
+ if topic in self.topic_callbacks_map:
+ self.topic_callbacks_map[topic].append(callback)
+ else:
+ self.topic_callbacks_map[topic] = [callback]
+ return
+
+ # Create consumer for that topic
+ c = Consumer({
+ 'bootstrap.servers': self.kafka_endpoint,
+ 'group.id': groupId,
+ 'auto.offset.reset': offset
+ })
+ yield deferToThread(c.subscribe, [topic])
+ # c.subscribe([topic])
+ self.topic_consumer_map[topic] = c
+ self.topic_callbacks_map[topic] = [callback]
+ # Start the consumer
+ reactor.callLater(0, self._wait_for_messages, c, topic)
+ except Exception, e:
+ log.exception("topic-subscription-error", e=e)
+ finally:
+ self.topic_any_map_lock.release()
+
+ @inlineCallbacks
+ def unsubscribe(self, topic, callback):
+ """
+ Unsubscribe to a given topic. Since there they be multiple callers
+ consuming from the same topic then to ensure only the relevant caller
+ gets unsubscribe then the callback is used as a differentiator. The
+ kafka consumer will be closed when there are no callbacks required.
+ :param topic: topic to unsubscribe
+ :param callback: callback the caller used when subscribing to the topic.
+ If multiple callers have subscribed to a topic using the same callback
+ then the first callback on the list will be removed.
+ :return:None
+ """
+ try:
+ self.topic_any_map_lock.acquire()
+ log.debug("unsubscribing-to-topic", topic=topic)
+ if topic in self.topic_callbacks_map:
+ index = 0
+ for cb in self.topic_callbacks_map[topic]:
+ if cb == callback:
+ break
+ index += 1
+ if index < len(self.topic_callbacks_map[topic]):
+ self.topic_callbacks_map[topic].pop(index)
+
+ if len(self.topic_callbacks_map[topic]) == 0:
+ # Stop the consumer
+ if topic in self.topic_consumer_map:
+ yield deferToThread(
+ self.topic_consumer_map[topic].close)
+ del self.topic_consumer_map[topic]
+ del self.topic_callbacks_map[topic]
+ log.debug("unsubscribed-to-topic", topic=topic)
+ else:
+ log.debug("consumers-for-topic-still-exist", topic=topic,
+ num=len(self.topic_callbacks_map[topic]))
+ except Exception, e:
+ log.exception("topic-unsubscription-error", e=e)
+ finally:
+ self.topic_any_map_lock.release()
+ log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+ @inlineCallbacks
def send_message(self, topic, msg, key=None):
assert topic is not None
assert msg is not None
@@ -157,14 +288,15 @@
self._get_kafka_producer()
# Lets the next message request do the retry if still a failure
if self.kproducer is None:
- log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ log.error('no-kafka-producer',
+ endpoint=self.kafka_endpoint)
return
log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
msgs = [msg]
if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
- d = deferToThread(self.kproducer.produce, topic, msg, key)
+ d = deferToThread(self.kproducer.produce, topic, msg, key)
yield d
log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
# send a lightweight poll to avoid an exception after 100k messages.
@@ -175,7 +307,8 @@
except Exception, e:
self.faulty = True
- log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg, e=e)
+ log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+ e=e)
# set the kafka producer to None. This is needed if the
# kafka docker went down and comes back up with a different
@@ -203,4 +336,3 @@
# Common method to get the singleton instance of the kafka proxy class
def get_kafka_proxy():
return KafkaProxy._kafka_instance
-