This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index 1d2b05c..aaa0c3c 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -152,6 +152,12 @@
             log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
+    def get_target_cls(self):
+        return self.target_cls
+
+    def get_default_topic(self):
+        return self.default_topic
+
     @inlineCallbacks
     def _subscribe(self, topic, callback=None, target_cls=None):
         try:
@@ -332,12 +338,13 @@
             request_body = InterContainerRequestBody()
             request.header.id = transaction_id
             request.header.type = MessageType.Value("REQUEST")
-            request.header.from_topic = self.default_topic
+            request.header.from_topic = reply_topic
             request.header.to_topic = to_topic
 
             response_required = False
             if reply_topic:
                 request_body.reply_to_topic = reply_topic
+                request_body.response_required = True
                 response_required = True
 
             request.header.timestamp = int(round(time.time() * 1000))
@@ -350,8 +357,6 @@
                     request_body.args.extend([arg])
                 except Exception as e:
                     log.exception("Failed-parsing-value", e=e)
-            request_body.reply_to_topic = self.default_topic
-            request_body.response_required = response_required
             request.body.Pack(request_body)
             return request, transaction_id, response_required
         except Exception as e:
@@ -479,7 +484,7 @@
                                 response.header.to_topic)
                             self._send_kafka_message(res_topic, response)
 
-                        log.debug("Response-sent", response=response.body)
+                        log.debug("Response-sent", response=response.body, to_topic=res_topic)
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map: