This commit consists of:
1) Change kafka/zookeeper ports mapping to allow for scaling via
   docker-compose
2) Use the afkak consumer example when subscribing to kafka messages
3) Remove dependency on kafka-python
4) Update the Build.md file with instructions on how to listen to kafka
   messages
diff --git a/BUILD.md b/BUILD.md
index 98e74a4..06eb438 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -228,6 +228,12 @@
    ```
   
    Voltha sends a periodic log message (heartbeat) to the log stream, in addition to logging major events.  Voltha also sends a periodic heartbeat message to the kafka broker.
+   
+   To subscribe and display to the kafka messages:
+   
+   ```
+   python kafka/kafka-consumer.py
+   ```
 
 
 4. In addition to the docker log stream, Voltha is explicitly hooked up to the fluentd log collector infrastructure. We are not using fluentd to its full potential yet, but establising the connection to fluentd and funelling structured logs to fluentd is already in place. To see the fluentd log stream, you can run:
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index 4fec883..aadeebc 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -6,7 +6,7 @@
   zookeeper:
     image: wurstmeister/zookeeper
     ports:
-      - "2181:2181"
+    - 2181
     environment:
       SERVICE_2181_NAME: "zookeeper"
   #
@@ -15,7 +15,7 @@
   kafka:
     build: ../kafka
     ports:
-      - "9092:9092"
+     - 9092
     environment:
       KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
new file mode 100644
index 0000000..c437258
--- /dev/null
+++ b/kafka/kafka-consumer.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2015 Cyan, Inc.
+
+import logging
+from argparse import ArgumentParser
+
+from twisted.internet import reactor
+from twisted.internet.defer import DeferredList, inlineCallbacks
+from twisted.python.failure import Failure
+from afkak.client import KafkaClient
+from afkak.consumer import Consumer
+from afkak.common import KafkaUnavailableError
+from voltha.consulhelpers import get_endpoint_from_consul
+
+log = logging.getLogger(__name__)
+
+
+class ConsumerExample(object):
+    def __init__(self, consul_endpoint, topic='voltha-heartbeat', runtime=60):
+        self.topic = topic
+        self.runtime = runtime
+        self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
+                                                       'kafka')
+
+        self._client = KafkaClient(self.kafka_endpoint)
+        self._consumer_list = []  # List of consumers
+        # List of deferred returned from consumers' start() methods
+        self._consumer_d_list = []
+
+    @inlineCallbacks
+    def start(self):
+        partitions = []
+        try:
+            while not partitions:
+                yield self._client.load_metadata_for_topics(self.topic)
+                e = self._client.metadata_error_for_topic(self.topic)
+                if e:
+                    log.warning("Error: %r getting metadata for topic: %s",
+                                e, self.topic)
+                else:
+                    partitions = self._client.topic_partitions[self.topic]
+        except KafkaUnavailableError:
+            log.error("Unable to communicate with any Kafka brokers")
+            self.stop()
+
+        def _note_consumer_stopped(result, consumer):
+            log.info("Consumer: %r stopped with result: %r", consumer, result)
+
+        for partition in partitions:
+            c = Consumer(self._client, self.topic, partition,
+                         self.msg_processor)
+            self._consumer_list.append(c)
+            d = c.start(0)
+            d.addBoth(_note_consumer_stopped, c)
+            self._consumer_d_list.append(d)
+
+        # Stop ourselves after we've run the allotted time
+        reactor.callLater(self.runtime, self.stop)
+
+    def stop(self):
+        log.info("\n")
+        log.info("Time is up, stopping consumers...")
+        # Ask each of our consumers to stop. When a consumer fully stops, it
+        # fires the deferred returned from its start() method. We saved all
+        # those deferreds away (above, in start()) in self._consumer_d_list,
+        # so now we'll use a DeferredList to wait for all of them...
+        for consumer in self._consumer_list:
+            consumer.stop()
+        dl = DeferredList(self._consumer_d_list)
+
+        # Once the consumers are all stopped, then close our client
+        def _stop_client(result):
+            if isinstance(result, Failure):
+                log.error("Error stopping consumers: %r", result)
+            else:
+                log.info("All consumers stopped. Stopping client: %r",
+                         self._client)
+            self._client.close()
+            return result
+
+        dl.addBoth(_stop_client)
+
+        # And once the client is shutdown, stop the reactor
+        def _stop_reactor(result):
+            reactor.stop()
+            return result
+
+        dl.addBoth(_stop_reactor)
+
+    def msg_processor(self, consumer, msglist):
+        for msg in msglist:
+            log.info("proc: msg: %r", msg)
+
+def parse_options():
+    parser = ArgumentParser("Consume kafka messages")
+    parser.add_argument("-c", "--consul",
+                        help="consul ip and port",
+                        default='10.100.198.220:8500')
+
+    parser.add_argument("-t", "--topic",
+                        help="topic to listen from",
+                        default='voltha-heartbeat')
+
+    return parser.parse_args()
+
+def main():
+    logging.basicConfig(
+        format='%(asctime)s:%(name)s:' +
+               '%(levelname)s:%(process)d:%(message)s',
+        level=logging.INFO
+    )
+    args = parse_options()
+
+    consul_endpoint = args.consul
+    topic = args.topic
+    consumer_example = ConsumerExample(consul_endpoint, topic, runtime=1000)
+    reactor.callWhenRunning(consumer_example.start)
+    reactor.run()
+    log.info("All Done!")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/kafka/kafkaConsumer.py b/kafka/kafkaConsumer.py
deleted file mode 100644
index bd49c09..0000000
--- a/kafka/kafkaConsumer.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/usr/bin/env python
-import threading, logging, time
-
-from kafka import KafkaConsumer
-
-class Consumer(threading.Thread):
-    daemon = True
-
-    def run(self):
-        consumer = KafkaConsumer(bootstrap_servers='10.100.198.220:9092',
-                                 auto_offset_reset='earliest')
-        consumer.subscribe(['voltha-heartbeat'])
-
-        for message in consumer:
-            print (message)
-
-
-def main():
-    threads = [
-        Consumer()
-    ]
-
-    for t in threads:
-        t.start()
-
-    time.sleep(3000)
-
-if __name__ == "__main__":
-    main()
-
diff --git a/requirements.txt b/requirements.txt
index f4323fc..eab3c02 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,7 +10,6 @@
 hash_ring>=1.3.1
 hexdump>=3.3
 jinja2>=2.8
-kafka-python>=1.2.0
 klein>=15.3.1
 nose>=1.3.7
 mock>=1.3.0
diff --git a/voltha/consulhelpers.py b/voltha/consulhelpers.py
index a5f0204..91acfc6 100644
--- a/voltha/consulhelpers.py
+++ b/voltha/consulhelpers.py
@@ -47,7 +47,6 @@
     endpoint = '{}:{}'.format(service['ServiceAddress'],
                               service['ServicePort'])
 
-    print endpoint
     return endpoint