blob: 3f6f5ebc522834efd61601b5aa992079e051a8a6 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2
Zack Williams998f4422018-09-19 10:38:57 -07003# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
khenaidoo6fdf0ba2018-11-02 14:38:33 -040017import time
18from uuid import uuid4
19
20import structlog
21from afkak.client import KafkaClient
22from afkak.consumer import OFFSET_LATEST, Consumer
khenaidoob9203542018-09-17 22:56:37 -040023from twisted.internet import reactor
24from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
25 DeferredQueue, gatherResults
khenaidoo6fdf0ba2018-11-02 14:38:33 -040026from zope.interface import implementer
khenaidoob9203542018-09-17 22:56:37 -040027
khenaidoo6fdf0ba2018-11-02 14:38:33 -040028from adapters.common.utils import asleep
29from adapters.common.utils.registry import IComponent
30from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
31from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
32 InterContainerRequestBody, InterContainerMessage, Header, \
33 InterContainerResponseBody
khenaidoob9203542018-09-17 22:56:37 -040034
35log = structlog.get_logger()
36
khenaidoo6fdf0ba2018-11-02 14:38:33 -040037
khenaidoob9203542018-09-17 22:56:37 -040038class KafkaMessagingError(BaseException):
39 def __init__(self, error):
40 self.error = error
41
khenaidoo6fdf0ba2018-11-02 14:38:33 -040042
khenaidoob9203542018-09-17 22:56:37 -040043@implementer(IComponent)
44class IKafkaMessagingProxy(object):
45 _kafka_messaging_instance = None
46
47 def __init__(self,
48 kafka_host_port,
49 kv_store,
50 default_topic,
51 target_cls):
52 """
53 Initialize the kafka proxy. This is a singleton (may change to
54 non-singleton if performance is better)
55 :param kafka_host_port: Kafka host and port
56 :param kv_store: Key-Value store
57 :param default_topic: Default topic to subscribe to
58 :param target_cls: target class - method of that class is invoked
59 when a message is received on the default_topic
60 """
61 # return an exception if the object already exist
62 if IKafkaMessagingProxy._kafka_messaging_instance:
63 raise Exception(
64 'Singleton-exist', cls=IKafkaMessagingProxy)
65
66 log.debug("Initializing-KafkaProxy")
67 self.kafka_host_port = kafka_host_port
68 self.kv_store = kv_store
69 self.default_topic = default_topic
70 self.target_cls = target_cls
71 self.topic_target_cls_map = {}
72 self.topic_consumer_map = {}
73 self.topic_callback_map = {}
74 self.subscribers = {}
75 self.kafka_client = None
76 self.kafka_proxy = None
77 self.transaction_id_deferred_map = {}
78 self.received_msg_queue = DeferredQueue()
79
80 self.init_time = 0
81 self.init_received_time = 0
82
83 self.init_resp_time = 0
84 self.init_received_resp_time = 0
85
86 self.num_messages = 0
87 self.total_time = 0
88 self.num_responses = 0
89 self.total_time_responses = 0
90 log.debug("KafkaProxy-initialized")
91
92 def start(self):
93 try:
94 # Create the kafka client
95 # assert self.kafka_host is not None
96 # assert self.kafka_port is not None
97 # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
98 self.kafka_client = KafkaClient(self.kafka_host_port)
99
100 # Get the kafka proxy instance. If it does not exist then
101 # create it
102 self.kafka_proxy = get_kafka_proxy()
103 if self.kafka_proxy == None:
104 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
105 self.kafka_proxy = get_kafka_proxy()
106
107 # Subscribe the default topic and target_cls
108 self.topic_target_cls_map[self.default_topic] = self.target_cls
109
110 # Start the queue to handle incoming messages
111 reactor.callLater(0, self._received_message_processing_loop)
112
113 # Start listening for incoming messages
114 reactor.callLater(0, self.subscribe, self.default_topic,
115 target_cls=self.target_cls)
116
117 # Setup the singleton instance
118 IKafkaMessagingProxy._kafka_messaging_instance = self
119 except Exception as e:
120 log.exception("Failed-to-start-proxy", e=e)
121
khenaidoob9203542018-09-17 22:56:37 -0400122 def stop(self):
123 """
124 Invoked to stop the kafka proxy
125 :return: None on success, Exception on failure
126 """
127 log.debug("Stopping-messaging-proxy ...")
128 try:
129 # Stop all the consumers
130 deferred_list = []
131 for key, values in self.topic_consumer_map.iteritems():
132 deferred_list.extend([c.stop() for c in values])
133
134 if not deferred_list:
135 d = gatherResults(deferred_list)
136 d.addCallback(lambda result: self.kafka_client.close())
137 log.debug("Messaging-proxy-stopped.")
138 except Exception as e:
139 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
140
khenaidoob9203542018-09-17 22:56:37 -0400141 @inlineCallbacks
142 def _wait_until_topic_is_ready(self, client, topic):
143 e = True
144 while e:
145 yield client.load_metadata_for_topics(topic)
146 e = client.metadata_error_for_topic(topic)
147 if e:
148 log.debug("Topic-not-ready-retrying...", topic=topic)
149
150 def _clear_backoff(self):
151 if self.retries:
152 log.info('reconnected-to-consul', after_retries=self.retries)
153 self.retries = 0
154
155 @inlineCallbacks
156 def _subscribe(self, topic, callback=None, target_cls=None):
157 try:
158 yield self._wait_until_topic_is_ready(self.kafka_client, topic)
159 partitions = self.kafka_client.topic_partitions[topic]
160 consumers = []
161
162 # First setup the generic callback - all received messages will
163 # go through that queue
164 if topic not in self.topic_consumer_map:
165 consumers = [Consumer(self.kafka_client, topic, partition,
166 self._enqueue_received_message)
167 for partition in partitions]
168 self.topic_consumer_map[topic] = consumers
169
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400170 log.debug("_subscribe", topic=topic,
171 consumermap=self.topic_consumer_map)
khenaidoob9203542018-09-17 22:56:37 -0400172
173 if target_cls is not None and callback is None:
174 # Scenario #1
175 if topic not in self.topic_target_cls_map:
176 self.topic_target_cls_map[topic] = target_cls
177 elif target_cls is None and callback is not None:
178 # Scenario #2
179 log.debug("custom-callback", topic=topic,
180 callback_map=self.topic_callback_map)
181 if topic not in self.topic_callback_map:
182 self.topic_callback_map[topic] = [callback]
183 else:
184 self.topic_callback_map[topic].extend([callback])
185 else:
186 log.warn("invalid-parameters")
187
188 def cb_closed(result):
189 """
190 Called when a consumer cleanly stops.
191 """
192 log.debug("Consumers-cleanly-stopped")
193
194 def eb_failed(failure):
195 """
196 Called when a consumer fails due to an uncaught exception in the
197 processing callback or a network error on shutdown. In this case we
198 simply log the error.
199 """
200 log.warn("Consumers-failed", failure=failure)
201
202 for c in consumers:
203 c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
204
205 returnValue(True)
206 except Exception as e:
207 log.exception("Exception-during-subscription", e=e)
208 returnValue(False)
209
210 def subscribe(self, topic, callback=None, target_cls=None,
211 max_retry=3):
212 """
213 Scenario 1: invoked to subscribe to a specific topic with a
214 target_cls to invoke when a message is received on that topic. This
215 handles the case of request/response where this library performs the
216 heavy lifting. In this case the m_callback must to be None
217
218 Scenario 2: invoked to subscribe to a specific topic with a
219 specific callback to invoke when a message is received on that topic.
220 This handles the case where the caller wants to process the message
221 received itself. In this case the target_cls must to be None
222
223 :param topic: topic to subscribe to
224 :param callback: Callback to invoke when a message is received on
225 the topic. Either one of callback or target_cls needs can be none
226 :param target_cls: Target class to use when a message is
227 received on the topic. There can only be 1 target_cls per topic.
228 Either one of callback or target_cls needs can be none
229 :param max_retry: the number of retries before reporting failure
230 to subscribe. This caters for scenario where the kafka topic is not
231 ready.
232 :return: True on success, False on failure
233 """
234 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
235
236 def _backoff(msg, retries):
237 wait_time = RETRY_BACKOFF[min(retries,
238 len(RETRY_BACKOFF) - 1)]
239 log.info(msg, retry_in=wait_time)
240 return asleep(wait_time)
241
242 retry = 0
243 while not self._subscribe(topic, callback=callback,
244 target_cls=target_cls):
245 if retry > max_retry:
246 return False
247 else:
248 _backoff("subscription-not-complete", retry)
249 retry += 1
250 return True
251
252 def unsubscribe(self, topic):
253 """
254 Invoked when unsubscribing to a topic
255 :param topic: topic to unsubscibe from
256 :return: None on success or Exception on failure
257 """
258 log.debug("Unsubscribing-to-topic", topic=topic)
259
260 def remove_topic(topic):
261 if topic in self.topic_consumer_map:
262 del self.topic_consumer_map[topic]
263
264 try:
265 if topic in self.topic_consumer_map:
266 consumers = self.topic_consumer_map[topic]
267 d = gatherResults([c.stop() for c in consumers])
268 d.addCallback(remove_topic, topic)
269 log.debug("Unsubscribed-to-topic.", topic=topic)
270 else:
271 log.debug("Topic-does-not-exist.", topic=topic)
272 except Exception as e:
273 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
274
275 @inlineCallbacks
276 def _enqueue_received_message(self, reactor, message_list):
277 """
278 Internal method to continuously queue all received messaged
279 irrespective of topic
280 :param reactor: A requirement by the Twisted Python kafka library
281 :param message_list: Received list of messages
282 :return: None on success, Exception on failure
283 """
284 try:
285 for m in message_list:
286 log.debug("received-msg", msg=m)
287 yield self.received_msg_queue.put(m)
288 except Exception as e:
289 log.exception("Failed-enqueueing-received-message", e=e)
290
291 @inlineCallbacks
292 def _received_message_processing_loop(self):
293 """
294 Internal method to continuously process all received messages one
295 at a time
296 :return: None on success, Exception on failure
297 """
298 while True:
299 try:
300 message = yield self.received_msg_queue.get()
301 yield self._process_message(message)
302 except Exception as e:
303 log.exception("Failed-dequeueing-received-message", e=e)
304
305 def _to_string(self, unicode_str):
306 if unicode_str is not None:
307 if type(unicode_str) == unicode:
308 return unicode_str.encode('ascii', 'ignore')
309 else:
310 return unicode_str
311 else:
312 return None
313
314 def _format_request(self,
315 rpc,
316 to_topic,
317 reply_topic,
318 **kwargs):
319 """
320 Format a request to send over kafka
321 :param rpc: Requested remote API
322 :param to_topic: Topic to send the request
323 :param reply_topic: Topic to receive the resulting response, if any
324 :param kwargs: Dictionary of key-value pairs to pass as arguments to
325 the remote rpc API.
326 :return: A InterContainerMessage message type on success or None on
327 failure
328 """
329 try:
330 transaction_id = uuid4().hex
331 request = InterContainerMessage()
332 request_body = InterContainerRequestBody()
333 request.header.id = transaction_id
334 request.header.type = MessageType.Value("REQUEST")
335 request.header.from_topic = self.default_topic
336 request.header.to_topic = to_topic
337
338 response_required = False
339 if reply_topic:
340 request_body.reply_to_topic = reply_topic
341 response_required = True
342
343 request.header.timestamp = int(round(time.time() * 1000))
344 request_body.rpc = rpc
345 for a, b in kwargs.iteritems():
346 arg = Argument()
347 arg.key = a
348 try:
349 arg.value.Pack(b)
350 request_body.args.extend([arg])
351 except Exception as e:
352 log.exception("Failed-parsing-value", e=e)
353 request_body.reply_to_topic = self.default_topic
354 request_body.response_required = response_required
355 request.body.Pack(request_body)
356 return request, transaction_id, response_required
357 except Exception as e:
358 log.exception("formatting-request-failed",
359 rpc=rpc,
360 to_topic=to_topic,
361 reply_topic=reply_topic,
362 args=kwargs)
363 return None, None, None
364
365 def _format_response(self, msg_header, msg_body, status):
366 """
367 Format a response
368 :param msg_header: The header portion of a received request
369 :param msg_body: The response body
370 :param status: True is this represents a successful response
371 :return: a InterContainerMessage message type
372 """
373 try:
374 assert isinstance(msg_header, Header)
375 response = InterContainerMessage()
376 response_body = InterContainerResponseBody()
377 response.header.id = msg_header.id
378 response.header.timestamp = int(
379 round(time.time() * 1000))
380 response.header.type = MessageType.Value("RESPONSE")
381 response.header.from_topic = msg_header.to_topic
382 response.header.to_topic = msg_header.from_topic
383 if msg_body is not None:
384 response_body.result.Pack(msg_body)
385 response_body.success = status
386 response.body.Pack(response_body)
387 return response
388 except Exception as e:
389 log.exception("formatting-response-failed", header=msg_header,
390 body=msg_body, status=status, e=e)
391 return None
392
393 def _parse_response(self, msg):
394 try:
395 message = InterContainerMessage()
396 message.ParseFromString(msg)
397 resp = InterContainerResponseBody()
398 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
399 message.body.Unpack(resp)
400 else:
401 log.debug("unsupported-msg", msg_type=type(message.body))
402 return None
403 log.debug("parsed-response", input=message, output=resp)
404 return resp
405 except Exception as e:
406 log.exception("parsing-response-failed", msg=msg, e=e)
407 return None
408
409 @inlineCallbacks
410 def _process_message(self, m):
411 """
412 Default internal method invoked for every batch of messages received
413 from Kafka.
414 """
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400415
khenaidoob9203542018-09-17 22:56:37 -0400416 def _toDict(args):
417 """
418 Convert a repeatable Argument type into a python dictionary
419 :param args: Repeatable core_adapter.Argument type
420 :return: a python dictionary
421 """
422 if args is None:
423 return None
424 result = {}
425 for arg in args:
426 assert isinstance(arg, Argument)
427 result[arg.key] = arg.value
428 return result
429
430 current_time = int(round(time.time() * 1000))
431 # log.debug("Got Message", message=m)
432 try:
433 val = m.message.value
434 # print m.topic
435
436 # Go over customized callbacks first
437 if m.topic in self.topic_callback_map:
438 for c in self.topic_callback_map[m.topic]:
439 yield c(val)
440
441 # Check whether we need to process request/response scenario
442 if m.topic not in self.topic_target_cls_map:
443 return
444
445 # Process request/response scenario
446 message = InterContainerMessage()
447 message.ParseFromString(val)
448
449 if message.header.type == MessageType.Value("REQUEST"):
khenaidoob9203542018-09-17 22:56:37 -0400450 # Get the target class for that specific topic
451 targetted_topic = self._to_string(message.header.to_topic)
452 msg_body = InterContainerRequestBody()
453 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
454 message.body.Unpack(msg_body)
455 else:
456 log.debug("unsupported-msg", msg_type=type(message.body))
457 return
458 if targetted_topic in self.topic_target_cls_map:
459 if msg_body.args:
460 log.debug("message-body-args-present", body=msg_body)
461 (status, res) = yield getattr(
462 self.topic_target_cls_map[targetted_topic],
463 self._to_string(msg_body.rpc))(
464 **_toDict(msg_body.args))
465 else:
466 log.debug("message-body-args-absent", body=msg_body,
467 rpc=msg_body.rpc)
468 (status, res) = yield getattr(
469 self.topic_target_cls_map[targetted_topic],
470 self._to_string(msg_body.rpc))()
471 if msg_body.response_required:
472 response = self._format_response(
473 msg_header=message.header,
474 msg_body=res,
475 status=status,
476 )
477 if response is not None:
478 res_topic = self._to_string(
479 response.header.to_topic)
480 self._send_kafka_message(res_topic, response)
481
482 log.debug("Response-sent", response=response.body)
483 elif message.header.type == MessageType.Value("RESPONSE"):
484 trns_id = self._to_string(message.header.id)
485 if trns_id in self.transaction_id_deferred_map:
khenaidoob9203542018-09-17 22:56:37 -0400486 resp = self._parse_response(val)
487
488 self.transaction_id_deferred_map[trns_id].callback(resp)
489 else:
490 log.error("!!INVALID-TRANSACTION-TYPE!!")
491
492 except Exception as e:
493 log.exception("Failed-to-process-message", message=m, e=e)
494
495 @inlineCallbacks
496 def _send_kafka_message(self, topic, msg):
497 try:
498 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
499 except Exception, e:
500 log.exception("Failed-sending-message", message=msg, e=e)
501
502 @inlineCallbacks
503 def send_request(self,
504 rpc,
505 to_topic,
506 reply_topic=None,
507 callback=None,
508 **kwargs):
509 """
510 Invoked to send a message to a remote container and receive a
511 response if required.
512 :param rpc: The remote API to invoke
513 :param to_topic: Send the message to this kafka topic
514 :param reply_topic: If not None then a response is expected on this
515 topic. If set to None then no response is required.
516 :param callback: Callback to invoke when a response is received.
517 :param kwargs: Key-value pairs representing arguments to pass to the
518 rpc remote API.
519 :return: Either no response is required, or a response is returned
520 via the callback or the response is a tuple of (status, return_cls)
521 """
522 try:
523 # Ensure all strings are not unicode encoded
524 rpc = self._to_string(rpc)
525 to_topic = self._to_string(to_topic)
526 reply_topic = self._to_string(reply_topic)
527
528 request, transaction_id, response_required = \
529 self._format_request(
530 rpc=rpc,
531 to_topic=to_topic,
532 reply_topic=reply_topic,
533 **kwargs)
534
535 if request is None:
536 return
537
538 # Add the transaction to the transaction map before sending the
539 # request. This will guarantee the eventual response will be
540 # processed.
541 wait_for_result = None
542 if response_required:
543 wait_for_result = Deferred()
544 self.transaction_id_deferred_map[
545 self._to_string(request.header.id)] = wait_for_result
546
khenaidoob9203542018-09-17 22:56:37 -0400547 yield self._send_kafka_message(to_topic, request)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400548 log.debug("message-sent", to_topic=to_topic,
549 from_topic=reply_topic)
khenaidoob9203542018-09-17 22:56:37 -0400550
551 if response_required:
552 res = yield wait_for_result
553
554 if res is None or not res.success:
555 raise KafkaMessagingError(error="Failed-response:{"
556 "}".format(res))
557
558 # Remove the transaction from the transaction map
559 del self.transaction_id_deferred_map[transaction_id]
560
561 log.debug("send-message-response", rpc=rpc, result=res)
562
563 if callback:
564 callback((res.success, res.result))
565 else:
566 returnValue((res.success, res.result))
567 except Exception as e:
568 log.exception("Exception-sending-request", e=e)
569 raise KafkaMessagingError(error=e)
570
571
572# Common method to get the singleton instance of the kafka proxy class
573def get_messaging_proxy():
574 return IKafkaMessagingProxy._kafka_messaging_instance