blob: 12a5dbe3c8a98c93b1627de212aa3d9dc817e5ea [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2
Zack Williams998f4422018-09-19 10:38:57 -07003# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
khenaidoo6fdf0ba2018-11-02 14:38:33 -040017import time
18from uuid import uuid4
19
20import structlog
khenaidoob9203542018-09-17 22:56:37 -040021from twisted.internet import reactor
22from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
23 DeferredQueue, gatherResults
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024from zope.interface import implementer
khenaidoob9203542018-09-17 22:56:37 -040025
khenaidoofdbad6e2018-11-06 22:26:38 -050026from python.common.utils import asleep
27from python.common.utils.registry import IComponent
28from kafka_proxy import KafkaProxy, get_kafka_proxy
khenaidoo79232702018-12-04 11:00:41 -050029from python.protos.inter_container_pb2 import MessageType, Argument, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040030 InterContainerRequestBody, InterContainerMessage, Header, \
khenaidoo54e0ddf2019-02-27 16:21:33 -050031 InterContainerResponseBody, StrType
khenaidoob9203542018-09-17 22:56:37 -040032
33log = structlog.get_logger()
34
khenaidooca301322019-01-09 23:06:32 -050035KAFKA_OFFSET_LATEST = 'latest'
36KAFKA_OFFSET_EARLIEST = 'earliest'
khenaidoo54e0ddf2019-02-27 16:21:33 -050037ARG_FROM_TOPIC = 'fromTopic'
khenaidooca301322019-01-09 23:06:32 -050038
khenaidoo6fdf0ba2018-11-02 14:38:33 -040039
khenaidoob9203542018-09-17 22:56:37 -040040class KafkaMessagingError(BaseException):
41 def __init__(self, error):
42 self.error = error
43
khenaidoo6fdf0ba2018-11-02 14:38:33 -040044
khenaidoob9203542018-09-17 22:56:37 -040045@implementer(IComponent)
46class IKafkaMessagingProxy(object):
47 _kafka_messaging_instance = None
48
49 def __init__(self,
50 kafka_host_port,
51 kv_store,
52 default_topic,
khenaidooca301322019-01-09 23:06:32 -050053 group_id_prefix,
khenaidoob9203542018-09-17 22:56:37 -040054 target_cls):
55 """
56 Initialize the kafka proxy. This is a singleton (may change to
57 non-singleton if performance is better)
58 :param kafka_host_port: Kafka host and port
59 :param kv_store: Key-Value store
60 :param default_topic: Default topic to subscribe to
61 :param target_cls: target class - method of that class is invoked
62 when a message is received on the default_topic
63 """
64 # return an exception if the object already exist
65 if IKafkaMessagingProxy._kafka_messaging_instance:
66 raise Exception(
67 'Singleton-exist', cls=IKafkaMessagingProxy)
68
69 log.debug("Initializing-KafkaProxy")
70 self.kafka_host_port = kafka_host_port
71 self.kv_store = kv_store
72 self.default_topic = default_topic
khenaidooca301322019-01-09 23:06:32 -050073 self.default_group_id = "_".join((group_id_prefix, default_topic))
khenaidoob9203542018-09-17 22:56:37 -040074 self.target_cls = target_cls
75 self.topic_target_cls_map = {}
khenaidoob9203542018-09-17 22:56:37 -040076 self.topic_callback_map = {}
77 self.subscribers = {}
khenaidoob9203542018-09-17 22:56:37 -040078 self.kafka_proxy = None
79 self.transaction_id_deferred_map = {}
80 self.received_msg_queue = DeferredQueue()
khenaidooca301322019-01-09 23:06:32 -050081 self.stopped = False
khenaidoob9203542018-09-17 22:56:37 -040082
83 self.init_time = 0
84 self.init_received_time = 0
85
86 self.init_resp_time = 0
87 self.init_received_resp_time = 0
88
89 self.num_messages = 0
90 self.total_time = 0
91 self.num_responses = 0
92 self.total_time_responses = 0
93 log.debug("KafkaProxy-initialized")
94
95 def start(self):
96 try:
khenaidooca301322019-01-09 23:06:32 -050097 log.debug("KafkaProxy-starting")
khenaidoob9203542018-09-17 22:56:37 -040098
99 # Get the kafka proxy instance. If it does not exist then
100 # create it
101 self.kafka_proxy = get_kafka_proxy()
102 if self.kafka_proxy == None:
103 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
104 self.kafka_proxy = get_kafka_proxy()
105
106 # Subscribe the default topic and target_cls
107 self.topic_target_cls_map[self.default_topic] = self.target_cls
108
109 # Start the queue to handle incoming messages
110 reactor.callLater(0, self._received_message_processing_loop)
111
khenaidooca301322019-01-09 23:06:32 -0500112 # Subscribe using the default topic and default group id. Whenever
113 # a message is received on that topic then teh target_cls will be
114 # invoked.
115 reactor.callLater(0, self.subscribe,
116 topic=self.default_topic,
117 target_cls=self.target_cls,
118 group_id=self.default_group_id)
khenaidoob9203542018-09-17 22:56:37 -0400119
120 # Setup the singleton instance
121 IKafkaMessagingProxy._kafka_messaging_instance = self
khenaidooca301322019-01-09 23:06:32 -0500122 log.debug("KafkaProxy-started")
khenaidoob9203542018-09-17 22:56:37 -0400123 except Exception as e:
124 log.exception("Failed-to-start-proxy", e=e)
125
khenaidoob9203542018-09-17 22:56:37 -0400126 def stop(self):
127 """
128 Invoked to stop the kafka proxy
129 :return: None on success, Exception on failure
130 """
131 log.debug("Stopping-messaging-proxy ...")
132 try:
khenaidooca301322019-01-09 23:06:32 -0500133 # Stop the kafka proxy. This will stop all the consumers
134 # and producers
135 self.stopped = True
136 self.kafka_proxy.stop()
khenaidoob9203542018-09-17 22:56:37 -0400137 log.debug("Messaging-proxy-stopped.")
138 except Exception as e:
139 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
140
khenaidoo43c82122018-11-22 18:38:28 -0500141 def get_target_cls(self):
142 return self.target_cls
143
144 def get_default_topic(self):
145 return self.default_topic
146
khenaidoob9203542018-09-17 22:56:37 -0400147 @inlineCallbacks
khenaidooca301322019-01-09 23:06:32 -0500148 def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
149 target_cls=None):
khenaidoob9203542018-09-17 22:56:37 -0400150 try:
khenaidoo90847922018-12-03 14:47:51 -0500151 log.debug("subscribing-to-topic-start", topic=topic)
khenaidooca301322019-01-09 23:06:32 -0500152 yield self.kafka_proxy.subscribe(topic,
153 self._enqueue_received_group_message,
154 group_id, offset)
khenaidoob9203542018-09-17 22:56:37 -0400155
156 if target_cls is not None and callback is None:
157 # Scenario #1
158 if topic not in self.topic_target_cls_map:
159 self.topic_target_cls_map[topic] = target_cls
160 elif target_cls is None and callback is not None:
161 # Scenario #2
162 log.debug("custom-callback", topic=topic,
163 callback_map=self.topic_callback_map)
164 if topic not in self.topic_callback_map:
165 self.topic_callback_map[topic] = [callback]
166 else:
167 self.topic_callback_map[topic].extend([callback])
168 else:
169 log.warn("invalid-parameters")
170
khenaidoob9203542018-09-17 22:56:37 -0400171 returnValue(True)
172 except Exception as e:
173 log.exception("Exception-during-subscription", e=e)
174 returnValue(False)
175
khenaidoo90847922018-12-03 14:47:51 -0500176 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400177 def subscribe(self, topic, callback=None, target_cls=None,
khenaidooca301322019-01-09 23:06:32 -0500178 max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
khenaidoob9203542018-09-17 22:56:37 -0400179 """
180 Scenario 1: invoked to subscribe to a specific topic with a
181 target_cls to invoke when a message is received on that topic. This
182 handles the case of request/response where this library performs the
183 heavy lifting. In this case the m_callback must to be None
184
185 Scenario 2: invoked to subscribe to a specific topic with a
186 specific callback to invoke when a message is received on that topic.
187 This handles the case where the caller wants to process the message
188 received itself. In this case the target_cls must to be None
189
190 :param topic: topic to subscribe to
191 :param callback: Callback to invoke when a message is received on
192 the topic. Either one of callback or target_cls needs can be none
193 :param target_cls: Target class to use when a message is
194 received on the topic. There can only be 1 target_cls per topic.
195 Either one of callback or target_cls needs can be none
196 :param max_retry: the number of retries before reporting failure
197 to subscribe. This caters for scenario where the kafka topic is not
198 ready.
khenaidooca301322019-01-09 23:06:32 -0500199 :param group_id: The ID of the group the consumer is subscribing to
200 :param offset: The topic offset on the kafka bus from where message consumption will start
khenaidoob9203542018-09-17 22:56:37 -0400201 :return: True on success, False on failure
202 """
203 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
204
205 def _backoff(msg, retries):
206 wait_time = RETRY_BACKOFF[min(retries,
207 len(RETRY_BACKOFF) - 1)]
208 log.info(msg, retry_in=wait_time)
khenaidooca301322019-01-09 23:06:32 -0500209 return asleep.asleep(wait_time)
210
211 log.debug("subscribing", topic=topic, group_id=group_id,
212 callback=callback, target=target_cls)
khenaidoob9203542018-09-17 22:56:37 -0400213
214 retry = 0
khenaidoo90847922018-12-03 14:47:51 -0500215 subscribed = False
khenaidooca301322019-01-09 23:06:32 -0500216 if group_id is None:
217 group_id = self.default_group_id
khenaidoo90847922018-12-03 14:47:51 -0500218 while not subscribed:
khenaidooca301322019-01-09 23:06:32 -0500219 subscribed = yield self._subscribe_group_consumer(group_id, topic,
220 callback=callback,
221 target_cls=target_cls,
222 offset=offset)
khenaidoo90847922018-12-03 14:47:51 -0500223 if subscribed:
224 returnValue(True)
225 elif retry > max_retry:
226 returnValue(False)
khenaidoob9203542018-09-17 22:56:37 -0400227 else:
228 _backoff("subscription-not-complete", retry)
229 retry += 1
khenaidoo90847922018-12-03 14:47:51 -0500230
khenaidooca301322019-01-09 23:06:32 -0500231 def unsubscribe(self, topic, callback=None, target_cls=None):
khenaidoob9203542018-09-17 22:56:37 -0400232 """
233 Invoked when unsubscribing to a topic
khenaidooca301322019-01-09 23:06:32 -0500234 :param topic: topic to unsubscribe from
235 :param callback: the callback used when subscribing to the topic, if any
236 :param target_cls: the targert class used when subscribing to the topic, if any
khenaidoob9203542018-09-17 22:56:37 -0400237 :return: None on success or Exception on failure
238 """
239 log.debug("Unsubscribing-to-topic", topic=topic)
240
khenaidoob9203542018-09-17 22:56:37 -0400241 try:
khenaidooca301322019-01-09 23:06:32 -0500242 self.kafka_proxy.unsubscribe(topic,
243 self._enqueue_received_group_message)
244
245 if callback is None and target_cls is None:
246 log.error("both-call-and-target-cls-cannot-be-none",
247 topic=topic)
248 raise KafkaMessagingError(
249 error="both-call-and-target-cls-cannot-be-none")
250
251 if target_cls is not None and topic in self.topic_target_cls_map:
252 del self.topic_target_cls_map[topic]
253
254 if callback is not None and topic in self.topic_callback_map:
255 index = 0
256 for cb in self.topic_callback_map[topic]:
257 if cb == callback:
258 break
259 index += 1
260 if index < len(self.topic_callback_map[topic]):
261 self.topic_callback_map[topic].pop(index)
262
263 if len(self.topic_callback_map[topic]) == 0:
264 del self.topic_callback_map[topic]
khenaidoob9203542018-09-17 22:56:37 -0400265 except Exception as e:
khenaidooca301322019-01-09 23:06:32 -0500266 log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
267 e=e)
268 return e
khenaidoob9203542018-09-17 22:56:37 -0400269
270 @inlineCallbacks
khenaidooca301322019-01-09 23:06:32 -0500271 def _enqueue_received_group_message(self, msg):
khenaidoob9203542018-09-17 22:56:37 -0400272 """
273 Internal method to continuously queue all received messaged
274 irrespective of topic
khenaidooca301322019-01-09 23:06:32 -0500275 :param msg: Received message
khenaidoob9203542018-09-17 22:56:37 -0400276 :return: None on success, Exception on failure
277 """
278 try:
khenaidooca301322019-01-09 23:06:32 -0500279 log.debug("received-msg", msg=msg)
280 yield self.received_msg_queue.put(msg)
khenaidoob9203542018-09-17 22:56:37 -0400281 except Exception as e:
282 log.exception("Failed-enqueueing-received-message", e=e)
283
284 @inlineCallbacks
285 def _received_message_processing_loop(self):
286 """
287 Internal method to continuously process all received messages one
288 at a time
289 :return: None on success, Exception on failure
290 """
291 while True:
292 try:
293 message = yield self.received_msg_queue.get()
294 yield self._process_message(message)
khenaidooca301322019-01-09 23:06:32 -0500295 if self.stopped:
296 break
khenaidoob9203542018-09-17 22:56:37 -0400297 except Exception as e:
298 log.exception("Failed-dequeueing-received-message", e=e)
299
300 def _to_string(self, unicode_str):
301 if unicode_str is not None:
302 if type(unicode_str) == unicode:
303 return unicode_str.encode('ascii', 'ignore')
304 else:
305 return unicode_str
306 else:
307 return None
308
309 def _format_request(self,
310 rpc,
311 to_topic,
312 reply_topic,
313 **kwargs):
314 """
315 Format a request to send over kafka
316 :param rpc: Requested remote API
317 :param to_topic: Topic to send the request
318 :param reply_topic: Topic to receive the resulting response, if any
319 :param kwargs: Dictionary of key-value pairs to pass as arguments to
320 the remote rpc API.
321 :return: A InterContainerMessage message type on success or None on
322 failure
323 """
324 try:
325 transaction_id = uuid4().hex
326 request = InterContainerMessage()
327 request_body = InterContainerRequestBody()
328 request.header.id = transaction_id
329 request.header.type = MessageType.Value("REQUEST")
khenaidoo43c82122018-11-22 18:38:28 -0500330 request.header.from_topic = reply_topic
khenaidoob9203542018-09-17 22:56:37 -0400331 request.header.to_topic = to_topic
332
333 response_required = False
334 if reply_topic:
335 request_body.reply_to_topic = reply_topic
khenaidoo43c82122018-11-22 18:38:28 -0500336 request_body.response_required = True
khenaidoob9203542018-09-17 22:56:37 -0400337 response_required = True
338
339 request.header.timestamp = int(round(time.time() * 1000))
340 request_body.rpc = rpc
341 for a, b in kwargs.iteritems():
342 arg = Argument()
343 arg.key = a
344 try:
345 arg.value.Pack(b)
346 request_body.args.extend([arg])
347 except Exception as e:
348 log.exception("Failed-parsing-value", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400349 request.body.Pack(request_body)
350 return request, transaction_id, response_required
351 except Exception as e:
352 log.exception("formatting-request-failed",
353 rpc=rpc,
354 to_topic=to_topic,
355 reply_topic=reply_topic,
356 args=kwargs)
357 return None, None, None
358
359 def _format_response(self, msg_header, msg_body, status):
360 """
361 Format a response
362 :param msg_header: The header portion of a received request
363 :param msg_body: The response body
364 :param status: True is this represents a successful response
365 :return: a InterContainerMessage message type
366 """
367 try:
368 assert isinstance(msg_header, Header)
369 response = InterContainerMessage()
370 response_body = InterContainerResponseBody()
371 response.header.id = msg_header.id
372 response.header.timestamp = int(
373 round(time.time() * 1000))
374 response.header.type = MessageType.Value("RESPONSE")
375 response.header.from_topic = msg_header.to_topic
376 response.header.to_topic = msg_header.from_topic
377 if msg_body is not None:
378 response_body.result.Pack(msg_body)
379 response_body.success = status
380 response.body.Pack(response_body)
381 return response
382 except Exception as e:
383 log.exception("formatting-response-failed", header=msg_header,
384 body=msg_body, status=status, e=e)
385 return None
386
387 def _parse_response(self, msg):
388 try:
389 message = InterContainerMessage()
390 message.ParseFromString(msg)
391 resp = InterContainerResponseBody()
392 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
393 message.body.Unpack(resp)
394 else:
395 log.debug("unsupported-msg", msg_type=type(message.body))
396 return None
397 log.debug("parsed-response", input=message, output=resp)
398 return resp
399 except Exception as e:
400 log.exception("parsing-response-failed", msg=msg, e=e)
401 return None
402
403 @inlineCallbacks
404 def _process_message(self, m):
405 """
406 Default internal method invoked for every batch of messages received
407 from Kafka.
408 """
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400409
khenaidoo54e0ddf2019-02-27 16:21:33 -0500410 def _augment_args_with_FromTopic(args, from_topic):
411 arg = Argument(key=ARG_FROM_TOPIC)
412 t = StrType(val=from_topic)
413 arg.value.Pack(t)
414 args.extend([arg])
415 return args
416
khenaidoob9203542018-09-17 22:56:37 -0400417 def _toDict(args):
418 """
419 Convert a repeatable Argument type into a python dictionary
420 :param args: Repeatable core_adapter.Argument type
421 :return: a python dictionary
422 """
423 if args is None:
424 return None
425 result = {}
426 for arg in args:
427 assert isinstance(arg, Argument)
428 result[arg.key] = arg.value
429 return result
430
431 current_time = int(round(time.time() * 1000))
432 # log.debug("Got Message", message=m)
433 try:
khenaidooca301322019-01-09 23:06:32 -0500434 val = m.value()
435 # val = m.message.value
khenaidoob9203542018-09-17 22:56:37 -0400436 # print m.topic
437
438 # Go over customized callbacks first
khenaidooca301322019-01-09 23:06:32 -0500439 m_topic = m.topic()
440 if m_topic in self.topic_callback_map:
441 for c in self.topic_callback_map[m_topic]:
khenaidoob9203542018-09-17 22:56:37 -0400442 yield c(val)
443
444 # Check whether we need to process request/response scenario
khenaidooca301322019-01-09 23:06:32 -0500445 if m_topic not in self.topic_target_cls_map:
khenaidoob9203542018-09-17 22:56:37 -0400446 return
447
448 # Process request/response scenario
449 message = InterContainerMessage()
450 message.ParseFromString(val)
451
452 if message.header.type == MessageType.Value("REQUEST"):
khenaidoob9203542018-09-17 22:56:37 -0400453 # Get the target class for that specific topic
454 targetted_topic = self._to_string(message.header.to_topic)
455 msg_body = InterContainerRequestBody()
456 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
457 message.body.Unpack(msg_body)
458 else:
459 log.debug("unsupported-msg", msg_type=type(message.body))
460 return
461 if targetted_topic in self.topic_target_cls_map:
khenaidoo54e0ddf2019-02-27 16:21:33 -0500462 # Augment the request arguments with the from_topic
463 augmented_args = _augment_args_with_FromTopic(msg_body.args,
464 msg_body.reply_to_topic)
465 if augmented_args:
khenaidoob9203542018-09-17 22:56:37 -0400466 log.debug("message-body-args-present", body=msg_body)
467 (status, res) = yield getattr(
468 self.topic_target_cls_map[targetted_topic],
469 self._to_string(msg_body.rpc))(
khenaidoo54e0ddf2019-02-27 16:21:33 -0500470 **_toDict(augmented_args))
khenaidoob9203542018-09-17 22:56:37 -0400471 else:
472 log.debug("message-body-args-absent", body=msg_body,
473 rpc=msg_body.rpc)
474 (status, res) = yield getattr(
475 self.topic_target_cls_map[targetted_topic],
476 self._to_string(msg_body.rpc))()
477 if msg_body.response_required:
478 response = self._format_response(
479 msg_header=message.header,
480 msg_body=res,
481 status=status,
482 )
483 if response is not None:
484 res_topic = self._to_string(
485 response.header.to_topic)
486 self._send_kafka_message(res_topic, response)
487
khenaidooca301322019-01-09 23:06:32 -0500488 log.debug("Response-sent", response=response.body,
489 to_topic=res_topic)
khenaidoob9203542018-09-17 22:56:37 -0400490 elif message.header.type == MessageType.Value("RESPONSE"):
491 trns_id = self._to_string(message.header.id)
492 if trns_id in self.transaction_id_deferred_map:
khenaidoob9203542018-09-17 22:56:37 -0400493 resp = self._parse_response(val)
494
495 self.transaction_id_deferred_map[trns_id].callback(resp)
496 else:
497 log.error("!!INVALID-TRANSACTION-TYPE!!")
498
499 except Exception as e:
500 log.exception("Failed-to-process-message", message=m, e=e)
501
502 @inlineCallbacks
503 def _send_kafka_message(self, topic, msg):
504 try:
505 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
506 except Exception, e:
507 log.exception("Failed-sending-message", message=msg, e=e)
508
509 @inlineCallbacks
510 def send_request(self,
511 rpc,
512 to_topic,
513 reply_topic=None,
514 callback=None,
515 **kwargs):
516 """
517 Invoked to send a message to a remote container and receive a
518 response if required.
519 :param rpc: The remote API to invoke
520 :param to_topic: Send the message to this kafka topic
521 :param reply_topic: If not None then a response is expected on this
522 topic. If set to None then no response is required.
523 :param callback: Callback to invoke when a response is received.
524 :param kwargs: Key-value pairs representing arguments to pass to the
525 rpc remote API.
526 :return: Either no response is required, or a response is returned
527 via the callback or the response is a tuple of (status, return_cls)
528 """
529 try:
530 # Ensure all strings are not unicode encoded
531 rpc = self._to_string(rpc)
532 to_topic = self._to_string(to_topic)
533 reply_topic = self._to_string(reply_topic)
534
535 request, transaction_id, response_required = \
536 self._format_request(
537 rpc=rpc,
538 to_topic=to_topic,
539 reply_topic=reply_topic,
540 **kwargs)
541
542 if request is None:
543 return
544
545 # Add the transaction to the transaction map before sending the
546 # request. This will guarantee the eventual response will be
547 # processed.
548 wait_for_result = None
549 if response_required:
550 wait_for_result = Deferred()
551 self.transaction_id_deferred_map[
552 self._to_string(request.header.id)] = wait_for_result
553
khenaidoob9203542018-09-17 22:56:37 -0400554 yield self._send_kafka_message(to_topic, request)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400555 log.debug("message-sent", to_topic=to_topic,
556 from_topic=reply_topic)
khenaidoob9203542018-09-17 22:56:37 -0400557
558 if response_required:
559 res = yield wait_for_result
560
561 if res is None or not res.success:
562 raise KafkaMessagingError(error="Failed-response:{"
563 "}".format(res))
564
565 # Remove the transaction from the transaction map
566 del self.transaction_id_deferred_map[transaction_id]
567
568 log.debug("send-message-response", rpc=rpc, result=res)
569
570 if callback:
571 callback((res.success, res.result))
572 else:
573 returnValue((res.success, res.result))
574 except Exception as e:
575 log.exception("Exception-sending-request", e=e)
576 raise KafkaMessagingError(error=e)
577
578
579# Common method to get the singleton instance of the kafka proxy class
580def get_messaging_proxy():
581 return IKafkaMessagingProxy._kafka_messaging_instance