khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2018 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ |
| 18 | The superclass for all kafka proxy subclasses. |
| 19 | """ |
| 20 | |
| 21 | import structlog |
| 22 | from twisted.internet.defer import inlineCallbacks, returnValue |
| 23 | from twisted.python import failure |
| 24 | from zope.interface import implementer |
| 25 | |
khenaidoo | fdbad6e | 2018-11-06 22:26:38 -0500 | [diff] [blame] | 26 | from python.common.utils.deferred_utils import DeferredWithTimeout, \ |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 27 | TimeOutError |
khenaidoo | fdbad6e | 2018-11-06 22:26:38 -0500 | [diff] [blame] | 28 | from python.common.utils.registry import IComponent |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 29 | |
| 30 | log = structlog.get_logger() |
| 31 | |
| 32 | |
| 33 | class KafkaMessagingError(BaseException): |
| 34 | def __init__(self, error): |
| 35 | self.error = error |
| 36 | |
| 37 | |
| 38 | @implementer(IComponent) |
| 39 | class ContainerProxy(object): |
| 40 | |
khenaidoo | 54e0ddf | 2019-02-27 16:21:33 -0500 | [diff] [blame] | 41 | def __init__(self, kafka_proxy, remote_topic, my_listening_topic): |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 42 | self.kafka_proxy = kafka_proxy |
| 43 | self.listening_topic = my_listening_topic |
khenaidoo | 54e0ddf | 2019-02-27 16:21:33 -0500 | [diff] [blame] | 44 | self.remote_topic = remote_topic |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 45 | self.default_timeout = 3 |
| 46 | |
| 47 | def start(self): |
| 48 | log.info('started') |
| 49 | |
| 50 | return self |
| 51 | |
| 52 | def stop(self): |
| 53 | log.info('stopped') |
| 54 | |
| 55 | @classmethod |
| 56 | def wrap_request(cls, return_cls): |
| 57 | def real_wrapper(func): |
| 58 | @inlineCallbacks |
| 59 | def wrapper(*args, **kw): |
| 60 | try: |
| 61 | (success, d) = yield func(*args, **kw) |
| 62 | if success: |
| 63 | log.debug("successful-response", func=func, val=d) |
| 64 | if return_cls is not None: |
| 65 | rc = return_cls() |
| 66 | if d is not None: |
| 67 | d.Unpack(rc) |
| 68 | returnValue(rc) |
| 69 | else: |
| 70 | log.debug("successful-response-none", func=func, |
| 71 | val=None) |
| 72 | returnValue(None) |
| 73 | else: |
| 74 | log.warn("unsuccessful-request", func=func, args=args, |
| 75 | kw=kw) |
| 76 | returnValue(d) |
| 77 | except Exception as e: |
| 78 | log.exception("request-wrapper-exception", func=func, e=e) |
| 79 | raise |
| 80 | |
| 81 | return wrapper |
| 82 | |
| 83 | return real_wrapper |
| 84 | |
| 85 | @inlineCallbacks |
khenaidoo | 43c8212 | 2018-11-22 18:38:28 -0500 | [diff] [blame] | 86 | def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs): |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 87 | @inlineCallbacks |
khenaidoo | 43c8212 | 2018-11-22 18:38:28 -0500 | [diff] [blame] | 88 | def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs): |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 89 | try: |
khenaidoo | 43c8212 | 2018-11-22 18:38:28 -0500 | [diff] [blame] | 90 | log.debug("sending-request", |
| 91 | rpc=rpc, |
| 92 | to_topic=to_topic, |
| 93 | reply_topic=reply_topic) |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 94 | if to_topic is None: |
khenaidoo | 54e0ddf | 2019-02-27 16:21:33 -0500 | [diff] [blame] | 95 | to_topic = self.remote_topic |
khenaidoo | 43c8212 | 2018-11-22 18:38:28 -0500 | [diff] [blame] | 96 | if reply_topic is None: |
| 97 | reply_topic = self.listening_topic |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 98 | result = yield self.kafka_proxy.send_request(rpc=rpc, |
| 99 | to_topic=to_topic, |
khenaidoo | 43c8212 | 2018-11-22 18:38:28 -0500 | [diff] [blame] | 100 | reply_topic=reply_topic, |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 101 | callback=None, |
| 102 | **kwargs) |
| 103 | if not m_callback.called: |
| 104 | m_callback.callback(result) |
| 105 | else: |
| 106 | log.debug('timeout-already-occurred', rpc=rpc) |
| 107 | except Exception as e: |
| 108 | log.exception("Failure-sending-request", rpc=rpc, kw=kwargs) |
| 109 | if not m_callback.called: |
| 110 | m_callback.errback(failure.Failure()) |
| 111 | |
khenaidoo | 43c8212 | 2018-11-22 18:38:28 -0500 | [diff] [blame] | 112 | # We are going to resend the request on the to_topic if there is a |
| 113 | # timeout error. This time the timeout will be longer. If the second |
| 114 | # request times out then we will send the request to the default |
| 115 | # core_topic. |
| 116 | timeouts = [self.default_timeout, |
| 117 | self.default_timeout*2, |
| 118 | self.default_timeout] |
| 119 | retry = 0 |
| 120 | max_retry = 2 |
| 121 | for timeout in timeouts: |
| 122 | cb = DeferredWithTimeout(timeout=timeout) |
| 123 | _send_request(rpc, cb, to_topic, reply_topic, **kwargs) |
| 124 | try: |
| 125 | res = yield cb |
| 126 | returnValue(res) |
| 127 | except TimeOutError as e: |
| 128 | log.warn('invoke-timeout', e=e) |
| 129 | if retry == max_retry: |
| 130 | raise e |
| 131 | retry += 1 |
| 132 | if retry == max_retry: |
khenaidoo | 54e0ddf | 2019-02-27 16:21:33 -0500 | [diff] [blame] | 133 | to_topic = self.remote_topic |