Suffix kafka topics with logical dev id or host_and_port

Change-Id: Iaaaa7989ae32bff87723953297bd97dc4b6f4cd6
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index a316e82..0672037 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -107,14 +107,13 @@
         self.device_info = None
 
         self._kadmin = KAdmin()
-        self._kadmin.delete_topics(['openolt.ind', 'voltha.pktout'])
+        self._kadmin.delete_topics([
+            'openolt.ind-{}'.format(self.host_and_port.split(':')[0])])
         self._grpc = None
         self.go_state_init()
 
     def do_state_init(self, event):
         self.log.debug('init')
-        self._packet = OpenoltPacket(self)
-        self._packet.start()
         self._indications = OpenoltIndications(self)
         self._indications.start()
 
@@ -136,6 +135,12 @@
 
         self.data_model.olt_create(self.device_info)
 
+        self._kadmin.delete_topics([
+            'voltha.pktout-{}'.format(self.data_model.logical_device_id)])
+
+        self._packet = OpenoltPacket(self)
+        self._packet.start()
+
         self.resource_mgr = self.resource_mgr_class(self.device_id,
                                                     self.host_and_port,
                                                     self.extra_args,
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index a7cfd07..726270b 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -18,7 +18,6 @@
 import grpc
 import threading
 import time
-from simplejson import dumps
 from twisted.internet import reactor
 from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
 from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
@@ -109,4 +108,6 @@
                                       indications=ind)
                         continue
 
-                kafka_send_pb("openolt.ind", ind)
+                topic = 'openolt.ind-{}'.format(
+                    self.device.host_and_port.split(':')[0])
+                kafka_send_pb(topic, ind)
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
index 54415ba..4873984 100644
--- a/voltha/adapters/openolt/openolt_indications.py
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -42,7 +42,10 @@
 
     def indications_thread(self):
         self.log.debug('openolt indications thread starting')
-        KConsumer(self.indications_process, "openolt.ind")
+
+        KConsumer(self.indications_process,
+                  'openolt.ind-{}'.format(
+                      self.device.host_and_port.split(':')[0]))
 
     def indications_process(self, topic, msg):
 
diff --git a/voltha/adapters/openolt/openolt_packet.py b/voltha/adapters/openolt/openolt_packet.py
index cfd838c..a3ac84b 100644
--- a/voltha/adapters/openolt/openolt_packet.py
+++ b/voltha/adapters/openolt/openolt_packet.py
@@ -45,7 +45,9 @@
 
     def packet_thread(self):
         self.log.debug('openolt packet-out thread starting')
-        KConsumer(self.packet_process, 'voltha.pktout')
+        KConsumer(self.packet_process,
+                  'voltha.pktout-{}'.format(
+                      self.device.data_model.logical_device_id))
 
     def packet_process(self, topic, msg):