blob: 1cd6e03744a7fd85018310588ea5017f9f2c70da [file] [log] [blame]
Martin Cosynsf88ed6e2020-12-02 10:30:10 +01001# Copyright 2020 ADTRAN, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13import os
14import ssl
15import six
16import json
17import uuid
18import kafka
19import getpass
20import logging
21import logging.config
22import tempfile
23import threading
24import pkg_resources
25
26from datetime import datetime
27import robot
28from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
29from robot.utils import robottime
30from robot.api.deco import keyword
31
32
33def _package_version_get(package_name, source=None):
34 """
35 Returns the installed version number for the given pip package with name _package_name_.
36 """
37
38 if source:
39 head, tail = os.path.split(os.path.dirname(os.path.abspath(source)))
40
41 while tail:
42 try:
43 with open(os.path.join(head, 'VERSION')) as version_file:
44 return version_file.read().strip()
45 except Exception:
46 head, tail = os.path.split(head)
47
48 try:
49 return pkg_resources.get_distribution(package_name).version
50 except pkg_resources.DistributionNotFound:
51 raise NameError("Package '%s' is not installed!" % package_name)
52
53
54class FatalError(RuntimeError):
55 ROBOT_EXIT_ON_FAILURE = True
56
57
58class KafkaRecord(object):
59 """
60 Wrapper for Kafka message
61 """
62
63 def __init__(self, customer_record):
64 self._org_record = customer_record
65
66 @property
67 def topic(self):
68 return self._org_record.topic
69
70 @property
71 def value(self):
72 return self._org_record.value
73
74 @property
75 def key(self):
76 return self._org_record.key
77
78 @property
79 def offset(self):
80 return self._org_record.offset
81
82 @property
83 def timestamp(self):
84 return self._org_record.timestamp
85
86
87class KafkaListener(threading.Thread):
88 """
89 Internal helper class used by this client.
90 """
91
92 def __init__(self, topic_name, callback, consumer):
93 super(KafkaListener, self).__init__()
94
95 self.setDaemon(True)
96
97 self._topic = topic_name
98 self._cb_func = callback
99 self._consumer = consumer
100 self.id = str(uuid.uuid4()).replace('-', '')
101
102 self.error = []
103
104 def run(self):
105 try:
106 self._consumer.subscribe([self._topic])
107 for record in self._consumer:
108 try:
109 self._cb_func(KafkaRecord(record))
110 except Exception as e:
111 logging.error('Error while deliver message: %s' % e)
112 self.stop()
113 except Exception:
114 pass
115
116 def stop(self):
117 if self._consumer is not None:
118 self._close_consumer()
119
120 def _close_consumer(self):
121 # try:
122 # self._consumer.unsubscribe()
123 # except Exception as e:
124 # self.error.append(str(e))
125 # print(self.error)
126
127 try:
128 self._consumer.close(autocommit=False)
129 except Exception as e:
130 self.error.append(str(e))
131
132
133class KafkaClient(object):
134 """ Constants """
135 ROBOT_LIBRARY_VERSION = '0.0'
136 ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
137
138 global_init = 0
139 global_timeout = 120
140 min_robot_version = 30202
141
142 try:
143 ROBOT_LIBRARY_VERSION = _package_version_get('kafka_robot')
144 except NameError:
145 ROBOT_LIBRARY_VERSION = 'unknown'
146
147 def __init__(self, **kwargs):
148 """
149 Constructor
150 """
151 super(KafkaClient, self).__init__()
152 self._record_list = []
153 self._listeners = {}
154
155 self._host = None
156 self._port = None
157 self._cert_path = None
158 self._consumer = None
159 self._consumer_config = kafka.KafkaConsumer.DEFAULT_CONFIG
160 self._topic_name = None
161
162 self._enable_logging(kwargs.get('root_logging', True))
163 logging.getLogger('kafka').propagate = False
164 self._log = logging.getLogger('kafka.conn')
165
166 # set default ssl context to accept any (self signed) certificate
167 ssl_ctx = ssl.create_default_context()
168 ssl_ctx.check_hostname = False
169 ssl_ctx.verify_mode = ssl.CERT_NONE
170
171 self._consumer_config['ssl_context'] = ssl_ctx
172
173 def _check_robot_version(self, min_robot_version):
174 """
175 This method verifies the Min Robot version required to run
176
177 *Parameter* :
178 - min_robot_version : <string> ; Minimum robot version is: 20801
179
180 *Return* : None if no errors else error message
181 """
182
183 if self._get_robot_version() < int(min_robot_version):
184 raise FatalError('wrong robot version: %s' % robot.get_version())
185
186 @staticmethod
187 def _get_robot_version():
188 """
189 This method gets the Min Robot version required to run
190
191 *Parameter* : None
192
193 *Return* : None if no errors else error message
194 """
195 version = robot.get_version().split('.')
196 if len(version) == 2:
197 return int(version[0]) * 10000 + int(version[1]) * 100
198 elif len(version) == 3:
199 return int(version[0]) * 10000 + \
200 int(version[1]) * 100 + int(version[2])
201 else:
202 return 0
203
204 @staticmethod
205 def _robot_bool_convert(robot_bool):
206 """
207 Converts unicode to boolean or returns unchanged input variable if it is
208 boolean already.
209
210 :param robot_bool: value to be converted
211 :return: Input param converted to boolean
212 """
213 real_bool = robot_bool
214 if not isinstance(robot_bool, bool):
215 robot_bool = str(robot_bool)
216 if robot_bool.lower() == "true":
217 real_bool = True
218 else:
219 real_bool = False
220 return real_bool
221
222 @keyword
223 def library_version_get(self):
224 """
225 Retrieve the version of the currently running library instance.
226
227 *Returns*: version string consisting of three dot-separated numbers (x.y.z)
228 """
229 return self.ROBOT_LIBRARY_VERSION
230
231 def __del__(self):
232 if self._host is not None:
233 self.connection_close()
234
235 @staticmethod
236 def _enable_logging(root_logging=True):
237
238 try:
239 log_dir = BuiltIn().replace_variables('${OUTPUT_DIR}')
240 except RobotNotRunningError:
241 log_dir = tempfile.gettempdir()
242
243 try:
244 logfile_name = os.path.join(log_dir, 'kafka_robot_%s.log' % getpass.getuser())
245 except KeyError:
246 logfile_name = os.path.join(log_dir, 'kafka_robot.log')
247
248 logging_dict = {
249 'version': 1,
250 'disable_existing_loggers': False,
251
252 'formatters': {
253 'standard': {
254 'format': '%(asctime)s %(name)s [%(levelname)s] : %(message)s'
255 }
256 },
257 'handlers': {
258 'kafka_file': {
259 'level': 'DEBUG',
260 'class': 'logging.FileHandler',
261 'mode': 'w',
262 'filename': logfile_name,
263 'formatter': 'standard'
264 }
265 },
266 'loggers': {
267 'kafka': {
268 'handlers': ['kafka_file'],
269 'level': 'DEBUG',
270 'propagate': False
271 }
272 }
273 }
274 if not root_logging:
275 logging_dict['loggers'].update({'': {'level': 'NOTSET'}})
276 logging.config.dictConfig(logging_dict)
277
278 def connection_open(self, host, port='9093', topic_name=''):
279 """
280 Opens a connection to the Kafka host.
281
282
283 *Parameters*:
284 - host: <string>|<IP address>; Name or IP address of the Kafka host.
285 - port: <number>; TCP port of the Kafka host. Default: 9093.
286 - topic_name: <string>; The name of the topic to listen on; optional. If not set, the keyword _subscribe_ can be used to set the subscription.
287
288 To set connection parameters such as certificates or authentication methods use _configuration_set_, e.g. for configuring ssl use
289
290 | Configuration Set | ssl_cafile=nms_cachain.crt | ssl_certfile=nms_suite.crt | ssl_keyfile=nms_suite.key | ssl_check_hostname=${False} |
291
292 *Return*: The ID of the subscription, if the parameter _topic_name_ is set. This ID can be used to unsubscribe.
293 """
294 self._host = host
295 self._port = port
296
297 self.configuration_set(bootstrap_servers='%s:%s' % (self._host, int(self._port)))
298
299 if topic_name:
300 return self.subscribe(topic_name=topic_name)
301
302 def connection_close(self):
303 """
304 Closes the connection to the Kafka host. Stops all running listener to Kafka subscriptions.
305 """
306 for _, l in self._listeners.items():
307 l.stop()
308
309 self._host = None
310 self._port = None
311 self._cert_path = None
312
313 def subscribe(self, topic_name): # type: (str) -> str
314 """
315 Subscribe for an external topic and starts a listener on this topic. All records received in this listener are
316 stored in a record list, which can be retrieved with keyword _records_get_. Records are recorded until it is
317 unsubscribed from the topic.
318
319 *Parameters*:
320 - topic_name: <string>; The name of the topic to listen on.
321
322 *Return*: The ID of the subscription. This ID can be used to unsubscribe.
323 """
324 consumer = self._create_consumer()
325 listener = KafkaListener(topic_name=topic_name, callback=self._add_record, consumer=consumer) # type: KafkaListener
326 self._listeners[listener.id] = listener
327 listener.start()
328 return listener.id
329
330 def unsubscribe(self, subscription_id):
331 """
332 Stops the listener and unsubscribes from an external topic.
333
334 *Parameters*:
335 - subscription_id: <string>; Subscription ID got from keyword _subscribe_.
336 """
337 listener = self._listeners.get(subscription_id) # type: KafkaListener
338 if listener is not None:
339 listener.stop()
340 del self._listeners[subscription_id]
341
342 if listener.error:
343 raise Exception(', '.join(listener.error))
344
345 def _records_wait_for(self, topic_name, number_of_records, wait_time='0', require_number_of_records='true'):
346 """
347 A lock function that waits for the specified _number_of_records_ on a topic or until the _wait_time_
348 is expired. The keyword subscribes for the Kafka topic. So it is not necessary to subscribe with the
349 _Subscribe_ keyword before.
350
351 *Parameter*:
352 - topic_name: <string>; The name of the topic to listen on.
353 - number_of_records: <number>; Number of records to be waited for.
354 - wait_time: <string>; Time to wait for the records. Default: _0_. If _0_ then the keyword waits forever for
355 the number of records, if not _0_ then it waits at maximum _wait_time_.
356 - require_number_of_records: _false_, _true_; Whether or not to check if list to return contains number of
357 records. If not an AssertionError is raised.
358
359 *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
360 - topic: Name of the topic
361 - timestamp: Number of seconds since Jan 01 1970 (UTC)
362 - message: Message content as dictionary
363 """
364 return_list = []
365
366 consumer = self._create_consumer()
367 consumer.subscribe(topics=[topic_name])
368 records = consumer.poll(timeout_ms=1000 * robottime.timestr_to_secs(wait_time), max_records=int(number_of_records))
369 consumer.unsubscribe()
370 consumer.close(autocommit=False)
371
372 for _, v in records.items():
373 return_list.extend(v)
374
375 if require_number_of_records == 'true':
376 if len(return_list) != number_of_records:
377 raise AssertionError('returned list does not contain expected number of records')
378
379 return [json.loads(r.value) for r in return_list]
380
381 def records_get(self, topic_name='all'):
382 """
383 Retrieves the list of records for subscribed topics.
384
385 *Parameters*:
386 - topic_name: <string>, _all_; The name of the topic to retrieve from record list. Default: _all_.
387
388 *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
389 - topic: Name of the topic
390 - timestamp: Number of milliseconds since Jan 01 1970, midnight (UTC) as string format "yyyy-mm-dd hh:MM:ss.ffffff"
391 - message: Message content as dictionary
392 """
393 if not topic_name or topic_name == 'all':
394 return self._record_list
395 else:
396 return [r for r in self._record_list if r.get('topic') == topic_name]
397
398 def records_clear(self, topic_name='all'):
399 """
400 Clears the list of records for subscribed topics.
401
402 *Parameters*:
403 - topic_name: <string>, _all_; The name of the topic to remove from record list. Default: _all_.
404 """
405 if not topic_name or topic_name == 'all':
406 self._record_list = []
407 else:
408 self._record_list = [r for r in self._record_list if r.get('topic') != topic_name]
409
410 def configuration_set(self, **kwargs):
411 """
412 Available setting with example values. To get actual used setting use _Configuration Get_.
413 Values must be supplied using there correct type (e.g. int, str, bool)
414
415 - 'bootstrap_servers': 'localhost',
416 - 'client_id': 'kafka-python-' + __version__,
417 - 'group_id': None,
418 - 'key_deserializer': None,
419 - 'value_deserializer': None,
420 - 'fetch_max_wait_ms': 500,
421 - 'fetch_min_bytes': 1,
422 - 'fetch_max_bytes': 52428800,
423 - 'max_partition_fetch_bytes': 1 * 1024 * 1024,
424 - 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
425 - 'retry_backoff_ms': 100,
426 - 'reconnect_backoff_ms': 50,
427 - 'reconnect_backoff_max_ms': 1000,
428 - 'max_in_flight_requests_per_connection': 5,
429 - 'auto_offset_reset': 'latest',
430 - 'enable_auto_commit': True,
431 - 'auto_commit_interval_ms': 5000,
432 - 'default_offset_commit_callback': lambda offsets, response: True,
433 - 'check_crcs': True,
434 - 'metadata_max_age_ms': 5 * 60 * 1000,
435 - 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
436 - 'max_poll_records': 500,
437 - 'max_poll_interval_ms': 300000,
438 - 'session_timeout_ms': 10000,
439 - 'heartbeat_interval_ms': 3000,
440 - 'receive_buffer_bytes': None,
441 - 'send_buffer_bytes': None,
442 - 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
443 - 'consumer_timeout_ms': float('inf'),
444 - 'skip_double_compressed_messages': False,
445 - 'security_protocol': 'PLAINTEXT',
446 - 'ssl_context': None,
447 - 'ssl_check_hostname': True,
448 - 'ssl_cafile': None,
449 - 'ssl_certfile': None,
450 - 'ssl_keyfile': None,
451 - 'ssl_crlfile': None,
452 - 'ssl_password': None,
453 - 'api_version': None,
454 - 'api_version_auto_timeout_ms': 2000,
455 - 'connections_max_idle_ms': 9 * 60 * 1000,
456 - 'metric_reporters': [],
457 - 'metrics_num_samples': 2,
458 - 'metrics_sample_window_ms': 30000,
459 - 'metric_group_prefix': 'consumer',
460 - 'selector': selectors.DefaultSelector,
461 - 'exclude_internal_topics': True,
462 - 'sasl_mechanism': None,
463 - 'sasl_plain_username': None,
464 - 'sasl_plain_password': None,
465 - 'sasl_kerberos_service_name': 'kafka'
466 """
467
468 for key, value in six.iteritems(kwargs):
469 self._consumer_config[key] = value
470
471 # disable default ssl context if user configures a ssl setting
472 if key.startswith('ssl') and key != 'ssl_context':
473 self._consumer_config['ssl_context'] = None
474
475 def configuration_get(self):
476 """
477 Shows the current setting of the Kafka interface. For available keys check keyword _Configuration Set_
478 """
479 return self._consumer_config
480
481 def _add_record(self, record):
482 try:
483 record = {
484 'topic': record.topic,
485 'timestamp': datetime.fromtimestamp(int(record.timestamp) / 1e3).strftime('%Y-%m-%d %H:%M:%S.%f'),
486 'message': record.value
487 }
488 except Exception as e:
489 self._log.error(e)
490
491 if record not in self._record_list:
492 self._record_list.append(record)
493
494 def _create_consumer(self):
495 return kafka.KafkaConsumer(**self._consumer_config)