[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index b897188..8d252a3 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -35,9 +35,27 @@
class CoreProxy(ContainerProxy):
- def __init__(self, kafka_proxy, core_topic, my_listening_topic):
- super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+ def __init__(self, kafka_proxy, default_core_topic, my_listening_topic):
+ super(CoreProxy, self).__init__(kafka_proxy, default_core_topic,
my_listening_topic)
+ self.core_default_topic = default_core_topic
+ self.deviceId_to_core_map = dict()
+
+ def update_device_core_reference(self, device_id, core_topic):
+ log.debug("update_device_core_reference")
+ self.deviceId_to_core_map[device_id] = core_topic
+
+ def delete_device_core_reference(self, device_id, core_topic):
+ log.debug("delete_device_core_reference")
+ del self.deviceId_to_core_map[device_id]
+
+ def get_adapter_topic(self, **kwargs):
+ return self.listening_topic
+
+ def get_core_topic(self, device_id):
+ if device_id in self.deviceId_to_core_map:
+ return self.deviceId_to_core_map[device_id]
+ return self.core_default_topic
@ContainerProxy.wrap_request(CoreInstance)
@inlineCallbacks
@@ -62,8 +80,11 @@
# Once we have a device being managed, all communications between the
# the adapter and the core occurs over a topic associated with that
# device
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="GetDevice",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -82,8 +103,11 @@
id.id = device_id
p_type = IntType()
p_type.val = port_type
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="GetPorts",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -132,8 +156,11 @@
cdt.val = child_device_type
channel = IntType()
channel.val = channel_id
- to_topic = createSubTopic(self.core_topic, parent_device_id)
- reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+ to_topic = self.get_core_topic(parent_device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, parent_device_id)
+ # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
args = self._to_proto(**kw)
res = yield self.invoke(rpc="ChildDeviceDetected",
to_topic=to_topic,
@@ -149,8 +176,11 @@
@inlineCallbacks
def device_update(self, device):
log.debug("device_update")
- to_topic = createSubTopic(self.core_topic, device.id)
- reply_topic = createSubTopic(self.listening_topic, device.id)
+ to_topic = self.get_core_topic(device.id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device.id)
+ # reply_topic = createSubTopic(self.listening_topic, device.id)
res = yield self.invoke(rpc="DeviceUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -178,8 +208,11 @@
else:
c_status.val = -1
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="DeviceStateUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -206,8 +239,11 @@
else:
c_status.val = -1
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="ChildrenStateUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -232,8 +268,11 @@
o_status = IntType()
o_status.val = oper_status
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="PortStateUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -262,8 +301,11 @@
else:
c_status.val = -1
- to_topic = createSubTopic(self.core_topic, parent_device_id)
- reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+ to_topic = self.get_core_topic(parent_device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, parent_device_id)
+ # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
res = yield self.invoke(rpc="child_devices_state_update",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -281,8 +323,11 @@
log.debug("device_pm_config_update")
b = BoolType()
b.val = init
- to_topic = createSubTopic(self.core_topic, device_pm_config.id)
- reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
+ to_topic = self.get_core_topic(device_pm_config.id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+ # reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
res = yield self.invoke(rpc="DevicePMConfigUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -296,8 +341,11 @@
log.debug("port_created")
proto_id = ID()
proto_id.id = device_id
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="PortCreated",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -333,8 +381,10 @@
p.val = port
pac = Packet()
pac.payload = packet
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="PacketIn",
to_topic=to_topic,
reply_topic=reply_topic,