diff --git a/python/adapters/iadapter.py b/python/adapters/iadapter.py
index 04cb303..78cf42a 100644
--- a/python/adapters/iadapter.py
+++ b/python/adapters/iadapter.py
@@ -30,6 +30,7 @@
 from python.protos.device_pb2 import DeviceType, DeviceTypes
 from python.protos.health_pb2 import HealthStatus
 
+
 log = structlog.get_logger()
 
 
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index cbae56d..978f57d 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -21,12 +21,15 @@
 
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
+from twisted.internet import reactor
 
 from python.adapters.interface import IAdapterInterface
 from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
+from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+    get_messaging_proxy
 
 
 class MacAddressError(BaseException):
@@ -62,11 +65,26 @@
     def stop(self):
         self.log.debug('stopping')
 
+
+    def createKafkaDeviceTopic(self, deviceId):
+        kafka_proxy = get_messaging_proxy()
+        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+        kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+
     def adopt_device(self, device):
         d = Device()
         if device:
             device.Unpack(d)
-            return True, self.adapter.adopt_device(d)
+
+            result = self.adapter.adopt_device(d)
+            # return True, self.adapter.adopt_device(d)
+
+            # Before we return, create a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
+            return True, result
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
@@ -151,7 +169,16 @@
         d = Device()
         if device:
             device.Unpack(d)
-            return (True, self.adapter.delete_device(d))
+            result = self.adapter.delete_device(d)
+            # return (True, self.adapter.delete_device(d))
+
+            # Before we return, delete the device specific topic as we will no
+            # longer receive requests from the Core for that device
+            kafka_proxy = get_messaging_proxy()
+            device_topic = kafka_proxy.get_default_topic() + "/" + d.id
+            kafka_proxy.unsubscribe(topic=device_topic)
+
+            return (True, result)
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
diff --git a/python/adapters/kafka/container_proxy.py b/python/adapters/kafka/container_proxy.py
index 8c4e828..efcf558 100644
--- a/python/adapters/kafka/container_proxy.py
+++ b/python/adapters/kafka/container_proxy.py
@@ -83,16 +83,21 @@
         return real_wrapper
 
     @inlineCallbacks
-    def invoke(self, rpc, to_topic=None, **kwargs):
+    def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
         @inlineCallbacks
-        def _send_request(rpc, m_callback, to_topic, **kwargs):
+        def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
             try:
-                log.debug("sending-request", rpc=rpc)
+                log.debug("sending-request",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic)
                 if to_topic is None:
                     to_topic = self.core_topic
+                if reply_topic is None:
+                    reply_topic = self.listening_topic
                 result = yield self.kafka_proxy.send_request(rpc=rpc,
                                                              to_topic=to_topic,
-                                                             reply_topic=self.listening_topic,
+                                                             reply_topic=reply_topic,
                                                              callback=None,
                                                              **kwargs)
                 if not m_callback.called:
@@ -104,11 +109,25 @@
                 if not m_callback.called:
                     m_callback.errback(failure.Failure())
 
-        cb = DeferredWithTimeout(timeout=self.default_timeout)
-        _send_request(rpc, cb, to_topic, **kwargs)
-        try:
-            res = yield cb
-            returnValue(res)
-        except TimeOutError as e:
-            log.warn('invoke-timeout', e=e)
-            raise e
+        # We are going to resend the request on the to_topic if there is a
+        # timeout error. This time the timeout will be longer.  If the second
+        # request times out then we will send the request to the default
+        # core_topic.
+        timeouts = [self.default_timeout,
+                    self.default_timeout*2,
+                    self.default_timeout]
+        retry = 0
+        max_retry = 2
+        for timeout in timeouts:
+            cb = DeferredWithTimeout(timeout=timeout)
+            _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+            try:
+                res = yield cb
+                returnValue(res)
+            except TimeOutError as e:
+                log.warn('invoke-timeout', e=e)
+                if retry == max_retry:
+                    raise e
+                retry += 1
+                if retry == max_retry:
+                    to_topic = self.core_topic
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index 4bab30d..47f6a61 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -30,6 +30,9 @@
 log = structlog.get_logger()
 
 
