Replace get_kafka_proxy() with registry
Change-Id: I0d529180597ab42467436bee4251c687e210e094
diff --git a/voltha/adapters/openolt/openolt_kafka_admin.py b/voltha/adapters/openolt/openolt_kafka_admin.py
index a61415e..d92237c 100644
--- a/voltha/adapters/openolt/openolt_kafka_admin.py
+++ b/voltha/adapters/openolt/openolt_kafka_admin.py
@@ -19,22 +19,24 @@
# Example Admin clients.
#
-from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource
+from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, \
+ ConfigResource, ConfigSource
from confluent_kafka import KafkaException
import sys
import threading
import logging
from structlog import get_logger
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.registry import registry
logging.basicConfig()
log = get_logger()
+
class KAdmin(object):
def __init__(self):
- kafka_proxy = get_kafka_proxy()
+ kafka_proxy = registry('kafka_proxy')
if kafka_proxy and not kafka_proxy.is_faulty():
kafka_endpoint = kafka_proxy.kafka_endpoint
log.debug('kafka-proxy-available', endpoint=kafka_endpoint)
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index aa53db7..1cb5400 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -20,14 +20,14 @@
import logging
from structlog import get_logger
from confluent_kafka import Consumer, KafkaError
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.registry import registry
log = get_logger()
class KConsumer(object):
def __init__(self, callback, *topics):
- kafka_proxy = get_kafka_proxy()
+ kafka_proxy = registry('kafka_proxy')
if kafka_proxy and not kafka_proxy.is_faulty():
self.kafka_endpoint = kafka_proxy.kafka_endpoint
log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint)
diff --git a/voltha/adapters/openolt/openolt_kafka_producer.py b/voltha/adapters/openolt/openolt_kafka_producer.py
index 360ad63..58b3613 100644
--- a/voltha/adapters/openolt/openolt_kafka_producer.py
+++ b/voltha/adapters/openolt/openolt_kafka_producer.py
@@ -17,7 +17,7 @@
from structlog import get_logger
from simplejson import dumps
from google.protobuf.json_format import MessageToJson
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.registry import registry
log = get_logger()
@@ -25,7 +25,7 @@
def kafka_send_pb(topic, msg):
try:
log.debug('send protobuf to kafka', topic=topic, msg=msg)
- kafka_proxy = get_kafka_proxy()
+ kafka_proxy = registry('kafka_proxy')
if kafka_proxy and not kafka_proxy.is_faulty():
log.debug('kafka-proxy-available')
kafka_proxy.send_message(
diff --git a/voltha/main.py b/voltha/main.py
index 19b8dfc..253b643 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -41,7 +41,7 @@
from voltha.core.config.config_backend import load_backend
from voltha.northbound.diagnostics import Diagnostics
from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
-from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from voltha.northbound.kafka.kafka_proxy import KafkaProxy
from voltha.northbound.rest.health_check import init_rest_service
from voltha.protos.common_pb2 import LogLevel
from voltha.registry import registry, IComponent
@@ -534,16 +534,21 @@
def send_msg():
try:
- kafka_proxy = get_kafka_proxy()
+ kafka_proxy = registry('kafka_proxy')
+ except KeyError as e:
+ self.log.warn('kafka-proxy-unavailable')
+ else:
if kafka_proxy and not kafka_proxy.is_faulty():
self.log.debug('kafka-proxy-available')
message['ts'] = arrow.utcnow().timestamp
self.log.debug('start-kafka-heartbeat')
- kafka_proxy.send_message(topic, dumps(message))
+ try:
+ kafka_proxy.send_message(topic, dumps(message))
+ except Exception, e:
+ self.log.exception('failed-sending-message-heartbeat',
+ e=e)
else:
self.log.warn('kafka-proxy-unavailable')
- except Exception, e:
- self.log.exception('failed-sending-message-heartbeat', e=e)
try:
lc = LoopingCall(send_msg)
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index aae67f9..e0ba989 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -331,8 +331,3 @@
def is_faulty(self):
return self.faulty
-
-
-# Common method to get the singleton instance of the kafka proxy class
-def get_kafka_proxy():
- return KafkaProxy._kafka_instance