Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 1 | #!/usr/bin/env python |
Zack Williams | 41513bf | 2018-07-07 20:08:35 -0700 | [diff] [blame] | 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 15 | # -*- coding: utf-8 -*- |
| 16 | # Copyright (C) 2015 Cyan, Inc. |
| 17 | |
| 18 | import logging |
| 19 | from argparse import ArgumentParser |
| 20 | |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 21 | from afkak.client import KafkaClient |
| 22 | from afkak.common import ( |
| 23 | KafkaUnavailableError, |
| 24 | OFFSET_LATEST) |
| 25 | from afkak.consumer import Consumer |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 26 | from twisted.internet import reactor |
| 27 | from twisted.internet.defer import DeferredList, inlineCallbacks |
| 28 | from twisted.python.failure import Failure |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 29 | |
| 30 | from common.utils.consulhelpers import get_endpoint_from_consul |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 31 | |
| 32 | log = logging.getLogger(__name__) |
| 33 | |
| 34 | |
| 35 | class ConsumerExample(object): |
Zsolt Haraszti | b5d72f1 | 2017-01-15 20:44:02 -0800 | [diff] [blame] | 36 | def __init__(self, consul_endpoint, topic="voltha.heartbeat", runtime=60): |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 37 | self.topic = topic |
| 38 | self.runtime = runtime |
| 39 | self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint, |
| 40 | 'kafka') |
| 41 | |
| 42 | self._client = KafkaClient(self.kafka_endpoint) |
| 43 | self._consumer_list = [] # List of consumers |
| 44 | # List of deferred returned from consumers' start() methods |
| 45 | self._consumer_d_list = [] |
| 46 | |
| 47 | @inlineCallbacks |
| 48 | def start(self): |
| 49 | partitions = [] |
| 50 | try: |
| 51 | while not partitions: |
| 52 | yield self._client.load_metadata_for_topics(self.topic) |
| 53 | e = self._client.metadata_error_for_topic(self.topic) |
| 54 | if e: |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 55 | log.warning('no-metadata-for-topic', error=e, |
| 56 | topic=self.topic) |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 57 | else: |
| 58 | partitions = self._client.topic_partitions[self.topic] |
| 59 | except KafkaUnavailableError: |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 60 | log.error("unable-to-communicate-with-Kafka-brokers") |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 61 | self.stop() |
| 62 | |
| 63 | def _note_consumer_stopped(result, consumer): |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 64 | log.info('consumer-stopped', consumer=consumer, |
| 65 | result=result) |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 66 | |
| 67 | for partition in partitions: |
| 68 | c = Consumer(self._client, self.topic, partition, |
| 69 | self.msg_processor) |
| 70 | self._consumer_list.append(c) |
Khen Nursimulu | 9b9f1ad | 2017-01-10 15:43:32 -0500 | [diff] [blame] | 71 | # log.info('consumer-started', topic=self.topic, partition=partition) |
Khen Nursimulu | 37a9bf8 | 2016-10-16 20:11:31 -0400 | [diff] [blame] | 72 | d = c.start(OFFSET_LATEST) |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 73 | d.addBoth(_note_consumer_stopped, c) |
| 74 | self._consumer_d_list.append(d) |
| 75 | |
| 76 | # Stop ourselves after we've run the allotted time |
| 77 | reactor.callLater(self.runtime, self.stop) |
| 78 | |
| 79 | def stop(self): |
| 80 | log.info("\n") |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 81 | log.info('end-of-execution-stopping-consumers') |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 82 | # Ask each of our consumers to stop. When a consumer fully stops, it |
| 83 | # fires the deferred returned from its start() method. We saved all |
| 84 | # those deferreds away (above, in start()) in self._consumer_d_list, |
| 85 | # so now we'll use a DeferredList to wait for all of them... |
| 86 | for consumer in self._consumer_list: |
| 87 | consumer.stop() |
| 88 | dl = DeferredList(self._consumer_d_list) |
| 89 | |
| 90 | # Once the consumers are all stopped, then close our client |
| 91 | def _stop_client(result): |
| 92 | if isinstance(result, Failure): |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 93 | log.error('error', result=result) |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 94 | else: |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 95 | log.info('all-consumers-stopped', client=self._client) |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 96 | self._client.close() |
| 97 | return result |
| 98 | |
| 99 | dl.addBoth(_stop_client) |
| 100 | |
| 101 | # And once the client is shutdown, stop the reactor |
| 102 | def _stop_reactor(result): |
| 103 | reactor.stop() |
| 104 | return result |
| 105 | |
| 106 | dl.addBoth(_stop_reactor) |
| 107 | |
| 108 | def msg_processor(self, consumer, msglist): |
| 109 | for msg in msglist: |
Khen Nursimulu | 9b9f1ad | 2017-01-10 15:43:32 -0500 | [diff] [blame] | 110 | log.info(msg) |
| 111 | |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 112 | |
| 113 | def parse_options(): |
| 114 | parser = ArgumentParser("Consume kafka messages") |
| 115 | parser.add_argument("-c", "--consul", |
| 116 | help="consul ip and port", |
| 117 | default='10.100.198.220:8500') |
| 118 | |
| 119 | parser.add_argument("-t", "--topic", |
| 120 | help="topic to listen from", |
Zsolt Haraszti | b5d72f1 | 2017-01-15 20:44:02 -0800 | [diff] [blame] | 121 | default="voltha.heartbeat") |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 122 | |
Khen Nursimulu | 37a9bf8 | 2016-10-16 20:11:31 -0400 | [diff] [blame] | 123 | parser.add_argument("-r", "--runtime", |
| 124 | help="total runtime", |
| 125 | default=1000) |
| 126 | |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 127 | return parser.parse_args() |
| 128 | |
| 129 | def main(): |
| 130 | logging.basicConfig( |
| 131 | format='%(asctime)s:%(name)s:' + |
| 132 | '%(levelname)s:%(process)d:%(message)s', |
| 133 | level=logging.INFO |
| 134 | ) |
Khen Nursimulu | 9b9f1ad | 2017-01-10 15:43:32 -0500 | [diff] [blame] | 135 | |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 136 | args = parse_options() |
| 137 | |
Khen Nursimulu | 37a9bf8 | 2016-10-16 20:11:31 -0400 | [diff] [blame] | 138 | consumer_example = ConsumerExample(args.consul, args.topic, |
| 139 | int(args.runtime)) |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 140 | reactor.callWhenRunning(consumer_example.start) |
| 141 | reactor.run() |
Khen Nursimulu | 90fc35d | 2017-01-09 08:42:04 -0500 | [diff] [blame] | 142 | log.info("completed!") |
Khen Nursimulu | da11dd7 | 2016-10-05 17:42:36 -0700 | [diff] [blame] | 143 | |
| 144 | |
| 145 | if __name__ == "__main__": |
| 146 | main() |