Address review comments of the kafka nbi

Change-Id: I4bffdd92bb05d3f4d4b1b077f2ccf0f332204ed2
diff --git a/Makefile b/Makefile
index 5520aa7..45dea97 100644
--- a/Makefile
+++ b/Makefile
@@ -101,6 +101,8 @@
 	docker pull fluent/fluentd:latest
 	docker pull gliderlabs/registrator:latest
 	docker pull ubuntu:xenial
+	docker pull wurstmeister/kafka
+	docker pull wurstmeister/zookeeper
 
 purge-venv:
 	rm -fr ${VENVDIR}
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index aadeebc..5c261f8 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -13,7 +13,7 @@
   # Single-node kafka service
   #
   kafka:
-    build: ../kafka
+    image: wurstmeister/kafka
     ports:
      - 9092
     environment:
diff --git a/compose/docker-compose-zk-kafka-test.yml b/compose/docker-compose-zk-kafka-test.yml
new file mode 100644
index 0000000..09c562b
--- /dev/null
+++ b/compose/docker-compose-zk-kafka-test.yml
@@ -0,0 +1,25 @@
+version: '2'
+services:
+  #
+  # Single-node zookeeper service
+  #
+  zookeeper:
+    image: wurstmeister/zookeeper
+    ports:
+    - 2181:2181
+    environment:
+      SERVICE_2181_NAME: "zookeeper"
+  #
+  # Single-node kafka service
+  #
+  kafka:
+    image: wurstmeister/kafka
+    ports:
+     - 9092:9092
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      SERVICE_9092_NAME: "kafka"
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
\ No newline at end of file
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
deleted file mode 100644
index 7f297d3..0000000
--- a/kafka/Dockerfile
+++ /dev/null
@@ -1,19 +0,0 @@
-FROM anapsix/alpine-java
-
-MAINTAINER Wurstmeister
-
-RUN apk add --update unzip wget curl docker jq coreutils
-
-ENV KAFKA_VERSION="0.10.0.1" SCALA_VERSION="2.11"
-ADD download-kafka.sh /tmp/download-kafka.sh
-RUN /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
-
-VOLUME ["/kafka"]
-
-ENV KAFKA_HOME /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}
-ADD start-kafka.sh /usr/bin/start-kafka.sh
-ADD broker-list.sh /usr/bin/broker-list.sh
-ADD create-topics.sh /usr/bin/create-topics.sh
-
-# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
-CMD ["start-kafka.sh"]
diff --git a/kafka/broker-list.sh b/kafka/broker-list.sh
deleted file mode 100755
index 7f04639..0000000
--- a/kafka/broker-list.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/bash
-
-CONTAINERS=$(docker ps | grep 9092 | awk '{print $1}')
-BROKERS=$(for CONTAINER in $CONTAINERS; do docker port $CONTAINER 9092 | sed -e "s/0.0.0.0:/$HOST_IP:/g"; done)
-echo $BROKERS | sed -e 's/ /,/g'
diff --git a/kafka/create-topics.sh b/kafka/create-topics.sh
deleted file mode 100755
index e07bf06..0000000
--- a/kafka/create-topics.sh
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/bash
-
-
-if [[ -z "$START_TIMEOUT" ]]; then
-    START_TIMEOUT=600
-fi
-
-start_timeout_exceeded=false
-count=0
-step=10
-while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
-    echo "waiting for kafka to be ready"
-    sleep $step;
-    count=$(expr $count + $step)
-    if [ $count -gt $START_TIMEOUT ]; then
-        start_timeout_exceeded=true
-        break
-    fi
-done
-
-if $start_timeout_exceeded; then
-    echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
-    exit 1
-fi
-
-if [[ -n $KAFKA_CREATE_TOPICS ]]; then
-    IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
-        echo "creating topics: $topicToCreate" 
-        IFS=':' read -a topicConfig <<< "$topicToCreate"
-        JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partition ${topicConfig[1]} --topic "${topicConfig[0]}"
-    done
-fi
diff --git a/kafka/download-kafka.sh b/kafka/download-kafka.sh
deleted file mode 100755
index 2ddc911..0000000
--- a/kafka/download-kafka.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred')
-url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
-wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/start-kafka-shell.sh b/kafka/start-kafka-shell.sh
deleted file mode 100755
index 62663e4..0000000
--- a/kafka/start-kafka-shell.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/bash
-docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash
diff --git a/kafka/start-kafka.sh b/kafka/start-kafka.sh
deleted file mode 100755
index cd437fb..0000000
--- a/kafka/start-kafka.sh
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/bin/bash
-
-if [[ -z "$KAFKA_PORT" ]]; then
-    export KAFKA_PORT=9092
-fi
-if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
-    export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
-fi
-if [[ -z "$KAFKA_BROKER_ID" ]]; then
-    # By default auto allocate broker ID
-    export KAFKA_BROKER_ID=-1
-fi
-if [[ -z "$KAFKA_LOG_DIRS" ]]; then
-    export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
-fi
-if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
-    export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
-fi
-
-if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
-    sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
-    unset KAFKA_HEAP_OPTS
-fi
-
-if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
-    export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
-fi
-
-for VAR in `env`
-do
-  if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
-    kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
-    env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
-    if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
-        sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
-    else
-        echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
-    fi
-  fi
-done
-
-if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
-  eval $CUSTOM_INIT_SCRIPT
-fi
-
-
-KAFKA_PID=0
-
-# see https://medium.com/@gchudnov/trapping-signals-in-docker-containers-7a57fdda7d86#.bh35ir4u5
-term_handler() {
-  echo 'Stopping Kafka....'
-  if [ $KAFKA_PID -ne 0 ]; then
-    kill -s TERM "$KAFKA_PID"
-    wait "$KAFKA_PID"
-  fi
-  echo 'Kafka stopped.'
-  exit
-}
-
-
-# Capture kill requests to stop properly
-trap "term_handler" SIGHUP SIGINT SIGTERM
-create-topics.sh & 
-$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
-KAFKA_PID=$!
-
-wait
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 072a7cd..cc8e80a 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -40,42 +40,77 @@
 
         self.log = get_logger()
 
