Suffix kafka topics with logical dev id or host_and_port
Change-Id: Iaaaa7989ae32bff87723953297bd97dc4b6f4cd6
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index a316e82..0672037 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -107,14 +107,13 @@
self.device_info = None
self._kadmin = KAdmin()
- self._kadmin.delete_topics(['openolt.ind', 'voltha.pktout'])
+ self._kadmin.delete_topics([
+ 'openolt.ind-{}'.format(self.host_and_port.split(':')[0])])
self._grpc = None
self.go_state_init()
def do_state_init(self, event):
self.log.debug('init')
- self._packet = OpenoltPacket(self)
- self._packet.start()
self._indications = OpenoltIndications(self)
self._indications.start()
@@ -136,6 +135,12 @@
self.data_model.olt_create(self.device_info)
+ self._kadmin.delete_topics([
+ 'voltha.pktout-{}'.format(self.data_model.logical_device_id)])
+
+ self._packet = OpenoltPacket(self)
+ self._packet.start()
+
self.resource_mgr = self.resource_mgr_class(self.device_id,
self.host_and_port,
self.extra_args,
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index a7cfd07..726270b 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -18,7 +18,6 @@
import grpc
import threading
import time
-from simplejson import dumps
from twisted.internet import reactor
from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
@@ -109,4 +108,6 @@
indications=ind)
continue
- kafka_send_pb("openolt.ind", ind)
+ topic = 'openolt.ind-{}'.format(
+ self.device.host_and_port.split(':')[0])
+ kafka_send_pb(topic, ind)
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
index 54415ba..4873984 100644
--- a/voltha/adapters/openolt/openolt_indications.py
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -42,7 +42,10 @@
def indications_thread(self):
self.log.debug('openolt indications thread starting')
- KConsumer(self.indications_process, "openolt.ind")
+
+ KConsumer(self.indications_process,
+ 'openolt.ind-{}'.format(
+ self.device.host_and_port.split(':')[0]))
def indications_process(self, topic, msg):
diff --git a/voltha/adapters/openolt/openolt_packet.py b/voltha/adapters/openolt/openolt_packet.py
index cfd838c..a3ac84b 100644
--- a/voltha/adapters/openolt/openolt_packet.py
+++ b/voltha/adapters/openolt/openolt_packet.py
@@ -45,7 +45,9 @@
def packet_thread(self):
self.log.debug('openolt packet-out thread starting')
- KConsumer(self.packet_process, 'voltha.pktout')
+ KConsumer(self.packet_process,
+ 'voltha.pktout-{}'.format(
+ self.device.data_model.logical_device_id))
def packet_process(self, topic, msg):
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 6daef6e..8f9999c 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -1123,9 +1123,7 @@
if adapter_name == 'openolt':
log.debug('fast path pkt-out to kafka')
- # topic = 'openolt.pktout:{}'.format(req.id)
- topic = 'voltha.pktout'
- kafka_send_pb(topic, req)
+ kafka_send_pb('voltha.pktout-{}'.format(req.id), req)
else:
forward_packet_out(packet_out=req)