blob: f5bb7202f8785b8c385940279a6501edab362def [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2
Zack Williams998f4422018-09-19 10:38:57 -07003# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
khenaidoob9203542018-09-17 22:56:37 -040017from zope.interface import Interface, implementer
18from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
19from twisted.internet import reactor
20from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
21 DeferredQueue, gatherResults
22from afkak.client import KafkaClient
23from afkak.consumer import OFFSET_LATEST, Consumer
24import structlog
25from adapters.common.utils import asleep
26from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
27 InterContainerRequestBody, InterContainerMessage, Header, InterContainerResponseBody
28import time
29from uuid import uuid4
30from adapters.common.utils.registry import IComponent
31
32
33log = structlog.get_logger()
34
35class KafkaMessagingError(BaseException):
36 def __init__(self, error):
37 self.error = error
38
39@implementer(IComponent)
40class IKafkaMessagingProxy(object):
41 _kafka_messaging_instance = None
42
43 def __init__(self,
44 kafka_host_port,
45 kv_store,
46 default_topic,
47 target_cls):
48 """
49 Initialize the kafka proxy. This is a singleton (may change to
50 non-singleton if performance is better)
51 :param kafka_host_port: Kafka host and port
52 :param kv_store: Key-Value store
53 :param default_topic: Default topic to subscribe to
54 :param target_cls: target class - method of that class is invoked
55 when a message is received on the default_topic
56 """
57 # return an exception if the object already exist
58 if IKafkaMessagingProxy._kafka_messaging_instance:
59 raise Exception(
60 'Singleton-exist', cls=IKafkaMessagingProxy)
61
62 log.debug("Initializing-KafkaProxy")
63 self.kafka_host_port = kafka_host_port
64 self.kv_store = kv_store
65 self.default_topic = default_topic
66 self.target_cls = target_cls
67 self.topic_target_cls_map = {}
68 self.topic_consumer_map = {}
69 self.topic_callback_map = {}
70 self.subscribers = {}
71 self.kafka_client = None
72 self.kafka_proxy = None
73 self.transaction_id_deferred_map = {}
74 self.received_msg_queue = DeferredQueue()
75
76 self.init_time = 0
77 self.init_received_time = 0
78
79 self.init_resp_time = 0
80 self.init_received_resp_time = 0
81
82 self.num_messages = 0
83 self.total_time = 0
84 self.num_responses = 0
85 self.total_time_responses = 0
86 log.debug("KafkaProxy-initialized")
87
88 def start(self):
89 try:
90 # Create the kafka client
91 # assert self.kafka_host is not None
92 # assert self.kafka_port is not None
93 # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
94 self.kafka_client = KafkaClient(self.kafka_host_port)
95
96 # Get the kafka proxy instance. If it does not exist then
97 # create it
98 self.kafka_proxy = get_kafka_proxy()
99 if self.kafka_proxy == None:
100 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
101 self.kafka_proxy = get_kafka_proxy()
102
103 # Subscribe the default topic and target_cls
104 self.topic_target_cls_map[self.default_topic] = self.target_cls
105
106 # Start the queue to handle incoming messages
107 reactor.callLater(0, self._received_message_processing_loop)
108
109 # Start listening for incoming messages
110 reactor.callLater(0, self.subscribe, self.default_topic,
111 target_cls=self.target_cls)
112
113 # Setup the singleton instance
114 IKafkaMessagingProxy._kafka_messaging_instance = self
115 except Exception as e:
116 log.exception("Failed-to-start-proxy", e=e)
117
118
119 def stop(self):
120 """
121 Invoked to stop the kafka proxy
122 :return: None on success, Exception on failure
123 """
124 log.debug("Stopping-messaging-proxy ...")
125 try:
126 # Stop all the consumers
127 deferred_list = []
128 for key, values in self.topic_consumer_map.iteritems():
129 deferred_list.extend([c.stop() for c in values])
130
131 if not deferred_list:
132 d = gatherResults(deferred_list)
133 d.addCallback(lambda result: self.kafka_client.close())
134 log.debug("Messaging-proxy-stopped.")
135 except Exception as e:
136 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
137
138
139 @inlineCallbacks
140 def _wait_until_topic_is_ready(self, client, topic):
141 e = True
142 while e:
143 yield client.load_metadata_for_topics(topic)
144 e = client.metadata_error_for_topic(topic)
145 if e:
146 log.debug("Topic-not-ready-retrying...", topic=topic)
147
148 def _clear_backoff(self):
149 if self.retries:
150 log.info('reconnected-to-consul', after_retries=self.retries)
151 self.retries = 0
152
153 @inlineCallbacks
154 def _subscribe(self, topic, callback=None, target_cls=None):
155 try:
156 yield self._wait_until_topic_is_ready(self.kafka_client, topic)
157 partitions = self.kafka_client.topic_partitions[topic]
158 consumers = []
159
160 # First setup the generic callback - all received messages will
161 # go through that queue
162 if topic not in self.topic_consumer_map:
163 consumers = [Consumer(self.kafka_client, topic, partition,
164 self._enqueue_received_message)
165 for partition in partitions]
166 self.topic_consumer_map[topic] = consumers
167
168 log.debug("_subscribe", topic=topic, consumermap=self.topic_consumer_map)
169
170 if target_cls is not None and callback is None:
171 # Scenario #1
172 if topic not in self.topic_target_cls_map:
173 self.topic_target_cls_map[topic] = target_cls
174 elif target_cls is None and callback is not None:
175 # Scenario #2
176 log.debug("custom-callback", topic=topic,
177 callback_map=self.topic_callback_map)
178 if topic not in self.topic_callback_map:
179 self.topic_callback_map[topic] = [callback]
180 else:
181 self.topic_callback_map[topic].extend([callback])
182 else:
183 log.warn("invalid-parameters")
184
185 def cb_closed(result):
186 """
187 Called when a consumer cleanly stops.
188 """
189 log.debug("Consumers-cleanly-stopped")
190
191 def eb_failed(failure):
192 """
193 Called when a consumer fails due to an uncaught exception in the
194 processing callback or a network error on shutdown. In this case we
195 simply log the error.
196 """
197 log.warn("Consumers-failed", failure=failure)
198
199 for c in consumers:
200 c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
201
202 returnValue(True)
203 except Exception as e:
204 log.exception("Exception-during-subscription", e=e)
205 returnValue(False)
206
207 def subscribe(self, topic, callback=None, target_cls=None,
208 max_retry=3):
209 """
210 Scenario 1: invoked to subscribe to a specific topic with a
211 target_cls to invoke when a message is received on that topic. This
212 handles the case of request/response where this library performs the
213 heavy lifting. In this case the m_callback must to be None
214
215 Scenario 2: invoked to subscribe to a specific topic with a
216 specific callback to invoke when a message is received on that topic.
217 This handles the case where the caller wants to process the message
218 received itself. In this case the target_cls must to be None
219
220 :param topic: topic to subscribe to
221 :param callback: Callback to invoke when a message is received on
222 the topic. Either one of callback or target_cls needs can be none
223 :param target_cls: Target class to use when a message is
224 received on the topic. There can only be 1 target_cls per topic.
225 Either one of callback or target_cls needs can be none
226 :param max_retry: the number of retries before reporting failure
227 to subscribe. This caters for scenario where the kafka topic is not
228 ready.
229 :return: True on success, False on failure
230 """
231 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
232
233 def _backoff(msg, retries):
234 wait_time = RETRY_BACKOFF[min(retries,
235 len(RETRY_BACKOFF) - 1)]
236 log.info(msg, retry_in=wait_time)
237 return asleep(wait_time)
238
239 retry = 0
240 while not self._subscribe(topic, callback=callback,
241 target_cls=target_cls):
242 if retry > max_retry:
243 return False
244 else:
245 _backoff("subscription-not-complete", retry)
246 retry += 1
247 return True
248
249 def unsubscribe(self, topic):
250 """
251 Invoked when unsubscribing to a topic
252 :param topic: topic to unsubscibe from
253 :return: None on success or Exception on failure
254 """
255 log.debug("Unsubscribing-to-topic", topic=topic)
256
257 def remove_topic(topic):
258 if topic in self.topic_consumer_map:
259 del self.topic_consumer_map[topic]
260
261 try:
262 if topic in self.topic_consumer_map:
263 consumers = self.topic_consumer_map[topic]
264 d = gatherResults([c.stop() for c in consumers])
265 d.addCallback(remove_topic, topic)
266 log.debug("Unsubscribed-to-topic.", topic=topic)
267 else:
268 log.debug("Topic-does-not-exist.", topic=topic)
269 except Exception as e:
270 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
271
272 @inlineCallbacks
273 def _enqueue_received_message(self, reactor, message_list):
274 """
275 Internal method to continuously queue all received messaged
276 irrespective of topic
277 :param reactor: A requirement by the Twisted Python kafka library
278 :param message_list: Received list of messages
279 :return: None on success, Exception on failure
280 """
281 try:
282 for m in message_list:
283 log.debug("received-msg", msg=m)
284 yield self.received_msg_queue.put(m)
285 except Exception as e:
286 log.exception("Failed-enqueueing-received-message", e=e)
287
288 @inlineCallbacks
289 def _received_message_processing_loop(self):
290 """
291 Internal method to continuously process all received messages one
292 at a time
293 :return: None on success, Exception on failure
294 """
295 while True:
296 try:
297 message = yield self.received_msg_queue.get()
298 yield self._process_message(message)
299 except Exception as e:
300 log.exception("Failed-dequeueing-received-message", e=e)
301
302 def _to_string(self, unicode_str):
303 if unicode_str is not None:
304 if type(unicode_str) == unicode:
305 return unicode_str.encode('ascii', 'ignore')
306 else:
307 return unicode_str
308 else:
309 return None
310
311 def _format_request(self,
312 rpc,
313 to_topic,
314 reply_topic,
315 **kwargs):
316 """
317 Format a request to send over kafka
318 :param rpc: Requested remote API
319 :param to_topic: Topic to send the request
320 :param reply_topic: Topic to receive the resulting response, if any
321 :param kwargs: Dictionary of key-value pairs to pass as arguments to
322 the remote rpc API.
323 :return: A InterContainerMessage message type on success or None on
324 failure
325 """
326 try:
327 transaction_id = uuid4().hex
328 request = InterContainerMessage()
329 request_body = InterContainerRequestBody()
330 request.header.id = transaction_id
331 request.header.type = MessageType.Value("REQUEST")
332 request.header.from_topic = self.default_topic
333 request.header.to_topic = to_topic
334
335 response_required = False
336 if reply_topic:
337 request_body.reply_to_topic = reply_topic
338 response_required = True
339
340 request.header.timestamp = int(round(time.time() * 1000))
341 request_body.rpc = rpc
342 for a, b in kwargs.iteritems():
343 arg = Argument()
344 arg.key = a
345 try:
346 arg.value.Pack(b)
347 request_body.args.extend([arg])
348 except Exception as e:
349 log.exception("Failed-parsing-value", e=e)
350 request_body.reply_to_topic = self.default_topic
351 request_body.response_required = response_required
352 request.body.Pack(request_body)
353 return request, transaction_id, response_required
354 except Exception as e:
355 log.exception("formatting-request-failed",
356 rpc=rpc,
357 to_topic=to_topic,
358 reply_topic=reply_topic,
359 args=kwargs)
360 return None, None, None
361
362 def _format_response(self, msg_header, msg_body, status):
363 """
364 Format a response
365 :param msg_header: The header portion of a received request
366 :param msg_body: The response body
367 :param status: True is this represents a successful response
368 :return: a InterContainerMessage message type
369 """
370 try:
371 assert isinstance(msg_header, Header)
372 response = InterContainerMessage()
373 response_body = InterContainerResponseBody()
374 response.header.id = msg_header.id
375 response.header.timestamp = int(
376 round(time.time() * 1000))
377 response.header.type = MessageType.Value("RESPONSE")
378 response.header.from_topic = msg_header.to_topic
379 response.header.to_topic = msg_header.from_topic
380 if msg_body is not None:
381 response_body.result.Pack(msg_body)
382 response_body.success = status
383 response.body.Pack(response_body)
384 return response
385 except Exception as e:
386 log.exception("formatting-response-failed", header=msg_header,
387 body=msg_body, status=status, e=e)
388 return None
389
390 def _parse_response(self, msg):
391 try:
392 message = InterContainerMessage()
393 message.ParseFromString(msg)
394 resp = InterContainerResponseBody()
395 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
396 message.body.Unpack(resp)
397 else:
398 log.debug("unsupported-msg", msg_type=type(message.body))
399 return None
400 log.debug("parsed-response", input=message, output=resp)
401 return resp
402 except Exception as e:
403 log.exception("parsing-response-failed", msg=msg, e=e)
404 return None
405
406 @inlineCallbacks
407 def _process_message(self, m):
408 """
409 Default internal method invoked for every batch of messages received
410 from Kafka.
411 """
412 def _toDict(args):
413 """
414 Convert a repeatable Argument type into a python dictionary
415 :param args: Repeatable core_adapter.Argument type
416 :return: a python dictionary
417 """
418 if args is None:
419 return None
420 result = {}
421 for arg in args:
422 assert isinstance(arg, Argument)
423 result[arg.key] = arg.value
424 return result
425
426 current_time = int(round(time.time() * 1000))
427 # log.debug("Got Message", message=m)
428 try:
429 val = m.message.value
430 # print m.topic
431
432 # Go over customized callbacks first
433 if m.topic in self.topic_callback_map:
434 for c in self.topic_callback_map[m.topic]:
435 yield c(val)
436
437 # Check whether we need to process request/response scenario
438 if m.topic not in self.topic_target_cls_map:
439 return
440
441 # Process request/response scenario
442 message = InterContainerMessage()
443 message.ParseFromString(val)
444
445 if message.header.type == MessageType.Value("REQUEST"):
446 # if self.num_messages == 0:
447 # self.init_time = int(round(time.time() * 1000))
448 # self.init_received_time = message.header.timestamp
449 # log.debug("INIT_TIME", time=self.init_time,
450 # time_sent=message.header.timestamp)
451 # self.num_messages = self.num_messages + 1
452 #
453 # self.total_time = self.total_time + current_time - message.header.timestamp
454 #
455 # if self.num_messages % 10 == 0:
456 # log.debug("TOTAL_TIME ...",
457 # num=self.num_messages,
458 # total=self.total_time,
459 # duration=current_time - self.init_time,
460 # time_since_first_msg=current_time - self.init_received_time,
461 # average=self.total_time / 10)
462 # self.total_time = 0
463
464 # Get the target class for that specific topic
465 targetted_topic = self._to_string(message.header.to_topic)
466 msg_body = InterContainerRequestBody()
467 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
468 message.body.Unpack(msg_body)
469 else:
470 log.debug("unsupported-msg", msg_type=type(message.body))
471 return
472 if targetted_topic in self.topic_target_cls_map:
473 if msg_body.args:
474 log.debug("message-body-args-present", body=msg_body)
475 (status, res) = yield getattr(
476 self.topic_target_cls_map[targetted_topic],
477 self._to_string(msg_body.rpc))(
478 **_toDict(msg_body.args))
479 else:
480 log.debug("message-body-args-absent", body=msg_body,
481 rpc=msg_body.rpc)
482 (status, res) = yield getattr(
483 self.topic_target_cls_map[targetted_topic],
484 self._to_string(msg_body.rpc))()
485 if msg_body.response_required:
486 response = self._format_response(
487 msg_header=message.header,
488 msg_body=res,
489 status=status,
490 )
491 if response is not None:
492 res_topic = self._to_string(
493 response.header.to_topic)
494 self._send_kafka_message(res_topic, response)
495
496 log.debug("Response-sent", response=response.body)
497 elif message.header.type == MessageType.Value("RESPONSE"):
498 trns_id = self._to_string(message.header.id)
499 if trns_id in self.transaction_id_deferred_map:
500 # self.num_responses = self.num_responses + 1
501 # self.total_time_responses = self.total_time_responses + current_time - \
502 # message.header.timestamp
503 # if self.num_responses % 10 == 0:
504 # log.debug("TOTAL RESPONSES ...",
505 # num=self.num_responses,
506 # total=self.total_time_responses,
507 # average=self.total_time_responses / 10)
508 # self.total_time_responses = 0
509
510 resp = self._parse_response(val)
511
512 self.transaction_id_deferred_map[trns_id].callback(resp)
513 else:
514 log.error("!!INVALID-TRANSACTION-TYPE!!")
515
516 except Exception as e:
517 log.exception("Failed-to-process-message", message=m, e=e)
518
519 @inlineCallbacks
520 def _send_kafka_message(self, topic, msg):
521 try:
522 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
523 except Exception, e:
524 log.exception("Failed-sending-message", message=msg, e=e)
525
526 @inlineCallbacks
527 def send_request(self,
528 rpc,
529 to_topic,
530 reply_topic=None,
531 callback=None,
532 **kwargs):
533 """
534 Invoked to send a message to a remote container and receive a
535 response if required.
536 :param rpc: The remote API to invoke
537 :param to_topic: Send the message to this kafka topic
538 :param reply_topic: If not None then a response is expected on this
539 topic. If set to None then no response is required.
540 :param callback: Callback to invoke when a response is received.
541 :param kwargs: Key-value pairs representing arguments to pass to the
542 rpc remote API.
543 :return: Either no response is required, or a response is returned
544 via the callback or the response is a tuple of (status, return_cls)
545 """
546 try:
547 # Ensure all strings are not unicode encoded
548 rpc = self._to_string(rpc)
549 to_topic = self._to_string(to_topic)
550 reply_topic = self._to_string(reply_topic)
551
552 request, transaction_id, response_required = \
553 self._format_request(
554 rpc=rpc,
555 to_topic=to_topic,
556 reply_topic=reply_topic,
557 **kwargs)
558
559 if request is None:
560 return
561
562 # Add the transaction to the transaction map before sending the
563 # request. This will guarantee the eventual response will be
564 # processed.
565 wait_for_result = None
566 if response_required:
567 wait_for_result = Deferred()
568 self.transaction_id_deferred_map[
569 self._to_string(request.header.id)] = wait_for_result
570
571 log.debug("BEFORE-SENDING", to_topic=to_topic, from_topic=reply_topic)
572 yield self._send_kafka_message(to_topic, request)
573 log.debug("AFTER-SENDING", to_topic=to_topic, from_topic=reply_topic)
574
575 if response_required:
576 res = yield wait_for_result
577
578 if res is None or not res.success:
579 raise KafkaMessagingError(error="Failed-response:{"
580 "}".format(res))
581
582 # Remove the transaction from the transaction map
583 del self.transaction_id_deferred_map[transaction_id]
584
585 log.debug("send-message-response", rpc=rpc, result=res)
586
587 if callback:
588 callback((res.success, res.result))
589 else:
590 returnValue((res.success, res.result))
591 except Exception as e:
592 log.exception("Exception-sending-request", e=e)
593 raise KafkaMessagingError(error=e)
594
595
596# Common method to get the singleton instance of the kafka proxy class
597def get_messaging_proxy():
598 return IKafkaMessagingProxy._kafka_messaging_instance