blob: 5cad2e855995aeea2b5b084a0a505109c9d6c753 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#!/usr/bin/env python
2
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import time
18from uuid import uuid4
19
20import structlog
21from twisted.internet import reactor
22from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
23 DeferredQueue, gatherResults
24from zope.interface import implementer
25
26from python.common.utils import asleep
27from python.common.utils.registry import IComponent
28from kafka_proxy import KafkaProxy, get_kafka_proxy
29from python.protos.inter_container_pb2 import MessageType, Argument, \
30 InterContainerRequestBody, InterContainerMessage, Header, \
31 InterContainerResponseBody
32
33log = structlog.get_logger()
34
35KAFKA_OFFSET_LATEST = 'latest'
36KAFKA_OFFSET_EARLIEST = 'earliest'
37
38
39class KafkaMessagingError(BaseException):
40 def __init__(self, error):
41 self.error = error
42
43
44@implementer(IComponent)
45class IKafkaMessagingProxy(object):
46 _kafka_messaging_instance = None
47
48 def __init__(self,
49 kafka_host_port,
50 kv_store,
51 default_topic,
52 group_id_prefix,
53 target_cls):
54 """
55 Initialize the kafka proxy. This is a singleton (may change to
56 non-singleton if performance is better)
57 :param kafka_host_port: Kafka host and port
58 :param kv_store: Key-Value store
59 :param default_topic: Default topic to subscribe to
60 :param target_cls: target class - method of that class is invoked
61 when a message is received on the default_topic
62 """
63 # return an exception if the object already exist
64 if IKafkaMessagingProxy._kafka_messaging_instance:
65 raise Exception(
66 'Singleton-exist', cls=IKafkaMessagingProxy)
67
68 log.debug("Initializing-KafkaProxy")
69 self.kafka_host_port = kafka_host_port
70 self.kv_store = kv_store
71 self.default_topic = default_topic
72 self.default_group_id = "_".join((group_id_prefix, default_topic))
73 self.target_cls = target_cls
74 self.topic_target_cls_map = {}
75 self.topic_callback_map = {}
76 self.subscribers = {}
77 self.kafka_proxy = None
78 self.transaction_id_deferred_map = {}
79 self.received_msg_queue = DeferredQueue()
80 self.stopped = False
81
82 self.init_time = 0
83 self.init_received_time = 0
84
85 self.init_resp_time = 0
86 self.init_received_resp_time = 0
87
88 self.num_messages = 0
89 self.total_time = 0
90 self.num_responses = 0
91 self.total_time_responses = 0
92 log.debug("KafkaProxy-initialized")
93
94 def start(self):
95 try:
96 log.debug("KafkaProxy-starting")
97
98 # Get the kafka proxy instance. If it does not exist then
99 # create it
100 self.kafka_proxy = get_kafka_proxy()
101 if self.kafka_proxy == None:
102 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
103 self.kafka_proxy = get_kafka_proxy()
104
105 # Subscribe the default topic and target_cls
106 self.topic_target_cls_map[self.default_topic] = self.target_cls
107
108 # Start the queue to handle incoming messages
109 reactor.callLater(0, self._received_message_processing_loop)
110
111 # Subscribe using the default topic and default group id. Whenever
112 # a message is received on that topic then teh target_cls will be
113 # invoked.
114 reactor.callLater(0, self.subscribe,
115 topic=self.default_topic,
116 target_cls=self.target_cls,
117 group_id=self.default_group_id)
118
119 # Setup the singleton instance
120 IKafkaMessagingProxy._kafka_messaging_instance = self
121 log.debug("KafkaProxy-started")
122 except Exception as e:
123 log.exception("Failed-to-start-proxy", e=e)
124
125 def stop(self):
126 """
127 Invoked to stop the kafka proxy
128 :return: None on success, Exception on failure
129 """
130 log.debug("Stopping-messaging-proxy ...")
131 try:
132 # Stop the kafka proxy. This will stop all the consumers
133 # and producers
134 self.stopped = True
135 self.kafka_proxy.stop()
136 log.debug("Messaging-proxy-stopped.")
137 except Exception as e:
138 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
139
140 def get_target_cls(self):
141 return self.target_cls
142
143 def get_default_topic(self):
144 return self.default_topic
145
146 @inlineCallbacks
147 def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
148 target_cls=None):
149 try:
150 log.debug("subscribing-to-topic-start", topic=topic)
151 yield self.kafka_proxy.subscribe(topic,
152 self._enqueue_received_group_message,
153 group_id, offset)
154
155 if target_cls is not None and callback is None:
156 # Scenario #1
157 if topic not in self.topic_target_cls_map:
158 self.topic_target_cls_map[topic] = target_cls
159 elif target_cls is None and callback is not None:
160 # Scenario #2
161 log.debug("custom-callback", topic=topic,
162 callback_map=self.topic_callback_map)
163 if topic not in self.topic_callback_map:
164 self.topic_callback_map[topic] = [callback]
165 else:
166 self.topic_callback_map[topic].extend([callback])
167 else:
168 log.warn("invalid-parameters")
169
170 returnValue(True)
171 except Exception as e:
172 log.exception("Exception-during-subscription", e=e)
173 returnValue(False)
174
175 @inlineCallbacks
176 def subscribe(self, topic, callback=None, target_cls=None,
177 max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
178 """
179 Scenario 1: invoked to subscribe to a specific topic with a
180 target_cls to invoke when a message is received on that topic. This
181 handles the case of request/response where this library performs the
182 heavy lifting. In this case the m_callback must to be None
183
184 Scenario 2: invoked to subscribe to a specific topic with a
185 specific callback to invoke when a message is received on that topic.
186 This handles the case where the caller wants to process the message
187 received itself. In this case the target_cls must to be None
188
189 :param topic: topic to subscribe to
190 :param callback: Callback to invoke when a message is received on
191 the topic. Either one of callback or target_cls needs can be none
192 :param target_cls: Target class to use when a message is
193 received on the topic. There can only be 1 target_cls per topic.
194 Either one of callback or target_cls needs can be none
195 :param max_retry: the number of retries before reporting failure
196 to subscribe. This caters for scenario where the kafka topic is not
197 ready.
198 :param group_id: The ID of the group the consumer is subscribing to
199 :param offset: The topic offset on the kafka bus from where message consumption will start
200 :return: True on success, False on failure
201 """
202 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
203
204 def _backoff(msg, retries):
205 wait_time = RETRY_BACKOFF[min(retries,
206 len(RETRY_BACKOFF) - 1)]
207 log.info(msg, retry_in=wait_time)
208 return asleep.asleep(wait_time)
209
210 log.debug("subscribing", topic=topic, group_id=group_id,
211 callback=callback, target=target_cls)
212
213 retry = 0
214 subscribed = False
215 if group_id is None:
216 group_id = self.default_group_id
217 while not subscribed:
218 subscribed = yield self._subscribe_group_consumer(group_id, topic,
219 callback=callback,
220 target_cls=target_cls,
221 offset=offset)
222 if subscribed:
223 returnValue(True)
224 elif retry > max_retry:
225 returnValue(False)
226 else:
227 _backoff("subscription-not-complete", retry)
228 retry += 1
229
230 def unsubscribe(self, topic, callback=None, target_cls=None):
231 """
232 Invoked when unsubscribing to a topic
233 :param topic: topic to unsubscribe from
234 :param callback: the callback used when subscribing to the topic, if any
235 :param target_cls: the targert class used when subscribing to the topic, if any
236 :return: None on success or Exception on failure
237 """
238 log.debug("Unsubscribing-to-topic", topic=topic)
239
240 try:
241 self.kafka_proxy.unsubscribe(topic,
242 self._enqueue_received_group_message)
243
244 if callback is None and target_cls is None:
245 log.error("both-call-and-target-cls-cannot-be-none",
246 topic=topic)
247 raise KafkaMessagingError(
248 error="both-call-and-target-cls-cannot-be-none")
249
250 if target_cls is not None and topic in self.topic_target_cls_map:
251 del self.topic_target_cls_map[topic]
252
253 if callback is not None and topic in self.topic_callback_map:
254 index = 0
255 for cb in self.topic_callback_map[topic]:
256 if cb == callback:
257 break
258 index += 1
259 if index < len(self.topic_callback_map[topic]):
260 self.topic_callback_map[topic].pop(index)
261
262 if len(self.topic_callback_map[topic]) == 0:
263 del self.topic_callback_map[topic]
264 except Exception as e:
265 log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
266 e=e)
267 return e
268
269 @inlineCallbacks
270 def _enqueue_received_group_message(self, msg):
271 """
272 Internal method to continuously queue all received messaged
273 irrespective of topic
274 :param msg: Received message
275 :return: None on success, Exception on failure
276 """
277 try:
278 log.debug("received-msg", msg=msg)
279 yield self.received_msg_queue.put(msg)
280 except Exception as e:
281 log.exception("Failed-enqueueing-received-message", e=e)
282
283 @inlineCallbacks
284 def _received_message_processing_loop(self):
285 """
286 Internal method to continuously process all received messages one
287 at a time
288 :return: None on success, Exception on failure
289 """
290 while True:
291 try:
292 message = yield self.received_msg_queue.get()
293 yield self._process_message(message)
294 if self.stopped:
295 break
296 except Exception as e:
297 log.exception("Failed-dequeueing-received-message", e=e)
298
299 def _to_string(self, unicode_str):
300 if unicode_str is not None:
301 if type(unicode_str) == unicode:
302 return unicode_str.encode('ascii', 'ignore')
303 else:
304 return unicode_str
305 else:
306 return None
307
308 def _format_request(self,
309 rpc,
310 to_topic,
311 reply_topic,
312 **kwargs):
313 """
314 Format a request to send over kafka
315 :param rpc: Requested remote API
316 :param to_topic: Topic to send the request
317 :param reply_topic: Topic to receive the resulting response, if any
318 :param kwargs: Dictionary of key-value pairs to pass as arguments to
319 the remote rpc API.
320 :return: A InterContainerMessage message type on success or None on
321 failure
322 """
323 try:
324 transaction_id = uuid4().hex
325 request = InterContainerMessage()
326 request_body = InterContainerRequestBody()
327 request.header.id = transaction_id
328 request.header.type = MessageType.Value("REQUEST")
329 request.header.from_topic = reply_topic
330 request.header.to_topic = to_topic
331
332 response_required = False
333 if reply_topic:
334 request_body.reply_to_topic = reply_topic
335 request_body.response_required = True
336 response_required = True
337
338 request.header.timestamp = int(round(time.time() * 1000))
339 request_body.rpc = rpc
340 for a, b in kwargs.iteritems():
341 arg = Argument()
342 arg.key = a
343 try:
344 arg.value.Pack(b)
345 request_body.args.extend([arg])
346 except Exception as e:
347 log.exception("Failed-parsing-value", e=e)
348 request.body.Pack(request_body)
349 return request, transaction_id, response_required
350 except Exception as e:
351 log.exception("formatting-request-failed",
352 rpc=rpc,
353 to_topic=to_topic,
354 reply_topic=reply_topic,
355 args=kwargs)
356 return None, None, None
357
358 def _format_response(self, msg_header, msg_body, status):
359 """
360 Format a response
361 :param msg_header: The header portion of a received request
362 :param msg_body: The response body
363 :param status: True is this represents a successful response
364 :return: a InterContainerMessage message type
365 """
366 try:
367 assert isinstance(msg_header, Header)
368 response = InterContainerMessage()
369 response_body = InterContainerResponseBody()
370 response.header.id = msg_header.id
371 response.header.timestamp = int(
372 round(time.time() * 1000))
373 response.header.type = MessageType.Value("RESPONSE")
374 response.header.from_topic = msg_header.to_topic
375 response.header.to_topic = msg_header.from_topic
376 if msg_body is not None:
377 response_body.result.Pack(msg_body)
378 response_body.success = status
379 response.body.Pack(response_body)
380 return response
381 except Exception as e:
382 log.exception("formatting-response-failed", header=msg_header,
383 body=msg_body, status=status, e=e)
384 return None
385
386 def _parse_response(self, msg):
387 try:
388 message = InterContainerMessage()
389 message.ParseFromString(msg)
390 resp = InterContainerResponseBody()
391 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
392 message.body.Unpack(resp)
393 else:
394 log.debug("unsupported-msg", msg_type=type(message.body))
395 return None
396 log.debug("parsed-response", input=message, output=resp)
397 return resp
398 except Exception as e:
399 log.exception("parsing-response-failed", msg=msg, e=e)
400 return None
401
402 @inlineCallbacks
403 def _process_message(self, m):
404 """
405 Default internal method invoked for every batch of messages received
406 from Kafka.
407 """
408
409 def _toDict(args):
410 """
411 Convert a repeatable Argument type into a python dictionary
412 :param args: Repeatable core_adapter.Argument type
413 :return: a python dictionary
414 """
415 if args is None:
416 return None
417 result = {}
418 for arg in args:
419 assert isinstance(arg, Argument)
420 result[arg.key] = arg.value
421 return result
422
423 current_time = int(round(time.time() * 1000))
424 # log.debug("Got Message", message=m)
425 try:
426 val = m.value()
427 # val = m.message.value
428 # print m.topic
429
430 # Go over customized callbacks first
431 m_topic = m.topic()
432 if m_topic in self.topic_callback_map:
433 for c in self.topic_callback_map[m_topic]:
434 yield c(val)
435
436 # Check whether we need to process request/response scenario
437 if m_topic not in self.topic_target_cls_map:
438 return
439
440 # Process request/response scenario
441 message = InterContainerMessage()
442 message.ParseFromString(val)
443
444 if message.header.type == MessageType.Value("REQUEST"):
445 # Get the target class for that specific topic
446 targetted_topic = self._to_string(message.header.to_topic)
447 msg_body = InterContainerRequestBody()
448 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
449 message.body.Unpack(msg_body)
450 else:
451 log.debug("unsupported-msg", msg_type=type(message.body))
452 return
453 if targetted_topic in self.topic_target_cls_map:
454 if msg_body.args:
455 log.debug("message-body-args-present", body=msg_body)
456 (status, res) = yield getattr(
457 self.topic_target_cls_map[targetted_topic],
458 self._to_string(msg_body.rpc))(
459 **_toDict(msg_body.args))
460 else:
461 log.debug("message-body-args-absent", body=msg_body,
462 rpc=msg_body.rpc)
463 (status, res) = yield getattr(
464 self.topic_target_cls_map[targetted_topic],
465 self._to_string(msg_body.rpc))()
466 if msg_body.response_required:
467 response = self._format_response(
468 msg_header=message.header,
469 msg_body=res,
470 status=status,
471 )
472 if response is not None:
473 res_topic = self._to_string(
474 response.header.to_topic)
475 self._send_kafka_message(res_topic, response)
476
477 log.debug("Response-sent", response=response.body,
478 to_topic=res_topic)
479 elif message.header.type == MessageType.Value("RESPONSE"):
480 trns_id = self._to_string(message.header.id)
481 if trns_id in self.transaction_id_deferred_map:
482 resp = self._parse_response(val)
483
484 self.transaction_id_deferred_map[trns_id].callback(resp)
485 else:
486 log.error("!!INVALID-TRANSACTION-TYPE!!")
487
488 except Exception as e:
489 log.exception("Failed-to-process-message", message=m, e=e)
490
491 @inlineCallbacks
492 def _send_kafka_message(self, topic, msg):
493 try:
494 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
495 except Exception, e:
496 log.exception("Failed-sending-message", message=msg, e=e)
497
498 @inlineCallbacks
499 def send_request(self,
500 rpc,
501 to_topic,
502 reply_topic=None,
503 callback=None,
504 **kwargs):
505 """
506 Invoked to send a message to a remote container and receive a
507 response if required.
508 :param rpc: The remote API to invoke
509 :param to_topic: Send the message to this kafka topic
510 :param reply_topic: If not None then a response is expected on this
511 topic. If set to None then no response is required.
512 :param callback: Callback to invoke when a response is received.
513 :param kwargs: Key-value pairs representing arguments to pass to the
514 rpc remote API.
515 :return: Either no response is required, or a response is returned
516 via the callback or the response is a tuple of (status, return_cls)
517 """
518 try:
519 # Ensure all strings are not unicode encoded
520 rpc = self._to_string(rpc)
521 to_topic = self._to_string(to_topic)
522 reply_topic = self._to_string(reply_topic)
523
524 request, transaction_id, response_required = \
525 self._format_request(
526 rpc=rpc,
527 to_topic=to_topic,
528 reply_topic=reply_topic,
529 **kwargs)
530
531 if request is None:
532 return
533
534 # Add the transaction to the transaction map before sending the
535 # request. This will guarantee the eventual response will be
536 # processed.
537 wait_for_result = None
538 if response_required:
539 wait_for_result = Deferred()
540 self.transaction_id_deferred_map[
541 self._to_string(request.header.id)] = wait_for_result
542
543 yield self._send_kafka_message(to_topic, request)
544 log.debug("message-sent", to_topic=to_topic,
545 from_topic=reply_topic)
546
547 if response_required:
548 res = yield wait_for_result
549
550 if res is None or not res.success:
551 raise KafkaMessagingError(error="Failed-response:{"
552 "}".format(res))
553
554 # Remove the transaction from the transaction map
555 del self.transaction_id_deferred_map[transaction_id]
556
557 log.debug("send-message-response", rpc=rpc, result=res)
558
559 if callback:
560 callback((res.success, res.result))
561 else:
562 returnValue((res.success, res.result))
563 except Exception as e:
564 log.exception("Exception-sending-request", e=e)
565 raise KafkaMessagingError(error=e)
566
567
568# Common method to get the singleton instance of the kafka proxy class
569def get_messaging_proxy():
570 return IKafkaMessagingProxy._kafka_messaging_instance