[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index 5cad2e8..12a5dbe 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -28,12 +28,13 @@
from kafka_proxy import KafkaProxy, get_kafka_proxy
from python.protos.inter_container_pb2 import MessageType, Argument, \
InterContainerRequestBody, InterContainerMessage, Header, \
- InterContainerResponseBody
+ InterContainerResponseBody, StrType
log = structlog.get_logger()
KAFKA_OFFSET_LATEST = 'latest'
KAFKA_OFFSET_EARLIEST = 'earliest'
+ARG_FROM_TOPIC = 'fromTopic'
class KafkaMessagingError(BaseException):
@@ -406,6 +407,13 @@
from Kafka.
"""
+ def _augment_args_with_FromTopic(args, from_topic):
+ arg = Argument(key=ARG_FROM_TOPIC)
+ t = StrType(val=from_topic)
+ arg.value.Pack(t)
+ args.extend([arg])
+ return args
+
def _toDict(args):
"""
Convert a repeatable Argument type into a python dictionary
@@ -451,12 +459,15 @@
log.debug("unsupported-msg", msg_type=type(message.body))
return
if targetted_topic in self.topic_target_cls_map:
- if msg_body.args:
+ # Augment the request arguments with the from_topic
+ augmented_args = _augment_args_with_FromTopic(msg_body.args,
+ msg_body.reply_to_topic)
+ if augmented_args:
log.debug("message-body-args-present", body=msg_body)
(status, res) = yield getattr(
self.topic_target_cls_map[targetted_topic],
self._to_string(msg_body.rpc))(
- **_toDict(msg_body.args))
+ **_toDict(augmented_args))
else:
log.debug("message-body-args-absent", body=msg_body,
rpc=msg_body.rpc)