blob: d596334f8f1bdaab87fabd2295189d9021234ea1 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17from afkak.client import KafkaClient as _KafkaClient
18from afkak.common import (
khenaidoo6fdf0ba2018-11-02 14:38:33 -040019 PRODUCER_ACK_NOT_REQUIRED
khenaidoob9203542018-09-17 22:56:37 -040020)
21from afkak.producer import Producer as _kafkaProducer
22from structlog import get_logger
23from twisted.internet.defer import inlineCallbacks, returnValue
24from zope.interface import implementer
25
khenaidoofdbad6e2018-11-06 22:26:38 -050026from python.common.utils.consulhelpers import get_endpoint_from_consul
27from python.common.utils.registry import IComponent
28from event_bus_publisher import EventBusPublisher
khenaidoob9203542018-09-17 22:56:37 -040029
30log = get_logger()
31
32
33@implementer(IComponent)
34class KafkaProxy(object):
35 """
36 This is a singleton proxy kafka class to hide the kafka client details.
37 """
38 _kafka_instance = None
39
40 def __init__(self,
41 consul_endpoint='localhost:8500',
42 kafka_endpoint='localhost:9092',
43 ack_timeout=1000,
44 max_req_attempts=10,
45 config={}):
46
47 # return an exception if the object already exist
48 if KafkaProxy._kafka_instance:
49 raise Exception('Singleton exist for :{}'.format(KafkaProxy))
50
51 log.debug('initializing', endpoint=kafka_endpoint)
52 self.ack_timeout = ack_timeout
53 self.max_req_attempts = max_req_attempts
54 self.consul_endpoint = consul_endpoint
55 self.kafka_endpoint = kafka_endpoint
56 self.config = config
57 self.kclient = None
58 self.kproducer = None
59 self.event_bus_publisher = None
60 self.stopping = False
61 self.faulty = False
62 log.debug('initialized', endpoint=kafka_endpoint)
63
64 @inlineCallbacks
65 def start(self):
66 log.debug('starting')
67 self._get_kafka_producer()
68 KafkaProxy._kafka_instance = self
69 self.event_bus_publisher = yield EventBusPublisher(
70 self, self.config.get('event_bus_publisher', {})).start()
71 log.info('started')
72 KafkaProxy.faulty = False
73 self.stopping = False
74 returnValue(self)
75
76 @inlineCallbacks
77 def stop(self):
78 try:
79 log.debug('stopping-kafka-proxy')
80 try:
81 if self.kclient:
82 yield self.kclient.close()
83 self.kclient = None
84 log.debug('stopped-kclient-kafka-proxy')
85 except Exception, e:
86 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
87 pass
88
89 try:
90 if self.kproducer:
91 yield self.kproducer.stop()
92 self.kproducer = None
93 log.debug('stopped-kproducer-kafka-proxy')
94 except Exception, e:
95 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
96 pass
97
khenaidoob9203542018-09-17 22:56:37 -040098 log.debug('stopped-kafka-proxy')
99
100 except Exception, e:
101 self.kclient = None
102 self.kproducer = None
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400103 # self.event_bus_publisher = None
khenaidoob9203542018-09-17 22:56:37 -0400104 log.exception('failed-stopped-kafka-proxy', e=e)
105 pass
106
107 def _get_kafka_producer(self):
108 # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
109 # to a local log before sending response
110 try:
111
112 if self.kafka_endpoint.startswith('@'):
113 try:
114 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400115 self.kafka_endpoint[
116 1:])
khenaidoob9203542018-09-17 22:56:37 -0400117 log.debug('found-kafka-service', endpoint=_k_endpoint)
118
119 except Exception as e:
120 log.exception('no-kafka-service-in-consul', e=e)
121
122 self.kproducer = None
123 self.kclient = None
124 return
125 else:
126 _k_endpoint = self.kafka_endpoint
127
128 self.kclient = _KafkaClient(_k_endpoint)
129 self.kproducer = _kafkaProducer(self.kclient,
130 req_acks=PRODUCER_ACK_NOT_REQUIRED,
131 # req_acks=PRODUCER_ACK_LOCAL_WRITE,
132 # ack_timeout=self.ack_timeout,
133 # max_req_attempts=self.max_req_attempts)
134 )
135 except Exception, e:
136 log.exception('failed-get-kafka-producer', e=e)
137 return
138
khenaidoo90847922018-12-03 14:47:51 -0500139
140 # @inlineCallbacks
141 # def wait_until_topic_is_ready(self, topic):
142 # # Assumes "auto.create.topics.enable" is set in the broker
143 # # configuration
144 # e = True
145 # while e:
146 # yield self.kclient.load_metadata_for_topics(topic)
147 # e = self.kclient.metadata_error_for_topic(topic)
148 # if e:
149 # log.debug("Topic-not-ready-retrying...", topic=topic)
150
151
152 @inlineCallbacks
153 def create_topic(self, topic):
154 # Assume auto create is enabled on the broker configuration
155 yield self.wait_until_topic_is_ready(topic)
156
157
khenaidoob9203542018-09-17 22:56:37 -0400158 @inlineCallbacks
159 def send_message(self, topic, msg):
160 assert topic is not None
161 assert msg is not None
162
163 # first check whether we have a kafka producer. If there is none
164 # then try to get one - this happens only when we try to lookup the
165 # kafka service from consul
166 try:
167 if self.faulty is False:
168
169 if self.kproducer is None:
170 self._get_kafka_producer()
171 # Lets the next message request do the retry if still a failure
172 if self.kproducer is None:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400173 log.error('no-kafka-producer',
174 endpoint=self.kafka_endpoint)
khenaidoob9203542018-09-17 22:56:37 -0400175 return
176
177 # log.debug('sending-kafka-msg', topic=topic, msg=msg)
178 msgs = [msg]
179
180 if self.kproducer and self.kclient and \
181 self.event_bus_publisher and self.faulty is False:
182 # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
183
184 yield self.kproducer.send_messages(topic, msgs=msgs)
185 # self.kproducer.send_messages(topic, msgs=msgs)
186 # log.debug('sent-kafka-msg', topic=topic, msg=msg)
187 else:
188 return
189
190 except Exception, e:
191 self.faulty = True
192 log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
193
194 # set the kafka producer to None. This is needed if the
195 # kafka docker went down and comes back up with a different
196 # port number.
197 if self.stopping is False:
198 log.debug('stopping-kafka-proxy')
199 try:
200 self.stopping = True
201 self.stop()
202 self.stopping = False
203 self.faulty = False
204 log.debug('stopped-kafka-proxy')
205 except Exception, e:
206 log.exception('failed-stopping-kafka-proxy', e=e)
207 pass
208 else:
209 log.info('already-stopping-kafka-proxy')
210
211 return
212
213 def is_faulty(self):
214 return self.faulty
215
216
217# Common method to get the singleton instance of the kafka proxy class
218def get_kafka_proxy():
219 return KafkaProxy._kafka_instance