+        self.log.info('KafkaProxy init with kafka endpoint:{}'.format(
+            kafka_endpoint))
+
+        self.ack_timeout = ack_timeout
+        self.max_req_attempts = max_req_attempts
         self.consul_endpoint = consul_endpoint
         self.kafka_endpoint = kafka_endpoint
+        self.kclient = None
+        self.kproducer = None
 
+        self._get_kafka_producer()
+
+        KafkaProxy._kafka_instance = self
+
+    def _get_kafka_producer(self):
         # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
         #  to a local log before sending response
+
         if self.kafka_endpoint.startswith('@'):
-            _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
-                                                   self.kafka_endpoint[1:])
+            try:
+                _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                       self.kafka_endpoint[1:])
+                self.log.info(
+                    'Found kafka service at {}'.format(_k_endpoint))
+
+            except Exception as e:
+                self.log.error('Failure to locate a kafka service from '
+                               'consul {}:'.format(repr(e)))
+                self.kproducer = None
+                self.kclient = None
+                return
         else:
             _k_endpoint = self.kafka_endpoint
 
-        self.log.info('Creating kafka endpoint', endpoint=_k_endpoint)
-
         self.kclient = _KafkaClient(_k_endpoint)
         self.kproducer = _kafkaProducer(self.kclient,
                                         req_acks=PRODUCER_ACK_LOCAL_WRITE,
-                                        ack_timeout=ack_timeout,
-                                        max_req_attempts=max_req_attempts)
-
-        self.log.info('initializing-KafkaProxy:{}'.format(_k_endpoint))
-        KafkaProxy._kafka_instance = self
+                                        ack_timeout=self.ack_timeout,
+                                        max_req_attempts=self.max_req_attempts)
 
     @inlineCallbacks
     def send_message(self, topic, msg):
         assert topic is not None
         assert msg is not None
+
+        # first check whether we have a kafka producer.  If there is none
+        # then try to get one - this happens only when we try to lookup the
+        # kafka service from consul
+        if self.kproducer is None:
+            self._get_kafka_producer()
+            # Lets the next message request do the retry if still a failure
+            if self.kproducer is None:
+                self.log.error('No kafka producer available at {}'.format(
+                    self.kafka_endpoint))
+                return
+
         self.log.info('Sending message {} to kafka topic {}'.format(msg,
                                                                     topic))
         try:
             msg_list = [msg]
             yield self.kproducer.send_messages(topic, msgs=msg_list)
-            self.log.debug('Successfully sent message {} to kafka topic '
-                           '{}'.format(msg, topic))
+            self.log.info('Successfully sent message {} to kafka topic '
+                          '{}'.format(msg, topic))
         except Exception as e:
-            self.log.info('Failure to send message {} to kafka topic {}: '
-                          '{}'.format(msg, topic, repr(e)))
+            self.log.error('Failure to send message {} to kafka topic {}: '
+                           '{}'.format(msg, topic, repr(e)))
+            # set the kafka producer to None.  This is needed if the
+            # kafka docker went down and comes back up with a different
+            # port number.
+            self.kproducer = None
+            self.kclient = None
 
 
 # Common method to get the singleton instance of the kafka proxy class