blob: 6dcb10fd619757e5d66e62584ecf37c1febde54e [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17from afkak.client import KafkaClient as _KafkaClient
18from afkak.common import (
khenaidoo6fdf0ba2018-11-02 14:38:33 -040019 PRODUCER_ACK_NOT_REQUIRED
khenaidoob9203542018-09-17 22:56:37 -040020)
21from afkak.producer import Producer as _kafkaProducer
22from structlog import get_logger
23from twisted.internet.defer import inlineCallbacks, returnValue
24from zope.interface import implementer
25
khenaidoofdbad6e2018-11-06 22:26:38 -050026from python.common.utils.consulhelpers import get_endpoint_from_consul
27from python.common.utils.registry import IComponent
28from event_bus_publisher import EventBusPublisher
khenaidoob9203542018-09-17 22:56:37 -040029
30log = get_logger()
31
32
33@implementer(IComponent)
34class KafkaProxy(object):
35 """
36 This is a singleton proxy kafka class to hide the kafka client details.
37 """
38 _kafka_instance = None
39
40 def __init__(self,
41 consul_endpoint='localhost:8500',
42 kafka_endpoint='localhost:9092',
43 ack_timeout=1000,
44 max_req_attempts=10,
45 config={}):
46
47 # return an exception if the object already exist
48 if KafkaProxy._kafka_instance:
49 raise Exception('Singleton exist for :{}'.format(KafkaProxy))
50
51 log.debug('initializing', endpoint=kafka_endpoint)
52 self.ack_timeout = ack_timeout
53 self.max_req_attempts = max_req_attempts
54 self.consul_endpoint = consul_endpoint
55 self.kafka_endpoint = kafka_endpoint
56 self.config = config
57 self.kclient = None
58 self.kproducer = None
59 self.event_bus_publisher = None
60 self.stopping = False
61 self.faulty = False
62 log.debug('initialized', endpoint=kafka_endpoint)
63
64 @inlineCallbacks
65 def start(self):
66 log.debug('starting')
67 self._get_kafka_producer()
68 KafkaProxy._kafka_instance = self
69 self.event_bus_publisher = yield EventBusPublisher(
70 self, self.config.get('event_bus_publisher', {})).start()
71 log.info('started')
72 KafkaProxy.faulty = False
73 self.stopping = False
74 returnValue(self)
75
76 @inlineCallbacks
77 def stop(self):
78 try:
79 log.debug('stopping-kafka-proxy')
80 try:
81 if self.kclient:
82 yield self.kclient.close()
83 self.kclient = None
84 log.debug('stopped-kclient-kafka-proxy')
85 except Exception, e:
86 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
87 pass
88
89 try:
90 if self.kproducer:
91 yield self.kproducer.stop()
92 self.kproducer = None
93 log.debug('stopped-kproducer-kafka-proxy')
94 except Exception, e:
95 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
96 pass
97
khenaidoob9203542018-09-17 22:56:37 -040098 log.debug('stopped-kafka-proxy')
99
100 except Exception, e:
101 self.kclient = None
102 self.kproducer = None
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400103 # self.event_bus_publisher = None
khenaidoob9203542018-09-17 22:56:37 -0400104 log.exception('failed-stopped-kafka-proxy', e=e)
105 pass
106
107 def _get_kafka_producer(self):
108 # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
109 # to a local log before sending response
110 try:
111
112 if self.kafka_endpoint.startswith('@'):
113 try:
114 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400115 self.kafka_endpoint[
116 1:])
khenaidoob9203542018-09-17 22:56:37 -0400117 log.debug('found-kafka-service', endpoint=_k_endpoint)
118
119 except Exception as e:
120 log.exception('no-kafka-service-in-consul', e=e)
121
122 self.kproducer = None
123 self.kclient = None
124 return
125 else:
126 _k_endpoint = self.kafka_endpoint
127
128 self.kclient = _KafkaClient(_k_endpoint)
129 self.kproducer = _kafkaProducer(self.kclient,
130 req_acks=PRODUCER_ACK_NOT_REQUIRED,
131 # req_acks=PRODUCER_ACK_LOCAL_WRITE,
132 # ack_timeout=self.ack_timeout,
133 # max_req_attempts=self.max_req_attempts)
134 )
135 except Exception, e:
136 log.exception('failed-get-kafka-producer', e=e)
137 return
138
139 @inlineCallbacks
140 def send_message(self, topic, msg):
141 assert topic is not None
142 assert msg is not None
143
144 # first check whether we have a kafka producer. If there is none
145 # then try to get one - this happens only when we try to lookup the
146 # kafka service from consul
147 try:
148 if self.faulty is False:
149
150 if self.kproducer is None:
151 self._get_kafka_producer()
152 # Lets the next message request do the retry if still a failure
153 if self.kproducer is None:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400154 log.error('no-kafka-producer',
155 endpoint=self.kafka_endpoint)
khenaidoob9203542018-09-17 22:56:37 -0400156 return
157
158 # log.debug('sending-kafka-msg', topic=topic, msg=msg)
159 msgs = [msg]
160
161 if self.kproducer and self.kclient and \
162 self.event_bus_publisher and self.faulty is False:
163 # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
164
165 yield self.kproducer.send_messages(topic, msgs=msgs)
166 # self.kproducer.send_messages(topic, msgs=msgs)
167 # log.debug('sent-kafka-msg', topic=topic, msg=msg)
168 else:
169 return
170
171 except Exception, e:
172 self.faulty = True
173 log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
174
175 # set the kafka producer to None. This is needed if the
176 # kafka docker went down and comes back up with a different
177 # port number.
178 if self.stopping is False:
179 log.debug('stopping-kafka-proxy')
180 try:
181 self.stopping = True
182 self.stop()
183 self.stopping = False
184 self.faulty = False
185 log.debug('stopped-kafka-proxy')
186 except Exception, e:
187 log.exception('failed-stopping-kafka-proxy', e=e)
188 pass
189 else:
190 log.info('already-stopping-kafka-proxy')
191
192 return
193
194 def is_faulty(self):
195 return self.faulty
196
197
198# Common method to get the singleton instance of the kafka proxy class
199def get_kafka_proxy():
200 return KafkaProxy._kafka_instance