Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2017 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 18 | from __future__ import absolute_import |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 19 | from confluent_kafka import Producer as _kafkaProducer |
| 20 | from structlog import get_logger |
| 21 | from twisted.internet import reactor |
| 22 | from twisted.internet.defer import inlineCallbacks, returnValue |
| 23 | from twisted.internet.threads import deferToThread |
| 24 | from zope.interface import implementer |
| 25 | |
| 26 | from pyvoltha.common.utils.consulhelpers import get_endpoint_from_consul |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 27 | from .event_bus_publisher import EventBusPublisher |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 28 | from pyvoltha.common.utils.registry import IComponent |
| 29 | from confluent_kafka import Consumer, KafkaError |
| 30 | import threading |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 31 | import six |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 32 | |
| 33 | log = get_logger() |
| 34 | |
| 35 | |
| 36 | @implementer(IComponent) |
| 37 | class KafkaProxy(object): |
| 38 | """ |
| 39 | This is a singleton proxy kafka class to hide the kafka client details. This |
| 40 | proxy uses confluent-kafka-python as the kafka client. Since that client is |
| 41 | not a Twisted client then requests to that client are wrapped with |
| 42 | twisted.internet.threads.deferToThread to avoid any potential blocking of |
| 43 | the Twisted loop. |
| 44 | """ |
| 45 | _kafka_instance = None |
| 46 | |
| 47 | def __init__(self, |
| 48 | consul_endpoint='localhost:8500', |
| 49 | kafka_endpoint='localhost:9092', |
| 50 | ack_timeout=1000, |
| 51 | max_req_attempts=10, |
| 52 | consumer_poll_timeout=10, |
| 53 | config={}): |
| 54 | |
| 55 | # return an exception if the object already exist |
| 56 | if KafkaProxy._kafka_instance: |
| 57 | raise Exception('Singleton exist for :{}'.format(KafkaProxy)) |
| 58 | |
| 59 | log.debug('initializing', endpoint=kafka_endpoint) |
| 60 | self.ack_timeout = ack_timeout |
| 61 | self.max_req_attempts = max_req_attempts |
| 62 | self.consul_endpoint = consul_endpoint |
| 63 | self.kafka_endpoint = kafka_endpoint |
| 64 | self.config = config |
| 65 | self.kclient = None |
| 66 | self.kproducer = None |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 67 | self.kproducer_heartbeat = None |
| 68 | self.alive_state_handler = None |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 69 | self.event_bus_publisher = None |
| 70 | self.stopping = False |
| 71 | self.faulty = False |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 72 | self.alive = False |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 73 | self.consumer_poll_timeout = consumer_poll_timeout |
| 74 | self.topic_consumer_map = {} |
| 75 | self.topic_callbacks_map = {} |
| 76 | self.topic_any_map_lock = threading.Lock() |
| 77 | log.debug('initialized', endpoint=kafka_endpoint) |
| 78 | |
| 79 | @inlineCallbacks |
| 80 | def start(self): |
| 81 | log.debug('starting') |
| 82 | self._get_kafka_producer() |
| 83 | KafkaProxy._kafka_instance = self |
| 84 | self.event_bus_publisher = yield EventBusPublisher( |
| 85 | self, self.config.get('event_bus_publisher', {})).start() |
| 86 | log.info('started') |
| 87 | KafkaProxy.faulty = False |
| 88 | self.stopping = False |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 89 | self.alive = True |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 90 | returnValue(self) |
| 91 | |
| 92 | @inlineCallbacks |
| 93 | def stop(self): |
| 94 | try: |
| 95 | log.debug('stopping-kafka-proxy') |
| 96 | self.stopping = True |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 97 | self.alive = False |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 98 | try: |
| 99 | if self.kclient: |
| 100 | yield self.kclient.close() |
| 101 | self.kclient = None |
| 102 | log.debug('stopped-kclient-kafka-proxy') |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 103 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 104 | log.exception('failed-stopped-kclient-kafka-proxy', e=e) |
| 105 | |
| 106 | try: |
| 107 | if self.kproducer: |
| 108 | yield self.kproducer.flush() |
| 109 | self.kproducer = None |
| 110 | log.debug('stopped-kproducer-kafka-proxy') |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 111 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 112 | log.exception('failed-stopped-kproducer-kafka-proxy', e=e) |
| 113 | |
| 114 | # Stop all consumers |
| 115 | try: |
| 116 | self.topic_any_map_lock.acquire() |
| 117 | log.debug('stopping-consumers-kafka-proxy') |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 118 | for _, c in six.iteritems(self.topic_consumer_map): |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 119 | yield deferToThread(c.close) |
| 120 | self.topic_consumer_map.clear() |
| 121 | self.topic_callbacks_map.clear() |
| 122 | log.debug('stopped-consumers-kafka-proxy') |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 123 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 124 | log.exception('failed-stopped-consumers-kafka-proxy', e=e) |
| 125 | finally: |
| 126 | self.topic_any_map_lock.release() |
| 127 | log.debug('stopping-consumers-kafka-proxy-released-lock') |
| 128 | |
| 129 | # try: |
| 130 | # if self.event_bus_publisher: |
| 131 | # yield self.event_bus_publisher.stop() |
| 132 | # self.event_bus_publisher = None |
| 133 | # log.debug('stopped-event-bus-publisher-kafka-proxy') |
| 134 | # except Exception, e: |
| 135 | # log.debug('failed-stopped-event-bus-publisher-kafka-proxy') |
| 136 | # pass |
| 137 | |
| 138 | log.debug('stopped-kafka-proxy') |
| 139 | |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 140 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 141 | self.kclient = None |
| 142 | self.kproducer = None |
| 143 | # self.event_bus_publisher = None |
| 144 | log.exception('failed-stopped-kafka-proxy', e=e) |
| 145 | pass |
| 146 | |
| 147 | def _get_kafka_producer(self): |
| 148 | |
| 149 | try: |
| 150 | |
| 151 | if self.kafka_endpoint.startswith('@'): |
| 152 | try: |
| 153 | _k_endpoint = get_endpoint_from_consul(self.consul_endpoint, |
| 154 | self.kafka_endpoint[ |
| 155 | 1:]) |
| 156 | log.debug('found-kafka-service', endpoint=_k_endpoint) |
| 157 | |
| 158 | except Exception as e: |
| 159 | log.exception('no-kafka-service-in-consul', e=e) |
| 160 | |
| 161 | self.kproducer = None |
| 162 | self.kclient = None |
| 163 | return |
| 164 | else: |
| 165 | _k_endpoint = self.kafka_endpoint |
| 166 | self.kproducer = _kafkaProducer( |
| 167 | {'bootstrap.servers': _k_endpoint, |
| 168 | } |
| 169 | ) |
| 170 | pass |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 171 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 172 | log.exception('failed-get-kafka-producer', e=e) |
| 173 | return |
| 174 | |
| 175 | @inlineCallbacks |
| 176 | def _wait_for_messages(self, consumer, topic): |
| 177 | while True: |
| 178 | try: |
| 179 | msg = yield deferToThread(consumer.poll, |
| 180 | self.consumer_poll_timeout) |
| 181 | |
| 182 | if self.stopping: |
| 183 | log.debug("stop-request-recieved", topic=topic) |
| 184 | break |
| 185 | |
| 186 | if msg is None: |
| 187 | continue |
| 188 | if msg.error(): |
| 189 | # This typically is received when there are no more messages |
| 190 | # to read from kafka. Ignore. |
| 191 | continue |
| 192 | |
| 193 | # Invoke callbacks |
| 194 | for cb in self.topic_callbacks_map[topic]: |
| 195 | yield cb(msg) |
| 196 | except Exception as e: |
| 197 | log.debug("exception-receiving-msg", topic=topic, e=e) |
| 198 | |
| 199 | @inlineCallbacks |
| 200 | def subscribe(self, topic, callback, groupId, offset='latest'): |
| 201 | """ |
| 202 | subscribe allows a caller to subscribe to a given kafka topic. This API |
| 203 | always create a group consumer. |
| 204 | :param topic - the topic to subscribe to |
| 205 | :param callback - the callback to invoke whenever a message is received |
| 206 | on that topic |
| 207 | :param groupId - the groupId for this consumer. In the current |
| 208 | implementation there is a one-to-one mapping between a topic and a |
| 209 | groupId. In other words, once a groupId is used for a given topic then |
| 210 | we won't be able to create another groupId for the same topic. |
| 211 | :param offset: the kafka offset from where the consumer will start |
| 212 | consuming messages |
| 213 | """ |
| 214 | try: |
| 215 | self.topic_any_map_lock.acquire() |
| 216 | if topic in self.topic_consumer_map: |
| 217 | # Just add the callback |
| 218 | if topic in self.topic_callbacks_map: |
| 219 | self.topic_callbacks_map[topic].append(callback) |
| 220 | else: |
| 221 | self.topic_callbacks_map[topic] = [callback] |
| 222 | return |
| 223 | |
| 224 | # Create consumer for that topic |
| 225 | c = Consumer({ |
| 226 | 'bootstrap.servers': self.kafka_endpoint, |
| 227 | 'group.id': groupId, |
| 228 | 'auto.offset.reset': offset |
| 229 | }) |
| 230 | yield deferToThread(c.subscribe, [topic]) |
| 231 | # c.subscribe([topic]) |
| 232 | self.topic_consumer_map[topic] = c |
| 233 | self.topic_callbacks_map[topic] = [callback] |
| 234 | # Start the consumer |
| 235 | reactor.callLater(0, self._wait_for_messages, c, topic) |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 236 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 237 | log.exception("topic-subscription-error", e=e) |
| 238 | finally: |
| 239 | self.topic_any_map_lock.release() |
| 240 | |
| 241 | @inlineCallbacks |
| 242 | def unsubscribe(self, topic, callback): |
| 243 | """ |
| 244 | Unsubscribe to a given topic. Since there they be multiple callers |
| 245 | consuming from the same topic then to ensure only the relevant caller |
| 246 | gets unsubscribe then the callback is used as a differentiator. The |
| 247 | kafka consumer will be closed when there are no callbacks required. |
| 248 | :param topic: topic to unsubscribe |
| 249 | :param callback: callback the caller used when subscribing to the topic. |
| 250 | If multiple callers have subscribed to a topic using the same callback |
| 251 | then the first callback on the list will be removed. |
| 252 | :return:None |
| 253 | """ |
| 254 | try: |
| 255 | self.topic_any_map_lock.acquire() |
| 256 | log.debug("unsubscribing-to-topic", topic=topic) |
| 257 | if topic in self.topic_callbacks_map: |
| 258 | index = 0 |
| 259 | for cb in self.topic_callbacks_map[topic]: |
| 260 | if cb == callback: |
| 261 | break |
| 262 | index += 1 |
| 263 | if index < len(self.topic_callbacks_map[topic]): |
| 264 | self.topic_callbacks_map[topic].pop(index) |
| 265 | |
| 266 | if len(self.topic_callbacks_map[topic]) == 0: |
| 267 | # Stop the consumer |
| 268 | if topic in self.topic_consumer_map: |
| 269 | yield deferToThread( |
| 270 | self.topic_consumer_map[topic].close) |
| 271 | del self.topic_consumer_map[topic] |
| 272 | del self.topic_callbacks_map[topic] |
| 273 | log.debug("unsubscribed-to-topic", topic=topic) |
| 274 | else: |
| 275 | log.debug("consumers-for-topic-still-exist", topic=topic, |
| 276 | num=len(self.topic_callbacks_map[topic])) |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 277 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 278 | log.exception("topic-unsubscription-error", e=e) |
| 279 | finally: |
| 280 | self.topic_any_map_lock.release() |
| 281 | log.debug("unsubscribing-to-topic-release-lock", topic=topic) |
| 282 | |
| 283 | @inlineCallbacks |
| 284 | def send_message(self, topic, msg, key=None): |
| 285 | assert topic is not None |
| 286 | assert msg is not None |
| 287 | |
| 288 | # first check whether we have a kafka producer. If there is none |
| 289 | # then try to get one - this happens only when we try to lookup the |
| 290 | # kafka service from consul |
| 291 | try: |
| 292 | if self.faulty is False: |
| 293 | |
| 294 | if self.kproducer is None: |
| 295 | self._get_kafka_producer() |
| 296 | # Lets the next message request do the retry if still a failure |
| 297 | if self.kproducer is None: |
| 298 | log.error('no-kafka-producer', |
| 299 | endpoint=self.kafka_endpoint) |
| 300 | return |
| 301 | |
William Kurkian | 79d1fee | 2019-04-11 11:26:10 -0400 | [diff] [blame] | 302 | log.debug('sending-kafka-msg', topic=topic) |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 303 | msgs = [msg] |
| 304 | |
| 305 | if self.kproducer is not None and self.event_bus_publisher and self.faulty is False: |
| 306 | d = deferToThread(self.kproducer.produce, topic, msg, key) |
| 307 | yield d |
William Kurkian | 79d1fee | 2019-04-11 11:26:10 -0400 | [diff] [blame] | 308 | log.debug('sent-kafka-msg', topic=topic) |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 309 | # send a lightweight poll to avoid an exception after 100k messages. |
| 310 | d1 = deferToThread(self.kproducer.poll, 0) |
| 311 | yield d1 |
| 312 | else: |
| 313 | return |
| 314 | |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 315 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 316 | self.faulty = True |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 317 | self.alive_state_handler.callback(self.alive) |
William Kurkian | 79d1fee | 2019-04-11 11:26:10 -0400 | [diff] [blame] | 318 | log.error('failed-to-send-kafka-msg', topic=topic, |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 319 | e=e) |
| 320 | |
| 321 | # set the kafka producer to None. This is needed if the |
| 322 | # kafka docker went down and comes back up with a different |
| 323 | # port number. |
| 324 | if self.stopping is False: |
| 325 | log.debug('stopping-kafka-proxy') |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 326 | self.alive = False |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 327 | try: |
| 328 | self.stopping = True |
| 329 | self.stop() |
| 330 | self.stopping = False |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 331 | log.debug('stopped-kafka-proxy') |
Zack Williams | 84a71e9 | 2019-11-15 09:00:19 -0700 | [diff] [blame] | 332 | except Exception as e: |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 333 | log.exception('failed-stopping-kafka-proxy', e=e) |
| 334 | pass |
| 335 | else: |
| 336 | log.info('already-stopping-kafka-proxy') |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 337 | return |
| 338 | |
Neha Sharma | ab9a908 | 2020-03-23 14:05:36 +0000 | [diff] [blame] | 339 | # sending heartbeat message to check the readiness |
| 340 | def send_heartbeat_message(self, topic, msg): |
| 341 | assert topic is not None |
| 342 | assert msg is not None |
| 343 | |
| 344 | try: |
| 345 | if self.kproducer_heartbeat is None: |
| 346 | if self.kafka_endpoint.startswith('@'): |
| 347 | _k_endpoint = get_endpoint_from_consul(self.consul_endpoint, |
| 348 | self.kafka_endpoint[ |
| 349 | 1:]) |
| 350 | else: |
| 351 | _k_endpoint = self.kafka_endpoint |
| 352 | |
| 353 | # Using 2 seconds timeout for heartbeat producer; default of 5 minutes is too long |
| 354 | self.kproducer_heartbeat = _kafkaProducer( |
| 355 | {'bootstrap.servers': _k_endpoint, |
| 356 | 'default.topic.config' : {'message.timeout.ms': 2000}, |
| 357 | } |
| 358 | ) |
| 359 | |
| 360 | log.debug('sending-kafka-heartbeat-message', topic=topic) |
| 361 | msgs = [msg] |
| 362 | |
| 363 | self.kproducer_heartbeat.produce(topic, msg, callback=self.handle_kafka_delivery_report) |
| 364 | |
| 365 | except Exception as e: |
| 366 | self.faulty = True |
| 367 | self.alive_state_handler.callback(self.alive) |
| 368 | log.error('failed-to-send-kafka-heartbeat-msg', e=e) |
| 369 | |
| 370 | def check_heartbeat_delivery(self): |
| 371 | try: |
| 372 | if self.kproducer_heartbeat is not None: |
| 373 | msg = self.kproducer_heartbeat.poll(0) |
| 374 | except Exception as e: |
| 375 | log.error('failed-to-check-heartbeat-msg-delivery', e=e) |
| 376 | self.faulty = True |
| 377 | |
| 378 | def handle_kafka_delivery_report(self, err, msg): |
| 379 | if err is not None : |
| 380 | # Log and notify only in event of alive status change |
| 381 | if self.alive is True: |
| 382 | log.info('failed-to-deliver-message', msg=msg.value(), err=err.str()) |
| 383 | self.alive_state_handler.callback(False) |
| 384 | self.alive = False |
| 385 | else : |
| 386 | if self.alive is not True: |
| 387 | log.info('message-delivered-successfully', msg=msg.value()) |
| 388 | self.alive_state_handler.callback(True) |
| 389 | self.alive = True |
| 390 | |
| 391 | def register_alive_state_update(self, defer_handler): |
| 392 | self.alive_state_handler = defer_handler |
| 393 | |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 394 | def is_faulty(self): |
| 395 | return self.faulty |
| 396 | |
| 397 | |
| 398 | # Common method to get the singleton instance of the kafka proxy class |
| 399 | def get_kafka_proxy(): |
| 400 | return KafkaProxy._kafka_instance |