new file openolt_kafka_prodcuer.py
Change-Id: Iceac47f2b283c14d1dda2b3e432f8744fb704902
diff --git a/KNOWN_ISSSUES.md b/KNOWN_ISSSUES.md
index be0c54a..0ec17b0 100644
--- a/KNOWN_ISSSUES.md
+++ b/KNOWN_ISSSUES.md
@@ -1,5 +1,6 @@
-### `matching-onu-port-label-invalid` error
+### matching-onu-port-label-invalid
+
This happens after olt is rebooted.
```
@@ -26,7 +27,8 @@
}
```
-### `rx-in-invalid-state` error message in mib_sync.on_set_response
+### rx-in-invalid-state
+
This happens after olt is rebooted.
```
@@ -36,3 +38,17 @@
### Eapol flow not added after olt reboot
20190419T234544.680 DEBUG MainThread openolt_flow_mgr.add_eapol_flow {'instance_id': 'vcore-0_1555716854', 'ip': '10.90.0.122:9191', 'event': 'flow-exists--not-re-adding', 'vcore_id': '0001'}
+
+### coordinator_etcd._retry
+
+Happens at startup
+```
+20190420T012854.182 ERROR MainThread coordinator_etcd._retry {'instance_id': 'vcore-0', 'exception': 'Traceback (most recent call last):\n File "/voltha/voltha/coordinator_etcd.py", line 564, in _retry\n result = yield operation(*args, **kw)\nConnectionRefusedError: Connection was refused by other side: 111: Connection refused.', 'event': ConnectionRefusedError('Connection refused',)}
+```
+
+### coordinator_etcd._get
+
+Happens at startup
+```
+20190420T012915.020 ERROR MainThread coordinator_etcd._get {'instance_id': 'vcore-0', 'exception': 'Traceback (most recent call last):\n File "/voltha/voltha/coordinator_etcd.py", line 605, in _get\n (index, result) = yield self._retry(\'GET\', key, **kw)\n File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 1386, in _inlineCallbacks\n result = g.send(result)\n File "/voltha/voltha/coordinator_etcd.py", line 577, in _retry\n returnValue(result)\nUnboundLocalError: local variable \'result\' referenced before assignment', 'e': UnboundLocalError("local variable 'result' referenced before assignment",), 'event': 'got-exception'}
+```
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 26b9547..0db15a0 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -18,18 +18,16 @@
import structlog
import grpc
import threading
-from structlog import get_logger
-from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
+from voltha.adapters.openolt.openolt_kafka_producer import kafka_send_pb
from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
-log = get_logger()
+log = structlog.get_logger()
class OpenoltGrpc(object):
def __init__(self, host_and_port, device):
super(OpenoltGrpc, self).__init__()
- self.log = structlog.get_logger()
log.debug('openolt grpc init')
self.device = device
self.host_and_port = host_and_port
diff --git a/voltha/adapters/openolt/openolt_kafka_producer.py b/voltha/adapters/openolt/openolt_kafka_producer.py
new file mode 100644
index 0000000..360ad63
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_kafka_producer.py
@@ -0,0 +1,39 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from structlog import get_logger
+from simplejson import dumps
+from google.protobuf.json_format import MessageToJson
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+
+log = get_logger()
+
+
+def kafka_send_pb(topic, msg):
+ try:
+ log.debug('send protobuf to kafka', topic=topic, msg=msg)
+ kafka_proxy = get_kafka_proxy()
+ if kafka_proxy and not kafka_proxy.is_faulty():
+ log.debug('kafka-proxy-available')
+ kafka_proxy.send_message(
+ topic,
+ dumps(MessageToJson(
+ msg,
+ including_default_value_fields=True)))
+ else:
+ log.error('kafka-proxy-unavailable')
+ except Exception, e:
+ log.exception('failed-sending-protobuf', e=e)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 8f9999c..3aea9f1 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -41,7 +41,7 @@
from voltha.protos.omci_alarm_db_pb2 import AlarmDeviceData
from requests.api import request
from common.utils.asleep import asleep
-from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
+from voltha.adapters.openolt.openolt_kafka_producer import kafka_send_pb
log = structlog.get_logger()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 08d42f1..aae67f9 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -21,10 +21,8 @@
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.threads import deferToThread
from zope.interface import implementer
-from google.protobuf.json_format import MessageToJson
-from simplejson import dumps
from confluent_kafka import Producer as _kafkaProducer
-from confluent_kafka import Consumer, KafkaError
+from confluent_kafka import Consumer
from common.utils.consulhelpers import get_endpoint_from_consul
from voltha.northbound.kafka.event_bus_publisher import EventBusPublisher
@@ -338,19 +336,3 @@
# Common method to get the singleton instance of the kafka proxy class
def get_kafka_proxy():
return KafkaProxy._kafka_instance
-
-def kafka_send_pb(topic, msg):
- try:
- log.debug('send protobuf to kafka', topic=topic, msg=msg)
- kafka_proxy = get_kafka_proxy()
- if kafka_proxy and not kafka_proxy.is_faulty():
- log.debug('kafka-proxy-available')
- kafka_proxy.send_message(
- topic,
- dumps(MessageToJson(
- msg,
- including_default_value_fields=True)))
- else:
- log.error('kafka-proxy-unavailable')
- except Exception, e:
- log.exception('failed-sending-protobuf', e=e)