This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/python/adapters/kafka/container_proxy.py b/python/adapters/kafka/container_proxy.py
index 8c4e828..efcf558 100644
--- a/python/adapters/kafka/container_proxy.py
+++ b/python/adapters/kafka/container_proxy.py
@@ -83,16 +83,21 @@
         return real_wrapper
 
     @inlineCallbacks
-    def invoke(self, rpc, to_topic=None, **kwargs):
+    def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
         @inlineCallbacks
-        def _send_request(rpc, m_callback, to_topic, **kwargs):
+        def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
             try:
-                log.debug("sending-request", rpc=rpc)
+                log.debug("sending-request",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic)
                 if to_topic is None:
                     to_topic = self.core_topic
+                if reply_topic is None:
+                    reply_topic = self.listening_topic
                 result = yield self.kafka_proxy.send_request(rpc=rpc,
                                                              to_topic=to_topic,
-                                                             reply_topic=self.listening_topic,
+                                                             reply_topic=reply_topic,
                                                              callback=None,
                                                              **kwargs)
                 if not m_callback.called:
@@ -104,11 +109,25 @@
                 if not m_callback.called:
                     m_callback.errback(failure.Failure())
 
-        cb = DeferredWithTimeout(timeout=self.default_timeout)
-        _send_request(rpc, cb, to_topic, **kwargs)
-        try:
-            res = yield cb
-            returnValue(res)
-        except TimeOutError as e:
-            log.warn('invoke-timeout', e=e)
-            raise e
+        # We are going to resend the request on the to_topic if there is a
+        # timeout error. This time the timeout will be longer.  If the second
+        # request times out then we will send the request to the default
+        # core_topic.
+        timeouts = [self.default_timeout,
+                    self.default_timeout*2,
+                    self.default_timeout]
+        retry = 0
+        max_retry = 2
+        for timeout in timeouts:
+            cb = DeferredWithTimeout(timeout=timeout)
+            _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+            try:
+                res = yield cb
+                returnValue(res)
+            except TimeOutError as e:
+                log.warn('invoke-timeout', e=e)
+                if retry == max_retry:
+                    raise e
+                retry += 1
+                if retry == max_retry:
+                    to_topic = self.core_topic