khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2018 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ |
| 18 | The superclass for all kafka proxy subclasses. |
| 19 | """ |
| 20 | |
| 21 | import structlog |
| 22 | from twisted.internet.defer import inlineCallbacks, returnValue |
| 23 | from twisted.python import failure |
| 24 | from zope.interface import implementer |
| 25 | |
| 26 | from adapters.common.utils.deferred_utils import DeferredWithTimeout, \ |
| 27 | TimeOutError |
| 28 | from adapters.common.utils.registry import IComponent |
| 29 | |
| 30 | log = structlog.get_logger() |
| 31 | |
| 32 | |
| 33 | class KafkaMessagingError(BaseException): |
| 34 | def __init__(self, error): |
| 35 | self.error = error |
| 36 | |
| 37 | |
| 38 | @implementer(IComponent) |
| 39 | class ContainerProxy(object): |
| 40 | |
| 41 | def __init__(self, kafka_proxy, core_topic, my_listening_topic): |
| 42 | self.kafka_proxy = kafka_proxy |
| 43 | self.listening_topic = my_listening_topic |
| 44 | self.core_topic = core_topic |
| 45 | self.default_timeout = 3 |
| 46 | |
| 47 | def start(self): |
| 48 | log.info('started') |
| 49 | |
| 50 | return self |
| 51 | |
| 52 | def stop(self): |
| 53 | log.info('stopped') |
| 54 | |
| 55 | @classmethod |
| 56 | def wrap_request(cls, return_cls): |
| 57 | def real_wrapper(func): |
| 58 | @inlineCallbacks |
| 59 | def wrapper(*args, **kw): |
| 60 | try: |
| 61 | (success, d) = yield func(*args, **kw) |
| 62 | if success: |
| 63 | log.debug("successful-response", func=func, val=d) |
| 64 | if return_cls is not None: |
| 65 | rc = return_cls() |
| 66 | if d is not None: |
| 67 | d.Unpack(rc) |
| 68 | returnValue(rc) |
| 69 | else: |
| 70 | log.debug("successful-response-none", func=func, |
| 71 | val=None) |
| 72 | returnValue(None) |
| 73 | else: |
| 74 | log.warn("unsuccessful-request", func=func, args=args, |
| 75 | kw=kw) |
| 76 | returnValue(d) |
| 77 | except Exception as e: |
| 78 | log.exception("request-wrapper-exception", func=func, e=e) |
| 79 | raise |
| 80 | |
| 81 | return wrapper |
| 82 | |
| 83 | return real_wrapper |
| 84 | |
| 85 | @inlineCallbacks |
| 86 | def invoke(self, rpc, to_topic=None, **kwargs): |
| 87 | @inlineCallbacks |
| 88 | def _send_request(rpc, m_callback, to_topic, **kwargs): |
| 89 | try: |
| 90 | log.debug("sending-request", rpc=rpc) |
| 91 | if to_topic is None: |
| 92 | to_topic = self.core_topic |
| 93 | result = yield self.kafka_proxy.send_request(rpc=rpc, |
| 94 | to_topic=to_topic, |
| 95 | reply_topic=self.listening_topic, |
| 96 | callback=None, |
| 97 | **kwargs) |
| 98 | if not m_callback.called: |
| 99 | m_callback.callback(result) |
| 100 | else: |
| 101 | log.debug('timeout-already-occurred', rpc=rpc) |
| 102 | except Exception as e: |
| 103 | log.exception("Failure-sending-request", rpc=rpc, kw=kwargs) |
| 104 | if not m_callback.called: |
| 105 | m_callback.errback(failure.Failure()) |
| 106 | |
| 107 | cb = DeferredWithTimeout(timeout=self.default_timeout) |
| 108 | _send_request(rpc, cb, to_topic, **kwargs) |
| 109 | try: |
| 110 | res = yield cb |
| 111 | returnValue(res) |
| 112 | except TimeOutError as e: |
| 113 | log.warn('invoke-timeout', e=e) |
| 114 | raise e |