This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index 4bab30d..47f6a61 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -30,6 +30,9 @@
 log = structlog.get_logger()
 
 
+def createSubTopic(*args):
+    return '_'.join(args)
+
 class CoreProxy(ContainerProxy):
 
     def __init__(self, kafka_proxy, core_topic, my_listening_topic):
@@ -56,7 +59,15 @@
         log.debug("get-device")
         id = ID()
         id.id = device_id
-        res = yield self.invoke(rpc="GetDevice", device_id=id)
+        # Once we have a device being managed, all communications between the
+        # the adapter and the core occurs over a topic associated with that
+        # device
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="GetDevice",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id)
         returnValue(res)
 
     @ContainerProxy.wrap_request(Device)
@@ -71,7 +82,11 @@
         id.id = device_id
         p_type = IntType()
         p_type.val = port_type
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetPorts",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=p_type)
         returnValue(res)
@@ -117,9 +132,12 @@
         cdt.val = child_device_type
         channel = IntType()
         channel.val = channel_id
-
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 parent_port_no=ppn,
                                 child_device_type=cdt,
@@ -131,7 +149,12 @@
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
-        res = yield self.invoke(rpc="DeviceUpdate", device=device)
+        to_topic = createSubTopic(self.core_topic, device.id)
+        reply_topic = createSubTopic(self.listening_topic, device.id)
+        res = yield self.invoke(rpc="DeviceUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device=device)
         returnValue(res)
 
     def child_device_removed(parent_device_id, child_device_id):
@@ -155,7 +178,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="DeviceStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -179,7 +206,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="ChildrenStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -201,7 +232,11 @@
         o_status = IntType()
         o_status.val = oper_status
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=pt,
                                 port_no=pNo,
@@ -227,7 +262,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         res = yield self.invoke(rpc="child_devices_state_update",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -242,7 +281,11 @@
         log.debug("device_pm_config_update")
         b = BoolType()
         b.val = init
+        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
         res = yield self.invoke(rpc="DevicePMConfigUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_pm_config=device_pm_config,
                                 init=b)
         returnValue(res)
@@ -253,7 +296,11 @@
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortCreated",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=port)
         returnValue(res)
@@ -284,7 +331,11 @@
         p.val = port
         pac = Packet()
         pac.payload = packet
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PacketIn",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=p,
                                 packet=pac)