blob: 8c4e8287e0ad3634f974734f711b5d9bc6f9b24d [file] [log] [blame]
khenaidoo6fdf0ba2018-11-02 14:38:33 -04001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The superclass for all kafka proxy subclasses.
19"""
20
21import structlog
22from twisted.internet.defer import inlineCallbacks, returnValue
23from twisted.python import failure
24from zope.interface import implementer
25
khenaidoofdbad6e2018-11-06 22:26:38 -050026from python.common.utils.deferred_utils import DeferredWithTimeout, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040027 TimeOutError
khenaidoofdbad6e2018-11-06 22:26:38 -050028from python.common.utils.registry import IComponent
khenaidoo6fdf0ba2018-11-02 14:38:33 -040029
30log = structlog.get_logger()
31
32
33class KafkaMessagingError(BaseException):
34 def __init__(self, error):
35 self.error = error
36
37
38@implementer(IComponent)
39class ContainerProxy(object):
40
41 def __init__(self, kafka_proxy, core_topic, my_listening_topic):
42 self.kafka_proxy = kafka_proxy
43 self.listening_topic = my_listening_topic
44 self.core_topic = core_topic
45 self.default_timeout = 3
46
47 def start(self):
48 log.info('started')
49
50 return self
51
52 def stop(self):
53 log.info('stopped')
54
55 @classmethod
56 def wrap_request(cls, return_cls):
57 def real_wrapper(func):
58 @inlineCallbacks
59 def wrapper(*args, **kw):
60 try:
61 (success, d) = yield func(*args, **kw)
62 if success:
63 log.debug("successful-response", func=func, val=d)
64 if return_cls is not None:
65 rc = return_cls()
66 if d is not None:
67 d.Unpack(rc)
68 returnValue(rc)
69 else:
70 log.debug("successful-response-none", func=func,
71 val=None)
72 returnValue(None)
73 else:
74 log.warn("unsuccessful-request", func=func, args=args,
75 kw=kw)
76 returnValue(d)
77 except Exception as e:
78 log.exception("request-wrapper-exception", func=func, e=e)
79 raise
80
81 return wrapper
82
83 return real_wrapper
84
85 @inlineCallbacks
86 def invoke(self, rpc, to_topic=None, **kwargs):
87 @inlineCallbacks
88 def _send_request(rpc, m_callback, to_topic, **kwargs):
89 try:
90 log.debug("sending-request", rpc=rpc)
91 if to_topic is None:
92 to_topic = self.core_topic
93 result = yield self.kafka_proxy.send_request(rpc=rpc,
94 to_topic=to_topic,
95 reply_topic=self.listening_topic,
96 callback=None,
97 **kwargs)
98 if not m_callback.called:
99 m_callback.callback(result)
100 else:
101 log.debug('timeout-already-occurred', rpc=rpc)
102 except Exception as e:
103 log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
104 if not m_callback.called:
105 m_callback.errback(failure.Failure())
106
107 cb = DeferredWithTimeout(timeout=self.default_timeout)
108 _send_request(rpc, cb, to_topic, **kwargs)
109 try:
110 res = yield cb
111 returnValue(res)
112 except TimeOutError as e:
113 log.warn('invoke-timeout', e=e)
114 raise e