blob: 5be6c3983f97b4312e918d3d465584ce10ea0929 [file] [log] [blame]
Khen Nursimuluda11dd72016-10-05 17:42:36 -07001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright (C) 2015 Cyan, Inc.
4
5import logging
6from argparse import ArgumentParser
7
8from twisted.internet import reactor
9from twisted.internet.defer import DeferredList, inlineCallbacks
10from twisted.python.failure import Failure
11from afkak.client import KafkaClient
12from afkak.consumer import Consumer
Khen Nursimuluda11dd72016-10-05 17:42:36 -070013from voltha.consulhelpers import get_endpoint_from_consul
Khen Nursimulu37a9bf82016-10-16 20:11:31 -040014from afkak.common import (
15 KafkaUnavailableError,
16 OFFSET_EARLIEST,
17 OFFSET_LATEST)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070018
19log = logging.getLogger(__name__)
20
21
22class ConsumerExample(object):
23 def __init__(self, consul_endpoint, topic='voltha-heartbeat', runtime=60):
24 self.topic = topic
25 self.runtime = runtime
26 self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
27 'kafka')
28
29 self._client = KafkaClient(self.kafka_endpoint)
30 self._consumer_list = [] # List of consumers
31 # List of deferred returned from consumers' start() methods
32 self._consumer_d_list = []
33
34 @inlineCallbacks
35 def start(self):
36 partitions = []
37 try:
38 while not partitions:
39 yield self._client.load_metadata_for_topics(self.topic)
40 e = self._client.metadata_error_for_topic(self.topic)
41 if e:
42 log.warning("Error: %r getting metadata for topic: %s",
43 e, self.topic)
44 else:
45 partitions = self._client.topic_partitions[self.topic]
46 except KafkaUnavailableError:
47 log.error("Unable to communicate with any Kafka brokers")
48 self.stop()
49
50 def _note_consumer_stopped(result, consumer):
51 log.info("Consumer: %r stopped with result: %r", consumer, result)
52
53 for partition in partitions:
54 c = Consumer(self._client, self.topic, partition,
55 self.msg_processor)
56 self._consumer_list.append(c)
Khen Nursimulu37a9bf82016-10-16 20:11:31 -040057 d = c.start(OFFSET_LATEST)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070058 d.addBoth(_note_consumer_stopped, c)
59 self._consumer_d_list.append(d)
60
61 # Stop ourselves after we've run the allotted time
62 reactor.callLater(self.runtime, self.stop)
63
64 def stop(self):
65 log.info("\n")
66 log.info("Time is up, stopping consumers...")
67 # Ask each of our consumers to stop. When a consumer fully stops, it
68 # fires the deferred returned from its start() method. We saved all
69 # those deferreds away (above, in start()) in self._consumer_d_list,
70 # so now we'll use a DeferredList to wait for all of them...
71 for consumer in self._consumer_list:
72 consumer.stop()
73 dl = DeferredList(self._consumer_d_list)
74
75 # Once the consumers are all stopped, then close our client
76 def _stop_client(result):
77 if isinstance(result, Failure):
78 log.error("Error stopping consumers: %r", result)
79 else:
80 log.info("All consumers stopped. Stopping client: %r",
81 self._client)
82 self._client.close()
83 return result
84
85 dl.addBoth(_stop_client)
86
87 # And once the client is shutdown, stop the reactor
88 def _stop_reactor(result):
89 reactor.stop()
90 return result
91
92 dl.addBoth(_stop_reactor)
93
94 def msg_processor(self, consumer, msglist):
95 for msg in msglist:
96 log.info("proc: msg: %r", msg)
97
98def parse_options():
99 parser = ArgumentParser("Consume kafka messages")
100 parser.add_argument("-c", "--consul",
101 help="consul ip and port",
102 default='10.100.198.220:8500')
103
104 parser.add_argument("-t", "--topic",
105 help="topic to listen from",
106 default='voltha-heartbeat')
107
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400108 parser.add_argument("-r", "--runtime",
109 help="total runtime",
110 default=1000)
111
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700112 return parser.parse_args()
113
114def main():
115 logging.basicConfig(
116 format='%(asctime)s:%(name)s:' +
117 '%(levelname)s:%(process)d:%(message)s',
118 level=logging.INFO
119 )
120 args = parse_options()
121
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400122 consumer_example = ConsumerExample(args.consul, args.topic,
123 int(args.runtime))
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700124 reactor.callWhenRunning(consumer_example.start)
125 reactor.run()
126 log.info("All Done!")
127
128
129if __name__ == "__main__":
130 main()