This commit consists of:
1) Provide a kafka client to publish events to kafka brokers
2) Provide zookeeper and kafka docker containers for local testing,
will not be present in production
3) Sends a regular heartbeat to the kafka broker from voltha to
exercise all the components
4) Provides a basic kafka consumeri (requires kafka-python to be
installed) to read the messages off the local kafka broker -
this time it is only heartbeat messages
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
new file mode 100644
index 0000000..7f297d3
--- /dev/null
+++ b/kafka/Dockerfile
@@ -0,0 +1,19 @@
+FROM anapsix/alpine-java
+
+MAINTAINER Wurstmeister
+
+RUN apk add --update unzip wget curl docker jq coreutils
+
+ENV KAFKA_VERSION="0.10.0.1" SCALA_VERSION="2.11"
+ADD download-kafka.sh /tmp/download-kafka.sh
+RUN /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
+
+VOLUME ["/kafka"]
+
+ENV KAFKA_HOME /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}
+ADD start-kafka.sh /usr/bin/start-kafka.sh
+ADD broker-list.sh /usr/bin/broker-list.sh
+ADD create-topics.sh /usr/bin/create-topics.sh
+
+# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
+CMD ["start-kafka.sh"]
diff --git a/kafka/broker-list.sh b/kafka/broker-list.sh
new file mode 100755
index 0000000..7f04639
--- /dev/null
+++ b/kafka/broker-list.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+CONTAINERS=$(docker ps | grep 9092 | awk '{print $1}')
+BROKERS=$(for CONTAINER in $CONTAINERS; do docker port $CONTAINER 9092 | sed -e "s/0.0.0.0:/$HOST_IP:/g"; done)
+echo $BROKERS | sed -e 's/ /,/g'
diff --git a/kafka/create-topics.sh b/kafka/create-topics.sh
new file mode 100755
index 0000000..e07bf06
--- /dev/null
+++ b/kafka/create-topics.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+
+if [[ -z "$START_TIMEOUT" ]]; then
+ START_TIMEOUT=600
+fi
+
+start_timeout_exceeded=false
+count=0
+step=10
+while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
+ echo "waiting for kafka to be ready"
+ sleep $step;
+ count=$(expr $count + $step)
+ if [ $count -gt $START_TIMEOUT ]; then
+ start_timeout_exceeded=true
+ break
+ fi
+done
+
+if $start_timeout_exceeded; then
+ echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
+ exit 1
+fi
+
+if [[ -n $KAFKA_CREATE_TOPICS ]]; then
+ IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
+ echo "creating topics: $topicToCreate"
+ IFS=':' read -a topicConfig <<< "$topicToCreate"
+ JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partition ${topicConfig[1]} --topic "${topicConfig[0]}"
+ done
+fi
diff --git a/kafka/download-kafka.sh b/kafka/download-kafka.sh
new file mode 100755
index 0000000..2ddc911
--- /dev/null
+++ b/kafka/download-kafka.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred')
+url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
+wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/kafkaConsumer.py b/kafka/kafkaConsumer.py
new file mode 100644
index 0000000..2fd61e8
--- /dev/null
+++ b/kafka/kafkaConsumer.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import threading, logging, time
+
+from kafka import KafkaConsumer
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ #consumer = KafkaConsumer(bootstrap_servers='10.100.198.220:9092',
+ consumer = KafkaConsumer(bootstrap_servers='10.0.2.15:9092',
+ auto_offset_reset='earliest')
+ consumer.subscribe(['voltha-heartbeat'])
+
+ for message in consumer:
+ print (message)
+
+
+def main():
+ threads = [
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(3000)
+
+if __name__ == "__main__":
+ logging.basicConfig(
+ format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+ level=logging.INFO
+ )
+ main()
+
diff --git a/kafka/start-kafka-shell.sh b/kafka/start-kafka-shell.sh
new file mode 100755
index 0000000..62663e4
--- /dev/null
+++ b/kafka/start-kafka-shell.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash
diff --git a/kafka/start-kafka.sh b/kafka/start-kafka.sh
new file mode 100755
index 0000000..cd437fb
--- /dev/null
+++ b/kafka/start-kafka.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+
+if [[ -z "$KAFKA_PORT" ]]; then
+ export KAFKA_PORT=9092
+fi
+if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
+ export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
+fi
+if [[ -z "$KAFKA_BROKER_ID" ]]; then
+ # By default auto allocate broker ID
+ export KAFKA_BROKER_ID=-1
+fi
+if [[ -z "$KAFKA_LOG_DIRS" ]]; then
+ export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
+fi
+if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
+ export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
+fi
+
+if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
+ sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
+ unset KAFKA_HEAP_OPTS
+fi
+
+if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
+ export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
+fi
+
+for VAR in `env`
+do
+ if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
+ kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
+ env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
+ if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
+ sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
+ else
+ echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
+ fi
+ fi
+done
+
+if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
+ eval $CUSTOM_INIT_SCRIPT
+fi
+
+
+KAFKA_PID=0
+
+# see https://medium.com/@gchudnov/trapping-signals-in-docker-containers-7a57fdda7d86#.bh35ir4u5
+term_handler() {
+ echo 'Stopping Kafka....'
+ if [ $KAFKA_PID -ne 0 ]; then
+ kill -s TERM "$KAFKA_PID"
+ wait "$KAFKA_PID"
+ fi
+ echo 'Kafka stopped.'
+ exit
+}
+
+
+# Capture kill requests to stop properly
+trap "term_handler" SIGHUP SIGINT SIGTERM
+create-topics.sh &
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
+KAFKA_PID=$!
+
+wait