Address review comments of the kafka nbi
Change-Id: I4bffdd92bb05d3f4d4b1b077f2ccf0f332204ed2
diff --git a/Makefile b/Makefile
index 5520aa7..45dea97 100644
--- a/Makefile
+++ b/Makefile
@@ -101,6 +101,8 @@
docker pull fluent/fluentd:latest
docker pull gliderlabs/registrator:latest
docker pull ubuntu:xenial
+ docker pull wurstmeister/kafka
+ docker pull wurstmeister/zookeeper
purge-venv:
rm -fr ${VENVDIR}
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index aadeebc..5c261f8 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -13,7 +13,7 @@
# Single-node kafka service
#
kafka:
- build: ../kafka
+ image: wurstmeister/kafka
ports:
- 9092
environment:
diff --git a/compose/docker-compose-zk-kafka-test.yml b/compose/docker-compose-zk-kafka-test.yml
new file mode 100644
index 0000000..09c562b
--- /dev/null
+++ b/compose/docker-compose-zk-kafka-test.yml
@@ -0,0 +1,25 @@
+version: '2'
+services:
+ #
+ # Single-node zookeeper service
+ #
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - 2181:2181
+ environment:
+ SERVICE_2181_NAME: "zookeeper"
+ #
+ # Single-node kafka service
+ #
+ kafka:
+ image: wurstmeister/kafka
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ SERVICE_9092_NAME: "kafka"
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
\ No newline at end of file
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
deleted file mode 100644
index 7f297d3..0000000
--- a/kafka/Dockerfile
+++ /dev/null
@@ -1,19 +0,0 @@
-FROM anapsix/alpine-java
-
-MAINTAINER Wurstmeister
-
-RUN apk add --update unzip wget curl docker jq coreutils
-
-ENV KAFKA_VERSION="0.10.0.1" SCALA_VERSION="2.11"
-ADD download-kafka.sh /tmp/download-kafka.sh
-RUN /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
-
-VOLUME ["/kafka"]
-
-ENV KAFKA_HOME /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}
-ADD start-kafka.sh /usr/bin/start-kafka.sh
-ADD broker-list.sh /usr/bin/broker-list.sh
-ADD create-topics.sh /usr/bin/create-topics.sh
-
-# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
-CMD ["start-kafka.sh"]
diff --git a/kafka/broker-list.sh b/kafka/broker-list.sh
deleted file mode 100755
index 7f04639..0000000
--- a/kafka/broker-list.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/bash
-
-CONTAINERS=$(docker ps | grep 9092 | awk '{print $1}')
-BROKERS=$(for CONTAINER in $CONTAINERS; do docker port $CONTAINER 9092 | sed -e "s/0.0.0.0:/$HOST_IP:/g"; done)
-echo $BROKERS | sed -e 's/ /,/g'
diff --git a/kafka/create-topics.sh b/kafka/create-topics.sh
deleted file mode 100755
index e07bf06..0000000
--- a/kafka/create-topics.sh
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/bash
-
-
-if [[ -z "$START_TIMEOUT" ]]; then
- START_TIMEOUT=600
-fi
-
-start_timeout_exceeded=false
-count=0
-step=10
-while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
- echo "waiting for kafka to be ready"
- sleep $step;
- count=$(expr $count + $step)
- if [ $count -gt $START_TIMEOUT ]; then
- start_timeout_exceeded=true
- break
- fi
-done
-
-if $start_timeout_exceeded; then
- echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
- exit 1
-fi
-
-if [[ -n $KAFKA_CREATE_TOPICS ]]; then
- IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
- echo "creating topics: $topicToCreate"
- IFS=':' read -a topicConfig <<< "$topicToCreate"
- JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partition ${topicConfig[1]} --topic "${topicConfig[0]}"
- done
-fi
diff --git a/kafka/download-kafka.sh b/kafka/download-kafka.sh
deleted file mode 100755
index 2ddc911..0000000
--- a/kafka/download-kafka.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred')
-url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
-wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/start-kafka-shell.sh b/kafka/start-kafka-shell.sh
deleted file mode 100755
index 62663e4..0000000
--- a/kafka/start-kafka-shell.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/bash
-docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash
diff --git a/kafka/start-kafka.sh b/kafka/start-kafka.sh
deleted file mode 100755
index cd437fb..0000000
--- a/kafka/start-kafka.sh
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/bin/bash
-
-if [[ -z "$KAFKA_PORT" ]]; then
- export KAFKA_PORT=9092
-fi
-if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
- export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
-fi
-if [[ -z "$KAFKA_BROKER_ID" ]]; then
- # By default auto allocate broker ID
- export KAFKA_BROKER_ID=-1
-fi
-if [[ -z "$KAFKA_LOG_DIRS" ]]; then
- export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
-fi
-if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
- export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
-fi
-
-if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
- sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
- unset KAFKA_HEAP_OPTS
-fi
-
-if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
- export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
-fi
-
-for VAR in `env`
-do
- if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
- kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
- env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
- if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
- sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
- else
- echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
- fi
- fi
-done
-
-if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
- eval $CUSTOM_INIT_SCRIPT
-fi
-
-
-KAFKA_PID=0
-
-# see https://medium.com/@gchudnov/trapping-signals-in-docker-containers-7a57fdda7d86#.bh35ir4u5
-term_handler() {
- echo 'Stopping Kafka....'
- if [ $KAFKA_PID -ne 0 ]; then
- kill -s TERM "$KAFKA_PID"
- wait "$KAFKA_PID"
- fi
- echo 'Kafka stopped.'
- exit
-}
-
-
-# Capture kill requests to stop properly
-trap "term_handler" SIGHUP SIGINT SIGTERM
-create-topics.sh &
-$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
-KAFKA_PID=$!
-
-wait
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 072a7cd..cc8e80a 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -40,42 +40,77 @@
self.log = get_logger()
+ self.log.info('KafkaProxy init with kafka endpoint:{}'.format(
+ kafka_endpoint))
+
+ self.ack_timeout = ack_timeout
+ self.max_req_attempts = max_req_attempts
self.consul_endpoint = consul_endpoint
self.kafka_endpoint = kafka_endpoint
+ self.kclient = None
+ self.kproducer = None
+ self._get_kafka_producer()
+
+ KafkaProxy._kafka_instance = self
+
+ def _get_kafka_producer(self):
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
# to a local log before sending response
+
if self.kafka_endpoint.startswith('@'):
- _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
- self.kafka_endpoint[1:])
+ try:
+ _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+ self.kafka_endpoint[1:])
+ self.log.info(
+ 'Found kafka service at {}'.format(_k_endpoint))
+
+ except Exception as e:
+ self.log.error('Failure to locate a kafka service from '
+ 'consul {}:'.format(repr(e)))
+ self.kproducer = None
+ self.kclient = None
+ return
else:
_k_endpoint = self.kafka_endpoint
- self.log.info('Creating kafka endpoint', endpoint=_k_endpoint)
-
self.kclient = _KafkaClient(_k_endpoint)
self.kproducer = _kafkaProducer(self.kclient,
req_acks=PRODUCER_ACK_LOCAL_WRITE,
- ack_timeout=ack_timeout,
- max_req_attempts=max_req_attempts)
-
- self.log.info('initializing-KafkaProxy:{}'.format(_k_endpoint))
- KafkaProxy._kafka_instance = self
+ ack_timeout=self.ack_timeout,
+ max_req_attempts=self.max_req_attempts)
@inlineCallbacks
def send_message(self, topic, msg):
assert topic is not None
assert msg is not None
+
+ # first check whether we have a kafka producer. If there is none
+ # then try to get one - this happens only when we try to lookup the
+ # kafka service from consul
+ if self.kproducer is None:
+ self._get_kafka_producer()
+ # Lets the next message request do the retry if still a failure
+ if self.kproducer is None:
+ self.log.error('No kafka producer available at {}'.format(
+ self.kafka_endpoint))
+ return
+
self.log.info('Sending message {} to kafka topic {}'.format(msg,
topic))
try:
msg_list = [msg]
yield self.kproducer.send_messages(topic, msgs=msg_list)
- self.log.debug('Successfully sent message {} to kafka topic '
- '{}'.format(msg, topic))
+ self.log.info('Successfully sent message {} to kafka topic '
+ '{}'.format(msg, topic))
except Exception as e:
- self.log.info('Failure to send message {} to kafka topic {}: '
- '{}'.format(msg, topic, repr(e)))
+ self.log.error('Failure to send message {} to kafka topic {}: '
+ '{}'.format(msg, topic, repr(e)))
+ # set the kafka producer to None. This is needed if the
+ # kafka docker went down and comes back up with a different
+ # port number.
+ self.kproducer = None
+ self.kclient = None
# Common method to get the singleton instance of the kafka proxy class