Merge "Chameleon fault tolerance"
diff --git a/ b/
index f994f3b..06eb438 100644
--- a/
+++ b/
@@ -138,7 +138,7 @@
-To start the four-conatiner ensamble:
+To start the eight-container ensamble:
 docker-compose -f compose/docker-compose-system-test.yml up -d
@@ -167,17 +167,19 @@
    This shall list something like this:
-   {
-     "voltha-health": [],
-     "voltha-grpc": [],
-     "fluentd-intake": [],
-     "consul-rest": [],
-     "consul-8600": [
-       "udp"
-     ],
-     "consul": [],
-     "chameleon-rest": []
-   }
+  {
+      "zookeeper": [],
+      "chameleon-rest": [],
+      "consul": [],
+      "consul-8600": [
+        "udp"
+      ],
+      "consul-rest": [],
+      "fluentd-intake": [],
+      "kafka": [],
+      "voltha-grpc": [],
+      "voltha-health": []
+  }  
    You don't see registrator istelf, and you see multiple entries for consul. More importantly you see voltha as a service called "voltha-health" (referring to the REST health check service of voltha). You can query additional info on this endpoint from consul:
@@ -217,7 +219,7 @@
    docker-compose -f compose/docker-compose-system-test.yml logs
-   Once important thing you can see is that voltha uses structured logging, which will come handy once we utilize macihine parsing and filtring of the logs.
+   Once important thing you can see is that voltha uses structured logging, which will come handy once we utilize machine parsing and filtering of the logs.
    Alternatively, you can see the individual docker log stream of voltha by:
@@ -225,7 +227,13 @@
    docker logs -f compose_voltha_1
-   Voltha sends a periodic log message (heartbeat) to the log stream, in addition to logging major events.
+   Voltha sends a periodic log message (heartbeat) to the log stream, in addition to logging major events.  Voltha also sends a periodic heartbeat message to the kafka broker.
+   To subscribe and display to the kafka messages:
+   ```
+   python kafka/
+   ```
 4. In addition to the docker log stream, Voltha is explicitly hooked up to the fluentd log collector infrastructure. We are not using fluentd to its full potential yet, but establising the connection to fluentd and funelling structured logs to fluentd is already in place. To see the fluentd log stream, you can run:
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index 01c03ea..aadeebc 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -1,6 +1,31 @@
 version: '2'
+  # Single-node zookeeper service
+  #
+  zookeeper:
+    image: wurstmeister/zookeeper
+    ports:
+    - 2181
+    environment:
+      SERVICE_2181_NAME: "zookeeper"
+  #
+  # Single-node kafka service
+  #
+  kafka:
+    build: ../kafka
+    ports:
+     - 9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      SERVICE_9092_NAME: "kafka"
+    depends_on:
+    - consul
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
+  #
   # Single-node consul agent
