Replace get_kafka_proxy() with registry

Change-Id: I0d529180597ab42467436bee4251c687e210e094
diff --git a/voltha/adapters/openolt/openolt_kafka_admin.py b/voltha/adapters/openolt/openolt_kafka_admin.py
index a61415e..d92237c 100644
--- a/voltha/adapters/openolt/openolt_kafka_admin.py
+++ b/voltha/adapters/openolt/openolt_kafka_admin.py
@@ -19,22 +19,24 @@
 # Example Admin clients.
 #
 
-from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource
+from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, \
+    ConfigResource, ConfigSource
 from confluent_kafka import KafkaException
 import sys
 import threading
 import logging
 from structlog import get_logger
 
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.registry import registry
 
 logging.basicConfig()
 
 log = get_logger()
 
+
 class KAdmin(object):
     def __init__(self):
-        kafka_proxy = get_kafka_proxy()
+        kafka_proxy = registry('kafka_proxy')
         if kafka_proxy and not kafka_proxy.is_faulty():
             kafka_endpoint = kafka_proxy.kafka_endpoint
             log.debug('kafka-proxy-available', endpoint=kafka_endpoint)
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index aa53db7..1cb5400 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -20,14 +20,14 @@
 import logging
 from structlog import get_logger
 from confluent_kafka import Consumer, KafkaError
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.registry import registry
 
 log = get_logger()
 
 
 class KConsumer(object):
     def __init__(self, callback, *topics):
-        kafka_proxy = get_kafka_proxy()
+        kafka_proxy = registry('kafka_proxy')
         if kafka_proxy and not kafka_proxy.is_faulty():
             self.kafka_endpoint = kafka_proxy.kafka_endpoint
             log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint)
diff --git a/voltha/adapters/openolt/openolt_kafka_producer.py b/voltha/adapters/openolt/openolt_kafka_producer.py
index 360ad63..58b3613 100644
--- a/voltha/adapters/openolt/openolt_kafka_producer.py
+++ b/voltha/adapters/openolt/openolt_kafka_producer.py
@@ -17,7 +17,7 @@
 from structlog import get_logger
 from simplejson import dumps
 from google.protobuf.json_format import MessageToJson
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.registry import registry
 
 log = get_logger()
 
@@ -25,7 +25,7 @@
 def kafka_send_pb(topic, msg):
     try:
         log.debug('send protobuf to kafka', topic=topic, msg=msg)
-        kafka_proxy = get_kafka_proxy()
+        kafka_proxy = registry('kafka_proxy')
         if kafka_proxy and not kafka_proxy.is_faulty():
             log.debug('kafka-proxy-available')
             kafka_proxy.send_message(
diff --git a/voltha/main.py b/voltha/main.py
index 19b8dfc..253b643 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -41,7 +41,7 @@
 from voltha.core.config.config_backend import load_backend
 from voltha.northbound.diagnostics import Diagnostics
 from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
-from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from voltha.northbound.kafka.kafka_proxy import KafkaProxy
 from voltha.northbound.rest.health_check import init_rest_service
 from voltha.protos.common_pb2 import LogLevel
 from voltha.registry import registry, IComponent
@@ -534,16 +534,21 @@
 
         def send_msg():
             try:
-                kafka_proxy = get_kafka_proxy()
+                kafka_proxy = registry('kafka_proxy')
+            except KeyError as e:
+                self.log.warn('kafka-proxy-unavailable')
+            else:
                 if kafka_proxy and not kafka_proxy.is_faulty():
                     self.log.debug('kafka-proxy-available')
                     message['ts'] = arrow.utcnow().timestamp
                     self.log.debug('start-kafka-heartbeat')
-                    kafka_proxy.send_message(topic, dumps(message))
+                    try:
+                        kafka_proxy.send_message(topic, dumps(message))
+                    except Exception, e:
+                        self.log.exception('failed-sending-message-heartbeat',
+                                           e=e)
                 else:
                     self.log.warn('kafka-proxy-unavailable')
-            except Exception, e:
-                self.log.exception('failed-sending-message-heartbeat', e=e)
 
         try:
             lc = LoopingCall(send_msg)
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index aae67f9..e0ba989 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -331,8 +331,3 @@
 
     def is_faulty(self):
         return self.faulty
-
-
-# Common method to get the singleton instance of the kafka proxy class
-def get_kafka_proxy():
-    return KafkaProxy._kafka_instance