blob: fbb0834ed68985de337f8643958c45ef9c4a4989 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2
Zack Williams998f4422018-09-19 10:38:57 -07003# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
khenaidoo6fdf0ba2018-11-02 14:38:33 -040017import time
18from uuid import uuid4
19
20import structlog
21from afkak.client import KafkaClient
22from afkak.consumer import OFFSET_LATEST, Consumer
khenaidoob9203542018-09-17 22:56:37 -040023from twisted.internet import reactor
24from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
25 DeferredQueue, gatherResults
khenaidoo6fdf0ba2018-11-02 14:38:33 -040026from zope.interface import implementer
khenaidoob9203542018-09-17 22:56:37 -040027
khenaidoofdbad6e2018-11-06 22:26:38 -050028from python.common.utils import asleep
29from python.common.utils.registry import IComponent
30from kafka_proxy import KafkaProxy, get_kafka_proxy
khenaidoo79232702018-12-04 11:00:41 -050031from python.protos.inter_container_pb2 import MessageType, Argument, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040032 InterContainerRequestBody, InterContainerMessage, Header, \
33 InterContainerResponseBody
khenaidoob9203542018-09-17 22:56:37 -040034
35log = structlog.get_logger()
36
khenaidoo6fdf0ba2018-11-02 14:38:33 -040037
khenaidoob9203542018-09-17 22:56:37 -040038class KafkaMessagingError(BaseException):
39 def __init__(self, error):
40 self.error = error
41
khenaidoo6fdf0ba2018-11-02 14:38:33 -040042
khenaidoob9203542018-09-17 22:56:37 -040043@implementer(IComponent)
44class IKafkaMessagingProxy(object):
45 _kafka_messaging_instance = None
46
47 def __init__(self,
48 kafka_host_port,
49 kv_store,
50 default_topic,
51 target_cls):
52 """
53 Initialize the kafka proxy. This is a singleton (may change to
54 non-singleton if performance is better)
55 :param kafka_host_port: Kafka host and port
56 :param kv_store: Key-Value store
57 :param default_topic: Default topic to subscribe to
58 :param target_cls: target class - method of that class is invoked
59 when a message is received on the default_topic
60 """
61 # return an exception if the object already exist
62 if IKafkaMessagingProxy._kafka_messaging_instance:
63 raise Exception(
64 'Singleton-exist', cls=IKafkaMessagingProxy)
65
66 log.debug("Initializing-KafkaProxy")
67 self.kafka_host_port = kafka_host_port
68 self.kv_store = kv_store
69 self.default_topic = default_topic
70 self.target_cls = target_cls
71 self.topic_target_cls_map = {}
72 self.topic_consumer_map = {}
73 self.topic_callback_map = {}
74 self.subscribers = {}
75 self.kafka_client = None
76 self.kafka_proxy = None
77 self.transaction_id_deferred_map = {}
78 self.received_msg_queue = DeferredQueue()
79
80 self.init_time = 0
81 self.init_received_time = 0
82
83 self.init_resp_time = 0
84 self.init_received_resp_time = 0
85
86 self.num_messages = 0
87 self.total_time = 0
88 self.num_responses = 0
89 self.total_time_responses = 0
90 log.debug("KafkaProxy-initialized")
91
92 def start(self):
93 try:
94 # Create the kafka client
95 # assert self.kafka_host is not None
96 # assert self.kafka_port is not None
97 # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
98 self.kafka_client = KafkaClient(self.kafka_host_port)
99
100 # Get the kafka proxy instance. If it does not exist then
101 # create it
102 self.kafka_proxy = get_kafka_proxy()
103 if self.kafka_proxy == None:
104 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
105 self.kafka_proxy = get_kafka_proxy()
106
107 # Subscribe the default topic and target_cls
108 self.topic_target_cls_map[self.default_topic] = self.target_cls
109
110 # Start the queue to handle incoming messages
111 reactor.callLater(0, self._received_message_processing_loop)
112
113 # Start listening for incoming messages
114 reactor.callLater(0, self.subscribe, self.default_topic,
115 target_cls=self.target_cls)
116
117 # Setup the singleton instance
118 IKafkaMessagingProxy._kafka_messaging_instance = self
119 except Exception as e:
120 log.exception("Failed-to-start-proxy", e=e)
121
khenaidoob9203542018-09-17 22:56:37 -0400122 def stop(self):
123 """
124 Invoked to stop the kafka proxy
125 :return: None on success, Exception on failure
126 """
127 log.debug("Stopping-messaging-proxy ...")
128 try:
129 # Stop all the consumers
130 deferred_list = []
131 for key, values in self.topic_consumer_map.iteritems():
132 deferred_list.extend([c.stop() for c in values])
133
134 if not deferred_list:
135 d = gatherResults(deferred_list)
136 d.addCallback(lambda result: self.kafka_client.close())
137 log.debug("Messaging-proxy-stopped.")
138 except Exception as e:
139 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
140
khenaidoo90847922018-12-03 14:47:51 -0500141
142 @inlineCallbacks
143 def create_topic(self, topic):
144 yield self._wait_until_topic_is_ready(self.kafka_client, topic)
145
khenaidoob9203542018-09-17 22:56:37 -0400146 @inlineCallbacks
147 def _wait_until_topic_is_ready(self, client, topic):
148 e = True
149 while e:
150 yield client.load_metadata_for_topics(topic)
151 e = client.metadata_error_for_topic(topic)
152 if e:
153 log.debug("Topic-not-ready-retrying...", topic=topic)
154
155 def _clear_backoff(self):
156 if self.retries:
157 log.info('reconnected-to-consul', after_retries=self.retries)
158 self.retries = 0
159
khenaidoo43c82122018-11-22 18:38:28 -0500160 def get_target_cls(self):
161 return self.target_cls
162
163 def get_default_topic(self):
164 return self.default_topic
165
khenaidoob9203542018-09-17 22:56:37 -0400166 @inlineCallbacks
khenaidoo90847922018-12-03 14:47:51 -0500167 def _subscribe(self, topic, offset, callback=None, target_cls=None):
khenaidoob9203542018-09-17 22:56:37 -0400168 try:
khenaidoo90847922018-12-03 14:47:51 -0500169 log.debug("subscribing-to-topic-start", topic=topic)
khenaidoob9203542018-09-17 22:56:37 -0400170 yield self._wait_until_topic_is_ready(self.kafka_client, topic)
171 partitions = self.kafka_client.topic_partitions[topic]
172 consumers = []
173
174 # First setup the generic callback - all received messages will
175 # go through that queue
176 if topic not in self.topic_consumer_map:
khenaidoo90847922018-12-03 14:47:51 -0500177 log.debug("topic-not-in-consumer-map", topic=topic)
khenaidoob9203542018-09-17 22:56:37 -0400178 consumers = [Consumer(self.kafka_client, topic, partition,
179 self._enqueue_received_message)
180 for partition in partitions]
181 self.topic_consumer_map[topic] = consumers
182
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400183 log.debug("_subscribe", topic=topic,
184 consumermap=self.topic_consumer_map)
khenaidoob9203542018-09-17 22:56:37 -0400185
186 if target_cls is not None and callback is None:
187 # Scenario #1
188 if topic not in self.topic_target_cls_map:
189 self.topic_target_cls_map[topic] = target_cls
190 elif target_cls is None and callback is not None:
191 # Scenario #2
192 log.debug("custom-callback", topic=topic,
193 callback_map=self.topic_callback_map)
194 if topic not in self.topic_callback_map:
195 self.topic_callback_map[topic] = [callback]
196 else:
197 self.topic_callback_map[topic].extend([callback])
198 else:
199 log.warn("invalid-parameters")
200
201 def cb_closed(result):
202 """
203 Called when a consumer cleanly stops.
204 """
205 log.debug("Consumers-cleanly-stopped")
206
207 def eb_failed(failure):
208 """
209 Called when a consumer fails due to an uncaught exception in the
210 processing callback or a network error on shutdown. In this case we
211 simply log the error.
212 """
213 log.warn("Consumers-failed", failure=failure)
214
215 for c in consumers:
khenaidoo90847922018-12-03 14:47:51 -0500216 c.start(offset).addCallbacks(cb_closed, eb_failed)
217
218 log.debug("subscribed-to-topic", topic=topic)
khenaidoob9203542018-09-17 22:56:37 -0400219
220 returnValue(True)
221 except Exception as e:
222 log.exception("Exception-during-subscription", e=e)
223 returnValue(False)
224
khenaidoo90847922018-12-03 14:47:51 -0500225 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400226 def subscribe(self, topic, callback=None, target_cls=None,
khenaidoo90847922018-12-03 14:47:51 -0500227 max_retry=3, offset=OFFSET_LATEST):
khenaidoob9203542018-09-17 22:56:37 -0400228 """
229 Scenario 1: invoked to subscribe to a specific topic with a
230 target_cls to invoke when a message is received on that topic. This
231 handles the case of request/response where this library performs the
232 heavy lifting. In this case the m_callback must to be None
233
234 Scenario 2: invoked to subscribe to a specific topic with a
235 specific callback to invoke when a message is received on that topic.
236 This handles the case where the caller wants to process the message
237 received itself. In this case the target_cls must to be None
238
239 :param topic: topic to subscribe to
240 :param callback: Callback to invoke when a message is received on
241 the topic. Either one of callback or target_cls needs can be none
242 :param target_cls: Target class to use when a message is
243 received on the topic. There can only be 1 target_cls per topic.
244 Either one of callback or target_cls needs can be none
245 :param max_retry: the number of retries before reporting failure
246 to subscribe. This caters for scenario where the kafka topic is not
247 ready.
248 :return: True on success, False on failure
249 """
250 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
251
252 def _backoff(msg, retries):
253 wait_time = RETRY_BACKOFF[min(retries,
254 len(RETRY_BACKOFF) - 1)]
255 log.info(msg, retry_in=wait_time)
256 return asleep(wait_time)
257
258 retry = 0
khenaidoo90847922018-12-03 14:47:51 -0500259 subscribed = False
260 while not subscribed:
261 subscribed = yield self._subscribe(topic, callback=callback,
262 target_cls=target_cls, offset=offset)
263 if subscribed:
264 returnValue(True)
265 elif retry > max_retry:
266 returnValue(False)
khenaidoob9203542018-09-17 22:56:37 -0400267 else:
268 _backoff("subscription-not-complete", retry)
269 retry += 1
khenaidoo90847922018-12-03 14:47:51 -0500270
271 # while not self._subscribe(topic, callback=callback,
272 # target_cls=target_cls):
273 # if retry > max_retry:
274 # return False
275 # else:
276 # _backoff("subscription-not-complete", retry)
277 # retry += 1
278 # return True
khenaidoob9203542018-09-17 22:56:37 -0400279
280 def unsubscribe(self, topic):
281 """
282 Invoked when unsubscribing to a topic
283 :param topic: topic to unsubscibe from
284 :return: None on success or Exception on failure
285 """
286 log.debug("Unsubscribing-to-topic", topic=topic)
287
288 def remove_topic(topic):
289 if topic in self.topic_consumer_map:
290 del self.topic_consumer_map[topic]
291
292 try:
293 if topic in self.topic_consumer_map:
294 consumers = self.topic_consumer_map[topic]
295 d = gatherResults([c.stop() for c in consumers])
296 d.addCallback(remove_topic, topic)
297 log.debug("Unsubscribed-to-topic.", topic=topic)
298 else:
299 log.debug("Topic-does-not-exist.", topic=topic)
300 except Exception as e:
301 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
302
303 @inlineCallbacks
304 def _enqueue_received_message(self, reactor, message_list):
305 """
306 Internal method to continuously queue all received messaged
307 irrespective of topic
308 :param reactor: A requirement by the Twisted Python kafka library
309 :param message_list: Received list of messages
310 :return: None on success, Exception on failure
311 """
312 try:
313 for m in message_list:
314 log.debug("received-msg", msg=m)
315 yield self.received_msg_queue.put(m)
316 except Exception as e:
317 log.exception("Failed-enqueueing-received-message", e=e)
318
319 @inlineCallbacks
320 def _received_message_processing_loop(self):
321 """
322 Internal method to continuously process all received messages one
323 at a time
324 :return: None on success, Exception on failure
325 """
326 while True:
327 try:
328 message = yield self.received_msg_queue.get()
329 yield self._process_message(message)
330 except Exception as e:
331 log.exception("Failed-dequeueing-received-message", e=e)
332
333 def _to_string(self, unicode_str):
334 if unicode_str is not None:
335 if type(unicode_str) == unicode:
336 return unicode_str.encode('ascii', 'ignore')
337 else:
338 return unicode_str
339 else:
340 return None
341
342 def _format_request(self,
343 rpc,
344 to_topic,
345 reply_topic,
346 **kwargs):
347 """
348 Format a request to send over kafka
349 :param rpc: Requested remote API
350 :param to_topic: Topic to send the request
351 :param reply_topic: Topic to receive the resulting response, if any
352 :param kwargs: Dictionary of key-value pairs to pass as arguments to
353 the remote rpc API.
354 :return: A InterContainerMessage message type on success or None on
355 failure
356 """
357 try:
358 transaction_id = uuid4().hex
359 request = InterContainerMessage()
360 request_body = InterContainerRequestBody()
361 request.header.id = transaction_id
362 request.header.type = MessageType.Value("REQUEST")
khenaidoo43c82122018-11-22 18:38:28 -0500363 request.header.from_topic = reply_topic
khenaidoob9203542018-09-17 22:56:37 -0400364 request.header.to_topic = to_topic
365
366 response_required = False
367 if reply_topic:
368 request_body.reply_to_topic = reply_topic
khenaidoo43c82122018-11-22 18:38:28 -0500369 request_body.response_required = True
khenaidoob9203542018-09-17 22:56:37 -0400370 response_required = True
371
372 request.header.timestamp = int(round(time.time() * 1000))
373 request_body.rpc = rpc
374 for a, b in kwargs.iteritems():
375 arg = Argument()
376 arg.key = a
377 try:
378 arg.value.Pack(b)
379 request_body.args.extend([arg])
380 except Exception as e:
381 log.exception("Failed-parsing-value", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400382 request.body.Pack(request_body)
383 return request, transaction_id, response_required
384 except Exception as e:
385 log.exception("formatting-request-failed",
386 rpc=rpc,
387 to_topic=to_topic,
388 reply_topic=reply_topic,
389 args=kwargs)
390 return None, None, None
391
392 def _format_response(self, msg_header, msg_body, status):
393 """
394 Format a response
395 :param msg_header: The header portion of a received request
396 :param msg_body: The response body
397 :param status: True is this represents a successful response
398 :return: a InterContainerMessage message type
399 """
400 try:
401 assert isinstance(msg_header, Header)
402 response = InterContainerMessage()
403 response_body = InterContainerResponseBody()
404 response.header.id = msg_header.id
405 response.header.timestamp = int(
406 round(time.time() * 1000))
407 response.header.type = MessageType.Value("RESPONSE")
408 response.header.from_topic = msg_header.to_topic
409 response.header.to_topic = msg_header.from_topic
410 if msg_body is not None:
411 response_body.result.Pack(msg_body)
412 response_body.success = status
413 response.body.Pack(response_body)
414 return response
415 except Exception as e:
416 log.exception("formatting-response-failed", header=msg_header,
417 body=msg_body, status=status, e=e)
418 return None
419
420 def _parse_response(self, msg):
421 try:
422 message = InterContainerMessage()
423 message.ParseFromString(msg)
424 resp = InterContainerResponseBody()
425 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
426 message.body.Unpack(resp)
427 else:
428 log.debug("unsupported-msg", msg_type=type(message.body))
429 return None
430 log.debug("parsed-response", input=message, output=resp)
431 return resp
432 except Exception as e:
433 log.exception("parsing-response-failed", msg=msg, e=e)
434 return None
435
436 @inlineCallbacks
437 def _process_message(self, m):
438 """
439 Default internal method invoked for every batch of messages received
440 from Kafka.
441 """
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400442
khenaidoob9203542018-09-17 22:56:37 -0400443 def _toDict(args):
444 """
445 Convert a repeatable Argument type into a python dictionary
446 :param args: Repeatable core_adapter.Argument type
447 :return: a python dictionary
448 """
449 if args is None:
450 return None
451 result = {}
452 for arg in args:
453 assert isinstance(arg, Argument)
454 result[arg.key] = arg.value
455 return result
456
457 current_time = int(round(time.time() * 1000))
458 # log.debug("Got Message", message=m)
459 try:
460 val = m.message.value
461 # print m.topic
462
463 # Go over customized callbacks first
464 if m.topic in self.topic_callback_map:
465 for c in self.topic_callback_map[m.topic]:
466 yield c(val)
467
468 # Check whether we need to process request/response scenario
469 if m.topic not in self.topic_target_cls_map:
470 return
471
472 # Process request/response scenario
473 message = InterContainerMessage()
474 message.ParseFromString(val)
475
476 if message.header.type == MessageType.Value("REQUEST"):
khenaidoob9203542018-09-17 22:56:37 -0400477 # Get the target class for that specific topic
478 targetted_topic = self._to_string(message.header.to_topic)
479 msg_body = InterContainerRequestBody()
480 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
481 message.body.Unpack(msg_body)
482 else:
483 log.debug("unsupported-msg", msg_type=type(message.body))
484 return
485 if targetted_topic in self.topic_target_cls_map:
486 if msg_body.args:
487 log.debug("message-body-args-present", body=msg_body)
488 (status, res) = yield getattr(
489 self.topic_target_cls_map[targetted_topic],
490 self._to_string(msg_body.rpc))(
491 **_toDict(msg_body.args))
492 else:
493 log.debug("message-body-args-absent", body=msg_body,
494 rpc=msg_body.rpc)
495 (status, res) = yield getattr(
496 self.topic_target_cls_map[targetted_topic],
497 self._to_string(msg_body.rpc))()
498 if msg_body.response_required:
499 response = self._format_response(
500 msg_header=message.header,
501 msg_body=res,
502 status=status,
503 )
504 if response is not None:
505 res_topic = self._to_string(
506 response.header.to_topic)
507 self._send_kafka_message(res_topic, response)
508
khenaidoo43c82122018-11-22 18:38:28 -0500509 log.debug("Response-sent", response=response.body, to_topic=res_topic)
khenaidoob9203542018-09-17 22:56:37 -0400510 elif message.header.type == MessageType.Value("RESPONSE"):
511 trns_id = self._to_string(message.header.id)
512 if trns_id in self.transaction_id_deferred_map:
khenaidoob9203542018-09-17 22:56:37 -0400513 resp = self._parse_response(val)
514
515 self.transaction_id_deferred_map[trns_id].callback(resp)
516 else:
517 log.error("!!INVALID-TRANSACTION-TYPE!!")
518
519 except Exception as e:
520 log.exception("Failed-to-process-message", message=m, e=e)
521
522 @inlineCallbacks
523 def _send_kafka_message(self, topic, msg):
524 try:
525 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
526 except Exception, e:
527 log.exception("Failed-sending-message", message=msg, e=e)
528
529 @inlineCallbacks
530 def send_request(self,
531 rpc,
532 to_topic,
533 reply_topic=None,
534 callback=None,
535 **kwargs):
536 """
537 Invoked to send a message to a remote container and receive a
538 response if required.
539 :param rpc: The remote API to invoke
540 :param to_topic: Send the message to this kafka topic
541 :param reply_topic: If not None then a response is expected on this
542 topic. If set to None then no response is required.
543 :param callback: Callback to invoke when a response is received.
544 :param kwargs: Key-value pairs representing arguments to pass to the
545 rpc remote API.
546 :return: Either no response is required, or a response is returned
547 via the callback or the response is a tuple of (status, return_cls)
548 """
549 try:
550 # Ensure all strings are not unicode encoded
551 rpc = self._to_string(rpc)
552 to_topic = self._to_string(to_topic)
553 reply_topic = self._to_string(reply_topic)
554
555 request, transaction_id, response_required = \
556 self._format_request(
557 rpc=rpc,
558 to_topic=to_topic,
559 reply_topic=reply_topic,
560 **kwargs)
561
562 if request is None:
563 return
564
565 # Add the transaction to the transaction map before sending the
566 # request. This will guarantee the eventual response will be
567 # processed.
568 wait_for_result = None
569 if response_required:
570 wait_for_result = Deferred()
571 self.transaction_id_deferred_map[
572 self._to_string(request.header.id)] = wait_for_result
573
khenaidoob9203542018-09-17 22:56:37 -0400574 yield self._send_kafka_message(to_topic, request)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400575 log.debug("message-sent", to_topic=to_topic,
576 from_topic=reply_topic)
khenaidoob9203542018-09-17 22:56:37 -0400577
578 if response_required:
579 res = yield wait_for_result
580
581 if res is None or not res.success:
582 raise KafkaMessagingError(error="Failed-response:{"
583 "}".format(res))
584
585 # Remove the transaction from the transaction map
586 del self.transaction_id_deferred_map[transaction_id]
587
588 log.debug("send-message-response", rpc=rpc, result=res)
589
590 if callback:
591 callback((res.success, res.result))
592 else:
593 returnValue((res.success, res.result))
594 except Exception as e:
595 log.exception("Exception-sending-request", e=e)
596 raise KafkaMessagingError(error=e)
597
598
599# Common method to get the singleton instance of the kafka proxy class
600def get_messaging_proxy():
601 return IKafkaMessagingProxy._kafka_messaging_instance