Minor cleanups
diff --git a/kafka/kafkaConsumer.py b/kafka/kafkaConsumer.py
index f6ae9ad..bd49c09 100644
--- a/kafka/kafkaConsumer.py
+++ b/kafka/kafkaConsumer.py
@@ -3,13 +3,11 @@
from kafka import KafkaConsumer
-
class Consumer(threading.Thread):
daemon = True
def run(self):
consumer = KafkaConsumer(bootstrap_servers='10.100.198.220:9092',
- #consumer = KafkaConsumer(bootstrap_servers='10.0.2.15:9092',
auto_offset_reset='earliest')
consumer.subscribe(['voltha-heartbeat'])
@@ -28,9 +26,5 @@
time.sleep(3000)
if __name__ == "__main__":
- logging.basicConfig(
- format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO
- )
main()
diff --git a/requirements.txt b/requirements.txt
index eab3c02..f4323fc 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,6 +10,7 @@
hash_ring>=1.3.1
hexdump>=3.3
jinja2>=2.8
+kafka-python>=1.2.0
klein>=15.3.1
nose>=1.3.7
mock>=1.3.0
diff --git a/voltha/consulhelpers.py b/voltha/consulhelpers.py
index c424e7f..a5f0204 100644
--- a/voltha/consulhelpers.py
+++ b/voltha/consulhelpers.py
@@ -18,19 +18,18 @@
Some consul related convenience functions
"""
-import os
from structlog import get_logger
-import sys
from consul import Consul
from random import randint
log = get_logger()
+
def get_endpoint_from_consul(consul_endpoint, service_name):
"""Look up, from consul, the service name specified by service-name
"""
log.debug('Retrieving endpoint {} from consul {}'.format(service_name,
- consul_endpoint))
+ consul_endpoint))
host = consul_endpoint.split(':')[0].strip()
port = int(consul_endpoint.split(':')[1].strip())
@@ -38,7 +37,8 @@
_, services = consul.catalog.service(service_name)
if len(services) == 0:
- raise Exception('Cannot find service {} in consul'.format(service_name))
+ raise Exception(
+ 'Cannot find service {} in consul'.format(service_name))
# pick a random entry
# TODO should we prefer local IP addresses? Probably.
diff --git a/voltha/main.py b/voltha/main.py
index 3f63a9c..e76fac3 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -30,13 +30,13 @@
from voltha.coordinator import Coordinator
from voltha.dockerhelpers import get_my_containers_name
-from voltha.nethelpers import get_my_primary_interface, get_my_primary_local_ipv4
+from voltha.nethelpers import get_my_primary_interface, \
+ get_my_primary_local_ipv4
from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
from voltha.northbound.rest.health_check import init_rest_service
from voltha.structlog_setup import setup_logging
from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
-
defs = dict(
config=os.environ.get('CONFIG', './voltha.yml'),
consul=os.environ.get('CONSUL', 'localhost:8500'),
@@ -54,7 +54,6 @@
def parse_args():
-
parser = argparse.ArgumentParser()
_help = ('Path to voltha.yml config file (default: %s). '
@@ -87,7 +86,7 @@
action='store',
default=defs['grpc_port'],
help=_help)
-
+
_help = ('<hostname>:<port> to fluentd server (default: %s). (If not '
'specified (None), the address from the config file is used'
% defs['fluentd'])
@@ -204,7 +203,6 @@
class Main(object):
-
def __init__(self):
self.args = args = parse_args()
@@ -219,6 +217,7 @@
# components
self.coordinator = None
self.grpc_server = None
+ self.kafka_proxy = None
if not args.no_banner:
print_banner(self.log)
@@ -244,7 +243,7 @@
self.grpc_server = VolthaGrpcServer(self.args.grpc_port).run()
- #initialize kafka proxy singleton
+ # initialize kafka proxy singleton
self.kafka_proxy = KafkaProxy(self.args.consul, self.args.kafka)
self.log.info('started-internal-services')
@@ -291,5 +290,6 @@
lc = LoopingCall(get_kafka_proxy().send_message, topic, message)
lc.start(10)
+
if __name__ == '__main__':
Main().start()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 801701a..072a7cd 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -14,7 +14,6 @@
# limitations under the License.
#
-import time
from afkak.client import KafkaClient as _KafkaClient
from afkak.producer import Producer as _kafkaProducer
from structlog import get_logger
@@ -32,8 +31,8 @@
_kafka_instance = None
def __init__(self, consul_endpoint='localhost:8500',
- kafka_endpoint='localhost:9092' ,
- ack_timeout = 1000, max_req_attempts = 10):
+ kafka_endpoint='localhost:9092',
+ ack_timeout=1000, max_req_attempts=10):
# return an exception if the object already exist
if KafkaProxy._kafka_instance:
@@ -63,7 +62,6 @@
self.log.info('initializing-KafkaProxy:{}'.format(_k_endpoint))
KafkaProxy._kafka_instance = self
-
@inlineCallbacks
def send_message(self, topic, msg):
assert topic is not None
@@ -71,8 +69,7 @@
self.log.info('Sending message {} to kafka topic {}'.format(msg,
topic))
try:
- msg_list = []
- msg_list.append(msg)
+ msg_list = [msg]
yield self.kproducer.send_messages(topic, msgs=msg_list)
self.log.debug('Successfully sent message {} to kafka topic '
'{}'.format(msg, topic))