This commit consists of:
1) Change kafka/zookeeper ports mapping to allow for scaling via
docker-compose
2) Use the afkak consumer example when subscribing to kafka messages
3) Remove dependency on kafka-python
4) Update the Build.md file with instructions on how to listen to kafka
messages
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
new file mode 100644
index 0000000..c437258
--- /dev/null
+++ b/kafka/kafka-consumer.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2015 Cyan, Inc.
+
+import logging
+from argparse import ArgumentParser
+
+from twisted.internet import reactor
+from twisted.internet.defer import DeferredList, inlineCallbacks
+from twisted.python.failure import Failure
+from afkak.client import KafkaClient
+from afkak.consumer import Consumer
+from afkak.common import KafkaUnavailableError
+from voltha.consulhelpers import get_endpoint_from_consul
+
+log = logging.getLogger(__name__)
+
+
+class ConsumerExample(object):
+ def __init__(self, consul_endpoint, topic='voltha-heartbeat', runtime=60):
+ self.topic = topic
+ self.runtime = runtime
+ self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
+ 'kafka')
+
+ self._client = KafkaClient(self.kafka_endpoint)
+ self._consumer_list = [] # List of consumers
+ # List of deferred returned from consumers' start() methods
+ self._consumer_d_list = []
+
+ @inlineCallbacks
+ def start(self):
+ partitions = []
+ try:
+ while not partitions:
+ yield self._client.load_metadata_for_topics(self.topic)
+ e = self._client.metadata_error_for_topic(self.topic)
+ if e:
+ log.warning("Error: %r getting metadata for topic: %s",
+ e, self.topic)
+ else:
+ partitions = self._client.topic_partitions[self.topic]
+ except KafkaUnavailableError:
+ log.error("Unable to communicate with any Kafka brokers")
+ self.stop()
+
+ def _note_consumer_stopped(result, consumer):
+ log.info("Consumer: %r stopped with result: %r", consumer, result)
+
+ for partition in partitions:
+ c = Consumer(self._client, self.topic, partition,
+ self.msg_processor)
+ self._consumer_list.append(c)
+ d = c.start(0)
+ d.addBoth(_note_consumer_stopped, c)
+ self._consumer_d_list.append(d)
+
+ # Stop ourselves after we've run the allotted time
+ reactor.callLater(self.runtime, self.stop)
+
+ def stop(self):
+ log.info("\n")
+ log.info("Time is up, stopping consumers...")
+ # Ask each of our consumers to stop. When a consumer fully stops, it
+ # fires the deferred returned from its start() method. We saved all
+ # those deferreds away (above, in start()) in self._consumer_d_list,
+ # so now we'll use a DeferredList to wait for all of them...
+ for consumer in self._consumer_list:
+ consumer.stop()
+ dl = DeferredList(self._consumer_d_list)
+
+ # Once the consumers are all stopped, then close our client
+ def _stop_client(result):
+ if isinstance(result, Failure):
+ log.error("Error stopping consumers: %r", result)
+ else:
+ log.info("All consumers stopped. Stopping client: %r",
+ self._client)
+ self._client.close()
+ return result
+
+ dl.addBoth(_stop_client)
+
+ # And once the client is shutdown, stop the reactor
+ def _stop_reactor(result):
+ reactor.stop()
+ return result
+
+ dl.addBoth(_stop_reactor)
+
+ def msg_processor(self, consumer, msglist):
+ for msg in msglist:
+ log.info("proc: msg: %r", msg)
+
+def parse_options():
+ parser = ArgumentParser("Consume kafka messages")
+ parser.add_argument("-c", "--consul",
+ help="consul ip and port",
+ default='10.100.198.220:8500')
+
+ parser.add_argument("-t", "--topic",
+ help="topic to listen from",
+ default='voltha-heartbeat')
+
+ return parser.parse_args()
+
+def main():
+ logging.basicConfig(
+ format='%(asctime)s:%(name)s:' +
+ '%(levelname)s:%(process)d:%(message)s',
+ level=logging.INFO
+ )
+ args = parse_options()
+
+ consul_endpoint = args.consul
+ topic = args.topic
+ consumer_example = ConsumerExample(consul_endpoint, topic, runtime=1000)
+ reactor.callWhenRunning(consumer_example.start)
+ reactor.run()
+ log.info("All Done!")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/kafka/kafkaConsumer.py b/kafka/kafkaConsumer.py
deleted file mode 100644
index bd49c09..0000000
--- a/kafka/kafkaConsumer.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/usr/bin/env python
-import threading, logging, time
-
-from kafka import KafkaConsumer
-
-class Consumer(threading.Thread):
- daemon = True
-
- def run(self):
- consumer = KafkaConsumer(bootstrap_servers='10.100.198.220:9092',
- auto_offset_reset='earliest')
- consumer.subscribe(['voltha-heartbeat'])
-
- for message in consumer:
- print (message)
-
-
-def main():
- threads = [
- Consumer()
- ]
-
- for t in threads:
- t.start()
-
- time.sleep(3000)
-
-if __name__ == "__main__":
- main()
-