This commit consists of:
1) Provide a kafka client to publish events to kafka brokers
2) Provide zookeeper and kafka docker containers for local testing,
will not be present in production
3) Sends a regular heartbeat to the kafka broker from voltha to
exercise all the components
4) Provides a basic kafka consumeri (requires kafka-python to be
installed) to read the messages off the local kafka broker -
this time it is only heartbeat messages
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index 01c03ea..4fec883 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -1,6 +1,31 @@
version: '2'
services:
#
+ # Single-node zookeeper service
+ #
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - "2181:2181"
+ environment:
+ SERVICE_2181_NAME: "zookeeper"
+ #
+ # Single-node kafka service
+ #
+ kafka:
+ build: ../kafka
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ SERVICE_9092_NAME: "kafka"
+ depends_on:
+ - consul
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ #
# Single-node consul agent
#
consul:
@@ -50,10 +75,11 @@
command: [
"/voltha/main.py",
"-v",
- "--consul=consul:8500",
+ "--consul=${DOCKER_HOST_IP}:8500",
"--fluentd=fluentd:24224",
"--rest-port=8880",
"--grpc-port=50555",
+ "--kafka=@kafka",
"--instance-id-is-container-name",
"-v"
]
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
new file mode 100644
index 0000000..7f297d3
--- /dev/null
+++ b/kafka/Dockerfile
@@ -0,0 +1,19 @@
+FROM anapsix/alpine-java
+
+MAINTAINER Wurstmeister
+
+RUN apk add --update unzip wget curl docker jq coreutils
+
+ENV KAFKA_VERSION="0.10.0.1" SCALA_VERSION="2.11"
+ADD download-kafka.sh /tmp/download-kafka.sh
+RUN /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
+
+VOLUME ["/kafka"]
+
+ENV KAFKA_HOME /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}
+ADD start-kafka.sh /usr/bin/start-kafka.sh
+ADD broker-list.sh /usr/bin/broker-list.sh
+ADD create-topics.sh /usr/bin/create-topics.sh
+
+# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
+CMD ["start-kafka.sh"]
diff --git a/kafka/broker-list.sh b/kafka/broker-list.sh
new file mode 100755
index 0000000..7f04639
--- /dev/null
+++ b/kafka/broker-list.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+CONTAINERS=$(docker ps | grep 9092 | awk '{print $1}')
+BROKERS=$(for CONTAINER in $CONTAINERS; do docker port $CONTAINER 9092 | sed -e "s/0.0.0.0:/$HOST_IP:/g"; done)
+echo $BROKERS | sed -e 's/ /,/g'
diff --git a/kafka/create-topics.sh b/kafka/create-topics.sh
new file mode 100755
index 0000000..e07bf06
--- /dev/null
+++ b/kafka/create-topics.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+
+if [[ -z "$START_TIMEOUT" ]]; then
+ START_TIMEOUT=600
+fi
+
+start_timeout_exceeded=false
+count=0
+step=10
+while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
+ echo "waiting for kafka to be ready"
+ sleep $step;
+ count=$(expr $count + $step)
+ if [ $count -gt $START_TIMEOUT ]; then
+ start_timeout_exceeded=true
+ break
+ fi
+done
+
+if $start_timeout_exceeded; then
+ echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
+ exit 1
+fi
+
+if [[ -n $KAFKA_CREATE_TOPICS ]]; then
+ IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
+ echo "creating topics: $topicToCreate"
+ IFS=':' read -a topicConfig <<< "$topicToCreate"
+ JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partition ${topicConfig[1]} --topic "${topicConfig[0]}"
+ done
+fi
diff --git a/kafka/download-kafka.sh b/kafka/download-kafka.sh
new file mode 100755
index 0000000..2ddc911
--- /dev/null
+++ b/kafka/download-kafka.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred')
+url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
+wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/kafkaConsumer.py b/kafka/kafkaConsumer.py
new file mode 100644
index 0000000..2fd61e8
--- /dev/null
+++ b/kafka/kafkaConsumer.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import threading, logging, time
+
+from kafka import KafkaConsumer
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ #consumer = KafkaConsumer(bootstrap_servers='10.100.198.220:9092',
+ consumer = KafkaConsumer(bootstrap_servers='10.0.2.15:9092',
+ auto_offset_reset='earliest')
+ consumer.subscribe(['voltha-heartbeat'])
+
+ for message in consumer:
+ print (message)
+
+
+def main():
+ threads = [
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(3000)
+
+if __name__ == "__main__":
+ logging.basicConfig(
+ format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+ level=logging.INFO
+ )
+ main()
+
diff --git a/kafka/start-kafka-shell.sh b/kafka/start-kafka-shell.sh
new file mode 100755
index 0000000..62663e4
--- /dev/null
+++ b/kafka/start-kafka-shell.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash
diff --git a/kafka/start-kafka.sh b/kafka/start-kafka.sh
new file mode 100755
index 0000000..cd437fb
--- /dev/null
+++ b/kafka/start-kafka.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+
+if [[ -z "$KAFKA_PORT" ]]; then
+ export KAFKA_PORT=9092
+fi
+if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
+ export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
+fi
+if [[ -z "$KAFKA_BROKER_ID" ]]; then
+ # By default auto allocate broker ID
+ export KAFKA_BROKER_ID=-1
+fi
+if [[ -z "$KAFKA_LOG_DIRS" ]]; then
+ export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
+fi
+if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
+ export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
+fi
+
+if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
+ sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
+ unset KAFKA_HEAP_OPTS
+fi
+
+if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
+ export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
+fi
+
+for VAR in `env`
+do
+ if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
+ kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
+ env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
+ if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
+ sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
+ else
+ echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
+ fi
+ fi
+done
+
+if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
+ eval $CUSTOM_INIT_SCRIPT
+fi
+
+
+KAFKA_PID=0
+
+# see https://medium.com/@gchudnov/trapping-signals-in-docker-containers-7a57fdda7d86#.bh35ir4u5
+term_handler() {
+ echo 'Stopping Kafka....'
+ if [ $KAFKA_PID -ne 0 ]; then
+ kill -s TERM "$KAFKA_PID"
+ wait "$KAFKA_PID"
+ fi
+ echo 'Kafka stopped.'
+ exit
+}
+
+
+# Capture kill requests to stop properly
+trap "term_handler" SIGHUP SIGINT SIGTERM
+create-topics.sh &
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
+KAFKA_PID=$!
+
+wait
diff --git a/requirements.txt b/requirements.txt
index 37787b3..eab3c02 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -35,3 +35,7 @@
# python-consul>=0.6.1 we need the pre-released version for now, because 0.6.1 does not
# yet support Twisted. Once this is released, it will be the 0.6.2 version
git+git://github.com/cablehead/python-consul.git
+
+# Twisted Python kafka client
+git+git://github.com/ciena/afkak.git
+
diff --git a/voltha/consulhelpers.py b/voltha/consulhelpers.py
new file mode 100644
index 0000000..c424e7f
--- /dev/null
+++ b/voltha/consulhelpers.py
@@ -0,0 +1,55 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some consul related convenience functions
+"""
+
+import os
+from structlog import get_logger
+import sys
+from consul import Consul
+from random import randint
+
+log = get_logger()
+
+def get_endpoint_from_consul(consul_endpoint, service_name):
+ """Look up, from consul, the service name specified by service-name
+ """
+ log.debug('Retrieving endpoint {} from consul {}'.format(service_name,
+ consul_endpoint))
+ host = consul_endpoint.split(':')[0].strip()
+ port = int(consul_endpoint.split(':')[1].strip())
+
+ consul = Consul(host=host, port=port)
+ _, services = consul.catalog.service(service_name)
+
+ if len(services) == 0:
+ raise Exception('Cannot find service {} in consul'.format(service_name))
+
+ # pick a random entry
+ # TODO should we prefer local IP addresses? Probably.
+
+ service = services[randint(0, len(services) - 1)]
+ endpoint = '{}:{}'.format(service['ServiceAddress'],
+ service['ServicePort'])
+
+ print endpoint
+ return endpoint
+
+
+if __name__ == '__main__':
+ get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
diff --git a/voltha/main.py b/voltha/main.py
index ef03797..3f63a9c 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -34,6 +34,8 @@
from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
from voltha.northbound.rest.health_check import init_rest_service
from voltha.structlog_setup import setup_logging
+from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+
defs = dict(
config=os.environ.get('CONFIG', './voltha.yml'),
@@ -47,6 +49,7 @@
get_my_primary_local_ipv4()),
interface=os.environ.get('INTERFACE', get_my_primary_interface()),
rest_port=os.environ.get('REST_PORT', 8880),
+ kafka=os.environ.get('KAFKA', 'localhost:9092'),
)
@@ -161,6 +164,15 @@
default=False,
help=_help)
+ _help = ('<hostname>:<port> of the kafka broker (default: %s). (If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka'])
+ parser.add_argument('-K', '--kafka',
+ dest='kafka',
+ action='store',
+ default=defs['kafka'],
+ help=_help)
+
args = parser.parse_args()
# post-processing
@@ -211,10 +223,11 @@
if not args.no_banner:
print_banner(self.log)
+ self.startup_components()
+
if not args.no_heartbeat:
self.start_heartbeat()
-
- self.startup_components()
+ self.start_kafka_heartbeat()
def start(self):
self.start_reactor() # will not return except Keyboard interrupt
@@ -231,6 +244,9 @@
self.grpc_server = VolthaGrpcServer(self.args.grpc_port).run()
+ #initialize kafka proxy singleton
+ self.kafka_proxy = KafkaProxy(self.args.consul, self.args.kafka)
+
self.log.info('started-internal-services')
@inlineCallbacks
@@ -262,6 +278,18 @@
lc = LoopingCall(heartbeat)
lc.start(10)
+ # Temporary function to send a heartbeat message to the external kafka
+ # broker
+ def start_kafka_heartbeat(self):
+ # For heartbeat we will send a message to a specific "voltha-heartbeat"
+ # topic. The message is a protocol buf
+ # message
+ message = 'Heartbeat message:{}'.format(get_my_primary_local_ipv4())
+ topic = "voltha-heartbeat"
+
+ from twisted.internet.task import LoopingCall
+ lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
+ lc.start(10)
if __name__ == '__main__':
Main().start()
diff --git a/voltha/northbound/kafka/__init__.py b/voltha/northbound/kafka/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/northbound/kafka/__init__.py
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
new file mode 100644
index 0000000..801701a
--- /dev/null
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -0,0 +1,86 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from afkak.client import KafkaClient as _KafkaClient
+from afkak.producer import Producer as _kafkaProducer
+from structlog import get_logger
+from voltha.consulhelpers import get_endpoint_from_consul
+from twisted.internet.defer import inlineCallbacks
+from afkak.common import (
+ PRODUCER_ACK_LOCAL_WRITE,
+)
+
+
+class KafkaProxy(object):
+ """
+ This is a singleton proxy kafka class to hide the kafka client details.
+ """
+ _kafka_instance = None
+
+ def __init__(self, consul_endpoint='localhost:8500',
+ kafka_endpoint='localhost:9092' ,
+ ack_timeout = 1000, max_req_attempts = 10):
+
+ # return an exception if the object already exist
+ if KafkaProxy._kafka_instance:
+ raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+
+ self.log = get_logger()
+
+ self.consul_endpoint = consul_endpoint
+ self.kafka_endpoint = kafka_endpoint
+
+ # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
+ # to a local log before sending response
+ if self.kafka_endpoint.startswith('@'):
+ _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+ self.kafka_endpoint[1:])
+ else:
+ _k_endpoint = self.kafka_endpoint
+
+ self.log.info('Creating kafka endpoint', endpoint=_k_endpoint)
+
+ self.kclient = _KafkaClient(_k_endpoint)
+ self.kproducer = _kafkaProducer(self.kclient,
+ req_acks=PRODUCER_ACK_LOCAL_WRITE,
+ ack_timeout=ack_timeout,
+ max_req_attempts=max_req_attempts)
+
+ self.log.info('initializing-KafkaProxy:{}'.format(_k_endpoint))
+ KafkaProxy._kafka_instance = self
+
+
+ @inlineCallbacks
+ def send_message(self, topic, msg):
+ assert topic is not None
+ assert msg is not None
+ self.log.info('Sending message {} to kafka topic {}'.format(msg,
+ topic))
+ try:
+ msg_list = []
+ msg_list.append(msg)
+ yield self.kproducer.send_messages(topic, msgs=msg_list)
+ self.log.debug('Successfully sent message {} to kafka topic '
+ '{}'.format(msg, topic))
+ except Exception as e:
+ self.log.info('Failure to send message {} to kafka topic {}: '
+ '{}'.format(msg, topic, repr(e)))
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+ return KafkaProxy._kafka_instance