[VOL-1499] Update pyvoltha to use pre-created kafka topic

Change-Id: I4e34c5c79390b936e8a1061224afcf059363c570
diff --git a/pyvoltha/adapters/kafka/adapter_request_facade.py b/pyvoltha/adapters/kafka/adapter_request_facade.py
index fe3e3b9..ef13342 100644
--- a/pyvoltha/adapters/kafka/adapter_request_facade.py
+++ b/pyvoltha/adapters/kafka/adapter_request_facade.py
@@ -30,7 +30,7 @@
 from pyvoltha.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
 from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
-    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
+    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
 
 log = structlog.get_logger()
 
@@ -56,8 +56,9 @@
     defined in
     """
 
-    def __init__(self, adapter):
+    def __init__(self, adapter, core_proxy):
         self.adapter = adapter
+        self.core_proxy = core_proxy
 
     @inlineCallbacks
     def start(self):
@@ -67,24 +68,32 @@
     def stop(self):
         log.debug('stopping')
 
-    @inlineCallbacks
-    def createKafkaDeviceTopic(self, deviceId):
-        log.debug("subscribing-to-topic", device_id=deviceId)
-        kafka_proxy = get_messaging_proxy()
-        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
-        # yield kafka_proxy.create_topic(topic=device_topic)
-        yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
-        log.debug("subscribed-to-topic", topic=device_topic)
+    # @inlineCallbacks
+    # def createKafkaDeviceTopic(self, deviceId):
+    #     log.debug("subscribing-to-topic", device_id=deviceId)
+    #     kafka_proxy = get_messaging_proxy()
+    #     device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+    #     # yield kafka_proxy.create_topic(topic=device_topic)
+    #     yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
+    #     log.debug("subscribed-to-topic", topic=device_topic)
 
-    def adopt_device(self, device):
+    def adopt_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
 
-            # Start the creation of a device specific topic to handle all
-            # subsequent requests from the Core. This adapter instance will
-            # handle all requests for that device.
-            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+            # Update the core reference for that device as it will be used
+            # by the adapter to send async messages to the Core.
+            if ARG_FROM_TOPIC in kwargs:
+                t = StrType()
+                kwargs[ARG_FROM_TOPIC].Unpack(t)
+                # Update the core reference for that device
+                self.core_proxy.update_device_core_reference(d.id, t.val)
+
+            # # Start the creation of a device specific topic to handle all
+            # # subsequent requests from the Core. This adapter instance will
+            # # handle all requests for that device.
+            # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
 
             result = self.adapter.adopt_device(d)
             # return True, self.adapter.adopt_device(d)
@@ -94,7 +103,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_ofp_device_info(self, device):
+    def get_ofp_device_info(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -103,7 +112,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_ofp_port_info(self, device, port_no):
+    def get_ofp_port_info(self, device, port_no, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -119,13 +128,13 @@
 
         return True, self.adapter.get_ofp_port_info(d, p.val)
 
-    def reconcile_device(self, device):
+    def reconcile_device(self, device, **kwargs):
         return self.adapter.reconcile_device(device)
 
-    def abandon_device(self, device):
+    def abandon_device(self, device, **kwargs):
         return self.adapter.abandon_device(device)
 
-    def disable_device(self, device):
+    def disable_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -134,7 +143,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def reenable_device(self, device):
+    def reenable_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -143,7 +152,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def reboot_device(self, device):
+    def reboot_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -152,7 +161,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def download_image(self, device, request):
+    def download_image(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -168,7 +177,7 @@
 
         return True, self.adapter.download_image(device, request)
 
-    def get_image_download_status(self, device, request):
+    def get_image_download_status(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -184,7 +193,7 @@
 
         return True, self.adapter.get_image_download_status(device, request)
 
-    def cancel_image_download(self, device, request):
+    def cancel_image_download(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -200,7 +209,7 @@
 
         return True, self.adapter.cancel_image_download(device, request)
 
-    def activate_image_update(self, device, request):
+    def activate_image_update(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -216,7 +225,7 @@
 
         return True, self.adapter.activate_image_update(device, request)
 
-    def revert_image_update(self, device, request):
+    def revert_image_update(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -233,10 +242,10 @@
         return True, self.adapter.revert_image_update(device, request)
 
 
-    def self_test(self, device):
+    def self_test(self, device, **kwargs):
         return self.adapter.self_test_device(device)
 
-    def delete_device(self, device):
+    def delete_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -254,10 +263,10 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_device_details(self, device):
+    def get_device_details(self, device, **kwargs):
         return self.adapter.get_device_details(device)
 
-    def update_flows_bulk(self, device, flows, groups):
+    def update_flows_bulk(self, device, flows, groups, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -274,7 +283,7 @@
 
         return (True, self.adapter.update_flows_bulk(d, f, g))
 
-    def update_flows_incrementally(self, device, flow_changes, group_changes):
+    def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -291,13 +300,13 @@
 
         return (True, self.adapter.update_flows_incrementally(d, f, g))
 
-    def suppress_alarm(self, filter):
+    def suppress_alarm(self, filter, **kwargs):
         return self.adapter.suppress_alarm(filter)
 
-    def unsuppress_alarm(self, filter):
+    def unsuppress_alarm(self, filter, **kwargs):
         return self.adapter.unsuppress_alarm(filter)
 
-    def process_inter_adapter_message(self, msg):
+    def process_inter_adapter_message(self, msg, **kwargs):
         m = InterAdapterMessage()
         if msg:
             msg.Unpack(m)
@@ -308,7 +317,7 @@
         return (True, self.adapter.process_inter_adapter_message(m))
 
 
-    def receive_packet_out(self, deviceId, outPort, packet):
+    def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
         try:
             d_id = StrType()
             if deviceId:
diff --git a/pyvoltha/adapters/kafka/container_proxy.py b/pyvoltha/adapters/kafka/container_proxy.py
index cfc064d..2a71027 100644
--- a/pyvoltha/adapters/kafka/container_proxy.py
+++ b/pyvoltha/adapters/kafka/container_proxy.py
@@ -38,10 +38,10 @@
 @implementer(IComponent)
 class ContainerProxy(object):
 
-    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+    def __init__(self, kafka_proxy, remote_topic, my_listening_topic):
         self.kafka_proxy = kafka_proxy
         self.listening_topic = my_listening_topic
-        self.core_topic = core_topic
+        self.remote_topic = remote_topic
         self.default_timeout = 3
 
     def start(self):
@@ -92,7 +92,7 @@
                           to_topic=to_topic,
                           reply_topic=reply_topic)
                 if to_topic is None:
-                    to_topic = self.core_topic
+                    to_topic = self.remote_topic
                 if reply_topic is None:
                     reply_topic = self.listening_topic
                 result = yield self.kafka_proxy.send_request(rpc=rpc,
@@ -130,4 +130,4 @@
                     raise e
                 retry += 1
                 if retry == max_retry:
-                    to_topic = self.core_topic
+                    to_topic = self.remote_topic
diff --git a/pyvoltha/adapters/kafka/core_proxy.py b/pyvoltha/adapters/kafka/core_proxy.py
index 8412163..800a453 100644
--- a/pyvoltha/adapters/kafka/core_proxy.py
+++ b/pyvoltha/adapters/kafka/core_proxy.py
@@ -35,9 +35,27 @@
 
 class CoreProxy(ContainerProxy):
 
-    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
-        super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+    def __init__(self, kafka_proxy, default_core_topic, my_listening_topic):
+        super(CoreProxy, self).__init__(kafka_proxy, default_core_topic,
                                         my_listening_topic)
+        self.core_default_topic = default_core_topic
+        self.deviceId_to_core_map = dict()
+
+    def update_device_core_reference(self, device_id, core_topic):
+        log.debug("update_device_core_reference")
+        self.deviceId_to_core_map[device_id] = core_topic
+
+    def delete_device_core_reference(self, device_id, core_topic):
+        log.debug("delete_device_core_reference")
+        del self.deviceId_to_core_map[device_id]
+
+    def get_adapter_topic(self, **kwargs):
+        return self.listening_topic
+
+    def get_core_topic(self, device_id):
+        if device_id in self.deviceId_to_core_map:
+            return self.deviceId_to_core_map[device_id]
+        return self.core_default_topic
 
     @ContainerProxy.wrap_request(CoreInstance)
     @inlineCallbacks
@@ -62,8 +80,11 @@
         # Once we have a device being managed, all communications between the
         # the adapter and the core occurs over a topic associated with that
         # device
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetDevice",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -82,8 +103,11 @@
         id.id = device_id
         p_type = IntType()
         p_type.val = port_type
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetPorts",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -132,8 +156,11 @@
         cdt.val = child_device_type
         channel = IntType()
         channel.val = channel_id
-        to_topic = createSubTopic(self.core_topic, parent_device_id)
-        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+        to_topic = self.get_core_topic(parent_device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, parent_device_id)
+        # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
                                 to_topic=to_topic,
@@ -149,8 +176,11 @@
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
-        to_topic = createSubTopic(self.core_topic, device.id)
-        reply_topic = createSubTopic(self.listening_topic, device.id)
+        to_topic = self.get_core_topic(device.id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device.id)
+        # reply_topic = createSubTopic(self.listening_topic, device.id)
         res = yield self.invoke(rpc="DeviceUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -178,8 +208,11 @@
         else:
             c_status.val = -1
 
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        #     reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="DeviceStateUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -206,8 +239,11 @@
         else:
             c_status.val = -1
 
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="ChildrenStateUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -232,8 +268,11 @@
         o_status = IntType()
         o_status.val = oper_status
 
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortStateUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -262,8 +301,11 @@
         else:
             c_status.val = -1
 
-        to_topic = createSubTopic(self.core_topic, parent_device_id)
-        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+        to_topic = self.get_core_topic(parent_device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, parent_device_id)
+        # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         res = yield self.invoke(rpc="child_devices_state_update",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -281,8 +323,11 @@
         log.debug("device_pm_config_update")
         b = BoolType()
         b.val = init
-        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
-        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
+        to_topic = self.get_core_topic(device_pm_config.id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        # reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
         res = yield self.invoke(rpc="DevicePMConfigUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -296,8 +341,11 @@
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortCreated",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -333,8 +381,10 @@
         p.val = port
         pac = Packet()
         pac.payload = packet
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PacketIn",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
diff --git a/pyvoltha/adapters/kafka/kafka_inter_container_library.py b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
index 696e582..ad3b806 100644
--- a/pyvoltha/adapters/kafka/kafka_inter_container_library.py
+++ b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
@@ -28,12 +28,13 @@
 from kafka_proxy import KafkaProxy, get_kafka_proxy
 from pyvoltha.protos.inter_container_pb2 import MessageType, Argument, \
     InterContainerRequestBody, InterContainerMessage, Header, \
-    InterContainerResponseBody
+    InterContainerResponseBody, StrType
 
 log = structlog.get_logger()
 
 KAFKA_OFFSET_LATEST = 'latest'
 KAFKA_OFFSET_EARLIEST = 'earliest'
+ARG_FROM_TOPIC = 'fromTopic'
 
 
 class KafkaMessagingError(BaseException):
@@ -406,6 +407,13 @@
         from Kafka.
         """
 
+        def _augment_args_with_FromTopic(args, from_topic):
+            arg = Argument(key=ARG_FROM_TOPIC)
+            t = StrType(val=from_topic)
+            arg.value.Pack(t)
+            args.extend([arg])
+            return args
+
         def _toDict(args):
             """
             Convert a repeatable Argument type into a python dictionary
@@ -451,12 +459,15 @@
                     log.debug("unsupported-msg", msg_type=type(message.body))
                     return
                 if targetted_topic in self.topic_target_cls_map:
-                    if msg_body.args:
+                    # Augment the request arguments with the from_topic
+                    augmented_args = _augment_args_with_FromTopic(msg_body.args,
+                                                        msg_body.reply_to_topic)
+                    if augmented_args:
                         log.debug("message-body-args-present", body=msg_body)
                         (status, res) = yield getattr(
                             self.topic_target_cls_map[targetted_topic],
                             self._to_string(msg_body.rpc))(
-                            **_toDict(msg_body.args))
+                            **_toDict(augmented_args))
                     else:
                         log.debug("message-body-args-absent", body=msg_body,
                                   rpc=msg_body.rpc)