Move pkt-in/out kafka topic deletion into openolt_packet
Change-Id: I97375e4f05dec7f6576e72861aded1cb8bc703f7
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index eec0f2c..1931c45 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -143,12 +143,6 @@
self.data_model.olt_create(self.device_info)
- # FIXME
- self._kadmin.delete_topics([
- 'voltha.pktout-{}'.format(self.data_model.logical_device_id)])
- self._kadmin.delete_topics(['openolt.pktin-{}'.format(
- self.host_and_port.split(':')[0])])
-
self._packet = OpenoltPacket(self)
self._packet.start()
diff --git a/voltha/adapters/openolt/openolt_packet.py b/voltha/adapters/openolt/openolt_packet.py
index 318456e..7c3c5f1 100644
--- a/voltha/adapters/openolt/openolt_packet.py
+++ b/voltha/adapters/openolt/openolt_packet.py
@@ -28,6 +28,7 @@
from voltha.core.flow_decomposer import OUTPUT
from voltha.protos.device_pb2 import Port
from voltha.adapters.openolt.protos import openolt_pb2
+from voltha.adapters.openolt.openolt_kafka_admin import KAdmin
class OpenoltPacket(object):
@@ -35,6 +36,8 @@
self.log = structlog.get_logger()
self.device = device
+ self._kadmin = KAdmin()
+
self.packet_out_thread_handle = threading.Thread(
target=self.packet_out_thread)
self.packet_out_thread_handle.setDaemon(True)
@@ -43,12 +46,24 @@
target=self.packet_in_thread)
self.packet_in_thread_handle.setDaemon(True)
+ self._kadmin.delete_topics([
+ 'voltha.pktout-{}'.format(
+ self.device.data_model.logical_device_id)])
+ self._kadmin.delete_topics(['openolt.pktin-{}'.format(
+ self.device.host_and_port.split(':')[0])])
+
def start(self):
self.packet_out_thread_handle.start()
self.packet_in_thread_handle.start()
def stop(self):
- pass
+ self._kadmin.delete_topics([
+ 'voltha.pktout-{}'.format(
+ self.device.data_model.logical_device_id)])
+ self._kadmin.delete_topics(['openolt.pktin-{}'.format(
+ self.device.host_and_port.split(':')[0])])
+
+ # FIXME - kill threads
def packet_out_thread(self):
self.log.debug('openolt packet-out thread starting')