new file openolt_kafka_prodcuer.py

Change-Id: Iceac47f2b283c14d1dda2b3e432f8744fb704902
diff --git a/KNOWN_ISSSUES.md b/KNOWN_ISSSUES.md
index be0c54a..0ec17b0 100644
--- a/KNOWN_ISSSUES.md
+++ b/KNOWN_ISSSUES.md
@@ -1,5 +1,6 @@
 
-### `matching-onu-port-label-invalid` error
+### matching-onu-port-label-invalid
+
 This happens after olt is rebooted.
 
 ```
@@ -26,7 +27,8 @@
 }
 ```
 
-### `rx-in-invalid-state` error message in mib_sync.on_set_response
+### rx-in-invalid-state
+
 This happens after olt is rebooted.
 
 ```
@@ -36,3 +38,17 @@
 ### Eapol flow not added after olt reboot
 
 20190419T234544.680 DEBUG    MainThread openolt_flow_mgr.add_eapol_flow {'instance_id': 'vcore-0_1555716854', 'ip': '10.90.0.122:9191', 'event': 'flow-exists--not-re-adding', 'vcore_id': '0001'}
+
+### coordinator_etcd._retry
+
+Happens at startup
+```
+20190420T012854.182 ERROR    MainThread coordinator_etcd._retry {'instance_id': 'vcore-0', 'exception': 'Traceback (most recent call last):\n  File "/voltha/voltha/coordinator_etcd.py", line 564, in _retry\n    result = yield operation(*args, **kw)\nConnectionRefusedError: Connection was refused by other side: 111: Connection refused.', 'event': ConnectionRefusedError('Connection refused',)}
+```
+
+### coordinator_etcd._get
+
+Happens at startup
+```
+20190420T012915.020 ERROR    MainThread coordinator_etcd._get {'instance_id': 'vcore-0', 'exception': 'Traceback (most recent call last):\n  File "/voltha/voltha/coordinator_etcd.py", line 605, in _get\n    (index, result) = yield self._retry(\'GET\', key, **kw)\n  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 1386, in _inlineCallbacks\n    result = g.send(result)\n  File "/voltha/voltha/coordinator_etcd.py", line 577, in _retry\n    returnValue(result)\nUnboundLocalError: local variable \'result\' referenced before assignment', 'e': UnboundLocalError("local variable 'result' referenced before assignment",), 'event': 'got-exception'}
+```
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 26b9547..0db15a0 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -18,18 +18,16 @@
 import structlog
 import grpc
 import threading
-from structlog import get_logger
 
-from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
+from voltha.adapters.openolt.openolt_kafka_producer import kafka_send_pb
 from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
 
-log = get_logger()
+log = structlog.get_logger()
 
 
 class OpenoltGrpc(object):
     def __init__(self, host_and_port, device):
         super(OpenoltGrpc, self).__init__()
-        self.log = structlog.get_logger()
         log.debug('openolt grpc init')
         self.device = device
         self.host_and_port = host_and_port
diff --git a/voltha/adapters/openolt/openolt_kafka_producer.py b/voltha/adapters/openolt/openolt_kafka_producer.py
new file mode 100644
index 0000000..360ad63
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_kafka_producer.py
@@ -0,0 +1,39 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from structlog import get_logger
+from simplejson import dumps
+from google.protobuf.json_format import MessageToJson
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+
+log = get_logger()
+
+
+def kafka_send_pb(topic, msg):
+    try:
+        log.debug('send protobuf to kafka', topic=topic, msg=msg)
+        kafka_proxy = get_kafka_proxy()
+        if kafka_proxy and not kafka_proxy.is_faulty():
+            log.debug('kafka-proxy-available')
+            kafka_proxy.send_message(
+                topic,
+                dumps(MessageToJson(
+                    msg,
+                    including_default_value_fields=True)))
+        else:
+            log.error('kafka-proxy-unavailable')
+    except Exception, e:
+        log.exception('failed-sending-protobuf', e=e)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 8f9999c..3aea9f1 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -41,7 +41,7 @@
 from voltha.protos.omci_alarm_db_pb2 import AlarmDeviceData
 from requests.api import request
 from common.utils.asleep import asleep
-from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
+from voltha.adapters.openolt.openolt_kafka_producer import kafka_send_pb
 
 log = structlog.get_logger()
 
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 08d42f1..aae67f9 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -21,10 +21,8 @@
 from twisted.internet.defer import inlineCallbacks, returnValue
 from twisted.internet.threads import deferToThread
 from zope.interface import implementer
-from google.protobuf.json_format import MessageToJson
-from simplejson import dumps
 from confluent_kafka import Producer as _kafkaProducer
-from confluent_kafka import Consumer, KafkaError
+from confluent_kafka import Consumer
 
 from common.utils.consulhelpers import get_endpoint_from_consul
 from voltha.northbound.kafka.event_bus_publisher import EventBusPublisher
@@ -338,19 +336,3 @@
 # Common method to get the singleton instance of the kafka proxy class
 def get_kafka_proxy():
     return KafkaProxy._kafka_instance
-
-def kafka_send_pb(topic, msg):
-    try:
-        log.debug('send protobuf to kafka', topic=topic, msg=msg)
-        kafka_proxy = get_kafka_proxy()
-        if kafka_proxy and not kafka_proxy.is_faulty():
-            log.debug('kafka-proxy-available')
-            kafka_proxy.send_message(
-                topic,
-                dumps(MessageToJson(
-                    msg,
-                    including_default_value_fields=True)))
-        else:
-            log.error('kafka-proxy-unavailable')
-    except Exception, e:
-        log.exception('failed-sending-protobuf', e=e)