blob: 0b45f6f795e525e89542f3511f965e8a58d0e0cf [file] [log] [blame]
Khen Nursimuluda11dd72016-10-05 17:42:36 -07001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright (C) 2015 Cyan, Inc.
4
5import logging
6from argparse import ArgumentParser
7
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07008from afkak.client import KafkaClient
9from afkak.common import (
10 KafkaUnavailableError,
11 OFFSET_LATEST)
12from afkak.consumer import Consumer
Khen Nursimuluda11dd72016-10-05 17:42:36 -070013from twisted.internet import reactor
14from twisted.internet.defer import DeferredList, inlineCallbacks
15from twisted.python.failure import Failure
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070016
17from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimuluda11dd72016-10-05 17:42:36 -070018
19log = logging.getLogger(__name__)
20
21
22class ConsumerExample(object):
Zsolt Harasztib5d72f12017-01-15 20:44:02 -080023 def __init__(self, consul_endpoint, topic="voltha.heartbeat", runtime=60):
Khen Nursimuluda11dd72016-10-05 17:42:36 -070024 self.topic = topic
25 self.runtime = runtime
26 self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
27 'kafka')
28
29 self._client = KafkaClient(self.kafka_endpoint)
30 self._consumer_list = [] # List of consumers
31 # List of deferred returned from consumers' start() methods
32 self._consumer_d_list = []
33
34 @inlineCallbacks
35 def start(self):
36 partitions = []
37 try:
38 while not partitions:
39 yield self._client.load_metadata_for_topics(self.topic)
40 e = self._client.metadata_error_for_topic(self.topic)
41 if e:
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050042 log.warning('no-metadata-for-topic', error=e,
43 topic=self.topic)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070044 else:
45 partitions = self._client.topic_partitions[self.topic]
46 except KafkaUnavailableError:
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050047 log.error("unable-to-communicate-with-Kafka-brokers")
Khen Nursimuluda11dd72016-10-05 17:42:36 -070048 self.stop()
49
50 def _note_consumer_stopped(result, consumer):
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050051 log.info('consumer-stopped', consumer=consumer,
52 result=result)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070053
54 for partition in partitions:
55 c = Consumer(self._client, self.topic, partition,
56 self.msg_processor)
57 self._consumer_list.append(c)
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -050058 # log.info('consumer-started', topic=self.topic, partition=partition)
Khen Nursimulu37a9bf82016-10-16 20:11:31 -040059 d = c.start(OFFSET_LATEST)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070060 d.addBoth(_note_consumer_stopped, c)
61 self._consumer_d_list.append(d)
62
63 # Stop ourselves after we've run the allotted time
64 reactor.callLater(self.runtime, self.stop)
65
66 def stop(self):
67 log.info("\n")
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050068 log.info('end-of-execution-stopping-consumers')
Khen Nursimuluda11dd72016-10-05 17:42:36 -070069 # Ask each of our consumers to stop. When a consumer fully stops, it
70 # fires the deferred returned from its start() method. We saved all
71 # those deferreds away (above, in start()) in self._consumer_d_list,
72 # so now we'll use a DeferredList to wait for all of them...
73 for consumer in self._consumer_list:
74 consumer.stop()
75 dl = DeferredList(self._consumer_d_list)
76
77 # Once the consumers are all stopped, then close our client
78 def _stop_client(result):
79 if isinstance(result, Failure):
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050080 log.error('error', result=result)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070081 else:
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050082 log.info('all-consumers-stopped', client=self._client)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070083 self._client.close()
84 return result
85
86 dl.addBoth(_stop_client)
87
88 # And once the client is shutdown, stop the reactor
89 def _stop_reactor(result):
90 reactor.stop()
91 return result
92
93 dl.addBoth(_stop_reactor)
94
95 def msg_processor(self, consumer, msglist):
96 for msg in msglist:
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -050097 log.info(msg)
98
Khen Nursimuluda11dd72016-10-05 17:42:36 -070099
100def parse_options():
101 parser = ArgumentParser("Consume kafka messages")
102 parser.add_argument("-c", "--consul",
103 help="consul ip and port",
104 default='10.100.198.220:8500')
105
106 parser.add_argument("-t", "--topic",
107 help="topic to listen from",
Zsolt Harasztib5d72f12017-01-15 20:44:02 -0800108 default="voltha.heartbeat")
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700109
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400110 parser.add_argument("-r", "--runtime",
111 help="total runtime",
112 default=1000)
113
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700114 return parser.parse_args()
115
116def main():
117 logging.basicConfig(
118 format='%(asctime)s:%(name)s:' +
119 '%(levelname)s:%(process)d:%(message)s',
120 level=logging.INFO
121 )
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -0500122
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700123 args = parse_options()
124
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400125 consumer_example = ConsumerExample(args.consul, args.topic,
126 int(args.runtime))
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700127 reactor.callWhenRunning(consumer_example.start)
128 reactor.run()
Khen Nursimulu90fc35d2017-01-09 08:42:04 -0500129 log.info("completed!")
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700130
131
132if __name__ == "__main__":
133 main()