Integration test for the build.md file
Change-Id: I93d25f1563636696a5f71471a06056eafee3e9e2
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
index c437258..5be6c39 100644
--- a/kafka/kafka-consumer.py
+++ b/kafka/kafka-consumer.py
@@ -10,8 +10,11 @@
from twisted.python.failure import Failure
from afkak.client import KafkaClient
from afkak.consumer import Consumer
-from afkak.common import KafkaUnavailableError
from voltha.consulhelpers import get_endpoint_from_consul
+from afkak.common import (
+ KafkaUnavailableError,
+ OFFSET_EARLIEST,
+ OFFSET_LATEST)
log = logging.getLogger(__name__)
@@ -51,7 +54,7 @@
c = Consumer(self._client, self.topic, partition,
self.msg_processor)
self._consumer_list.append(c)
- d = c.start(0)
+ d = c.start(OFFSET_LATEST)
d.addBoth(_note_consumer_stopped, c)
self._consumer_d_list.append(d)
@@ -102,6 +105,10 @@
help="topic to listen from",
default='voltha-heartbeat')
+ parser.add_argument("-r", "--runtime",
+ help="total runtime",
+ default=1000)
+
return parser.parse_args()
def main():
@@ -112,9 +119,8 @@
)
args = parse_options()
- consul_endpoint = args.consul
- topic = args.topic
- consumer_example = ConsumerExample(consul_endpoint, topic, runtime=1000)
+ consumer_example = ConsumerExample(args.consul, args.topic,
+ int(args.runtime))
reactor.callWhenRunning(consumer_example.start)
reactor.run()
log.info("All Done!")