This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/python/adapters/iadapter.py b/python/adapters/iadapter.py
index 04cb303..78cf42a 100644
--- a/python/adapters/iadapter.py
+++ b/python/adapters/iadapter.py
@@ -30,6 +30,7 @@
 from python.protos.device_pb2 import DeviceType, DeviceTypes
 from python.protos.health_pb2 import HealthStatus
 
+
 log = structlog.get_logger()
 
 
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index cbae56d..978f57d 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -21,12 +21,15 @@
 
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
+from twisted.internet import reactor
 
 from python.adapters.interface import IAdapterInterface
 from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
+from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+    get_messaging_proxy
 
 
 class MacAddressError(BaseException):
@@ -62,11 +65,26 @@
     def stop(self):
         self.log.debug('stopping')
 
+
+    def createKafkaDeviceTopic(self, deviceId):
+        kafka_proxy = get_messaging_proxy()
+        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+        kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+
     def adopt_device(self, device):
         d = Device()
         if device:
             device.Unpack(d)
-            return True, self.adapter.adopt_device(d)
+
+            result = self.adapter.adopt_device(d)
+            # return True, self.adapter.adopt_device(d)
+
+            # Before we return, create a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
+            return True, result
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
@@ -151,7 +169,16 @@
         d = Device()
         if device:
             device.Unpack(d)
-            return (True, self.adapter.delete_device(d))
+            result = self.adapter.delete_device(d)
+            # return (True, self.adapter.delete_device(d))
+
+            # Before we return, delete the device specific topic as we will no
+            # longer receive requests from the Core for that device
+            kafka_proxy = get_messaging_proxy()
+            device_topic = kafka_proxy.get_default_topic() + "/" + d.id
+            kafka_proxy.unsubscribe(topic=device_topic)
+
+            return (True, result)
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
diff --git a/python/adapters/kafka/container_proxy.py b/python/adapters/kafka/container_proxy.py
index 8c4e828..efcf558 100644
--- a/python/adapters/kafka/container_proxy.py
+++ b/python/adapters/kafka/container_proxy.py
@@ -83,16 +83,21 @@
         return real_wrapper
 
     @inlineCallbacks
-    def invoke(self, rpc, to_topic=None, **kwargs):
+    def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
         @inlineCallbacks
-        def _send_request(rpc, m_callback, to_topic, **kwargs):
+        def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
             try:
-                log.debug("sending-request", rpc=rpc)
+                log.debug("sending-request",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic)
                 if to_topic is None:
                     to_topic = self.core_topic
+                if reply_topic is None:
+                    reply_topic = self.listening_topic
                 result = yield self.kafka_proxy.send_request(rpc=rpc,
                                                              to_topic=to_topic,
-                                                             reply_topic=self.listening_topic,
+                                                             reply_topic=reply_topic,
                                                              callback=None,
                                                              **kwargs)
                 if not m_callback.called:
@@ -104,11 +109,25 @@
                 if not m_callback.called:
                     m_callback.errback(failure.Failure())
 
-        cb = DeferredWithTimeout(timeout=self.default_timeout)
-        _send_request(rpc, cb, to_topic, **kwargs)
-        try:
-            res = yield cb
-            returnValue(res)
-        except TimeOutError as e:
-            log.warn('invoke-timeout', e=e)
-            raise e
+        # We are going to resend the request on the to_topic if there is a
+        # timeout error. This time the timeout will be longer.  If the second
+        # request times out then we will send the request to the default
+        # core_topic.
+        timeouts = [self.default_timeout,
+                    self.default_timeout*2,
+                    self.default_timeout]
+        retry = 0
+        max_retry = 2
+        for timeout in timeouts:
+            cb = DeferredWithTimeout(timeout=timeout)
+            _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+            try:
+                res = yield cb
+                returnValue(res)
+            except TimeOutError as e:
+                log.warn('invoke-timeout', e=e)
+                if retry == max_retry:
+                    raise e
+                retry += 1
+                if retry == max_retry:
+                    to_topic = self.core_topic
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index 4bab30d..47f6a61 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -30,6 +30,9 @@
 log = structlog.get_logger()
 
 
