blob: e606bbb6b24d8f4ae05b9a9e5644ef819648a892 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17
Zack Williams84a71e92019-11-15 09:00:19 -070018from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060019from confluent_kafka import Producer as _kafkaProducer
20from structlog import get_logger
21from twisted.internet import reactor
22from twisted.internet.defer import inlineCallbacks, returnValue
23from twisted.internet.threads import deferToThread
24from zope.interface import implementer
25
26from pyvoltha.common.utils.consulhelpers import get_endpoint_from_consul
Zack Williams84a71e92019-11-15 09:00:19 -070027from .event_bus_publisher import EventBusPublisher
Chip Boling67b674a2019-02-08 11:42:18 -060028from pyvoltha.common.utils.registry import IComponent
29from confluent_kafka import Consumer, KafkaError
30import threading
Zack Williams84a71e92019-11-15 09:00:19 -070031import six
Chip Boling67b674a2019-02-08 11:42:18 -060032
33log = get_logger()
34
35
36@implementer(IComponent)
37class KafkaProxy(object):
38 """
39 This is a singleton proxy kafka class to hide the kafka client details. This
40 proxy uses confluent-kafka-python as the kafka client. Since that client is
41 not a Twisted client then requests to that client are wrapped with
42 twisted.internet.threads.deferToThread to avoid any potential blocking of
43 the Twisted loop.
44 """
45 _kafka_instance = None
46
47 def __init__(self,
48 consul_endpoint='localhost:8500',
49 kafka_endpoint='localhost:9092',
50 ack_timeout=1000,
51 max_req_attempts=10,
52 consumer_poll_timeout=10,
53 config={}):
54
55 # return an exception if the object already exist
56 if KafkaProxy._kafka_instance:
57 raise Exception('Singleton exist for :{}'.format(KafkaProxy))
58
59 log.debug('initializing', endpoint=kafka_endpoint)
60 self.ack_timeout = ack_timeout
61 self.max_req_attempts = max_req_attempts
62 self.consul_endpoint = consul_endpoint
63 self.kafka_endpoint = kafka_endpoint
64 self.config = config
65 self.kclient = None
66 self.kproducer = None
Neha Sharmaab9a9082020-03-23 14:05:36 +000067 self.kproducer_heartbeat = None
68 self.alive_state_handler = None
Chip Boling67b674a2019-02-08 11:42:18 -060069 self.event_bus_publisher = None
70 self.stopping = False
71 self.faulty = False
Neha Sharmaab9a9082020-03-23 14:05:36 +000072 self.alive = False
Chip Boling67b674a2019-02-08 11:42:18 -060073 self.consumer_poll_timeout = consumer_poll_timeout
74 self.topic_consumer_map = {}
75 self.topic_callbacks_map = {}
76 self.topic_any_map_lock = threading.Lock()
77 log.debug('initialized', endpoint=kafka_endpoint)
78
79 @inlineCallbacks
80 def start(self):
81 log.debug('starting')
82 self._get_kafka_producer()
83 KafkaProxy._kafka_instance = self
84 self.event_bus_publisher = yield EventBusPublisher(
85 self, self.config.get('event_bus_publisher', {})).start()
86 log.info('started')
87 KafkaProxy.faulty = False
88 self.stopping = False
Neha Sharmaab9a9082020-03-23 14:05:36 +000089 self.alive = True
Chip Boling67b674a2019-02-08 11:42:18 -060090 returnValue(self)
91
92 @inlineCallbacks
93 def stop(self):
94 try:
95 log.debug('stopping-kafka-proxy')
96 self.stopping = True
Neha Sharmaab9a9082020-03-23 14:05:36 +000097 self.alive = False
Chip Boling67b674a2019-02-08 11:42:18 -060098 try:
99 if self.kclient:
100 yield self.kclient.close()
101 self.kclient = None
102 log.debug('stopped-kclient-kafka-proxy')
Zack Williams84a71e92019-11-15 09:00:19 -0700103 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600104 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
105
106 try:
107 if self.kproducer:
108 yield self.kproducer.flush()
109 self.kproducer = None
110 log.debug('stopped-kproducer-kafka-proxy')
Zack Williams84a71e92019-11-15 09:00:19 -0700111 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600112 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
113
114 # Stop all consumers
115 try:
116 self.topic_any_map_lock.acquire()
117 log.debug('stopping-consumers-kafka-proxy')
Zack Williams84a71e92019-11-15 09:00:19 -0700118 for _, c in six.iteritems(self.topic_consumer_map):
Chip Boling67b674a2019-02-08 11:42:18 -0600119 yield deferToThread(c.close)
120 self.topic_consumer_map.clear()
121 self.topic_callbacks_map.clear()
122 log.debug('stopped-consumers-kafka-proxy')
Zack Williams84a71e92019-11-15 09:00:19 -0700123 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600124 log.exception('failed-stopped-consumers-kafka-proxy', e=e)
125 finally:
126 self.topic_any_map_lock.release()
127 log.debug('stopping-consumers-kafka-proxy-released-lock')
128
129 # try:
130 # if self.event_bus_publisher:
131 # yield self.event_bus_publisher.stop()
132 # self.event_bus_publisher = None
133 # log.debug('stopped-event-bus-publisher-kafka-proxy')
134 # except Exception, e:
135 # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
136 # pass
137
138 log.debug('stopped-kafka-proxy')
139
Zack Williams84a71e92019-11-15 09:00:19 -0700140 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600141 self.kclient = None
142 self.kproducer = None
143 # self.event_bus_publisher = None
144 log.exception('failed-stopped-kafka-proxy', e=e)
145 pass
146
147 def _get_kafka_producer(self):
148
149 try:
150
151 if self.kafka_endpoint.startswith('@'):
152 try:
153 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
154 self.kafka_endpoint[
155 1:])
156 log.debug('found-kafka-service', endpoint=_k_endpoint)
157
158 except Exception as e:
159 log.exception('no-kafka-service-in-consul', e=e)
160
161 self.kproducer = None
162 self.kclient = None
163 return
164 else:
165 _k_endpoint = self.kafka_endpoint
166 self.kproducer = _kafkaProducer(
167 {'bootstrap.servers': _k_endpoint,
168 }
169 )
170 pass
Zack Williams84a71e92019-11-15 09:00:19 -0700171 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600172 log.exception('failed-get-kafka-producer', e=e)
173 return
174
175 @inlineCallbacks
176 def _wait_for_messages(self, consumer, topic):
177 while True:
178 try:
179 msg = yield deferToThread(consumer.poll,
180 self.consumer_poll_timeout)
181
182 if self.stopping:
183 log.debug("stop-request-recieved", topic=topic)
184 break
185
186 if msg is None:
187 continue
188 if msg.error():
189 # This typically is received when there are no more messages
190 # to read from kafka. Ignore.
191 continue
192
193 # Invoke callbacks
194 for cb in self.topic_callbacks_map[topic]:
195 yield cb(msg)
196 except Exception as e:
197 log.debug("exception-receiving-msg", topic=topic, e=e)
198
199 @inlineCallbacks
200 def subscribe(self, topic, callback, groupId, offset='latest'):
201 """
202 subscribe allows a caller to subscribe to a given kafka topic. This API
203 always create a group consumer.
204 :param topic - the topic to subscribe to
205 :param callback - the callback to invoke whenever a message is received
206 on that topic
207 :param groupId - the groupId for this consumer. In the current
208 implementation there is a one-to-one mapping between a topic and a
209 groupId. In other words, once a groupId is used for a given topic then
210 we won't be able to create another groupId for the same topic.
211 :param offset: the kafka offset from where the consumer will start
212 consuming messages
213 """
214 try:
215 self.topic_any_map_lock.acquire()
216 if topic in self.topic_consumer_map:
217 # Just add the callback
218 if topic in self.topic_callbacks_map:
219 self.topic_callbacks_map[topic].append(callback)
220 else:
221 self.topic_callbacks_map[topic] = [callback]
222 return
223
224 # Create consumer for that topic
225 c = Consumer({
226 'bootstrap.servers': self.kafka_endpoint,
227 'group.id': groupId,
228 'auto.offset.reset': offset
229 })
230 yield deferToThread(c.subscribe, [topic])
231 # c.subscribe([topic])
232 self.topic_consumer_map[topic] = c
233 self.topic_callbacks_map[topic] = [callback]
234 # Start the consumer
235 reactor.callLater(0, self._wait_for_messages, c, topic)
Zack Williams84a71e92019-11-15 09:00:19 -0700236 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600237 log.exception("topic-subscription-error", e=e)
238 finally:
239 self.topic_any_map_lock.release()
240
241 @inlineCallbacks
242 def unsubscribe(self, topic, callback):
243 """
244 Unsubscribe to a given topic. Since there they be multiple callers
245 consuming from the same topic then to ensure only the relevant caller
246 gets unsubscribe then the callback is used as a differentiator. The
247 kafka consumer will be closed when there are no callbacks required.
248 :param topic: topic to unsubscribe
249 :param callback: callback the caller used when subscribing to the topic.
250 If multiple callers have subscribed to a topic using the same callback
251 then the first callback on the list will be removed.
252 :return:None
253 """
254 try:
255 self.topic_any_map_lock.acquire()
256 log.debug("unsubscribing-to-topic", topic=topic)
257 if topic in self.topic_callbacks_map:
258 index = 0
259 for cb in self.topic_callbacks_map[topic]:
260 if cb == callback:
261 break
262 index += 1
263 if index < len(self.topic_callbacks_map[topic]):
264 self.topic_callbacks_map[topic].pop(index)
265
266 if len(self.topic_callbacks_map[topic]) == 0:
267 # Stop the consumer
268 if topic in self.topic_consumer_map:
269 yield deferToThread(
270 self.topic_consumer_map[topic].close)
271 del self.topic_consumer_map[topic]
272 del self.topic_callbacks_map[topic]
273 log.debug("unsubscribed-to-topic", topic=topic)
274 else:
275 log.debug("consumers-for-topic-still-exist", topic=topic,
276 num=len(self.topic_callbacks_map[topic]))
Zack Williams84a71e92019-11-15 09:00:19 -0700277 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600278 log.exception("topic-unsubscription-error", e=e)
279 finally:
280 self.topic_any_map_lock.release()
281 log.debug("unsubscribing-to-topic-release-lock", topic=topic)
282
283 @inlineCallbacks
284 def send_message(self, topic, msg, key=None):
285 assert topic is not None
286 assert msg is not None
287
288 # first check whether we have a kafka producer. If there is none
289 # then try to get one - this happens only when we try to lookup the
290 # kafka service from consul
291 try:
292 if self.faulty is False:
293
294 if self.kproducer is None:
295 self._get_kafka_producer()
296 # Lets the next message request do the retry if still a failure
297 if self.kproducer is None:
298 log.error('no-kafka-producer',
299 endpoint=self.kafka_endpoint)
300 return
301
William Kurkian79d1fee2019-04-11 11:26:10 -0400302 log.debug('sending-kafka-msg', topic=topic)
Chip Boling67b674a2019-02-08 11:42:18 -0600303 msgs = [msg]
304
305 if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
306 d = deferToThread(self.kproducer.produce, topic, msg, key)
307 yield d
William Kurkian79d1fee2019-04-11 11:26:10 -0400308 log.debug('sent-kafka-msg', topic=topic)
Chip Boling67b674a2019-02-08 11:42:18 -0600309 # send a lightweight poll to avoid an exception after 100k messages.
310 d1 = deferToThread(self.kproducer.poll, 0)
311 yield d1
312 else:
313 return
314
Zack Williams84a71e92019-11-15 09:00:19 -0700315 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600316 self.faulty = True
Neha Sharmaab9a9082020-03-23 14:05:36 +0000317 self.alive_state_handler.callback(self.alive)
William Kurkian79d1fee2019-04-11 11:26:10 -0400318 log.error('failed-to-send-kafka-msg', topic=topic,
Chip Boling67b674a2019-02-08 11:42:18 -0600319 e=e)
320
321 # set the kafka producer to None. This is needed if the
322 # kafka docker went down and comes back up with a different
323 # port number.
324 if self.stopping is False:
325 log.debug('stopping-kafka-proxy')
Neha Sharmaab9a9082020-03-23 14:05:36 +0000326 self.alive = False
Chip Boling67b674a2019-02-08 11:42:18 -0600327 try:
328 self.stopping = True
329 self.stop()
330 self.stopping = False
Chip Boling67b674a2019-02-08 11:42:18 -0600331 log.debug('stopped-kafka-proxy')
Zack Williams84a71e92019-11-15 09:00:19 -0700332 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600333 log.exception('failed-stopping-kafka-proxy', e=e)
334 pass
335 else:
336 log.info('already-stopping-kafka-proxy')
Chip Boling67b674a2019-02-08 11:42:18 -0600337 return
338
Neha Sharmaab9a9082020-03-23 14:05:36 +0000339 # sending heartbeat message to check the readiness
340 def send_heartbeat_message(self, topic, msg):
341 assert topic is not None
342 assert msg is not None
343
344 try:
345 if self.kproducer_heartbeat is None:
346 if self.kafka_endpoint.startswith('@'):
347 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
348 self.kafka_endpoint[
349 1:])
350 else:
351 _k_endpoint = self.kafka_endpoint
352
353 # Using 2 seconds timeout for heartbeat producer; default of 5 minutes is too long
354 self.kproducer_heartbeat = _kafkaProducer(
355 {'bootstrap.servers': _k_endpoint,
356 'default.topic.config' : {'message.timeout.ms': 2000},
357 }
358 )
359
360 log.debug('sending-kafka-heartbeat-message', topic=topic)
361 msgs = [msg]
362
363 self.kproducer_heartbeat.produce(topic, msg, callback=self.handle_kafka_delivery_report)
364
365 except Exception as e:
366 self.faulty = True
367 self.alive_state_handler.callback(self.alive)
368 log.error('failed-to-send-kafka-heartbeat-msg', e=e)
369
370 def check_heartbeat_delivery(self):
371 try:
372 if self.kproducer_heartbeat is not None:
373 msg = self.kproducer_heartbeat.poll(0)
374 except Exception as e:
375 log.error('failed-to-check-heartbeat-msg-delivery', e=e)
376 self.faulty = True
377
378 def handle_kafka_delivery_report(self, err, msg):
379 if err is not None :
380 # Log and notify only in event of alive status change
381 if self.alive is True:
382 log.info('failed-to-deliver-message', msg=msg.value(), err=err.str())
383 self.alive_state_handler.callback(False)
384 self.alive = False
385 else :
386 if self.alive is not True:
387 log.info('message-delivered-successfully', msg=msg.value())
388 self.alive_state_handler.callback(True)
389 self.alive = True
390
391 def register_alive_state_update(self, defer_handler):
392 self.alive_state_handler = defer_handler
393
Chip Boling67b674a2019-02-08 11:42:18 -0600394 def is_faulty(self):
395 return self.faulty
396
397
398# Common method to get the singleton instance of the kafka proxy class
399def get_kafka_proxy():
400 return KafkaProxy._kafka_instance