blob: 10fdbf875aea08c5199d7cc3803e558ac12aab2c [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17from afkak.client import KafkaClient as _KafkaClient
18from afkak.common import (
19 PRODUCER_ACK_LOCAL_WRITE, PRODUCER_ACK_NOT_REQUIRED
20)
21from afkak.producer import Producer as _kafkaProducer
22from structlog import get_logger
23from twisted.internet.defer import inlineCallbacks, returnValue
24from zope.interface import implementer
25
26from adapters.common.utils.consulhelpers import get_endpoint_from_consul
27from adapters.kafka.event_bus_publisher import EventBusPublisher
28from adapters.common.utils.registry import IComponent
29import time
30
31log = get_logger()
32
33
34@implementer(IComponent)
35class KafkaProxy(object):
36 """
37 This is a singleton proxy kafka class to hide the kafka client details.
38 """
39 _kafka_instance = None
40
41 def __init__(self,
42 consul_endpoint='localhost:8500',
43 kafka_endpoint='localhost:9092',
44 ack_timeout=1000,
45 max_req_attempts=10,
46 config={}):
47
48 # return an exception if the object already exist
49 if KafkaProxy._kafka_instance:
50 raise Exception('Singleton exist for :{}'.format(KafkaProxy))
51
52 log.debug('initializing', endpoint=kafka_endpoint)
53 self.ack_timeout = ack_timeout
54 self.max_req_attempts = max_req_attempts
55 self.consul_endpoint = consul_endpoint
56 self.kafka_endpoint = kafka_endpoint
57 self.config = config
58 self.kclient = None
59 self.kproducer = None
60 self.event_bus_publisher = None
61 self.stopping = False
62 self.faulty = False
63 log.debug('initialized', endpoint=kafka_endpoint)
64
65 @inlineCallbacks
66 def start(self):
67 log.debug('starting')
68 self._get_kafka_producer()
69 KafkaProxy._kafka_instance = self
70 self.event_bus_publisher = yield EventBusPublisher(
71 self, self.config.get('event_bus_publisher', {})).start()
72 log.info('started')
73 KafkaProxy.faulty = False
74 self.stopping = False
75 returnValue(self)
76
77 @inlineCallbacks
78 def stop(self):
79 try:
80 log.debug('stopping-kafka-proxy')
81 try:
82 if self.kclient:
83 yield self.kclient.close()
84 self.kclient = None
85 log.debug('stopped-kclient-kafka-proxy')
86 except Exception, e:
87 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
88 pass
89
90 try:
91 if self.kproducer:
92 yield self.kproducer.stop()
93 self.kproducer = None
94 log.debug('stopped-kproducer-kafka-proxy')
95 except Exception, e:
96 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
97 pass
98
99 #try:
100 # if self.event_bus_publisher:
101 # yield self.event_bus_publisher.stop()
102 # self.event_bus_publisher = None
103 # log.debug('stopped-event-bus-publisher-kafka-proxy')
104 #except Exception, e:
105 # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
106 # pass
107
108 log.debug('stopped-kafka-proxy')
109
110 except Exception, e:
111 self.kclient = None
112 self.kproducer = None
113 #self.event_bus_publisher = None
114 log.exception('failed-stopped-kafka-proxy', e=e)
115 pass
116
117 def _get_kafka_producer(self):
118 # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
119 # to a local log before sending response
120 try:
121
122 if self.kafka_endpoint.startswith('@'):
123 try:
124 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
125 self.kafka_endpoint[1:])
126 log.debug('found-kafka-service', endpoint=_k_endpoint)
127
128 except Exception as e:
129 log.exception('no-kafka-service-in-consul', e=e)
130
131 self.kproducer = None
132 self.kclient = None
133 return
134 else:
135 _k_endpoint = self.kafka_endpoint
136
137 self.kclient = _KafkaClient(_k_endpoint)
138 self.kproducer = _kafkaProducer(self.kclient,
139 req_acks=PRODUCER_ACK_NOT_REQUIRED,
140 # req_acks=PRODUCER_ACK_LOCAL_WRITE,
141 # ack_timeout=self.ack_timeout,
142 # max_req_attempts=self.max_req_attempts)
143 )
144 except Exception, e:
145 log.exception('failed-get-kafka-producer', e=e)
146 return
147
148 @inlineCallbacks
149 def send_message(self, topic, msg):
150 assert topic is not None
151 assert msg is not None
152
153 # first check whether we have a kafka producer. If there is none
154 # then try to get one - this happens only when we try to lookup the
155 # kafka service from consul
156 try:
157 if self.faulty is False:
158
159 if self.kproducer is None:
160 self._get_kafka_producer()
161 # Lets the next message request do the retry if still a failure
162 if self.kproducer is None:
163 log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
164 return
165
166 # log.debug('sending-kafka-msg', topic=topic, msg=msg)
167 msgs = [msg]
168
169 if self.kproducer and self.kclient and \
170 self.event_bus_publisher and self.faulty is False:
171 # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
172
173 yield self.kproducer.send_messages(topic, msgs=msgs)
174 # self.kproducer.send_messages(topic, msgs=msgs)
175 # log.debug('sent-kafka-msg', topic=topic, msg=msg)
176 else:
177 return
178
179 except Exception, e:
180 self.faulty = True
181 log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
182
183 # set the kafka producer to None. This is needed if the
184 # kafka docker went down and comes back up with a different
185 # port number.
186 if self.stopping is False:
187 log.debug('stopping-kafka-proxy')
188 try:
189 self.stopping = True
190 self.stop()
191 self.stopping = False
192 self.faulty = False
193 log.debug('stopped-kafka-proxy')
194 except Exception, e:
195 log.exception('failed-stopping-kafka-proxy', e=e)
196 pass
197 else:
198 log.info('already-stopping-kafka-proxy')
199
200 return
201
202 def is_faulty(self):
203 return self.faulty
204
205
206# Common method to get the singleton instance of the kafka proxy class
207def get_kafka_proxy():
208 return KafkaProxy._kafka_instance
209