[VOL-1499] Update pyvoltha to use pre-created kafka topic
Change-Id: I4e34c5c79390b936e8a1061224afcf059363c570
diff --git a/pyvoltha/adapters/kafka/adapter_request_facade.py b/pyvoltha/adapters/kafka/adapter_request_facade.py
index fe3e3b9..ef13342 100644
--- a/pyvoltha/adapters/kafka/adapter_request_facade.py
+++ b/pyvoltha/adapters/kafka/adapter_request_facade.py
@@ -30,7 +30,7 @@
from pyvoltha.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
FlowGroupChanges, ofp_packet_out
from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
- get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
+ get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
log = structlog.get_logger()
@@ -56,8 +56,9 @@
defined in
"""
- def __init__(self, adapter):
+ def __init__(self, adapter, core_proxy):
self.adapter = adapter
+ self.core_proxy = core_proxy
@inlineCallbacks
def start(self):
@@ -67,24 +68,32 @@
def stop(self):
log.debug('stopping')
- @inlineCallbacks
- def createKafkaDeviceTopic(self, deviceId):
- log.debug("subscribing-to-topic", device_id=deviceId)
- kafka_proxy = get_messaging_proxy()
- device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
- # yield kafka_proxy.create_topic(topic=device_topic)
- yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
- log.debug("subscribed-to-topic", topic=device_topic)
+ # @inlineCallbacks
+ # def createKafkaDeviceTopic(self, deviceId):
+ # log.debug("subscribing-to-topic", device_id=deviceId)
+ # kafka_proxy = get_messaging_proxy()
+ # device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+ # # yield kafka_proxy.create_topic(topic=device_topic)
+ # yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
+ # log.debug("subscribed-to-topic", topic=device_topic)
- def adopt_device(self, device):
+ def adopt_device(self, device, **kwargs):
d = Device()
if device:
device.Unpack(d)
- # Start the creation of a device specific topic to handle all
- # subsequent requests from the Core. This adapter instance will
- # handle all requests for that device.
- reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+ # Update the core reference for that device as it will be used
+ # by the adapter to send async messages to the Core.
+ if ARG_FROM_TOPIC in kwargs:
+ t = StrType()
+ kwargs[ARG_FROM_TOPIC].Unpack(t)
+ # Update the core reference for that device
+ self.core_proxy.update_device_core_reference(d.id, t.val)
+
+ # # Start the creation of a device specific topic to handle all
+ # # subsequent requests from the Core. This adapter instance will
+ # # handle all requests for that device.
+ # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
result = self.adapter.adopt_device(d)
# return True, self.adapter.adopt_device(d)
@@ -94,7 +103,7 @@
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
reason="device-invalid")
- def get_ofp_device_info(self, device):
+ def get_ofp_device_info(self, device, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -103,7 +112,7 @@
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
reason="device-invalid")
- def get_ofp_port_info(self, device, port_no):
+ def get_ofp_port_info(self, device, port_no, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -119,13 +128,13 @@
return True, self.adapter.get_ofp_port_info(d, p.val)
- def reconcile_device(self, device):
+ def reconcile_device(self, device, **kwargs):
return self.adapter.reconcile_device(device)
- def abandon_device(self, device):
+ def abandon_device(self, device, **kwargs):
return self.adapter.abandon_device(device)
- def disable_device(self, device):
+ def disable_device(self, device, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -134,7 +143,7 @@
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
reason="device-invalid")
- def reenable_device(self, device):
+ def reenable_device(self, device, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -143,7 +152,7 @@
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
reason="device-invalid")
- def reboot_device(self, device):
+ def reboot_device(self, device, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -152,7 +161,7 @@
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
reason="device-invalid")
- def download_image(self, device, request):
+ def download_image(self, device, request, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -168,7 +177,7 @@
return True, self.adapter.download_image(device, request)
- def get_image_download_status(self, device, request):
+ def get_image_download_status(self, device, request, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -184,7 +193,7 @@
return True, self.adapter.get_image_download_status(device, request)
- def cancel_image_download(self, device, request):
+ def cancel_image_download(self, device, request, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -200,7 +209,7 @@
return True, self.adapter.cancel_image_download(device, request)
- def activate_image_update(self, device, request):
+ def activate_image_update(self, device, request, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -216,7 +225,7 @@
return True, self.adapter.activate_image_update(device, request)
- def revert_image_update(self, device, request):
+ def revert_image_update(self, device, request, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -233,10 +242,10 @@
return True, self.adapter.revert_image_update(device, request)
- def self_test(self, device):
+ def self_test(self, device, **kwargs):
return self.adapter.self_test_device(device)
- def delete_device(self, device):
+ def delete_device(self, device, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -254,10 +263,10 @@
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
reason="device-invalid")
- def get_device_details(self, device):
+ def get_device_details(self, device, **kwargs):
return self.adapter.get_device_details(device)
- def update_flows_bulk(self, device, flows, groups):
+ def update_flows_bulk(self, device, flows, groups, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -274,7 +283,7 @@
return (True, self.adapter.update_flows_bulk(d, f, g))
- def update_flows_incrementally(self, device, flow_changes, group_changes):
+ def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
d = Device()
if device:
device.Unpack(d)
@@ -291,13 +300,13 @@
return (True, self.adapter.update_flows_incrementally(d, f, g))
- def suppress_alarm(self, filter):
+ def suppress_alarm(self, filter, **kwargs):
return self.adapter.suppress_alarm(filter)
- def unsuppress_alarm(self, filter):
+ def unsuppress_alarm(self, filter, **kwargs):
return self.adapter.unsuppress_alarm(filter)
- def process_inter_adapter_message(self, msg):
+ def process_inter_adapter_message(self, msg, **kwargs):
m = InterAdapterMessage()
if msg:
msg.Unpack(m)
@@ -308,7 +317,7 @@
return (True, self.adapter.process_inter_adapter_message(m))
- def receive_packet_out(self, deviceId, outPort, packet):
+ def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
try:
d_id = StrType()
if deviceId:
diff --git a/pyvoltha/adapters/kafka/container_proxy.py b/pyvoltha/adapters/kafka/container_proxy.py
index cfc064d..2a71027 100644
--- a/pyvoltha/adapters/kafka/container_proxy.py
+++ b/pyvoltha/adapters/kafka/container_proxy.py
@@ -38,10 +38,10 @@
@implementer(IComponent)
class ContainerProxy(object):
- def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ def __init__(self, kafka_proxy, remote_topic, my_listening_topic):
self.kafka_proxy = kafka_proxy
self.listening_topic = my_listening_topic
- self.core_topic = core_topic
+ self.remote_topic = remote_topic
self.default_timeout = 3
def start(self):
@@ -92,7 +92,7 @@
to_topic=to_topic,
reply_topic=reply_topic)
if to_topic is None:
- to_topic = self.core_topic
+ to_topic = self.remote_topic
if reply_topic is None:
reply_topic = self.listening_topic
result = yield self.kafka_proxy.send_request(rpc=rpc,
@@ -130,4 +130,4 @@
raise e
retry += 1
if retry == max_retry:
- to_topic = self.core_topic
+ to_topic = self.remote_topic
diff --git a/pyvoltha/adapters/kafka/core_proxy.py b/pyvoltha/adapters/kafka/core_proxy.py
index 8412163..800a453 100644
--- a/pyvoltha/adapters/kafka/core_proxy.py
+++ b/pyvoltha/adapters/kafka/core_proxy.py
@@ -35,9 +35,27 @@
class CoreProxy(ContainerProxy):
- def __init__(self, kafka_proxy, core_topic, my_listening_topic):
- super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+ def __init__(self, kafka_proxy, default_core_topic, my_listening_topic):
+ super(CoreProxy, self).__init__(kafka_proxy, default_core_topic,
my_listening_topic)
+ self.core_default_topic = default_core_topic
+ self.deviceId_to_core_map = dict()
+
+ def update_device_core_reference(self, device_id, core_topic):
+ log.debug("update_device_core_reference")
+ self.deviceId_to_core_map[device_id] = core_topic
+
+ def delete_device_core_reference(self, device_id, core_topic):
+ log.debug("delete_device_core_reference")
+ del self.deviceId_to_core_map[device_id]
+
+ def get_adapter_topic(self, **kwargs):
+ return self.listening_topic
+
+ def get_core_topic(self, device_id):
+ if device_id in self.deviceId_to_core_map:
+ return self.deviceId_to_core_map[device_id]
+ return self.core_default_topic
@ContainerProxy.wrap_request(CoreInstance)
@inlineCallbacks
@@ -62,8 +80,11 @@
# Once we have a device being managed, all communications between the
# the adapter and the core occurs over a topic associated with that
# device
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="GetDevice",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -82,8 +103,11 @@
id.id = device_id
p_type = IntType()
p_type.val = port_type
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="GetPorts",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -132,8 +156,11 @@
cdt.val = child_device_type
channel = IntType()
channel.val = channel_id
- to_topic = createSubTopic(self.core_topic, parent_device_id)
- reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+ to_topic = self.get_core_topic(parent_device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, parent_device_id)
+ # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
args = self._to_proto(**kw)
res = yield self.invoke(rpc="ChildDeviceDetected",
to_topic=to_topic,
@@ -149,8 +176,11 @@
@inlineCallbacks
def device_update(self, device):
log.debug("device_update")
- to_topic = createSubTopic(self.core_topic, device.id)
- reply_topic = createSubTopic(self.listening_topic, device.id)
+ to_topic = self.get_core_topic(device.id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device.id)
+ # reply_topic = createSubTopic(self.listening_topic, device.id)
res = yield self.invoke(rpc="DeviceUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -178,8 +208,11 @@
else:
c_status.val = -1
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="DeviceStateUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -206,8 +239,11 @@
else:
c_status.val = -1
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="ChildrenStateUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -232,8 +268,11 @@
o_status = IntType()
o_status.val = oper_status
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="PortStateUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -262,8 +301,11 @@
else:
c_status.val = -1
- to_topic = createSubTopic(self.core_topic, parent_device_id)
- reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+ to_topic = self.get_core_topic(parent_device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, parent_device_id)
+ # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
res = yield self.invoke(rpc="child_devices_state_update",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -281,8 +323,11 @@
log.debug("device_pm_config_update")
b = BoolType()
b.val = init
- to_topic = createSubTopic(self.core_topic, device_pm_config.id)
- reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
+ to_topic = self.get_core_topic(device_pm_config.id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+ # reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
res = yield self.invoke(rpc="DevicePMConfigUpdate",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -296,8 +341,11 @@
log.debug("port_created")
proto_id = ID()
proto_id.id = device_id
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="PortCreated",
to_topic=to_topic,
reply_topic=reply_topic,
@@ -333,8 +381,10 @@
p.val = port
pac = Packet()
pac.payload = packet
- to_topic = createSubTopic(self.core_topic, device_id)
- reply_topic = createSubTopic(self.listening_topic, device_id)
+ to_topic = self.get_core_topic(device_id)
+ reply_topic = self.get_adapter_topic()
+ # to_topic = createSubTopic(self.core_topic, device_id)
+ # reply_topic = createSubTopic(self.listening_topic, device_id)
res = yield self.invoke(rpc="PacketIn",
to_topic=to_topic,
reply_topic=reply_topic,
diff --git a/pyvoltha/adapters/kafka/kafka_inter_container_library.py b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
index 696e582..ad3b806 100644
--- a/pyvoltha/adapters/kafka/kafka_inter_container_library.py
+++ b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
@@ -28,12 +28,13 @@
from kafka_proxy import KafkaProxy, get_kafka_proxy
from pyvoltha.protos.inter_container_pb2 import MessageType, Argument, \
InterContainerRequestBody, InterContainerMessage, Header, \
- InterContainerResponseBody
+ InterContainerResponseBody, StrType
log = structlog.get_logger()
KAFKA_OFFSET_LATEST = 'latest'
KAFKA_OFFSET_EARLIEST = 'earliest'
+ARG_FROM_TOPIC = 'fromTopic'
class KafkaMessagingError(BaseException):
@@ -406,6 +407,13 @@
from Kafka.
"""
+ def _augment_args_with_FromTopic(args, from_topic):
+ arg = Argument(key=ARG_FROM_TOPIC)
+ t = StrType(val=from_topic)
+ arg.value.Pack(t)
+ args.extend([arg])
+ return args
+
def _toDict(args):
"""
Convert a repeatable Argument type into a python dictionary
@@ -451,12 +459,15 @@
log.debug("unsupported-msg", msg_type=type(message.body))
return
if targetted_topic in self.topic_target_cls_map:
- if msg_body.args:
+ # Augment the request arguments with the from_topic
+ augmented_args = _augment_args_with_FromTopic(msg_body.args,
+ msg_body.reply_to_topic)
+ if augmented_args:
log.debug("message-body-args-present", body=msg_body)
(status, res) = yield getattr(
self.topic_target_cls_map[targetted_topic],
self._to_string(msg_body.rpc))(
- **_toDict(msg_body.args))
+ **_toDict(augmented_args))
else:
log.debug("message-body-args-absent", body=msg_body,
rpc=msg_body.rpc)