blob: 3f405d8b4189c2145f97bd2f41036736b547c110 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Cyan, Inc.
import logging
from argparse import ArgumentParser
from afkak.client import KafkaClient
from afkak.common import (
KafkaUnavailableError,
OFFSET_LATEST)
from afkak.consumer import Consumer
from twisted.internet import reactor
from twisted.internet.defer import DeferredList, inlineCallbacks
from twisted.python.failure import Failure
from common.utils.consulhelpers import get_endpoint_from_consul
log = logging.getLogger(__name__)
class ConsumerExample(object):
def __init__(self, consul_endpoint, topic="heartbeat.voltha", runtime=60):
self.topic = topic
self.runtime = runtime
self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
'kafka')
self._client = KafkaClient(self.kafka_endpoint)
self._consumer_list = [] # List of consumers
# List of deferred returned from consumers' start() methods
self._consumer_d_list = []
@inlineCallbacks
def start(self):
partitions = []
try:
while not partitions:
yield self._client.load_metadata_for_topics(self.topic)
e = self._client.metadata_error_for_topic(self.topic)
if e:
log.warning('no-metadata-for-topic', error=e,
topic=self.topic)
else:
partitions = self._client.topic_partitions[self.topic]
except KafkaUnavailableError:
log.error("unable-to-communicate-with-Kafka-brokers")
self.stop()
def _note_consumer_stopped(result, consumer):
log.info('consumer-stopped', consumer=consumer,
result=result)
for partition in partitions:
c = Consumer(self._client, self.topic, partition,
self.msg_processor)
self._consumer_list.append(c)
# log.info('consumer-started', topic=self.topic, partition=partition)
d = c.start(OFFSET_LATEST)
d.addBoth(_note_consumer_stopped, c)
self._consumer_d_list.append(d)
# Stop ourselves after we've run the allotted time
reactor.callLater(self.runtime, self.stop)
def stop(self):
log.info("\n")
log.info('end-of-execution-stopping-consumers')
# Ask each of our consumers to stop. When a consumer fully stops, it
# fires the deferred returned from its start() method. We saved all
# those deferreds away (above, in start()) in self._consumer_d_list,
# so now we'll use a DeferredList to wait for all of them...
for consumer in self._consumer_list:
consumer.stop()
dl = DeferredList(self._consumer_d_list)
# Once the consumers are all stopped, then close our client
def _stop_client(result):
if isinstance(result, Failure):
log.error('error', result=result)
else:
log.info('all-consumers-stopped', client=self._client)
self._client.close()
return result
dl.addBoth(_stop_client)
# And once the client is shutdown, stop the reactor
def _stop_reactor(result):
reactor.stop()
return result
dl.addBoth(_stop_reactor)
def msg_processor(self, consumer, msglist):
for msg in msglist:
log.info(msg)
def parse_options():
parser = ArgumentParser("Consume kafka messages")
parser.add_argument("-c", "--consul",
help="consul ip and port",
default='10.100.198.220:8500')
parser.add_argument("-t", "--topic",
help="topic to listen from",
default="heartbeat.voltha")
parser.add_argument("-r", "--runtime",
help="total runtime",
default=1000)
return parser.parse_args()
def main():
logging.basicConfig(
format='%(asctime)s:%(name)s:' +
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
args = parse_options()
consumer_example = ConsumerExample(args.consul, args.topic,
int(args.runtime))
reactor.callWhenRunning(consumer_example.start)
reactor.run()
log.info("completed!")
if __name__ == "__main__":
main()