blob: 64277300bec68302d65d2c064ea717a38904124c [file] [log] [blame]
Martin Cosynsbf2daa02021-09-29 13:23:45 +02001# Copyright 2020-present Open Networking Foundation
2# Original copyright 2020-present ADTRAN, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14import os
15import ssl
16import six
17import json
18import time
19import uuid
20import kafka
21import getpass
22import logging
23import logging.config
24import tempfile
25import threading
26import pkg_resources
27
28from datetime import datetime
29import robot
30from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
31from robot.utils import robottime
32from robot.api.deco import keyword
33
34
35def _package_version_get(package_name, source=None):
36 """
37 Returns the installed version number for the given pip package with name _package_name_.
38 """
39
40 if source:
41 head, tail = os.path.split(os.path.dirname(os.path.abspath(source)))
42
43 while tail:
44 try:
45 with open(os.path.join(head, 'VERSION')) as version_file:
46 return version_file.read().strip()
47 except Exception:
48 head, tail = os.path.split(head)
49
50 try:
51 return pkg_resources.get_distribution(package_name).version
52 except pkg_resources.DistributionNotFound:
53 raise NameError("Package '%s' is not installed!" % package_name)
54
55
56class FatalError(RuntimeError):
57 ROBOT_EXIT_ON_FAILURE = True
58
59
60class KafkaRecord(object):
61 """
62 Wrapper for Kafka message
63 """
64
65 def __init__(self, customer_record):
66 self._org_record = customer_record
67
68 @property
69 def topic(self):
70 return self._org_record.topic
71
72 @property
73 def value(self):
74 return self._org_record.value
75
76 @property
77 def key(self):
78 return self._org_record.key
79
80 @property
81 def offset(self):
82 return self._org_record.offset
83
84 @property
85 def timestamp(self):
86 return self._org_record.timestamp
87
88
89class KafkaListener(threading.Thread):
90 """
91 Internal helper class used by this client.
92 """
93
94 def __init__(self, topic_name, callback, consumer, **kwargs):
95 """
96 - topic_name: The Kafka topic to be listened to
97 - callback: The function to be executed when Kafka message arrives
98 - consumer: The Kafka consumer object of type kafka.KafkaConsumer()
99 - **kwargs: The keyword arguments:
100 - timestamp_from: To be used in the callback function, description in KafkaClient.subscribe()
101 """
102 super(KafkaListener, self).__init__()
103
104 self.setDaemon(True)
105
106 self._topic = topic_name
107 self._cb_func = callback
108 self._consumer = consumer
109 self._kwargs = kwargs
110 self.id = str(uuid.uuid4()).replace('-', '')
111
112 self.error = []
113
114 def run(self):
115 try:
116 self._consumer.subscribe([self._topic])
117 for record in self._consumer:
118 try:
119 self._cb_func(KafkaRecord(record), **self._kwargs)
120 except Exception as e:
121 logging.error('Error while deliver message: %s' % e)
122 self.stop()
123 except Exception:
124 pass
125
126 def stop(self):
127 if self._consumer is not None:
128 self._close_consumer()
129
130 def _close_consumer(self):
131 # try:
132 # self._consumer.unsubscribe()
133 # except Exception as e:
134 # self.error.append(str(e))
135 # print(self.error)
136
137 try:
138 self._consumer.close(autocommit=False)
139 except Exception as e:
140 self.error.append(str(e))
141
142
143class KafkaClient(object):
144 """ Constants """
145 ROBOT_LIBRARY_VERSION = '0.0'
146 ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
147
148 global_init = 0
149 global_timeout = 120
150 min_robot_version = 30202
151
152 try:
153 ROBOT_LIBRARY_VERSION = _package_version_get('kafka_robot')
154 except NameError:
155 ROBOT_LIBRARY_VERSION = 'unknown'
156
157 def __init__(self, **kwargs):
158 """
159 Constructor
160 """
161 super(KafkaClient, self).__init__()
162 self._record_list = []
163 self._listeners = {}
164
165 self._host = None
166 self._port = None
167 self._cert_path = None
168 self._consumer = None
169 self._consumer_config = kafka.KafkaConsumer.DEFAULT_CONFIG
170 self._topic_name = None
171
172 self._enable_logging(kwargs.get('root_logging', False), kwargs.get('log_level', 'INFO'))
173 logging.getLogger('kafka').propagate = False
174 self._log = logging.getLogger('kafka.conn')
175
176 # set default ssl context to accept any (self signed) certificate
177 ssl_ctx = ssl.create_default_context()
178 ssl_ctx.check_hostname = False
179 ssl_ctx.verify_mode = ssl.CERT_NONE
180
181 self._consumer_config['ssl_context'] = ssl_ctx
182
183 def _check_robot_version(self, min_robot_version):
184 """
185 This method verifies the Min Robot version required to run
186
187 *Parameter* :
188 - min_robot_version : <string> ; Minimum robot version is: 20801
189
190 *Return* : None if no errors else error message
191 """
192
193 if self._get_robot_version() < int(min_robot_version):
194 raise FatalError('wrong robot version: %s' % robot.get_version())
195
196 @staticmethod
197 def _get_robot_version():
198 """
199 This method gets the Min Robot version required to run
200
201 *Parameter* : None
202
203 *Return* : None if no errors else error message
204 """
205 version = robot.get_version().split('.')
206 if len(version) == 2:
207 return int(version[0]) * 10000 + int(version[1]) * 100
208 elif len(version) == 3:
209 return int(version[0]) * 10000 + \
210 int(version[1]) * 100 + int(version[2])
211 else:
212 return 0
213
214 @staticmethod
215 def _robot_bool_convert(robot_bool):
216 """
217 Converts unicode to boolean or returns unchanged input variable if it is
218 boolean already.
219
220 :param robot_bool: value to be converted
221 :return: Input param converted to boolean
222 """
223 real_bool = robot_bool
224 if not isinstance(robot_bool, bool):
225 robot_bool = str(robot_bool)
226 if robot_bool.lower() == "true":
227 real_bool = True
228 else:
229 real_bool = False
230 return real_bool
231
232 @keyword
233 def library_version_get(self):
234 """
235 Retrieve the version of the currently running library instance.
236
237 *Returns*: version string consisting of three dot-separated numbers (x.y.z)
238 """
239 return self.ROBOT_LIBRARY_VERSION
240
241 def __del__(self):
242 if self._host is not None:
243 self.connection_close()
244
245 @staticmethod
246 def _enable_logging(root_logging, log_level):
247
248 try:
249 log_dir = BuiltIn().replace_variables('${OUTPUT_DIR}')
250 except RobotNotRunningError:
251 log_dir = tempfile.gettempdir()
252
253 try:
254 logfile_name = os.path.join(log_dir, 'kafka_robot_%s.log' % getpass.getuser())
255 except KeyError:
256 logfile_name = os.path.join(log_dir, 'kafka_robot.log')
257
258 logging_dict = {
259 'version': 1,
260 'disable_existing_loggers': False,
261
262 'formatters': {
263 'standard': {
264 'format': '%(asctime)s %(name)s [%(levelname)s] : %(message)s'
265 }
266 },
267 'handlers': {
268 'kafka_file': {
269 'level': log_level.upper(),
270 'class': 'logging.FileHandler',
271 'mode': 'w',
272 'filename': logfile_name,
273 'formatter': 'standard'
274 }
275 },
276 'loggers': {
277 'kafka': {
278 'handlers': ['kafka_file'],
279 'level': log_level.upper(),
280 'propagate': False
281 }
282 }
283 }
284 if not root_logging:
285 logging_dict['loggers'].update({'': {'level': 'NOTSET'}})
286 logging.config.dictConfig(logging_dict)
287
288 def connection_open(self, host, port='9093', topic_name='', **kwargs):
289 """
290 Opens a connection to the Kafka host.
291
292 *Parameters*:
293 - host: <string>|<IP address>; Name or IP address of the Kafka host.
294 - port: <number>; TCP port of the Kafka host. Default: 9093.
295 - topic_name: <string>; The name of the topic to listen on; optional. If not set, the keyword _subscribe_ can be used to set the subscription.
296
297 *Named parameters*:
298 - timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
299 (e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
300 record list. If 0 then the filtering is switched off and received Kafka records are collected. If
301 _timestamp_from_ is not set then the library will use the current time for the filter.
302
303 To set connection parameters such as certificates or authentication methods use _configuration_set_, e.g. for configuring ssl use
304
305 | Configuration Set | ssl_cafile=nms_cachain.crt | ssl_certfile=nms_suite.crt | ssl_keyfile=nms_suite.key | ssl_check_hostname=${False} |
306
307 *Return*: The ID of the subscription, if the parameter _topic_name_ is set. This ID can be used to unsubscribe.
308 """
309 self._host = host
310 self._port = port
311
312 self.configuration_set(bootstrap_servers='%s:%s' % (self._host, int(self._port)))
313
314 if topic_name:
315 return self.subscribe(topic_name=topic_name, **kwargs)
316
317 def connection_close(self):
318 """
319 Closes the connection to the Kafka host. Stops all running listener to Kafka subscriptions.
320 """
321 for _, l in self._listeners.items():
322 l.stop()
323
324 self._host = None
325 self._port = None
326 self._cert_path = None
327
328 @staticmethod
329 def _get_timestamp(timestamp):
330
331 if timestamp is None or timestamp == '':
332 timestamp = datetime.now()
333
334 try:
335 timestamp = int(time.mktime(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timetuple()) * 1000)
336 except TypeError:
337 try:
338 timestamp = int(time.mktime(timestamp.timetuple()) * 1000)
339 except AttributeError:
340 timestamp = int(timestamp)
341 except ValueError:
342 try:
343 timestamp = int(timestamp)
344 except ValueError:
345 raise ValueError('"%s" is not a valid input: should be of format "YYYY-MM-DD hh:mm:ss"' % timestamp)
346
347 return timestamp
348
349 def subscribe(self, topic_name, **kwargs): # type: (str) -> str
350 """
351 Subscribe for an external topic and starts a listener on this topic. All records received in this listener are
352 stored in a record list, which can be retrieved with keyword _records_get_. Records are recorded until it is
353 unsubscribed from the topic.
354
355 *Parameters*:
356 - topic_name: <string>; The name of the topic to listen on.
357
358 *Named parameters*:
359 - timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
360 (e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
361 record list. If 0 then the filtering is switched off and received Kafka records are collected. If
362 _timestamp_from_ is not set then the library will use the current time for the filter.
363 - human_readable_timestamps:
364
365 *Return*: The ID of the subscription. This ID can be used to unsubscribe.
366 """
367 kwargs['timestamp_from'] = self._get_timestamp(kwargs.get('timestamp_from'))
368
369 consumer = self._create_consumer()
370 listener = KafkaListener(topic_name=topic_name, callback=self._add_record, consumer=consumer, **kwargs) # type: KafkaListener
371 self._listeners[listener.id] = listener
372 listener.start()
373 return listener.id
374
375 def unsubscribe(self, subscription_id):
376 """
377 Stops the listener and unsubscribes from an external topic.
378
379 *Parameters*:
380 - subscription_id: <string>; Subscription ID got from keyword _subscribe_.
381 """
382 listener = self._listeners.get(subscription_id) # type: KafkaListener
383 if listener is not None:
384 listener.stop()
385 del self._listeners[subscription_id]
386
387 if listener.error:
388 raise Exception(', '.join(listener.error))
389
390 def _records_wait_for(self, topic_name, number_of_records, wait_time='0', require_number_of_records='true'):
391 """
392 A lock function that waits for the specified _number_of_records_ on a topic or until the _wait_time_
393 is expired. The keyword subscribes for the Kafka topic. So it is not necessary to subscribe with the
394 _Subscribe_ keyword before.
395
396 *Parameter*:
397 - topic_name: <string>; The name of the topic to listen on.
398 - number_of_records: <number>; Number of records to be waited for.
399 - wait_time: <string>; Time to wait for the records. Default: _0_. If _0_ then the keyword waits forever for
400 the number of records, if not _0_ then it waits at maximum _wait_time_.
401 - require_number_of_records: _false_, _true_; Whether or not to check if list to return contains number of
402 records. If not an AssertionError is raised.
403
404 *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
405 - topic: Name of the topic
406 - timestamp: Number of seconds since Jan 01 1970 (UTC)
407 - message: Message content as dictionary
408 """
409 return_list = []
410
411 consumer = self._create_consumer()
412 consumer.subscribe(topics=[topic_name])
413 records = consumer.poll(timeout_ms=1000 * robottime.timestr_to_secs(wait_time), max_records=int(number_of_records))
414 consumer.unsubscribe()
415 consumer.close(autocommit=False)
416
417 for _, v in records.items():
418 return_list.extend(v)
419
420 if require_number_of_records == 'true':
421 if len(return_list) != number_of_records:
422 raise AssertionError('returned list does not contain expected number of records')
423
424 return [json.loads(r.value) for r in return_list]
425
426 def records_get(self, topic_name='all'):
427 """
428 Retrieves the list of records for subscribed topics.
429
430 *Parameters*:
431 - topic_name: <string>, _all_; The name of the topic to retrieve from record list. Default: _all_.
432
433 *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
434 - topic: Name of the topic
435 - timestamp: Number of milliseconds since Jan 01 1970, midnight (UTC) as string format "yyyy-mm-dd hh:MM:ss.ffffff"
436 - message: Message content as dictionary
437 """
438 if not topic_name or topic_name == 'all':
439 return self._record_list
440 else:
441 return [r for r in self._record_list if r.get('topic') == topic_name]
442
443 def records_clear(self, topic_name='all'):
444 """
445 Clears the list of records for subscribed topics.
446
447 *Parameters*:
448 - topic_name: <string>, _all_; The name of the topic to remove from record list. Default: _all_.
449 """
450 if not topic_name or topic_name == 'all':
451 self._record_list = []
452 else:
453 self._record_list = [r for r in self._record_list if r.get('topic') != topic_name]
454
455 def configuration_set(self, **kwargs):
456 """
457 Available setting with example values. To get actual used setting use _Configuration Get_.
458 Values must be supplied using there correct type (e.g. int, str, bool)
459
460 - 'bootstrap_servers': 'localhost',
461 - 'client_id': 'kafka-python-' + __version__,
462 - 'group_id': None,
463 - 'key_deserializer': None,
464 - 'value_deserializer': None,
465 - 'fetch_max_wait_ms': 500,
466 - 'fetch_min_bytes': 1,
467 - 'fetch_max_bytes': 52428800,
468 - 'max_partition_fetch_bytes': 1 * 1024 * 1024,
469 - 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
470 - 'retry_backoff_ms': 100,
471 - 'reconnect_backoff_ms': 50,
472 - 'reconnect_backoff_max_ms': 1000,
473 - 'max_in_flight_requests_per_connection': 5,
474 - 'auto_offset_reset': 'latest',
475 - 'enable_auto_commit': True,
476 - 'auto_commit_interval_ms': 5000,
477 - 'default_offset_commit_callback': lambda offsets, response: True,
478 - 'check_crcs': True,
479 - 'metadata_max_age_ms': 5 * 60 * 1000,
480 - 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
481 - 'max_poll_records': 500,
482 - 'max_poll_interval_ms': 300000,
483 - 'session_timeout_ms': 10000,
484 - 'heartbeat_interval_ms': 3000,
485 - 'receive_buffer_bytes': None,
486 - 'send_buffer_bytes': None,
487 - 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
488 - 'consumer_timeout_ms': float('inf'),
489 - 'skip_double_compressed_messages': False,
490 - 'security_protocol': 'PLAINTEXT',
491 - 'ssl_context': None,
492 - 'ssl_check_hostname': True,
493 - 'ssl_cafile': None,
494 - 'ssl_certfile': None,
495 - 'ssl_keyfile': None,
496 - 'ssl_crlfile': None,
497 - 'ssl_password': None,
498 - 'api_version': None,
499 - 'api_version_auto_timeout_ms': 2000,
500 - 'connections_max_idle_ms': 9 * 60 * 1000,
501 - 'metric_reporters': [],
502 - 'metrics_num_samples': 2,
503 - 'metrics_sample_window_ms': 30000,
504 - 'metric_group_prefix': 'consumer',
505 - 'selector': selectors.DefaultSelector,
506 - 'exclude_internal_topics': True,
507 - 'sasl_mechanism': None,
508 - 'sasl_plain_username': None,
509 - 'sasl_plain_password': None,
510 - 'sasl_kerberos_service_name': 'kafka'
511 """
512
513 for key, value in six.iteritems(kwargs):
514 self._consumer_config[key] = value
515
516 # disable default ssl context if user configures a ssl setting
517 if key.startswith('ssl') and key != 'ssl_context':
518 self._consumer_config['ssl_context'] = None
519
520 def configuration_get(self):
521 """
522 Shows the current setting of the Kafka interface. For available keys check keyword _Configuration Set_
523 """
524 return self._consumer_config
525
526 def _add_record(self, record, **kwargs):
527 try:
528
529 if kwargs.get('timestamp_from') == 0 or kwargs.get('timestamp_from') <= record.timestamp:
530
531 record = {
532 'topic': record.topic,
533 'timestamp': datetime.fromtimestamp(int(record.timestamp) / 1e3).strftime('%Y-%m-%d %H:%M:%S.%f'),
534 'timestamp_sec': record.timestamp,
535 'message': record.value
536 }
537
538 if record not in self._record_list:
539 self._record_list.append(record)
540
541 except Exception as e:
542 self._log.error(e)
543
544 def _create_consumer(self):
545 return kafka.KafkaConsumer(**self._consumer_config)