blob: c4372586b7c34522435c33e9157823b681c5ceaa [file] [log] [blame]
Khen Nursimuluda11dd72016-10-05 17:42:36 -07001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright (C) 2015 Cyan, Inc.
4
5import logging
6from argparse import ArgumentParser
7
8from twisted.internet import reactor
9from twisted.internet.defer import DeferredList, inlineCallbacks
10from twisted.python.failure import Failure
11from afkak.client import KafkaClient
12from afkak.consumer import Consumer
13from afkak.common import KafkaUnavailableError
14from voltha.consulhelpers import get_endpoint_from_consul
15
16log = logging.getLogger(__name__)
17
18
19class ConsumerExample(object):
20 def __init__(self, consul_endpoint, topic='voltha-heartbeat', runtime=60):
21 self.topic = topic
22 self.runtime = runtime
23 self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
24 'kafka')
25
26 self._client = KafkaClient(self.kafka_endpoint)
27 self._consumer_list = [] # List of consumers
28 # List of deferred returned from consumers' start() methods
29 self._consumer_d_list = []
30
31 @inlineCallbacks
32 def start(self):
33 partitions = []
34 try:
35 while not partitions:
36 yield self._client.load_metadata_for_topics(self.topic)
37 e = self._client.metadata_error_for_topic(self.topic)
38 if e:
39 log.warning("Error: %r getting metadata for topic: %s",
40 e, self.topic)
41 else:
42 partitions = self._client.topic_partitions[self.topic]
43 except KafkaUnavailableError:
44 log.error("Unable to communicate with any Kafka brokers")
45 self.stop()
46
47 def _note_consumer_stopped(result, consumer):
48 log.info("Consumer: %r stopped with result: %r", consumer, result)
49
50 for partition in partitions:
51 c = Consumer(self._client, self.topic, partition,
52 self.msg_processor)
53 self._consumer_list.append(c)
54 d = c.start(0)
55 d.addBoth(_note_consumer_stopped, c)
56 self._consumer_d_list.append(d)
57
58 # Stop ourselves after we've run the allotted time
59 reactor.callLater(self.runtime, self.stop)
60
61 def stop(self):
62 log.info("\n")
63 log.info("Time is up, stopping consumers...")
64 # Ask each of our consumers to stop. When a consumer fully stops, it
65 # fires the deferred returned from its start() method. We saved all
66 # those deferreds away (above, in start()) in self._consumer_d_list,
67 # so now we'll use a DeferredList to wait for all of them...
68 for consumer in self._consumer_list:
69 consumer.stop()
70 dl = DeferredList(self._consumer_d_list)
71
72 # Once the consumers are all stopped, then close our client
73 def _stop_client(result):
74 if isinstance(result, Failure):
75 log.error("Error stopping consumers: %r", result)
76 else:
77 log.info("All consumers stopped. Stopping client: %r",
78 self._client)
79 self._client.close()
80 return result
81
82 dl.addBoth(_stop_client)
83
84 # And once the client is shutdown, stop the reactor
85 def _stop_reactor(result):
86 reactor.stop()
87 return result
88
89 dl.addBoth(_stop_reactor)
90
91 def msg_processor(self, consumer, msglist):
92 for msg in msglist:
93 log.info("proc: msg: %r", msg)
94
95def parse_options():
96 parser = ArgumentParser("Consume kafka messages")
97 parser.add_argument("-c", "--consul",
98 help="consul ip and port",
99 default='10.100.198.220:8500')
100
101 parser.add_argument("-t", "--topic",
102 help="topic to listen from",
103 default='voltha-heartbeat')
104
105 return parser.parse_args()
106
107def main():
108 logging.basicConfig(
109 format='%(asctime)s:%(name)s:' +
110 '%(levelname)s:%(process)d:%(message)s',
111 level=logging.INFO
112 )
113 args = parse_options()
114
115 consul_endpoint = args.consul
116 topic = args.topic
117 consumer_example = ConsumerExample(consul_endpoint, topic, runtime=1000)
118 reactor.callWhenRunning(consumer_example.start)
119 reactor.run()
120 log.info("All Done!")
121
122
123if __name__ == "__main__":
124 main()