+def createSubTopic(*args):
+    return '_'.join(args)
+
 class CoreProxy(ContainerProxy):
 
     def __init__(self, kafka_proxy, core_topic, my_listening_topic):
@@ -56,7 +59,15 @@
         log.debug("get-device")
         id = ID()
         id.id = device_id
-        res = yield self.invoke(rpc="GetDevice", device_id=id)
+        # Once we have a device being managed, all communications between the
+        # the adapter and the core occurs over a topic associated with that
+        # device
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="GetDevice",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id)
         returnValue(res)
 
     @ContainerProxy.wrap_request(Device)
@@ -71,7 +82,11 @@
         id.id = device_id
         p_type = IntType()
         p_type.val = port_type
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetPorts",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=p_type)
         returnValue(res)
@@ -117,9 +132,12 @@
         cdt.val = child_device_type
         channel = IntType()
         channel.val = channel_id
-
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 parent_port_no=ppn,
                                 child_device_type=cdt,
@@ -131,7 +149,12 @@
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
-        res = yield self.invoke(rpc="DeviceUpdate", device=device)
+        to_topic = createSubTopic(self.core_topic, device.id)
+        reply_topic = createSubTopic(self.listening_topic, device.id)
+        res = yield self.invoke(rpc="DeviceUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device=device)
         returnValue(res)
 
     def child_device_removed(parent_device_id, child_device_id):
@@ -155,7 +178,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="DeviceStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -179,7 +206,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="ChildrenStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -201,7 +232,11 @@
         o_status = IntType()
         o_status.val = oper_status
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=pt,
                                 port_no=pNo,
@@ -227,7 +262,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         res = yield self.invoke(rpc="child_devices_state_update",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -242,7 +281,11 @@
         log.debug("device_pm_config_update")
         b = BoolType()
         b.val = init
+        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
         res = yield self.invoke(rpc="DevicePMConfigUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_pm_config=device_pm_config,
                                 init=b)
         returnValue(res)
@@ -253,7 +296,11 @@
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortCreated",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=port)
         returnValue(res)
@@ -284,7 +331,11 @@
         p.val = port
         pac = Packet()
         pac.payload = packet
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PacketIn",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=p,
                                 packet=pac)
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index 1d2b05c..aaa0c3c 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -152,6 +152,12 @@
             log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
+    def get_target_cls(self):
+        return self.target_cls
+
+    def get_default_topic(self):
+        return self.default_topic
+
     @inlineCallbacks
     def _subscribe(self, topic, callback=None, target_cls=None):
         try:
@@ -332,12 +338,13 @@
             request_body = InterContainerRequestBody()
             request.header.id = transaction_id
             request.header.type = MessageType.Value("REQUEST")
-            request.header.from_topic = self.default_topic
+            request.header.from_topic = reply_topic
             request.header.to_topic = to_topic
 
             response_required = False
             if reply_topic:
                 request_body.reply_to_topic = reply_topic
+                request_body.response_required = True
                 response_required = True
 
             request.header.timestamp = int(round(time.time() * 1000))
@@ -350,8 +357,6 @@
                     request_body.args.extend([arg])
                 except Exception as e:
                     log.exception("Failed-parsing-value", e=e)
-            request_body.reply_to_topic = self.default_topic
-            request_body.response_required = response_required
             request.body.Pack(request_body)
             return request, transaction_id, response_required
         except Exception as e:
@@ -479,7 +484,7 @@
                                 response.header.to_topic)
                             self._send_kafka_message(res_topic, response)
 
-                        log.debug("Response-sent", response=response.body)
+                        log.debug("Response-sent", response=response.body, to_topic=res_topic)
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map:
