[VOL-1499] Update pyvoltha to use pre-created kafka topic

Change-Id: I4e34c5c79390b936e8a1061224afcf059363c570
diff --git a/pyvoltha/adapters/kafka/adapter_request_facade.py b/pyvoltha/adapters/kafka/adapter_request_facade.py
index fe3e3b9..ef13342 100644
--- a/pyvoltha/adapters/kafka/adapter_request_facade.py
+++ b/pyvoltha/adapters/kafka/adapter_request_facade.py
@@ -30,7 +30,7 @@
 from pyvoltha.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
 from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
-    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
+    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
 
 log = structlog.get_logger()
 
@@ -56,8 +56,9 @@
     defined in
     """
 
-    def __init__(self, adapter):
+    def __init__(self, adapter, core_proxy):
         self.adapter = adapter
+        self.core_proxy = core_proxy
 
     @inlineCallbacks
     def start(self):
@@ -67,24 +68,32 @@
     def stop(self):
         log.debug('stopping')
 
-    @inlineCallbacks
-    def createKafkaDeviceTopic(self, deviceId):
-        log.debug("subscribing-to-topic", device_id=deviceId)
-        kafka_proxy = get_messaging_proxy()
-        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
-        # yield kafka_proxy.create_topic(topic=device_topic)
-        yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
-        log.debug("subscribed-to-topic", topic=device_topic)
+    # @inlineCallbacks
+    # def createKafkaDeviceTopic(self, deviceId):
+    #     log.debug("subscribing-to-topic", device_id=deviceId)
+    #     kafka_proxy = get_messaging_proxy()
+    #     device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+    #     # yield kafka_proxy.create_topic(topic=device_topic)
+    #     yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
+    #     log.debug("subscribed-to-topic", topic=device_topic)
 
-    def adopt_device(self, device):
+    def adopt_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
 
-            # Start the creation of a device specific topic to handle all
-            # subsequent requests from the Core. This adapter instance will
-            # handle all requests for that device.
-            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+            # Update the core reference for that device as it will be used
+            # by the adapter to send async messages to the Core.
+            if ARG_FROM_TOPIC in kwargs:
+                t = StrType()
+                kwargs[ARG_FROM_TOPIC].Unpack(t)
+                # Update the core reference for that device
+                self.core_proxy.update_device_core_reference(d.id, t.val)
+
+            # # Start the creation of a device specific topic to handle all
+            # # subsequent requests from the Core. This adapter instance will
+            # # handle all requests for that device.
+            # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
 
             result = self.adapter.adopt_device(d)
             # return True, self.adapter.adopt_device(d)
@@ -94,7 +103,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_ofp_device_info(self, device):
+    def get_ofp_device_info(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -103,7 +112,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_ofp_port_info(self, device, port_no):
+    def get_ofp_port_info(self, device, port_no, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -119,13 +128,13 @@
 
         return True, self.adapter.get_ofp_port_info(d, p.val)
 
-    def reconcile_device(self, device):
+    def reconcile_device(self, device, **kwargs):
         return self.adapter.reconcile_device(device)
 
-    def abandon_device(self, device):
+    def abandon_device(self, device, **kwargs):
         return self.adapter.abandon_device(device)
 
-    def disable_device(self, device):
+    def disable_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -134,7 +143,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def reenable_device(self, device):
+    def reenable_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -143,7 +152,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def reboot_device(self, device):
+    def reboot_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -152,7 +161,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def download_image(self, device, request):
+    def download_image(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -168,7 +177,7 @@
 
         return True, self.adapter.download_image(device, request)
 
-    def get_image_download_status(self, device, request):
+    def get_image_download_status(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -184,7 +193,7 @@
 
         return True, self.adapter.get_image_download_status(device, request)
 
-    def cancel_image_download(self, device, request):
+    def cancel_image_download(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -200,7 +209,7 @@
 
         return True, self.adapter.cancel_image_download(device, request)
 
-    def activate_image_update(self, device, request):
+    def activate_image_update(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -216,7 +225,7 @@
 
         return True, self.adapter.activate_image_update(device, request)
 
-    def revert_image_update(self, device, request):
+    def revert_image_update(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -233,10 +242,10 @@
         return True, self.adapter.revert_image_update(device, request)
 
 
-    def self_test(self, device):
+    def self_test(self, device, **kwargs):
         return self.adapter.self_test_device(device)
 
-    def delete_device(self, device):
+    def delete_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -254,10 +263,10 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_device_details(self, device):
+    def get_device_details(self, device, **kwargs):
         return self.adapter.get_device_details(device)
 
-    def update_flows_bulk(self, device, flows, groups):
+    def update_flows_bulk(self, device, flows, groups, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -274,7 +283,7 @@
 
         return (True, self.adapter.update_flows_bulk(d, f, g))
 
-    def update_flows_incrementally(self, device, flow_changes, group_changes):
+    def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -291,13 +300,13 @@
 
         return (True, self.adapter.update_flows_incrementally(d, f, g))
 
-    def suppress_alarm(self, filter):
+    def suppress_alarm(self, filter, **kwargs):
         return self.adapter.suppress_alarm(filter)
 
-    def unsuppress_alarm(self, filter):
+    def unsuppress_alarm(self, filter, **kwargs):
         return self.adapter.unsuppress_alarm(filter)
 
-    def process_inter_adapter_message(self, msg):
+    def process_inter_adapter_message(self, msg, **kwargs):
         m = InterAdapterMessage()
         if msg:
             msg.Unpack(m)
@@ -308,7 +317,7 @@
         return (True, self.adapter.process_inter_adapter_message(m))
 
 
-    def receive_packet_out(self, deviceId, outPort, packet):
+    def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
         try:
             d_id = StrType()
             if deviceId: