VOL-1474: Support GetChildDevices and Exception results
Also handle kafka exceptions better, especially given
a grpc code.NotFound returns as an Exception. If return
results are expected return None so calling code
can handle
Change-Id: I0d8ac8c243a47c8e6144a59d1ae662303b741ce9
diff --git a/pyvoltha/adapters/kafka/core_proxy.py b/pyvoltha/adapters/kafka/core_proxy.py
index 800a453..e2bc7c1 100644
--- a/pyvoltha/adapters/kafka/core_proxy.py
+++ b/pyvoltha/adapters/kafka/core_proxy.py
@@ -94,7 +94,18 @@
@ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_child_device(self, parent_device_id, **kwargs):
- raise NotImplementedError()
+ log.debug("get-child-device")
+ id = ID()
+ id.id = parent_device_id
+ to_topic = self.get_core_topic(parent_device_id)
+ reply_topic = self.get_adapter_topic()
+ args = self._to_proto(**kwargs)
+ res = yield self.invoke(rpc="GetChildDevice",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=id,
+ **args)
+ returnValue(res)
@ContainerProxy.wrap_request(Ports)
@inlineCallbacks
diff --git a/pyvoltha/adapters/kafka/kafka_inter_container_library.py b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
index ad3b806..d8840a2 100644
--- a/pyvoltha/adapters/kafka/kafka_inter_container_library.py
+++ b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
@@ -37,7 +37,7 @@
ARG_FROM_TOPIC = 'fromTopic'
-class KafkaMessagingError(BaseException):
+class KafkaMessagingError(Exception):
def __init__(self, error):
self.error = error
@@ -553,24 +553,32 @@
yield self._send_kafka_message(to_topic, request)
log.debug("message-sent", to_topic=to_topic,
- from_topic=reply_topic)
+ from_topic=reply_topic, kafka_request=request)
if response_required:
res = yield wait_for_result
- if res is None or not res.success:
- raise KafkaMessagingError(error="Failed-response:{"
- "}".format(res))
-
# Remove the transaction from the transaction map
del self.transaction_id_deferred_map[transaction_id]
- log.debug("send-message-response", rpc=rpc, result=res)
-
- if callback:
- callback((res.success, res.result))
+ if res is not None:
+ if res.success:
+ log.debug("send-message-response", rpc=rpc, result=res)
+ if callback:
+ callback((res.success, res.result))
+ else:
+ returnValue((res.success, res.result))
+ else:
+ # this is the case where the core API returns a grpc code.NotFound. Return or callback
+ # so the caller can act appropriately (i.e add whatever was not found)
+ log.warn("send-message-response-error-result", kafka_request=request, kafka_result=res)
+ if callback:
+ callback((res.success, None))
+ else:
+ returnValue((res.success, None))
else:
- returnValue((res.success, res.result))
+ raise KafkaMessagingError(error="failed-response-for-request:{}".format(request))
+
except Exception as e:
log.exception("Exception-sending-request", e=e)
raise KafkaMessagingError(error=e)