+def createSubTopic(*args):
+    return '_'.join(args)
+
 class CoreProxy(ContainerProxy):
 
     def __init__(self, kafka_proxy, core_topic, my_listening_topic):
@@ -56,7 +59,15 @@
         log.debug("get-device")
         id = ID()
         id.id = device_id
-        res = yield self.invoke(rpc="GetDevice", device_id=id)
+        # Once we have a device being managed, all communications between the
+        # the adapter and the core occurs over a topic associated with that
+        # device
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="GetDevice",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id)
         returnValue(res)
 
     @ContainerProxy.wrap_request(Device)
@@ -71,7 +82,11 @@
         id.id = device_id
         p_type = IntType()
         p_type.val = port_type
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetPorts",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=p_type)
         returnValue(res)
@@ -117,9 +132,12 @@
         cdt.val = child_device_type
         channel = IntType()
         channel.val = channel_id
-
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 parent_port_no=ppn,
                                 child_device_type=cdt,
@@ -131,7 +149,12 @@
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
-        res = yield self.invoke(rpc="DeviceUpdate", device=device)
+        to_topic = createSubTopic(self.core_topic, device.id)
+        reply_topic = createSubTopic(self.listening_topic, device.id)
+        res = yield self.invoke(rpc="DeviceUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device=device)
         returnValue(res)
 
     def child_device_removed(parent_device_id, child_device_id):
@@ -155,7 +178,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="DeviceStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -179,7 +206,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="ChildrenStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -201,7 +232,11 @@
         o_status = IntType()
         o_status.val = oper_status
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=pt,
                                 port_no=pNo,
@@ -227,7 +262,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         res = yield self.invoke(rpc="child_devices_state_update",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -242,7 +281,11 @@
         log.debug("device_pm_config_update")
         b = BoolType()
         b.val = init
+        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
         res = yield self.invoke(rpc="DevicePMConfigUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_pm_config=device_pm_config,
                                 init=b)
         returnValue(res)
@@ -253,7 +296,11 @@
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortCreated",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=port)
         returnValue(res)
@@ -284,7 +331,11 @@
         p.val = port
         pac = Packet()
         pac.payload = packet
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PacketIn",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=p,
                                 packet=pac)
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index 1d2b05c..aaa0c3c 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -152,6 +152,12 @@
             log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
+    def get_target_cls(self):
+        return self.target_cls
+
+    def get_default_topic(self):
+        return self.default_topic
+
     @inlineCallbacks
     def _subscribe(self, topic, callback=None, target_cls=None):
         try:
@@ -332,12 +338,13 @@
             request_body = InterContainerRequestBody()
             request.header.id = transaction_id
             request.header.type = MessageType.Value("REQUEST")
-            request.header.from_topic = self.default_topic
+            request.header.from_topic = reply_topic
             request.header.to_topic = to_topic
 
             response_required = False
             if reply_topic:
                 request_body.reply_to_topic = reply_topic
+                request_body.response_required = True
                 response_required = True
 
             request.header.timestamp = int(round(time.time() * 1000))
@@ -350,8 +357,6 @@
                     request_body.args.extend([arg])
                 except Exception as e:
                     log.exception("Failed-parsing-value", e=e)
-            request_body.reply_to_topic = self.default_topic
-            request_body.response_required = response_required
             request.body.Pack(request_body)
             return request, transaction_id, response_required
         except Exception as e:
@@ -479,7 +484,7 @@
                                 response.header.to_topic)
                             self._send_kafka_message(res_topic, response)
 
-                        log.debug("Response-sent", response=response.body)
+                        log.debug("Response-sent", response=response.body, to_topic=res_topic)
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map: