blob: 6b5583573e5d0fcb4b977bd9ff318ce6281d576a [file] [log] [blame]
Khen Nursimuluda11dd72016-10-05 17:42:36 -07001#!/usr/bin/env python
Zack Williams41513bf2018-07-07 20:08:35 -07002# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Khen Nursimuluda11dd72016-10-05 17:42:36 -070015# -*- coding: utf-8 -*-
16# Copyright (C) 2015 Cyan, Inc.
17
18import logging
19from argparse import ArgumentParser
20
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070021from afkak.client import KafkaClient
22from afkak.common import (
23 KafkaUnavailableError,
24 OFFSET_LATEST)
25from afkak.consumer import Consumer
Khen Nursimuluda11dd72016-10-05 17:42:36 -070026from twisted.internet import reactor
27from twisted.internet.defer import DeferredList, inlineCallbacks
28from twisted.python.failure import Failure
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
30from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimuluda11dd72016-10-05 17:42:36 -070031
32log = logging.getLogger(__name__)
33
34
35class ConsumerExample(object):
Zsolt Harasztib5d72f12017-01-15 20:44:02 -080036 def __init__(self, consul_endpoint, topic="voltha.heartbeat", runtime=60):
Khen Nursimuluda11dd72016-10-05 17:42:36 -070037 self.topic = topic
38 self.runtime = runtime
39 self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
40 'kafka')
41
42 self._client = KafkaClient(self.kafka_endpoint)
43 self._consumer_list = [] # List of consumers
44 # List of deferred returned from consumers' start() methods
45 self._consumer_d_list = []
46
47 @inlineCallbacks
48 def start(self):
49 partitions = []
50 try:
51 while not partitions:
52 yield self._client.load_metadata_for_topics(self.topic)
53 e = self._client.metadata_error_for_topic(self.topic)
54 if e:
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050055 log.warning('no-metadata-for-topic', error=e,
56 topic=self.topic)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070057 else:
58 partitions = self._client.topic_partitions[self.topic]
59 except KafkaUnavailableError:
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050060 log.error("unable-to-communicate-with-Kafka-brokers")
Khen Nursimuluda11dd72016-10-05 17:42:36 -070061 self.stop()
62
63 def _note_consumer_stopped(result, consumer):
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050064 log.info('consumer-stopped', consumer=consumer,
65 result=result)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070066
67 for partition in partitions:
68 c = Consumer(self._client, self.topic, partition,
69 self.msg_processor)
70 self._consumer_list.append(c)
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -050071 # log.info('consumer-started', topic=self.topic, partition=partition)
Khen Nursimulu37a9bf82016-10-16 20:11:31 -040072 d = c.start(OFFSET_LATEST)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070073 d.addBoth(_note_consumer_stopped, c)
74 self._consumer_d_list.append(d)
75
76 # Stop ourselves after we've run the allotted time
77 reactor.callLater(self.runtime, self.stop)
78
79 def stop(self):
80 log.info("\n")
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050081 log.info('end-of-execution-stopping-consumers')
Khen Nursimuluda11dd72016-10-05 17:42:36 -070082 # Ask each of our consumers to stop. When a consumer fully stops, it
83 # fires the deferred returned from its start() method. We saved all
84 # those deferreds away (above, in start()) in self._consumer_d_list,
85 # so now we'll use a DeferredList to wait for all of them...
86 for consumer in self._consumer_list:
87 consumer.stop()
88 dl = DeferredList(self._consumer_d_list)
89
90 # Once the consumers are all stopped, then close our client
91 def _stop_client(result):
92 if isinstance(result, Failure):
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050093 log.error('error', result=result)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070094 else:
Khen Nursimulu90fc35d2017-01-09 08:42:04 -050095 log.info('all-consumers-stopped', client=self._client)
Khen Nursimuluda11dd72016-10-05 17:42:36 -070096 self._client.close()
97 return result
98
99 dl.addBoth(_stop_client)
100
101 # And once the client is shutdown, stop the reactor
102 def _stop_reactor(result):
103 reactor.stop()
104 return result
105
106 dl.addBoth(_stop_reactor)
107
108 def msg_processor(self, consumer, msglist):
109 for msg in msglist:
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -0500110 log.info(msg)
111
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700112
113def parse_options():
114 parser = ArgumentParser("Consume kafka messages")
115 parser.add_argument("-c", "--consul",
116 help="consul ip and port",
117 default='10.100.198.220:8500')
118
119 parser.add_argument("-t", "--topic",
120 help="topic to listen from",
Zsolt Harasztib5d72f12017-01-15 20:44:02 -0800121 default="voltha.heartbeat")
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700122
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400123 parser.add_argument("-r", "--runtime",
124 help="total runtime",
125 default=1000)
126
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700127 return parser.parse_args()
128
129def main():
130 logging.basicConfig(
131 format='%(asctime)s:%(name)s:' +
132 '%(levelname)s:%(process)d:%(message)s',
133 level=logging.INFO
134 )
Khen Nursimulu9b9f1ad2017-01-10 15:43:32 -0500135
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700136 args = parse_options()
137
Khen Nursimulu37a9bf82016-10-16 20:11:31 -0400138 consumer_example = ConsumerExample(args.consul, args.topic,
139 int(args.runtime))
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700140 reactor.callWhenRunning(consumer_example.start)
141 reactor.run()
Khen Nursimulu90fc35d2017-01-09 08:42:04 -0500142 log.info("completed!")
Khen Nursimuluda11dd72016-10-05 17:42:36 -0700143
144
145if __name__ == "__main__":
146 main()