blob: cefc5909fb6bb374d8d53ae2fcdeb8ce9bc16f92 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17
18from confluent_kafka import Producer as _kafkaProducer
19from structlog import get_logger
20from twisted.internet import reactor
21from twisted.internet.defer import inlineCallbacks, returnValue
22from twisted.internet.threads import deferToThread
23from zope.interface import implementer
24
25from python.common.utils.consulhelpers import get_endpoint_from_consul
26from event_bus_publisher import EventBusPublisher
27from python.common.utils.registry import IComponent
28from confluent_kafka import Consumer, KafkaError
29import threading
30
31log = get_logger()
32
33
34@implementer(IComponent)
35class KafkaProxy(object):
36 """
37 This is a singleton proxy kafka class to hide the kafka client details. This
38 proxy uses confluent-kafka-python as the kafka client. Since that client is
39 not a Twisted client then requests to that client are wrapped with
40 twisted.internet.threads.deferToThread to avoid any potential blocking of
41 the Twisted loop.
42 """
43 _kafka_instance = None
44
45 def __init__(self,
46 consul_endpoint='localhost:8500',
47 kafka_endpoint='localhost:9092',
48 ack_timeout=1000,
49 max_req_attempts=10,
50 consumer_poll_timeout=10,
51 config={}):
52
53 # return an exception if the object already exist
54 if KafkaProxy._kafka_instance:
55 raise Exception('Singleton exist for :{}'.format(KafkaProxy))
56
57 log.debug('initializing', endpoint=kafka_endpoint)
58 self.ack_timeout = ack_timeout
59 self.max_req_attempts = max_req_attempts
60 self.consul_endpoint = consul_endpoint
61 self.kafka_endpoint = kafka_endpoint
62 self.config = config
63 self.kclient = None
64 self.kproducer = None
65 self.event_bus_publisher = None
66 self.stopping = False
67 self.faulty = False
68 self.consumer_poll_timeout = consumer_poll_timeout
69 self.topic_consumer_map = {}
70 self.topic_callbacks_map = {}
71 self.topic_any_map_lock = threading.Lock()
72 log.debug('initialized', endpoint=kafka_endpoint)
73
74 @inlineCallbacks
75 def start(self):
76 log.debug('starting')
77 self._get_kafka_producer()
78 KafkaProxy._kafka_instance = self
79 self.event_bus_publisher = yield EventBusPublisher(
80 self, self.config.get('event_bus_publisher', {})).start()
81 log.info('started')
82 KafkaProxy.faulty = False
83 self.stopping = False
84 returnValue(self)
85
86 @inlineCallbacks
87 def stop(self):
88 try:
89 log.debug('stopping-kafka-proxy')
90 self.stopping = True
91 try:
92 if self.kclient:
93 yield self.kclient.close()
94 self.kclient = None
95 log.debug('stopped-kclient-kafka-proxy')
96 except Exception, e:
97 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
98
99 try:
100 if self.kproducer:
101 yield self.kproducer.flush()
102 self.kproducer = None
103 log.debug('stopped-kproducer-kafka-proxy')
104 except Exception, e:
105 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
106
107 # Stop all consumers
108 try:
109 self.topic_any_map_lock.acquire()
110 log.debug('stopping-consumers-kafka-proxy')
111 for _, c in self.topic_consumer_map.iteritems():
112 yield deferToThread(c.close)
113 self.topic_consumer_map.clear()
114 self.topic_callbacks_map.clear()
115 log.debug('stopped-consumers-kafka-proxy')
116 except Exception, e:
117 log.exception('failed-stopped-consumers-kafka-proxy', e=e)
118 finally:
119 self.topic_any_map_lock.release()
120 log.debug('stopping-consumers-kafka-proxy-released-lock')
121
122 # try:
123 # if self.event_bus_publisher:
124 # yield self.event_bus_publisher.stop()
125 # self.event_bus_publisher = None
126 # log.debug('stopped-event-bus-publisher-kafka-proxy')
127 # except Exception, e:
128 # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
129 # pass
130
131 log.debug('stopped-kafka-proxy')
132
133 except Exception, e:
134 self.kclient = None
135 self.kproducer = None
136 # self.event_bus_publisher = None
137 log.exception('failed-stopped-kafka-proxy', e=e)
138 pass
139
140 def _get_kafka_producer(self):
141
142 try:
143
144 if self.kafka_endpoint.startswith('@'):
145 try:
146 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
147 self.kafka_endpoint[
148 1:])
149 log.debug('found-kafka-service', endpoint=_k_endpoint)
150
151 except Exception as e:
152 log.exception('no-kafka-service-in-consul', e=e)
153
154 self.kproducer = None
155 self.kclient = None
156 return
157 else:
158 _k_endpoint = self.kafka_endpoint
159 self.kproducer = _kafkaProducer(
160 {'bootstrap.servers': _k_endpoint,
161 }
162 )
163 pass
164 except Exception, e:
165 log.exception('failed-get-kafka-producer', e=e)
166 return
167
168 @inlineCallbacks
169 def _wait_for_messages(self, consumer, topic):
170 while True:
171 try:
172 msg = yield deferToThread(consumer.poll,
173 self.consumer_poll_timeout)
174
175 if self.stopping:
176 log.debug("stop-request-recieved", topic=topic)
177 break
178
179 if msg is None:
180 continue
181 if msg.error():
182 # This typically is received when there are no more messages
183 # to read from kafka. Ignore.
184 continue
185
186 # Invoke callbacks
187 for cb in self.topic_callbacks_map[topic]:
188 yield cb(msg)
189 except Exception as e:
190 log.debug("exception-receiving-msg", topic=topic, e=e)
191
192 @inlineCallbacks
193 def subscribe(self, topic, callback, groupId, offset='latest'):
194 """
195 subscribe allows a caller to subscribe to a given kafka topic. This API
196 always create a group consumer.
197 :param topic - the topic to subscribe to
198 :param callback - the callback to invoke whenever a message is received
199 on that topic
200 :param groupId - the groupId for this consumer. In the current
201 implementation there is a one-to-one mapping between a topic and a
202 groupId. In other words, once a groupId is used for a given topic then
203 we won't be able to create another groupId for the same topic.
204 :param offset: the kafka offset from where the consumer will start
205 consuming messages
206 """
207 try:
208 self.topic_any_map_lock.acquire()
209 if topic in self.topic_consumer_map:
210 # Just add the callback
211 if topic in self.topic_callbacks_map:
212 self.topic_callbacks_map[topic].append(callback)
213 else:
214 self.topic_callbacks_map[topic] = [callback]
215 return
216
217 # Create consumer for that topic
218 c = Consumer({
219 'bootstrap.servers': self.kafka_endpoint,
220 'group.id': groupId,
221 'auto.offset.reset': offset
222 })
223 yield deferToThread(c.subscribe, [topic])
224 # c.subscribe([topic])
225 self.topic_consumer_map[topic] = c
226 self.topic_callbacks_map[topic] = [callback]
227 # Start the consumer
228 reactor.callLater(0, self._wait_for_messages, c, topic)
229 except Exception, e:
230 log.exception("topic-subscription-error", e=e)
231 finally:
232 self.topic_any_map_lock.release()
233
234 @inlineCallbacks
235 def unsubscribe(self, topic, callback):
236 """
237 Unsubscribe to a given topic. Since there they be multiple callers
238 consuming from the same topic then to ensure only the relevant caller
239 gets unsubscribe then the callback is used as a differentiator. The
240 kafka consumer will be closed when there are no callbacks required.
241 :param topic: topic to unsubscribe
242 :param callback: callback the caller used when subscribing to the topic.
243 If multiple callers have subscribed to a topic using the same callback
244 then the first callback on the list will be removed.
245 :return:None
246 """
247 try:
248 self.topic_any_map_lock.acquire()
249 log.debug("unsubscribing-to-topic", topic=topic)
250 if topic in self.topic_callbacks_map:
251 index = 0
252 for cb in self.topic_callbacks_map[topic]:
253 if cb == callback:
254 break
255 index += 1
256 if index < len(self.topic_callbacks_map[topic]):
257 self.topic_callbacks_map[topic].pop(index)
258
259 if len(self.topic_callbacks_map[topic]) == 0:
260 # Stop the consumer
261 if topic in self.topic_consumer_map:
262 yield deferToThread(
263 self.topic_consumer_map[topic].close)
264 del self.topic_consumer_map[topic]
265 del self.topic_callbacks_map[topic]
266 log.debug("unsubscribed-to-topic", topic=topic)
267 else:
268 log.debug("consumers-for-topic-still-exist", topic=topic,
269 num=len(self.topic_callbacks_map[topic]))
270 except Exception, e:
271 log.exception("topic-unsubscription-error", e=e)
272 finally:
273 self.topic_any_map_lock.release()
274 log.debug("unsubscribing-to-topic-release-lock", topic=topic)
275
276 @inlineCallbacks
277 def send_message(self, topic, msg, key=None):
278 assert topic is not None
279 assert msg is not None
280
281 # first check whether we have a kafka producer. If there is none
282 # then try to get one - this happens only when we try to lookup the
283 # kafka service from consul
284 try:
285 if self.faulty is False:
286
287 if self.kproducer is None:
288 self._get_kafka_producer()
289 # Lets the next message request do the retry if still a failure
290 if self.kproducer is None:
291 log.error('no-kafka-producer',
292 endpoint=self.kafka_endpoint)
293 return
294
295 log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
296 msgs = [msg]
297
298 if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
299 d = deferToThread(self.kproducer.produce, topic, msg, key)
300 yield d
301 log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
302 # send a lightweight poll to avoid an exception after 100k messages.
303 d1 = deferToThread(self.kproducer.poll, 0)
304 yield d1
305 else:
306 return
307
308 except Exception, e:
309 self.faulty = True
310 log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
311 e=e)
312
313 # set the kafka producer to None. This is needed if the
314 # kafka docker went down and comes back up with a different
315 # port number.
316 if self.stopping is False:
317 log.debug('stopping-kafka-proxy')
318 try:
319 self.stopping = True
320 self.stop()
321 self.stopping = False
322 self.faulty = False
323 log.debug('stopped-kafka-proxy')
324 except Exception, e:
325 log.exception('failed-stopping-kafka-proxy', e=e)
326 pass
327 else:
328 log.info('already-stopping-kafka-proxy')
329
330 return
331
332 def is_faulty(self):
333 return self.faulty
334
335
336# Common method to get the singleton instance of the kafka proxy class
337def get_kafka_proxy():
338 return KafkaProxy._kafka_instance