This commit consists of:
1) Provide a kafka client to publish events to kafka brokers
2) Provide zookeeper and kafka docker containers for local testing,
   will not be present in production
3) Sends a regular heartbeat to the kafka broker from voltha to
   exercise all the components
4) Provides a basic kafka consumeri (requires kafka-python to be
   installed)  to read the messages off the local kafka broker -
   this time it is only heartbeat messages
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index 01c03ea..4fec883 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -1,6 +1,31 @@
 version: '2'
 services:
   #
+  # Single-node zookeeper service
+  #
+  zookeeper:
+    image: wurstmeister/zookeeper
+    ports:
+      - "2181:2181"
+    environment:
+      SERVICE_2181_NAME: "zookeeper"
+  #
+  # Single-node kafka service
+  #
+  kafka:
+    build: ../kafka
+    ports:
+      - "9092:9092"
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      SERVICE_9092_NAME: "kafka"
+    depends_on:
+    - consul
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
+  #
   # Single-node consul agent
   #
   consul:
@@ -50,10 +75,11 @@
     command: [
       "/voltha/main.py",
       "-v",
-      "--consul=consul:8500",
+      "--consul=${DOCKER_HOST_IP}:8500",
       "--fluentd=fluentd:24224",
       "--rest-port=8880",
       "--grpc-port=50555",
+      "--kafka=@kafka",
       "--instance-id-is-container-name",
       "-v"
     ]
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
new file mode 100644
index 0000000..7f297d3
--- /dev/null
+++ b/kafka/Dockerfile
@@ -0,0 +1,19 @@
+FROM anapsix/alpine-java
+
+MAINTAINER Wurstmeister
+
+RUN apk add --update unzip wget curl docker jq coreutils
+
+ENV KAFKA_VERSION="0.10.0.1" SCALA_VERSION="2.11"
+ADD download-kafka.sh /tmp/download-kafka.sh
+RUN /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
+
+VOLUME ["/kafka"]
+
+ENV KAFKA_HOME /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}
+ADD start-kafka.sh /usr/bin/start-kafka.sh
+ADD broker-list.sh /usr/bin/broker-list.sh
+ADD create-topics.sh /usr/bin/create-topics.sh
+
+# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
+CMD ["start-kafka.sh"]
diff --git a/kafka/broker-list.sh b/kafka/broker-list.sh
new file mode 100755
index 0000000..7f04639
--- /dev/null
+++ b/kafka/broker-list.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+CONTAINERS=$(docker ps | grep 9092 | awk '{print $1}')
+BROKERS=$(for CONTAINER in $CONTAINERS; do docker port $CONTAINER 9092 | sed -e "s/0.0.0.0:/$HOST_IP:/g"; done)
+echo $BROKERS | sed -e 's/ /,/g'
diff --git a/kafka/create-topics.sh b/kafka/create-topics.sh
new file mode 100755
index 0000000..e07bf06
--- /dev/null
+++ b/kafka/create-topics.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+
+if [[ -z "$START_TIMEOUT" ]]; then
+    START_TIMEOUT=600
+fi
+
+start_timeout_exceeded=false
+count=0
+step=10
+while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
+    echo "waiting for kafka to be ready"
+    sleep $step;
+    count=$(expr $count + $step)
+    if [ $count -gt $START_TIMEOUT ]; then
+        start_timeout_exceeded=true
+        break
+    fi
+done
+
+if $start_timeout_exceeded; then
+    echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
+    exit 1
+fi
+
+if [[ -n $KAFKA_CREATE_TOPICS ]]; then
+    IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
+        echo "creating topics: $topicToCreate" 
+        IFS=':' read -a topicConfig <<< "$topicToCreate"
+        JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partition ${topicConfig[1]} --topic "${topicConfig[0]}"
+    done
+fi
diff --git a/kafka/download-kafka.sh b/kafka/download-kafka.sh
new file mode 100755
index 0000000..2ddc911
--- /dev/null
+++ b/kafka/download-kafka.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred')
+url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
+wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/kafkaConsumer.py b/kafka/kafkaConsumer.py
new file mode 100644
index 0000000..2fd61e8
--- /dev/null
+++ b/kafka/kafkaConsumer.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import threading, logging, time
+
+from kafka import KafkaConsumer
+
+
+class Consumer(threading.Thread):
+    daemon = True
+
+    def run(self):
+        #consumer = KafkaConsumer(bootstrap_servers='10.100.198.220:9092',
+        consumer = KafkaConsumer(bootstrap_servers='10.0.2.15:9092',
+                                 auto_offset_reset='earliest')
+        consumer.subscribe(['voltha-heartbeat'])
+
+        for message in consumer:
+            print (message)
+
+
+def main():
+    threads = [
+        Consumer()
+    ]
+
+    for t in threads:
+        t.start()
+
+    time.sleep(3000)
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+        level=logging.INFO
+        )
+    main()
+
diff --git a/kafka/start-kafka-shell.sh b/kafka/start-kafka-shell.sh
new file mode 100755
index 0000000..62663e4
--- /dev/null
+++ b/kafka/start-kafka-shell.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash
diff --git a/kafka/start-kafka.sh b/kafka/start-kafka.sh
new file mode 100755
index 0000000..cd437fb
--- /dev/null
+++ b/kafka/start-kafka.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+
+if [[ -z "$KAFKA_PORT" ]]; then
+    export KAFKA_PORT=9092
+fi
+if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
+    export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
+fi
+if [[ -z "$KAFKA_BROKER_ID" ]]; then
+    # By default auto allocate broker ID
+    export KAFKA_BROKER_ID=-1
+fi
+if [[ -z "$KAFKA_LOG_DIRS" ]]; then
+    export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
+fi
+if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
+    export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
+fi
+
+if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
+    sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
+    unset KAFKA_HEAP_OPTS
+fi
+
+if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
+    export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
+fi
+
+for VAR in `env`
+do
+  if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
+    kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
+    env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
+    if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
+        sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
+    else
+        echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
+    fi
+  fi
+done
+
+if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
+  eval $CUSTOM_INIT_SCRIPT
+fi
+
+
+KAFKA_PID=0
+
+# see https://medium.com/@gchudnov/trapping-signals-in-docker-containers-7a57fdda7d86#.bh35ir4u5
+term_handler() {
+  echo 'Stopping Kafka....'
+  if [ $KAFKA_PID -ne 0 ]; then
+    kill -s TERM "$KAFKA_PID"
+    wait "$KAFKA_PID"
+  fi
+  echo 'Kafka stopped.'
+  exit
+}
+
+
+# Capture kill requests to stop properly
+trap "term_handler" SIGHUP SIGINT SIGTERM
+create-topics.sh & 
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
+KAFKA_PID=$!
+
+wait
diff --git a/requirements.txt b/requirements.txt
index 37787b3..eab3c02 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -35,3 +35,7 @@
 # python-consul>=0.6.1  we need the pre-released version for now, because 0.6.1 does not
 # yet support Twisted. Once this is released, it will be the 0.6.2 version
 git+git://github.com/cablehead/python-consul.git
