blob: efcf558f7f9a0f78a7b60f5f12bfb6f35c09cd0a [file] [log] [blame]
khenaidoo6fdf0ba2018-11-02 14:38:33 -04001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The superclass for all kafka proxy subclasses.
19"""
20
21import structlog
22from twisted.internet.defer import inlineCallbacks, returnValue
23from twisted.python import failure
24from zope.interface import implementer
25
khenaidoofdbad6e2018-11-06 22:26:38 -050026from python.common.utils.deferred_utils import DeferredWithTimeout, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040027 TimeOutError
khenaidoofdbad6e2018-11-06 22:26:38 -050028from python.common.utils.registry import IComponent
khenaidoo6fdf0ba2018-11-02 14:38:33 -040029
30log = structlog.get_logger()
31
32
33class KafkaMessagingError(BaseException):
34 def __init__(self, error):
35 self.error = error
36
37
38@implementer(IComponent)
39class ContainerProxy(object):
40
41 def __init__(self, kafka_proxy, core_topic, my_listening_topic):
42 self.kafka_proxy = kafka_proxy
43 self.listening_topic = my_listening_topic
44 self.core_topic = core_topic
45 self.default_timeout = 3
46
47 def start(self):
48 log.info('started')
49
50 return self
51
52 def stop(self):
53 log.info('stopped')
54
55 @classmethod
56 def wrap_request(cls, return_cls):
57 def real_wrapper(func):
58 @inlineCallbacks
59 def wrapper(*args, **kw):
60 try:
61 (success, d) = yield func(*args, **kw)
62 if success:
63 log.debug("successful-response", func=func, val=d)
64 if return_cls is not None:
65 rc = return_cls()
66 if d is not None:
67 d.Unpack(rc)
68 returnValue(rc)
69 else:
70 log.debug("successful-response-none", func=func,
71 val=None)
72 returnValue(None)
73 else:
74 log.warn("unsuccessful-request", func=func, args=args,
75 kw=kw)
76 returnValue(d)
77 except Exception as e:
78 log.exception("request-wrapper-exception", func=func, e=e)
79 raise
80
81 return wrapper
82
83 return real_wrapper
84
85 @inlineCallbacks
khenaidoo43c82122018-11-22 18:38:28 -050086 def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
khenaidoo6fdf0ba2018-11-02 14:38:33 -040087 @inlineCallbacks
khenaidoo43c82122018-11-22 18:38:28 -050088 def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
khenaidoo6fdf0ba2018-11-02 14:38:33 -040089 try:
khenaidoo43c82122018-11-22 18:38:28 -050090 log.debug("sending-request",
91 rpc=rpc,
92 to_topic=to_topic,
93 reply_topic=reply_topic)
khenaidoo6fdf0ba2018-11-02 14:38:33 -040094 if to_topic is None:
95 to_topic = self.core_topic
khenaidoo43c82122018-11-22 18:38:28 -050096 if reply_topic is None:
97 reply_topic = self.listening_topic
khenaidoo6fdf0ba2018-11-02 14:38:33 -040098 result = yield self.kafka_proxy.send_request(rpc=rpc,
99 to_topic=to_topic,
khenaidoo43c82122018-11-22 18:38:28 -0500100 reply_topic=reply_topic,
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400101 callback=None,
102 **kwargs)
103 if not m_callback.called:
104 m_callback.callback(result)
105 else:
106 log.debug('timeout-already-occurred', rpc=rpc)
107 except Exception as e:
108 log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
109 if not m_callback.called:
110 m_callback.errback(failure.Failure())
111
khenaidoo43c82122018-11-22 18:38:28 -0500112 # We are going to resend the request on the to_topic if there is a
113 # timeout error. This time the timeout will be longer. If the second
114 # request times out then we will send the request to the default
115 # core_topic.
116 timeouts = [self.default_timeout,
117 self.default_timeout*2,
118 self.default_timeout]
119 retry = 0
120 max_retry = 2
121 for timeout in timeouts:
122 cb = DeferredWithTimeout(timeout=timeout)
123 _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
124 try:
125 res = yield cb
126 returnValue(res)
127 except TimeOutError as e:
128 log.warn('invoke-timeout', e=e)
129 if retry == max_retry:
130 raise e
131 retry += 1
132 if retry == max_retry:
133 to_topic = self.core_topic