khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2017 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 17 | |
| 18 | from confluent_kafka import Producer as _kafkaProducer |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 19 | from structlog import get_logger |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 20 | from twisted.internet import reactor |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 21 | from twisted.internet.defer import inlineCallbacks, returnValue |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 22 | from twisted.internet.threads import deferToThread |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 23 | from zope.interface import implementer |
| 24 | |
khenaidoo | fdbad6e | 2018-11-06 22:26:38 -0500 | [diff] [blame] | 25 | from python.common.utils.consulhelpers import get_endpoint_from_consul |
khenaidoo | fdbad6e | 2018-11-06 22:26:38 -0500 | [diff] [blame] | 26 | from event_bus_publisher import EventBusPublisher |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 27 | from python.common.utils.registry import IComponent |
| 28 | from confluent_kafka import Consumer, KafkaError |
| 29 | import threading |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 30 | |
| 31 | log = get_logger() |
| 32 | |
| 33 | |
| 34 | @implementer(IComponent) |
| 35 | class KafkaProxy(object): |
| 36 | """ |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 37 | This is a singleton proxy kafka class to hide the kafka client details. This |
| 38 | proxy uses confluent-kafka-python as the kafka client. Since that client is |
| 39 | not a Twisted client then requests to that client are wrapped with |
| 40 | twisted.internet.threads.deferToThread to avoid any potential blocking of |
| 41 | the Twisted loop. |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 42 | """ |
| 43 | _kafka_instance = None |
| 44 | |
| 45 | def __init__(self, |
| 46 | consul_endpoint='localhost:8500', |
| 47 | kafka_endpoint='localhost:9092', |
| 48 | ack_timeout=1000, |
| 49 | max_req_attempts=10, |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 50 | consumer_poll_timeout=10, |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 51 | config={}): |
| 52 | |
| 53 | # return an exception if the object already exist |
| 54 | if KafkaProxy._kafka_instance: |
| 55 | raise Exception('Singleton exist for :{}'.format(KafkaProxy)) |
| 56 | |
| 57 | log.debug('initializing', endpoint=kafka_endpoint) |
| 58 | self.ack_timeout = ack_timeout |
| 59 | self.max_req_attempts = max_req_attempts |
| 60 | self.consul_endpoint = consul_endpoint |
| 61 | self.kafka_endpoint = kafka_endpoint |
| 62 | self.config = config |
| 63 | self.kclient = None |
| 64 | self.kproducer = None |
| 65 | self.event_bus_publisher = None |
| 66 | self.stopping = False |
| 67 | self.faulty = False |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 68 | self.consumer_poll_timeout = consumer_poll_timeout |
| 69 | self.topic_consumer_map = {} |
| 70 | self.topic_callbacks_map = {} |
| 71 | self.topic_any_map_lock = threading.Lock() |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 72 | log.debug('initialized', endpoint=kafka_endpoint) |
| 73 | |
| 74 | @inlineCallbacks |
| 75 | def start(self): |
| 76 | log.debug('starting') |
| 77 | self._get_kafka_producer() |
| 78 | KafkaProxy._kafka_instance = self |
| 79 | self.event_bus_publisher = yield EventBusPublisher( |
| 80 | self, self.config.get('event_bus_publisher', {})).start() |
| 81 | log.info('started') |
| 82 | KafkaProxy.faulty = False |
| 83 | self.stopping = False |
| 84 | returnValue(self) |
| 85 | |
| 86 | @inlineCallbacks |
| 87 | def stop(self): |
| 88 | try: |
| 89 | log.debug('stopping-kafka-proxy') |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 90 | self.stopping = True |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 91 | try: |
| 92 | if self.kclient: |
| 93 | yield self.kclient.close() |
| 94 | self.kclient = None |
| 95 | log.debug('stopped-kclient-kafka-proxy') |
| 96 | except Exception, e: |
| 97 | log.exception('failed-stopped-kclient-kafka-proxy', e=e) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 98 | |
| 99 | try: |
| 100 | if self.kproducer: |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 101 | yield self.kproducer.flush() |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 102 | self.kproducer = None |
| 103 | log.debug('stopped-kproducer-kafka-proxy') |
| 104 | except Exception, e: |
| 105 | log.exception('failed-stopped-kproducer-kafka-proxy', e=e) |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 106 | |
| 107 | # Stop all consumers |
| 108 | try: |
| 109 | self.topic_any_map_lock.acquire() |
| 110 | log.debug('stopping-consumers-kafka-proxy') |
| 111 | for _, c in self.topic_consumer_map.iteritems(): |
| 112 | yield deferToThread(c.close) |
| 113 | self.topic_consumer_map.clear() |
| 114 | self.topic_callbacks_map.clear() |
| 115 | log.debug('stopped-consumers-kafka-proxy') |
| 116 | except Exception, e: |
| 117 | log.exception('failed-stopped-consumers-kafka-proxy', e=e) |
| 118 | finally: |
| 119 | self.topic_any_map_lock.release() |
| 120 | log.debug('stopping-consumers-kafka-proxy-released-lock') |
| 121 | |
| 122 | # try: |
| 123 | # if self.event_bus_publisher: |
| 124 | # yield self.event_bus_publisher.stop() |
| 125 | # self.event_bus_publisher = None |
| 126 | # log.debug('stopped-event-bus-publisher-kafka-proxy') |
| 127 | # except Exception, e: |
| 128 | # log.debug('failed-stopped-event-bus-publisher-kafka-proxy') |
| 129 | # pass |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 130 | |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 131 | log.debug('stopped-kafka-proxy') |
| 132 | |
| 133 | except Exception, e: |
| 134 | self.kclient = None |
| 135 | self.kproducer = None |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 136 | # self.event_bus_publisher = None |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 137 | log.exception('failed-stopped-kafka-proxy', e=e) |
| 138 | pass |
| 139 | |
| 140 | def _get_kafka_producer(self): |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 141 | |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 142 | try: |
| 143 | |
| 144 | if self.kafka_endpoint.startswith('@'): |
| 145 | try: |
| 146 | _k_endpoint = get_endpoint_from_consul(self.consul_endpoint, |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 147 | self.kafka_endpoint[ |
| 148 | 1:]) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 149 | log.debug('found-kafka-service', endpoint=_k_endpoint) |
| 150 | |
| 151 | except Exception as e: |
| 152 | log.exception('no-kafka-service-in-consul', e=e) |
| 153 | |
| 154 | self.kproducer = None |
| 155 | self.kclient = None |
| 156 | return |
| 157 | else: |
| 158 | _k_endpoint = self.kafka_endpoint |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 159 | self.kproducer = _kafkaProducer( |
| 160 | {'bootstrap.servers': _k_endpoint, |
| 161 | } |
| 162 | ) |
| 163 | pass |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 164 | except Exception, e: |
| 165 | log.exception('failed-get-kafka-producer', e=e) |
| 166 | return |
| 167 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 168 | @inlineCallbacks |
| 169 | def _wait_for_messages(self, consumer, topic): |
| 170 | while True: |
| 171 | try: |
| 172 | msg = yield deferToThread(consumer.poll, |
| 173 | self.consumer_poll_timeout) |
khenaidoo | 9084792 | 2018-12-03 14:47:51 -0500 | [diff] [blame] | 174 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 175 | if self.stopping: |
| 176 | log.debug("stop-request-recieved", topic=topic) |
| 177 | break |
khenaidoo | 9084792 | 2018-12-03 14:47:51 -0500 | [diff] [blame] | 178 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 179 | if msg is None: |
| 180 | continue |
| 181 | if msg.error(): |
| 182 | # This typically is received when there are no more messages |
| 183 | # to read from kafka. Ignore. |
| 184 | continue |
| 185 | |
| 186 | # Invoke callbacks |
| 187 | for cb in self.topic_callbacks_map[topic]: |
| 188 | yield cb(msg) |
| 189 | except Exception as e: |
| 190 | log.debug("exception-receiving-msg", topic=topic, e=e) |
khenaidoo | 9084792 | 2018-12-03 14:47:51 -0500 | [diff] [blame] | 191 | |
| 192 | @inlineCallbacks |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 193 | def subscribe(self, topic, callback, groupId, offset='latest'): |
| 194 | """ |
| 195 | subscribe allows a caller to subscribe to a given kafka topic. This API |
| 196 | always create a group consumer. |
| 197 | :param topic - the topic to subscribe to |
| 198 | :param callback - the callback to invoke whenever a message is received |
| 199 | on that topic |
| 200 | :param groupId - the groupId for this consumer. In the current |
| 201 | implementation there is a one-to-one mapping between a topic and a |
| 202 | groupId. In other words, once a groupId is used for a given topic then |
| 203 | we won't be able to create another groupId for the same topic. |
| 204 | :param offset: the kafka offset from where the consumer will start |
| 205 | consuming messages |
| 206 | """ |
| 207 | try: |
| 208 | self.topic_any_map_lock.acquire() |
| 209 | if topic in self.topic_consumer_map: |
| 210 | # Just add the callback |
| 211 | if topic in self.topic_callbacks_map: |
| 212 | self.topic_callbacks_map[topic].append(callback) |
| 213 | else: |
| 214 | self.topic_callbacks_map[topic] = [callback] |
| 215 | return |
khenaidoo | 9084792 | 2018-12-03 14:47:51 -0500 | [diff] [blame] | 216 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 217 | # Create consumer for that topic |
| 218 | c = Consumer({ |
| 219 | 'bootstrap.servers': self.kafka_endpoint, |
| 220 | 'group.id': groupId, |
| 221 | 'auto.offset.reset': offset |
| 222 | }) |
| 223 | yield deferToThread(c.subscribe, [topic]) |
| 224 | # c.subscribe([topic]) |
| 225 | self.topic_consumer_map[topic] = c |
| 226 | self.topic_callbacks_map[topic] = [callback] |
| 227 | # Start the consumer |
| 228 | reactor.callLater(0, self._wait_for_messages, c, topic) |
| 229 | except Exception, e: |
| 230 | log.exception("topic-subscription-error", e=e) |
| 231 | finally: |
| 232 | self.topic_any_map_lock.release() |
khenaidoo | 9084792 | 2018-12-03 14:47:51 -0500 | [diff] [blame] | 233 | |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 234 | @inlineCallbacks |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 235 | def unsubscribe(self, topic, callback): |
| 236 | """ |
| 237 | Unsubscribe to a given topic. Since there they be multiple callers |
| 238 | consuming from the same topic then to ensure only the relevant caller |
| 239 | gets unsubscribe then the callback is used as a differentiator. The |
| 240 | kafka consumer will be closed when there are no callbacks required. |
| 241 | :param topic: topic to unsubscribe |
| 242 | :param callback: callback the caller used when subscribing to the topic. |
| 243 | If multiple callers have subscribed to a topic using the same callback |
| 244 | then the first callback on the list will be removed. |
| 245 | :return:None |
| 246 | """ |
| 247 | try: |
| 248 | self.topic_any_map_lock.acquire() |
| 249 | log.debug("unsubscribing-to-topic", topic=topic) |
| 250 | if topic in self.topic_callbacks_map: |
| 251 | index = 0 |
| 252 | for cb in self.topic_callbacks_map[topic]: |
| 253 | if cb == callback: |
| 254 | break |
| 255 | index += 1 |
| 256 | if index < len(self.topic_callbacks_map[topic]): |
| 257 | self.topic_callbacks_map[topic].pop(index) |
| 258 | |
| 259 | if len(self.topic_callbacks_map[topic]) == 0: |
| 260 | # Stop the consumer |
| 261 | if topic in self.topic_consumer_map: |
| 262 | yield deferToThread( |
| 263 | self.topic_consumer_map[topic].close) |
| 264 | del self.topic_consumer_map[topic] |
| 265 | del self.topic_callbacks_map[topic] |
| 266 | log.debug("unsubscribed-to-topic", topic=topic) |
| 267 | else: |
| 268 | log.debug("consumers-for-topic-still-exist", topic=topic, |
| 269 | num=len(self.topic_callbacks_map[topic])) |
| 270 | except Exception, e: |
| 271 | log.exception("topic-unsubscription-error", e=e) |
| 272 | finally: |
| 273 | self.topic_any_map_lock.release() |
| 274 | log.debug("unsubscribing-to-topic-release-lock", topic=topic) |
| 275 | |
| 276 | @inlineCallbacks |
| 277 | def send_message(self, topic, msg, key=None): |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 278 | assert topic is not None |
| 279 | assert msg is not None |
| 280 | |
| 281 | # first check whether we have a kafka producer. If there is none |
| 282 | # then try to get one - this happens only when we try to lookup the |
| 283 | # kafka service from consul |
| 284 | try: |
| 285 | if self.faulty is False: |
| 286 | |
| 287 | if self.kproducer is None: |
| 288 | self._get_kafka_producer() |
| 289 | # Lets the next message request do the retry if still a failure |
| 290 | if self.kproducer is None: |
khenaidoo | 6fdf0ba | 2018-11-02 14:38:33 -0400 | [diff] [blame] | 291 | log.error('no-kafka-producer', |
| 292 | endpoint=self.kafka_endpoint) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 293 | return |
| 294 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 295 | log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 296 | msgs = [msg] |
| 297 | |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 298 | if self.kproducer is not None and self.event_bus_publisher and self.faulty is False: |
| 299 | d = deferToThread(self.kproducer.produce, topic, msg, key) |
| 300 | yield d |
| 301 | log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg) |
| 302 | # send a lightweight poll to avoid an exception after 100k messages. |
| 303 | d1 = deferToThread(self.kproducer.poll, 0) |
| 304 | yield d1 |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 305 | else: |
| 306 | return |
| 307 | |
| 308 | except Exception, e: |
| 309 | self.faulty = True |
khenaidoo | ca30132 | 2019-01-09 23:06:32 -0500 | [diff] [blame] | 310 | log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg, |
| 311 | e=e) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 312 | |
| 313 | # set the kafka producer to None. This is needed if the |
| 314 | # kafka docker went down and comes back up with a different |
| 315 | # port number. |
| 316 | if self.stopping is False: |
| 317 | log.debug('stopping-kafka-proxy') |
| 318 | try: |
| 319 | self.stopping = True |
| 320 | self.stop() |
| 321 | self.stopping = False |
| 322 | self.faulty = False |
| 323 | log.debug('stopped-kafka-proxy') |
| 324 | except Exception, e: |
| 325 | log.exception('failed-stopping-kafka-proxy', e=e) |
| 326 | pass |
| 327 | else: |
| 328 | log.info('already-stopping-kafka-proxy') |
| 329 | |
| 330 | return |
| 331 | |
| 332 | def is_faulty(self): |
| 333 | return self.faulty |
| 334 | |
| 335 | |
| 336 | # Common method to get the singleton instance of the kafka proxy class |
| 337 | def get_kafka_proxy(): |
| 338 | return KafkaProxy._kafka_instance |