+
+# Twisted Python kafka client
+git+git://github.com/ciena/afkak.git
+
diff --git a/voltha/consulhelpers.py b/voltha/consulhelpers.py
new file mode 100644
index 0000000..c424e7f
--- /dev/null
+++ b/voltha/consulhelpers.py
@@ -0,0 +1,55 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some consul related convenience functions
+"""
+
+import os
+from structlog import get_logger
+import sys
+from consul import Consul
+from random import randint
+
+log = get_logger()
+
+def get_endpoint_from_consul(consul_endpoint, service_name):
+    """Look up, from consul, the service name specified by service-name
+    """
+    log.debug('Retrieving endpoint {} from consul {}'.format(service_name,
+                                                           consul_endpoint))
+    host = consul_endpoint.split(':')[0].strip()
+    port = int(consul_endpoint.split(':')[1].strip())
+
+    consul = Consul(host=host, port=port)
+    _, services = consul.catalog.service(service_name)
+
+    if len(services) == 0:
+        raise Exception('Cannot find service {} in consul'.format(service_name))
+
+    # pick a random entry
+    # TODO should we prefer local IP addresses? Probably.
+
+    service = services[randint(0, len(services) - 1)]
+    endpoint = '{}:{}'.format(service['ServiceAddress'],
+                              service['ServicePort'])
+
+    print endpoint
+    return endpoint
+
+
+if __name__ == '__main__':
+    get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
diff --git a/voltha/main.py b/voltha/main.py
index ef03797..3f63a9c 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -34,6 +34,8 @@
 from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
 from voltha.northbound.rest.health_check import init_rest_service
 from voltha.structlog_setup import setup_logging
+from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+
 
 defs = dict(
     config=os.environ.get('CONFIG', './voltha.yml'),
@@ -47,6 +49,7 @@
                                          get_my_primary_local_ipv4()),
     interface=os.environ.get('INTERFACE', get_my_primary_interface()),
     rest_port=os.environ.get('REST_PORT', 8880),
+    kafka=os.environ.get('KAFKA', 'localhost:9092'),
 )
 
 
@@ -161,6 +164,15 @@
                         default=False,
                         help=_help)
 
+    _help = ('<hostname>:<port> of the kafka broker (default: %s). (If not '
+             'specified (None), the address from the config file is used'
+             % defs['kafka'])
+    parser.add_argument('-K', '--kafka',
+                        dest='kafka',
+                        action='store',
+                        default=defs['kafka'],
+                        help=_help)
+
     args = parser.parse_args()
 
     # post-processing
@@ -211,10 +223,11 @@
         if not args.no_banner:
             print_banner(self.log)
 
+        self.startup_components()
+
         if not args.no_heartbeat:
             self.start_heartbeat()
-
-        self.startup_components()
+            self.start_kafka_heartbeat()
 
     def start(self):
         self.start_reactor()  # will not return except Keyboard interrupt
@@ -231,6 +244,9 @@
 
         self.grpc_server = VolthaGrpcServer(self.args.grpc_port).run()
 
+        #initialize kafka proxy singleton
+        self.kafka_proxy = KafkaProxy(self.args.consul, self.args.kafka)
+
         self.log.info('started-internal-services')
 
     @inlineCallbacks
@@ -262,6 +278,18 @@
         lc = LoopingCall(heartbeat)
         lc.start(10)
 
+    # Temporary function to send a heartbeat message to the external kafka
+    # broker
+    def start_kafka_heartbeat(self):
+        # For heartbeat we will send a message to a specific "voltha-heartbeat"
+        #  topic.  The message is a protocol buf
+        # message
+        message = 'Heartbeat message:{}'.format(get_my_primary_local_ipv4())
+        topic = "voltha-heartbeat"
+
+        from twisted.internet.task import LoopingCall
+        lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
+        lc.start(10)
 
 if __name__ == '__main__':
     Main().start()
diff --git a/voltha/northbound/kafka/__init__.py b/voltha/northbound/kafka/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/northbound/kafka/__init__.py
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
new file mode 100644
index 0000000..801701a
--- /dev/null
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -0,0 +1,86 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from afkak.client import KafkaClient as _KafkaClient
+from afkak.producer import Producer as _kafkaProducer
+from structlog import get_logger
+from voltha.consulhelpers import get_endpoint_from_consul
+from twisted.internet.defer import inlineCallbacks
+from afkak.common import (
+    PRODUCER_ACK_LOCAL_WRITE,
+)
+
+
+class KafkaProxy(object):
+    """
+    This is a singleton proxy kafka class to hide the kafka client details.
+    """
+    _kafka_instance = None
+
+    def __init__(self, consul_endpoint='localhost:8500',
+                 kafka_endpoint='localhost:9092' ,
+                 ack_timeout = 1000, max_req_attempts = 10):
+
+        # return an exception if the object already exist
+        if KafkaProxy._kafka_instance:
+            raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+
+        self.log = get_logger()
+
+        self.consul_endpoint = consul_endpoint
+        self.kafka_endpoint = kafka_endpoint
+
+        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
+        #  to a local log before sending response
+        if self.kafka_endpoint.startswith('@'):
+            _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                   self.kafka_endpoint[1:])
+        else:
+            _k_endpoint = self.kafka_endpoint
+
+        self.log.info('Creating kafka endpoint', endpoint=_k_endpoint)
+
+        self.kclient = _KafkaClient(_k_endpoint)
+        self.kproducer = _kafkaProducer(self.kclient,
+                                        req_acks=PRODUCER_ACK_LOCAL_WRITE,
+                                        ack_timeout=ack_timeout,
+                                        max_req_attempts=max_req_attempts)
+
+        self.log.info('initializing-KafkaProxy:{}'.format(_k_endpoint))
+        KafkaProxy._kafka_instance = self
+
+
+    @inlineCallbacks
+    def send_message(self, topic, msg):
+        assert topic is not None
+        assert msg is not None
+        self.log.info('Sending message {} to kafka topic {}'.format(msg,
+                                                                    topic))
+        try:
+            msg_list = []
+            msg_list.append(msg)
+            yield self.kproducer.send_messages(topic, msgs=msg_list)
+            self.log.debug('Successfully sent message {} to kafka topic '
+                           '{}'.format(msg, topic))
+        except Exception as e:
+            self.log.info('Failure to send message {} to kafka topic {}: '
+                          '{}'.format(msg, topic, repr(e)))
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+    return KafkaProxy._kafka_instance