blob: ad538128dff7ebabb3bc38e34d9d8355fb7e7287 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2
3from zope.interface import Interface, implementer
4from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
5from twisted.internet import reactor
6from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
7 DeferredQueue, gatherResults
8from afkak.client import KafkaClient
9from afkak.consumer import OFFSET_LATEST, Consumer
10import structlog
11from adapters.common.utils import asleep
12from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
13 InterContainerRequestBody, InterContainerMessage, Header, InterContainerResponseBody
14import time
15from uuid import uuid4
16from adapters.common.utils.registry import IComponent
17
18
19log = structlog.get_logger()
20
21class KafkaMessagingError(BaseException):
22 def __init__(self, error):
23 self.error = error
24
25@implementer(IComponent)
26class IKafkaMessagingProxy(object):
27 _kafka_messaging_instance = None
28
29 def __init__(self,
30 kafka_host_port,
31 kv_store,
32 default_topic,
33 target_cls):
34 """
35 Initialize the kafka proxy. This is a singleton (may change to
36 non-singleton if performance is better)
37 :param kafka_host_port: Kafka host and port
38 :param kv_store: Key-Value store
39 :param default_topic: Default topic to subscribe to
40 :param target_cls: target class - method of that class is invoked
41 when a message is received on the default_topic
42 """
43 # return an exception if the object already exist
44 if IKafkaMessagingProxy._kafka_messaging_instance:
45 raise Exception(
46 'Singleton-exist', cls=IKafkaMessagingProxy)
47
48 log.debug("Initializing-KafkaProxy")
49 self.kafka_host_port = kafka_host_port
50 self.kv_store = kv_store
51 self.default_topic = default_topic
52 self.target_cls = target_cls
53 self.topic_target_cls_map = {}
54 self.topic_consumer_map = {}
55 self.topic_callback_map = {}
56 self.subscribers = {}
57 self.kafka_client = None
58 self.kafka_proxy = None
59 self.transaction_id_deferred_map = {}
60 self.received_msg_queue = DeferredQueue()
61
62 self.init_time = 0
63 self.init_received_time = 0
64
65 self.init_resp_time = 0
66 self.init_received_resp_time = 0
67
68 self.num_messages = 0
69 self.total_time = 0
70 self.num_responses = 0
71 self.total_time_responses = 0
72 log.debug("KafkaProxy-initialized")
73
74 def start(self):
75 try:
76 # Create the kafka client
77 # assert self.kafka_host is not None
78 # assert self.kafka_port is not None
79 # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
80 self.kafka_client = KafkaClient(self.kafka_host_port)
81
82 # Get the kafka proxy instance. If it does not exist then
83 # create it
84 self.kafka_proxy = get_kafka_proxy()
85 if self.kafka_proxy == None:
86 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
87 self.kafka_proxy = get_kafka_proxy()
88
89 # Subscribe the default topic and target_cls
90 self.topic_target_cls_map[self.default_topic] = self.target_cls
91
92 # Start the queue to handle incoming messages
93 reactor.callLater(0, self._received_message_processing_loop)
94
95 # Start listening for incoming messages
96 reactor.callLater(0, self.subscribe, self.default_topic,
97 target_cls=self.target_cls)
98
99 # Setup the singleton instance
100 IKafkaMessagingProxy._kafka_messaging_instance = self
101 except Exception as e:
102 log.exception("Failed-to-start-proxy", e=e)
103
104
105 def stop(self):
106 """
107 Invoked to stop the kafka proxy
108 :return: None on success, Exception on failure
109 """
110 log.debug("Stopping-messaging-proxy ...")
111 try:
112 # Stop all the consumers
113 deferred_list = []
114 for key, values in self.topic_consumer_map.iteritems():
115 deferred_list.extend([c.stop() for c in values])
116
117 if not deferred_list:
118 d = gatherResults(deferred_list)
119 d.addCallback(lambda result: self.kafka_client.close())
120 log.debug("Messaging-proxy-stopped.")
121 except Exception as e:
122 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
123
124
125 @inlineCallbacks
126 def _wait_until_topic_is_ready(self, client, topic):
127 e = True
128 while e:
129 yield client.load_metadata_for_topics(topic)
130 e = client.metadata_error_for_topic(topic)
131 if e:
132 log.debug("Topic-not-ready-retrying...", topic=topic)
133
134 def _clear_backoff(self):
135 if self.retries:
136 log.info('reconnected-to-consul', after_retries=self.retries)
137 self.retries = 0
138
139 @inlineCallbacks
140 def _subscribe(self, topic, callback=None, target_cls=None):
141 try:
142 yield self._wait_until_topic_is_ready(self.kafka_client, topic)
143 partitions = self.kafka_client.topic_partitions[topic]
144 consumers = []
145
146 # First setup the generic callback - all received messages will
147 # go through that queue
148 if topic not in self.topic_consumer_map:
149 consumers = [Consumer(self.kafka_client, topic, partition,
150 self._enqueue_received_message)
151 for partition in partitions]
152 self.topic_consumer_map[topic] = consumers
153
154 log.debug("_subscribe", topic=topic, consumermap=self.topic_consumer_map)
155
156 if target_cls is not None and callback is None:
157 # Scenario #1
158 if topic not in self.topic_target_cls_map:
159 self.topic_target_cls_map[topic] = target_cls
160 elif target_cls is None and callback is not None:
161 # Scenario #2
162 log.debug("custom-callback", topic=topic,
163 callback_map=self.topic_callback_map)
164 if topic not in self.topic_callback_map:
165 self.topic_callback_map[topic] = [callback]
166 else:
167 self.topic_callback_map[topic].extend([callback])
168 else:
169 log.warn("invalid-parameters")
170
171 def cb_closed(result):
172 """
173 Called when a consumer cleanly stops.
174 """
175 log.debug("Consumers-cleanly-stopped")
176
177 def eb_failed(failure):
178 """
179 Called when a consumer fails due to an uncaught exception in the
180 processing callback or a network error on shutdown. In this case we
181 simply log the error.
182 """
183 log.warn("Consumers-failed", failure=failure)
184
185 for c in consumers:
186 c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
187
188 returnValue(True)
189 except Exception as e:
190 log.exception("Exception-during-subscription", e=e)
191 returnValue(False)
192
193 def subscribe(self, topic, callback=None, target_cls=None,
194 max_retry=3):
195 """
196 Scenario 1: invoked to subscribe to a specific topic with a
197 target_cls to invoke when a message is received on that topic. This
198 handles the case of request/response where this library performs the
199 heavy lifting. In this case the m_callback must to be None
200
201 Scenario 2: invoked to subscribe to a specific topic with a
202 specific callback to invoke when a message is received on that topic.
203 This handles the case where the caller wants to process the message
204 received itself. In this case the target_cls must to be None
205
206 :param topic: topic to subscribe to
207 :param callback: Callback to invoke when a message is received on
208 the topic. Either one of callback or target_cls needs can be none
209 :param target_cls: Target class to use when a message is
210 received on the topic. There can only be 1 target_cls per topic.
211 Either one of callback or target_cls needs can be none
212 :param max_retry: the number of retries before reporting failure
213 to subscribe. This caters for scenario where the kafka topic is not
214 ready.
215 :return: True on success, False on failure
216 """
217 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
218
219 def _backoff(msg, retries):
220 wait_time = RETRY_BACKOFF[min(retries,
221 len(RETRY_BACKOFF) - 1)]
222 log.info(msg, retry_in=wait_time)
223 return asleep(wait_time)
224
225 retry = 0
226 while not self._subscribe(topic, callback=callback,
227 target_cls=target_cls):
228 if retry > max_retry:
229 return False
230 else:
231 _backoff("subscription-not-complete", retry)
232 retry += 1
233 return True
234
235 def unsubscribe(self, topic):
236 """
237 Invoked when unsubscribing to a topic
238 :param topic: topic to unsubscibe from
239 :return: None on success or Exception on failure
240 """
241 log.debug("Unsubscribing-to-topic", topic=topic)
242
243 def remove_topic(topic):
244 if topic in self.topic_consumer_map:
245 del self.topic_consumer_map[topic]
246
247 try:
248 if topic in self.topic_consumer_map:
249 consumers = self.topic_consumer_map[topic]
250 d = gatherResults([c.stop() for c in consumers])
251 d.addCallback(remove_topic, topic)
252 log.debug("Unsubscribed-to-topic.", topic=topic)
253 else:
254 log.debug("Topic-does-not-exist.", topic=topic)
255 except Exception as e:
256 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
257
258 @inlineCallbacks
259 def _enqueue_received_message(self, reactor, message_list):
260 """
261 Internal method to continuously queue all received messaged
262 irrespective of topic
263 :param reactor: A requirement by the Twisted Python kafka library
264 :param message_list: Received list of messages
265 :return: None on success, Exception on failure
266 """
267 try:
268 for m in message_list:
269 log.debug("received-msg", msg=m)
270 yield self.received_msg_queue.put(m)
271 except Exception as e:
272 log.exception("Failed-enqueueing-received-message", e=e)
273
274 @inlineCallbacks
275 def _received_message_processing_loop(self):
276 """
277 Internal method to continuously process all received messages one
278 at a time
279 :return: None on success, Exception on failure
280 """
281 while True:
282 try:
283 message = yield self.received_msg_queue.get()
284 yield self._process_message(message)
285 except Exception as e:
286 log.exception("Failed-dequeueing-received-message", e=e)
287
288 def _to_string(self, unicode_str):
289 if unicode_str is not None:
290 if type(unicode_str) == unicode:
291 return unicode_str.encode('ascii', 'ignore')
292 else:
293 return unicode_str
294 else:
295 return None
296
297 def _format_request(self,
298 rpc,
299 to_topic,
300 reply_topic,
301 **kwargs):
302 """
303 Format a request to send over kafka
304 :param rpc: Requested remote API
305 :param to_topic: Topic to send the request
306 :param reply_topic: Topic to receive the resulting response, if any
307 :param kwargs: Dictionary of key-value pairs to pass as arguments to
308 the remote rpc API.
309 :return: A InterContainerMessage message type on success or None on
310 failure
311 """
312 try:
313 transaction_id = uuid4().hex
314 request = InterContainerMessage()
315 request_body = InterContainerRequestBody()
316 request.header.id = transaction_id
317 request.header.type = MessageType.Value("REQUEST")
318 request.header.from_topic = self.default_topic
319 request.header.to_topic = to_topic
320
321 response_required = False
322 if reply_topic:
323 request_body.reply_to_topic = reply_topic
324 response_required = True
325
326 request.header.timestamp = int(round(time.time() * 1000))
327 request_body.rpc = rpc
328 for a, b in kwargs.iteritems():
329 arg = Argument()
330 arg.key = a
331 try:
332 arg.value.Pack(b)
333 request_body.args.extend([arg])
334 except Exception as e:
335 log.exception("Failed-parsing-value", e=e)
336 request_body.reply_to_topic = self.default_topic
337 request_body.response_required = response_required
338 request.body.Pack(request_body)
339 return request, transaction_id, response_required
340 except Exception as e:
341 log.exception("formatting-request-failed",
342 rpc=rpc,
343 to_topic=to_topic,
344 reply_topic=reply_topic,
345 args=kwargs)
346 return None, None, None
347
348 def _format_response(self, msg_header, msg_body, status):
349 """
350 Format a response
351 :param msg_header: The header portion of a received request
352 :param msg_body: The response body
353 :param status: True is this represents a successful response
354 :return: a InterContainerMessage message type
355 """
356 try:
357 assert isinstance(msg_header, Header)
358 response = InterContainerMessage()
359 response_body = InterContainerResponseBody()
360 response.header.id = msg_header.id
361 response.header.timestamp = int(
362 round(time.time() * 1000))
363 response.header.type = MessageType.Value("RESPONSE")
364 response.header.from_topic = msg_header.to_topic
365 response.header.to_topic = msg_header.from_topic
366 if msg_body is not None:
367 response_body.result.Pack(msg_body)
368 response_body.success = status
369 response.body.Pack(response_body)
370 return response
371 except Exception as e:
372 log.exception("formatting-response-failed", header=msg_header,
373 body=msg_body, status=status, e=e)
374 return None
375
376 def _parse_response(self, msg):
377 try:
378 message = InterContainerMessage()
379 message.ParseFromString(msg)
380 resp = InterContainerResponseBody()
381 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
382 message.body.Unpack(resp)
383 else:
384 log.debug("unsupported-msg", msg_type=type(message.body))
385 return None
386 log.debug("parsed-response", input=message, output=resp)
387 return resp
388 except Exception as e:
389 log.exception("parsing-response-failed", msg=msg, e=e)
390 return None
391
392 @inlineCallbacks
393 def _process_message(self, m):
394 """
395 Default internal method invoked for every batch of messages received
396 from Kafka.
397 """
398 def _toDict(args):
399 """
400 Convert a repeatable Argument type into a python dictionary
401 :param args: Repeatable core_adapter.Argument type
402 :return: a python dictionary
403 """
404 if args is None:
405 return None
406 result = {}
407 for arg in args:
408 assert isinstance(arg, Argument)
409 result[arg.key] = arg.value
410 return result
411
412 current_time = int(round(time.time() * 1000))
413 # log.debug("Got Message", message=m)
414 try:
415 val = m.message.value
416 # print m.topic
417
418 # Go over customized callbacks first
419 if m.topic in self.topic_callback_map:
420 for c in self.topic_callback_map[m.topic]:
421 yield c(val)
422
423 # Check whether we need to process request/response scenario
424 if m.topic not in self.topic_target_cls_map:
425 return
426
427 # Process request/response scenario
428 message = InterContainerMessage()
429 message.ParseFromString(val)
430
431 if message.header.type == MessageType.Value("REQUEST"):
432 # if self.num_messages == 0:
433 # self.init_time = int(round(time.time() * 1000))
434 # self.init_received_time = message.header.timestamp
435 # log.debug("INIT_TIME", time=self.init_time,
436 # time_sent=message.header.timestamp)
437 # self.num_messages = self.num_messages + 1
438 #
439 # self.total_time = self.total_time + current_time - message.header.timestamp
440 #
441 # if self.num_messages % 10 == 0:
442 # log.debug("TOTAL_TIME ...",
443 # num=self.num_messages,
444 # total=self.total_time,
445 # duration=current_time - self.init_time,
446 # time_since_first_msg=current_time - self.init_received_time,
447 # average=self.total_time / 10)
448 # self.total_time = 0
449
450 # Get the target class for that specific topic
451 targetted_topic = self._to_string(message.header.to_topic)
452 msg_body = InterContainerRequestBody()
453 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
454 message.body.Unpack(msg_body)
455 else:
456 log.debug("unsupported-msg", msg_type=type(message.body))
457 return
458 if targetted_topic in self.topic_target_cls_map:
459 if msg_body.args:
460 log.debug("message-body-args-present", body=msg_body)
461 (status, res) = yield getattr(
462 self.topic_target_cls_map[targetted_topic],
463 self._to_string(msg_body.rpc))(
464 **_toDict(msg_body.args))
465 else:
466 log.debug("message-body-args-absent", body=msg_body,
467 rpc=msg_body.rpc)
468 (status, res) = yield getattr(
469 self.topic_target_cls_map[targetted_topic],
470 self._to_string(msg_body.rpc))()
471 if msg_body.response_required:
472 response = self._format_response(
473 msg_header=message.header,
474 msg_body=res,
475 status=status,
476 )
477 if response is not None:
478 res_topic = self._to_string(
479 response.header.to_topic)
480 self._send_kafka_message(res_topic, response)
481
482 log.debug("Response-sent", response=response.body)
483 elif message.header.type == MessageType.Value("RESPONSE"):
484 trns_id = self._to_string(message.header.id)
485 if trns_id in self.transaction_id_deferred_map:
486 # self.num_responses = self.num_responses + 1
487 # self.total_time_responses = self.total_time_responses + current_time - \
488 # message.header.timestamp
489 # if self.num_responses % 10 == 0:
490 # log.debug("TOTAL RESPONSES ...",
491 # num=self.num_responses,
492 # total=self.total_time_responses,
493 # average=self.total_time_responses / 10)
494 # self.total_time_responses = 0
495
496 resp = self._parse_response(val)
497
498 self.transaction_id_deferred_map[trns_id].callback(resp)
499 else:
500 log.error("!!INVALID-TRANSACTION-TYPE!!")
501
502 except Exception as e:
503 log.exception("Failed-to-process-message", message=m, e=e)
504
505 @inlineCallbacks
506 def _send_kafka_message(self, topic, msg):
507 try:
508 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
509 except Exception, e:
510 log.exception("Failed-sending-message", message=msg, e=e)
511
512 @inlineCallbacks
513 def send_request(self,
514 rpc,
515 to_topic,
516 reply_topic=None,
517 callback=None,
518 **kwargs):
519 """
520 Invoked to send a message to a remote container and receive a
521 response if required.
522 :param rpc: The remote API to invoke
523 :param to_topic: Send the message to this kafka topic
524 :param reply_topic: If not None then a response is expected on this
525 topic. If set to None then no response is required.
526 :param callback: Callback to invoke when a response is received.
527 :param kwargs: Key-value pairs representing arguments to pass to the
528 rpc remote API.
529 :return: Either no response is required, or a response is returned
530 via the callback or the response is a tuple of (status, return_cls)
531 """
532 try:
533 # Ensure all strings are not unicode encoded
534 rpc = self._to_string(rpc)
535 to_topic = self._to_string(to_topic)
536 reply_topic = self._to_string(reply_topic)
537
538 request, transaction_id, response_required = \
539 self._format_request(
540 rpc=rpc,
541 to_topic=to_topic,
542 reply_topic=reply_topic,
543 **kwargs)
544
545 if request is None:
546 return
547
548 # Add the transaction to the transaction map before sending the
549 # request. This will guarantee the eventual response will be
550 # processed.
551 wait_for_result = None
552 if response_required:
553 wait_for_result = Deferred()
554 self.transaction_id_deferred_map[
555 self._to_string(request.header.id)] = wait_for_result
556
557 log.debug("BEFORE-SENDING", to_topic=to_topic, from_topic=reply_topic)
558 yield self._send_kafka_message(to_topic, request)
559 log.debug("AFTER-SENDING", to_topic=to_topic, from_topic=reply_topic)
560
561 if response_required:
562 res = yield wait_for_result
563
564 if res is None or not res.success:
565 raise KafkaMessagingError(error="Failed-response:{"
566 "}".format(res))
567
568 # Remove the transaction from the transaction map
569 del self.transaction_id_deferred_map[transaction_id]
570
571 log.debug("send-message-response", rpc=rpc, result=res)
572
573 if callback:
574 callback((res.success, res.result))
575 else:
576 returnValue((res.success, res.result))
577 except Exception as e:
578 log.exception("Exception-sending-request", e=e)
579 raise KafkaMessagingError(error=e)
580
581
582# Common method to get the singleton instance of the kafka proxy class
583def get_messaging_proxy():
584 return IKafkaMessagingProxy._kafka_messaging_instance