OpenoltKafkaProxy
Change-Id: I10f46ffd417f5c6814d30c56b1656a492b265528
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 0db15a0..2dcc4c5 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -19,8 +19,10 @@
import grpc
import threading
+from voltha.registry import registry
from voltha.adapters.openolt.openolt_kafka_producer import kafka_send_pb
from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
+from voltha.adapters.openolt.openolt_kafka_proxy import OpenoltKafkaProxy
log = structlog.get_logger()
@@ -79,4 +81,9 @@
broker = sys.argv[1]
host = sys.argv[2]
- process_indications(broker, host)
+ kafka_proxy = registry.register(
+ 'openolt_kafka_proxy',
+ OpenoltKafkaProxy(broker)
+ ).start()
+
+ process_indications(host)
diff --git a/voltha/adapters/openolt/openolt_kafka_producer.py b/voltha/adapters/openolt/openolt_kafka_producer.py
index 58b3613..567933b 100644
--- a/voltha/adapters/openolt/openolt_kafka_producer.py
+++ b/voltha/adapters/openolt/openolt_kafka_producer.py
@@ -25,7 +25,7 @@
def kafka_send_pb(topic, msg):
try:
log.debug('send protobuf to kafka', topic=topic, msg=msg)
- kafka_proxy = registry('kafka_proxy')
+ kafka_proxy = registry('openolt_kafka_proxy')
if kafka_proxy and not kafka_proxy.is_faulty():
log.debug('kafka-proxy-available')
kafka_proxy.send_message(
diff --git a/voltha/main.py b/voltha/main.py
index 253b643..243bffa 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -42,6 +42,7 @@
from voltha.northbound.diagnostics import Diagnostics
from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
from voltha.northbound.kafka.kafka_proxy import KafkaProxy
+from voltha.adapters.openolt.openolt_kafka_proxy import OpenoltKafkaProxy
from voltha.northbound.rest.health_check import init_rest_service
from voltha.protos.common_pb2 import LogLevel
from voltha.registry import registry, IComponent
@@ -438,6 +439,11 @@
).start()
yield registry.register(
+ 'openolt_kafka_proxy',
+ OpenoltKafkaProxy(self.args.kafka)
+ ).start()
+
+ yield registry.register(
'frameio',
FrameIOManager()
).start()