@@ -50,10 +75,11 @@
     command: [
-      "--consul=consul:8500",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--kafka=@kafka",
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
new file mode 100644
index 0000000..7f297d3
--- /dev/null
+++ b/kafka/Dockerfile
@@ -0,0 +1,19 @@
+FROM anapsix/alpine-java
+MAINTAINER Wurstmeister
+RUN apk add --update unzip wget curl docker jq coreutils
+ADD /tmp/
+RUN /tmp/ && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
+VOLUME ["/kafka"]
+ADD /usr/bin/
+ADD /usr/bin/
+ADD /usr/bin/
+# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
+CMD [""]
diff --git a/kafka/ b/kafka/
new file mode 100755
index 0000000..7f04639
--- /dev/null
+++ b/kafka/
@@ -0,0 +1,5 @@
+CONTAINERS=$(docker ps | grep 9092 | awk '{print $1}')
+BROKERS=$(for CONTAINER in $CONTAINERS; do docker port $CONTAINER 9092 | sed -e "s/$HOST_IP:/g"; done)
+echo $BROKERS | sed -e 's/ /,/g'
diff --git a/kafka/ b/kafka/
new file mode 100755
index 0000000..e07bf06
--- /dev/null
+++ b/kafka/
@@ -0,0 +1,32 @@
+if [[ -z "$START_TIMEOUT" ]]; then
+while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
+    echo "waiting for kafka to be ready"
+    sleep $step;
+    count=$(expr $count + $step)
+    if [ $count -gt $START_TIMEOUT ]; then
+        start_timeout_exceeded=true
+        break
+    fi
+if $start_timeout_exceeded; then
+    echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
+    exit 1
+if [[ -n $KAFKA_CREATE_TOPICS ]]; then
+    IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
+        echo "creating topics: $topicToCreate" 
+        IFS=':' read -a topicConfig <<< "$topicToCreate"
+        JMX_PORT='' $KAFKA_HOME/bin/ --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partition ${topicConfig[1]} --topic "${topicConfig[0]}"
+    done
diff --git a/kafka/ b/kafka/
new file mode 100755
index 0000000..2ddc911
--- /dev/null
+++ b/kafka/
@@ -0,0 +1,5 @@
+mirror=$(curl --stderr /dev/null\?as_json\=1 | jq -r '.preferred')
+wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/ b/kafka/
new file mode 100644
index 0000000..c437258
--- /dev/null
+++ b/kafka/
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2015 Cyan, Inc.
+import logging
+from argparse import ArgumentParser
+from twisted.internet import reactor
+from twisted.internet.defer import DeferredList, inlineCallbacks
+from twisted.python.failure import Failure
+from afkak.client import KafkaClient
+from afkak.consumer import Consumer
+from afkak.common import KafkaUnavailableError
+from voltha.consulhelpers import get_endpoint_from_consul
+log = logging.getLogger(__name__)
+class ConsumerExample(object):
+    def __init__(self, consul_endpoint, topic='voltha-heartbeat', runtime=60):
+        self.topic = topic
+        self.runtime = runtime
+        self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
+                                                       'kafka')
+        self._client = KafkaClient(self.kafka_endpoint)
+        self._consumer_list = []  # List of consumers
+        # List of deferred returned from consumers' start() methods
+        self._consumer_d_list = []
+    @inlineCallbacks
+    def start(self):
+        partitions = []
+        try:
+            while not partitions:
+                yield self._client.load_metadata_for_topics(self.topic)
+                e = self._client.metadata_error_for_topic(self.topic)
+                if e:
+                    log.warning("Error: %r getting metadata for topic: %s",
+                                e, self.topic)
+                else:
+                    partitions = self._client.topic_partitions[self.topic]
+        except KafkaUnavailableError:
+            log.error("Unable to communicate with any Kafka brokers")
+            self.stop()
+        def _note_consumer_stopped(result, consumer):
+  "Consumer: %r stopped with result: %r", consumer, result)
+        for partition in partitions:
+            c = Consumer(self._client, self.topic, partition,
+                         self.msg_processor)
+            self._consumer_list.append(c)
+            d = c.start(0)
+            d.addBoth(_note_consumer_stopped, c)
+            self._consumer_d_list.append(d)
+        # Stop ourselves after we've run the allotted time
+        reactor.callLater(self.runtime, self.stop)
+    def stop(self):
+"Time is up, stopping consumers...")
+        # Ask each of our consumers to stop. When a consumer fully stops, it
+        # fires the deferred returned from its start() method. We saved all
+        # those deferreds away (above, in start()) in self._consumer_d_list,
+        # so now we'll use a DeferredList to wait for all of them...
+        for consumer in self._consumer_list:
+            consumer.stop()
+        dl = DeferredList(self._consumer_d_list)
+        # Once the consumers are all stopped, then close our client
+        def _stop_client(result):
+            if isinstance(result, Failure):
+                log.error("Error stopping consumers: %r", result)
+            else:
+      "All consumers stopped. Stopping client: %r",
+                         self._client)
+            self._client.close()
+            return result
+        dl.addBoth(_stop_client)
+        # And once the client is shutdown, stop the reactor
+        def _stop_reactor(result):
+            reactor.stop()
+            return result
+        dl.addBoth(_stop_reactor)
+    def msg_processor(self, consumer, msglist):
+        for msg in msglist:
+  "proc: msg: %r", msg)
+def parse_options():
+    parser = ArgumentParser("Consume kafka messages")
+    parser.add_argument("-c", "--consul",
+                        help="consul ip and port",
+                        default='')
+    parser.add_argument("-t", "--topic",
+                        help="topic to listen from",
+                        default='voltha-heartbeat')
+    return parser.parse_args()
+def main():
+    logging.basicConfig(
+        format='%(asctime)s:%(name)s:' +
+               '%(levelname)s:%(process)d:%(message)s',
+        level=logging.INFO
+    )
+    args = parse_options()
+    consul_endpoint = args.consul
+    topic = args.topic
+    consumer_example = ConsumerExample(consul_endpoint, topic, runtime=1000)
+    reactor.callWhenRunning(consumer_example.start)
+"All Done!")
+if __name__ == "__main__":
+    main()
diff --git a/kafka/ b/kafka/
new file mode 100755
index 0000000..62663e4
--- /dev/null
+++ b/kafka/
@@ -0,0 +1,2 @@
+docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash
diff --git a/kafka/ b/kafka/
new file mode 100755
index 0000000..cd437fb
--- /dev/null
+++ b/kafka/
@@ -0,0 +1,67 @@
+if [[ -z "$KAFKA_PORT" ]]; then
+    export KAFKA_PORT=9092
+if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
+    export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
+if [[ -z "$KAFKA_BROKER_ID" ]]; then
+    # By default auto allocate broker ID
+    export KAFKA_BROKER_ID=-1
+if [[ -z "$KAFKA_LOG_DIRS" ]]; then
+    export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
+if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
+    export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
+if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
+    sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/
+    unset KAFKA_HEAP_OPTS
+for VAR in `env`
+  if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
+    kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
+    env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
+    if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/; then
+        sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/ #note that no config values may contain an '@' char
+    else
+        echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/
+    fi
+  fi
+if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
+# see
+term_handler() {
+  echo 'Stopping Kafka....'
+  if [ $KAFKA_PID -ne 0 ]; then
+    kill -s TERM "$KAFKA_PID"
+    wait "$KAFKA_PID"
+  fi
+  echo 'Kafka stopped.'
+  exit
+# Capture kill requests to stop properly
+trap "term_handler" SIGHUP SIGINT SIGTERM & 
+$KAFKA_HOME/bin/ $KAFKA_HOME/config/ &
diff --git a/requirements.txt b/requirements.txt
index 37787b3..eab3c02 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -35,3 +35,7 @@
 # python-consul>=0.6.1  we need the pre-released version for now, because 0.6.1 does not
 # yet support Twisted. Once this is released, it will be the 0.6.2 version
+# Twisted Python kafka client
diff --git a/voltha/ b/voltha/
new file mode 100644
index 0000000..91acfc6
--- /dev/null
+++ b/voltha/
@@ -0,0 +1,54 @@
+# Copyright 2016 the original author or authors.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Some consul related convenience functions
+from structlog import get_logger
+from consul import Consul
+from random import randint
+log = get_logger()
+def get_endpoint_from_consul(consul_endpoint, service_name):
+    """Look up, from consul, the service name specified by service-name
+    """
+    log.debug('Retrieving endpoint {} from consul {}'.format(service_name,
+                                                             consul_endpoint))
+    host = consul_endpoint.split(':')[0].strip()
+    port = int(consul_endpoint.split(':')[1].strip())
+    consul = Consul(host=host, port=port)
+    _, services = consul.catalog.service(service_name)
+    if len(services) == 0:
+        raise Exception(
+            'Cannot find service {} in consul'.format(service_name))
+    # pick a random entry
+    # TODO should we prefer local IP addresses? Probably.
+    service = services[randint(0, len(services) - 1)]
+    endpoint = '{}:{}'.format(service['ServiceAddress'],
+                              service['ServicePort'])
+    return endpoint
+if __name__ == '__main__':
+    get_endpoint_from_consul('', 'kafka')
diff --git a/voltha/ b/voltha/
index ef03797..e76fac3 100755
--- a/voltha/
+++ b/voltha/
@@ -30,10 +30,12 @@
 from voltha.coordinator import Coordinator
 from voltha.dockerhelpers import get_my_containers_name
-from voltha.nethelpers import get_my_primary_interface, get_my_primary_local_ipv4
+from voltha.nethelpers import get_my_primary_interface, \
+    get_my_primary_local_ipv4
 from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
 from import init_rest_service
 from voltha.structlog_setup import setup_logging
+from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
 defs = dict(
     config=os.environ.get('CONFIG', './voltha.yml'),
@@ -47,11 +49,11 @@
     interface=os.environ.get('INTERFACE', get_my_primary_interface()),
     rest_port=os.environ.get('REST_PORT', 8880),
+    kafka=os.environ.get('KAFKA', 'localhost:9092'),
 def parse_args():
     parser = argparse.ArgumentParser()
     _help = ('Path to voltha.yml config file (default: %s). '
@@ -84,7 +86,7 @@
     _help = ('<hostname>:<port> to fluentd server (default: %s). (If not '
              'specified (None), the address from the config file is used'
              % defs['fluentd'])
@@ -161,6 +163,15 @@
+    _help = ('<hostname>:<port> of the kafka broker (default: %s). (If not '
+             'specified (None), the address from the config file is used'
+             % defs['kafka'])
+    parser.add_argument('-K', '--kafka',
+                        dest='kafka',
+                        action='store',
+                        default=defs['kafka'],
+                        help=_help)
     args = parser.parse_args()
     # post-processing
@@ -192,7 +203,6 @@
 class Main(object):
     def __init__(self):
         self.args = args = parse_args()
@@ -207,14 +217,16 @@
         # components
         self.coordinator = None
         self.grpc_server = None
+        self.kafka_proxy = None
         if not args.no_banner:
+        self.startup_components()
         if not args.no_heartbeat:
-        self.startup_components()
+            self.start_kafka_heartbeat()
     def start(self):
         self.start_reactor()  # will not return except Keyboard interrupt
@@ -231,6 +243,9 @@
         self.grpc_server = VolthaGrpcServer(self.args.grpc_port).run()
+        # initialize kafka proxy singleton
+        self.kafka_proxy = KafkaProxy(self.args.consul, self.args.kafka)
@@ -262,6 +277,19 @@
         lc = LoopingCall(heartbeat)
+    # Temporary function to send a heartbeat message to the external kafka
+    # broker
+    def start_kafka_heartbeat(self):
+        # For heartbeat we will send a message to a specific "voltha-heartbeat"
+        #  topic.  The message is a protocol buf
+        # message
+        message = 'Heartbeat message:{}'.format(get_my_primary_local_ipv4())
+        topic = "voltha-heartbeat"
+        from twisted.internet.task import LoopingCall
+        lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
+        lc.start(10)
 if __name__ == '__main__':
diff --git a/voltha/northbound/kafka/ b/voltha/northbound/kafka/
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/northbound/kafka/
diff --git a/voltha/northbound/kafka/ b/voltha/northbound/kafka/
new file mode 100644
index 0000000..072a7cd
--- /dev/null
+++ b/voltha/northbound/kafka/
@@ -0,0 +1,83 @@
+# Copyright 2016 the original author or authors.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from afkak.client import KafkaClient as _KafkaClient
+from afkak.producer import Producer as _kafkaProducer
+from structlog import get_logger
+from voltha.consulhelpers import get_endpoint_from_consul
+from twisted.internet.defer import inlineCallbacks
+from afkak.common import (
+class KafkaProxy(object):
+    """
+    This is a singleton proxy kafka class to hide the kafka client details.
+    """
+    _kafka_instance = None
+    def __init__(self, consul_endpoint='localhost:8500',
+                 kafka_endpoint='localhost:9092',
+                 ack_timeout=1000, max_req_attempts=10):
+        # return an exception if the object already exist
+        if KafkaProxy._kafka_instance:
+            raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+        self.log = get_logger()
+        self.consul_endpoint = consul_endpoint
+        self.kafka_endpoint = kafka_endpoint
+        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
+        #  to a local log before sending response
+        if self.kafka_endpoint.startswith('@'):
+            _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                   self.kafka_endpoint[1:])
+        else:
+            _k_endpoint = self.kafka_endpoint
+'Creating kafka endpoint', endpoint=_k_endpoint)
+        self.kclient = _KafkaClient(_k_endpoint)
+        self.kproducer = _kafkaProducer(self.kclient,
+                                        req_acks=PRODUCER_ACK_LOCAL_WRITE,
+                                        ack_timeout=ack_timeout,
+                                        max_req_attempts=max_req_attempts)
+        KafkaProxy._kafka_instance = self
+    @inlineCallbacks
+    def send_message(self, topic, msg):
+        assert topic is not None
+        assert msg is not None
+'Sending message {} to kafka topic {}'.format(msg,
+                                                                    topic))
+        try:
+            msg_list = [msg]
+            yield self.kproducer.send_messages(topic, msgs=msg_list)
+            self.log.debug('Successfully sent message {} to kafka topic '
+                           '{}'.format(msg, topic))
+        except Exception as e:
+  'Failure to send message {} to kafka topic {}: '
+                          '{}'.format(msg, topic, repr(e)))
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+    return KafkaProxy._kafka_instance