move the library to ONF
Change-Id: I633991669051555f195b51021fb2ef697c51be39
diff --git a/kafka_robot/kafka_client.py b/kafka_robot/kafka_client.py
new file mode 100644
index 0000000..6427730
--- /dev/null
+++ b/kafka_robot/kafka_client.py
@@ -0,0 +1,545 @@
+# Copyright 2020-present Open Networking Foundation
+# Original copyright 2020-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+import os
+import ssl
+import six
+import json
+import time
+import uuid
+import kafka
+import getpass
+import logging
+import logging.config
+import tempfile
+import threading
+import pkg_resources
+
+from datetime import datetime
+import robot
+from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
+from robot.utils import robottime
+from robot.api.deco import keyword
+
+
+def _package_version_get(package_name, source=None):
+ """
+ Returns the installed version number for the given pip package with name _package_name_.
+ """
+
+ if source:
+ head, tail = os.path.split(os.path.dirname(os.path.abspath(source)))
+
+ while tail:
+ try:
+ with open(os.path.join(head, 'VERSION')) as version_file:
+ return version_file.read().strip()
+ except Exception:
+ head, tail = os.path.split(head)
+
+ try:
+ return pkg_resources.get_distribution(package_name).version
+ except pkg_resources.DistributionNotFound:
+ raise NameError("Package '%s' is not installed!" % package_name)
+
+
+class FatalError(RuntimeError):
+ ROBOT_EXIT_ON_FAILURE = True
+
+
+class KafkaRecord(object):
+ """
+ Wrapper for Kafka message
+ """
+
+ def __init__(self, customer_record):
+ self._org_record = customer_record
+
+ @property
+ def topic(self):
+ return self._org_record.topic
+
+ @property
+ def value(self):
+ return self._org_record.value
+
+ @property
+ def key(self):
+ return self._org_record.key
+
+ @property
+ def offset(self):
+ return self._org_record.offset
+
+ @property
+ def timestamp(self):
+ return self._org_record.timestamp
+
+
+class KafkaListener(threading.Thread):
+ """
+ Internal helper class used by this client.
+ """
+
+ def __init__(self, topic_name, callback, consumer, **kwargs):
+ """
+ - topic_name: The Kafka topic to be listened to
+ - callback: The function to be executed when Kafka message arrives
+ - consumer: The Kafka consumer object of type kafka.KafkaConsumer()
+ - **kwargs: The keyword arguments:
+ - timestamp_from: To be used in the callback function, description in KafkaClient.subscribe()
+ """
+ super(KafkaListener, self).__init__()
+
+ self.setDaemon(True)
+
+ self._topic = topic_name
+ self._cb_func = callback
+ self._consumer = consumer
+ self._kwargs = kwargs
+ self.id = str(uuid.uuid4()).replace('-', '')
+
+ self.error = []
+
+ def run(self):
+ try:
+ self._consumer.subscribe([self._topic])
+ for record in self._consumer:
+ try:
+ self._cb_func(KafkaRecord(record), **self._kwargs)
+ except Exception as e:
+ logging.error('Error while deliver message: %s' % e)
+ self.stop()
+ except Exception:
+ pass
+
+ def stop(self):
+ if self._consumer is not None:
+ self._close_consumer()
+
+ def _close_consumer(self):
+ # try:
+ # self._consumer.unsubscribe()
+ # except Exception as e:
+ # self.error.append(str(e))
+ # print(self.error)
+
+ try:
+ self._consumer.close(autocommit=False)
+ except Exception as e:
+ self.error.append(str(e))
+
+
+class KafkaClient(object):
+ """ Constants """
+ ROBOT_LIBRARY_VERSION = '0.0'
+ ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
+
+ global_init = 0
+ global_timeout = 120
+ min_robot_version = 30202
+
+ try:
+ ROBOT_LIBRARY_VERSION = _package_version_get('kafka_robot')
+ except NameError:
+ ROBOT_LIBRARY_VERSION = 'unknown'
+
+ def __init__(self, **kwargs):
+ """
+ Constructor
+ """
+ super(KafkaClient, self).__init__()
+ self._record_list = []
+ self._listeners = {}
+
+ self._host = None
+ self._port = None
+ self._cert_path = None
+ self._consumer = None
+ self._consumer_config = kafka.KafkaConsumer.DEFAULT_CONFIG
+ self._topic_name = None
+
+ self._enable_logging(kwargs.get('root_logging', False), kwargs.get('log_level', 'INFO'))
+ logging.getLogger('kafka').propagate = False
+ self._log = logging.getLogger('kafka.conn')
+
+ # set default ssl context to accept any (self signed) certificate
+ ssl_ctx = ssl.create_default_context()
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+
+ self._consumer_config['ssl_context'] = ssl_ctx
+
+ def _check_robot_version(self, min_robot_version):
+ """
+ This method verifies the Min Robot version required to run
+
+ *Parameter* :
+ - min_robot_version : <string> ; Minimum robot version is: 20801
+
+ *Return* : None if no errors else error message
+ """
+
+ if self._get_robot_version() < int(min_robot_version):
+ raise FatalError('wrong robot version: %s' % robot.get_version())
+
+ @staticmethod
+ def _get_robot_version():
+ """
+ This method gets the Min Robot version required to run
+
+ *Parameter* : None
+
+ *Return* : None if no errors else error message
+ """
+ version = robot.get_version().split('.')
+ if len(version) == 2:
+ return int(version[0]) * 10000 + int(version[1]) * 100
+ elif len(version) == 3:
+ return int(version[0]) * 10000 + \
+ int(version[1]) * 100 + int(version[2])
+ else:
+ return 0
+
+ @staticmethod
+ def _robot_bool_convert(robot_bool):
+ """
+ Converts unicode to boolean or returns unchanged input variable if it is
+ boolean already.
+
+ :param robot_bool: value to be converted
+ :return: Input param converted to boolean
+ """
+ real_bool = robot_bool
+ if not isinstance(robot_bool, bool):
+ robot_bool = str(robot_bool)
+ if robot_bool.lower() == "true":
+ real_bool = True
+ else:
+ real_bool = False
+ return real_bool
+
+ @keyword
+ def library_version_get(self):
+ """
+ Retrieve the version of the currently running library instance.
+
+ *Returns*: version string consisting of three dot-separated numbers (x.y.z)
+ """
+ return self.ROBOT_LIBRARY_VERSION
+
+ def __del__(self):
+ if self._host is not None:
+ self.connection_close()
+
+ @staticmethod
+ def _enable_logging(root_logging, log_level):
+
+ try:
+ log_dir = BuiltIn().replace_variables('${OUTPUT_DIR}')
+ except RobotNotRunningError:
+ log_dir = tempfile.gettempdir()
+
+ try:
+ logfile_name = os.path.join(log_dir, 'kafka_robot_%s.log' % getpass.getuser())
+ except KeyError:
+ logfile_name = os.path.join(log_dir, 'kafka_robot.log')
+
+ logging_dict = {
+ 'version': 1,
+ 'disable_existing_loggers': False,
+
+ 'formatters': {
+ 'standard': {
+ 'format': '%(asctime)s %(name)s [%(levelname)s] : %(message)s'
+ }
+ },
+ 'handlers': {
+ 'kafka_file': {
+ 'level': log_level.upper(),
+ 'class': 'logging.FileHandler',
+ 'mode': 'w',
+ 'filename': logfile_name,
+ 'formatter': 'standard'
+ }
+ },
+ 'loggers': {
+ 'kafka': {
+ 'handlers': ['kafka_file'],
+ 'level': log_level.upper(),
+ 'propagate': False
+ }
+ }
+ }
+ if not root_logging:
+ logging_dict['loggers'].update({'': {'level': 'NOTSET'}})
+ logging.config.dictConfig(logging_dict)
+
+ def connection_open(self, host, port='9093', topic_name='', **kwargs):
+ """
+ Opens a connection to the Kafka host.
+
+ *Parameters*:
+ - host: <string>|<IP address>; Name or IP address of the Kafka host.
+ - port: <number>; TCP port of the Kafka host. Default: 9093.
+ - topic_name: <string>; The name of the topic to listen on; optional. If not set, the keyword _subscribe_ can be used to set the subscription.
+
+ *Named parameters*:
+ - timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
+ (e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
+ record list. If 0 then the filtering is switched off and received Kafka records are collected. If
+ _timestamp_from_ is not set then the library will use the current time for the filter.
+
+ To set connection parameters such as certificates or authentication methods use _configuration_set_, e.g. for configuring ssl use
+
+ | Configuration Set | ssl_cafile=nms_cachain.crt | ssl_certfile=nms_suite.crt | ssl_keyfile=nms_suite.key | ssl_check_hostname=${False} |
+
+ *Return*: The ID of the subscription, if the parameter _topic_name_ is set. This ID can be used to unsubscribe.
+ """
+ self._host = host
+ self._port = port
+
+ self.configuration_set(bootstrap_servers='%s:%s' % (self._host, int(self._port)))
+
+ if topic_name:
+ return self.subscribe(topic_name=topic_name, **kwargs)
+
+ def connection_close(self):
+ """
+ Closes the connection to the Kafka host. Stops all running listener to Kafka subscriptions.
+ """
+ for _, l in self._listeners.items():
+ l.stop()
+
+ self._host = None
+ self._port = None
+ self._cert_path = None
+
+ @staticmethod
+ def _get_timestamp(timestamp):
+
+ if timestamp is None or timestamp == '':
+ timestamp = datetime.now()
+
+ try:
+ timestamp = int(time.mktime(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timetuple()) * 1000)
+ except TypeError:
+ try:
+ timestamp = int(time.mktime(timestamp.timetuple()) * 1000)
+ except AttributeError:
+ timestamp = int(timestamp)
+ except ValueError:
+ try:
+ timestamp = int(timestamp)
+ except ValueError:
+ raise ValueError('"%s" is not a valid input: should be of format "YYYY-MM-DD hh:mm:ss"' % timestamp)
+
+ return timestamp
+
+ def subscribe(self, topic_name, **kwargs): # type: (str) -> str
+ """
+ Subscribe for an external topic and starts a listener on this topic. All records received in this listener are
+ stored in a record list, which can be retrieved with keyword _records_get_. Records are recorded until it is
+ unsubscribed from the topic.
+
+ *Parameters*:
+ - topic_name: <string>; The name of the topic to listen on.
+
+ *Named parameters*:
+ - timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
+ (e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
+ record list. If 0 then the filtering is switched off and received Kafka records are collected. If
+ _timestamp_from_ is not set then the library will use the current time for the filter.
+ - human_readable_timestamps:
+
+ *Return*: The ID of the subscription. This ID can be used to unsubscribe.
+ """
+ kwargs['timestamp_from'] = self._get_timestamp(kwargs.get('timestamp_from'))
+
+ consumer = self._create_consumer()
+ listener = KafkaListener(topic_name=topic_name, callback=self._add_record, consumer=consumer, **kwargs) # type: KafkaListener
+ self._listeners[listener.id] = listener
+ listener.start()
+ return listener.id
+
+ def unsubscribe(self, subscription_id):
+ """
+ Stops the listener and unsubscribes from an external topic.
+
+ *Parameters*:
+ - subscription_id: <string>; Subscription ID got from keyword _subscribe_.
+ """
+ listener = self._listeners.get(subscription_id) # type: KafkaListener
+ if listener is not None:
+ listener.stop()
+ del self._listeners[subscription_id]
+
+ if listener.error:
+ raise Exception(', '.join(listener.error))
+
+ def _records_wait_for(self, topic_name, number_of_records, wait_time='0', require_number_of_records='true'):
+ """
+ A lock function that waits for the specified _number_of_records_ on a topic or until the _wait_time_
+ is expired. The keyword subscribes for the Kafka topic. So it is not necessary to subscribe with the
+ _Subscribe_ keyword before.
+
+ *Parameter*:
+ - topic_name: <string>; The name of the topic to listen on.
+ - number_of_records: <number>; Number of records to be waited for.
+ - wait_time: <string>; Time to wait for the records. Default: _0_. If _0_ then the keyword waits forever for
+ the number of records, if not _0_ then it waits at maximum _wait_time_.
+ - require_number_of_records: _false_, _true_; Whether or not to check if list to return contains number of
+ records. If not an AssertionError is raised.
+
+ *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
+ - topic: Name of the topic
+ - timestamp: Number of seconds since Jan 01 1970 (UTC)
+ - message: Message content as dictionary
+ """
+ return_list = []
+
+ consumer = self._create_consumer()
+ consumer.subscribe(topics=[topic_name])
+ records = consumer.poll(timeout_ms=1000 * robottime.timestr_to_secs(wait_time), max_records=int(number_of_records))
+ consumer.unsubscribe()
+ consumer.close(autocommit=False)
+
+ for _, v in records.items():
+ return_list.extend(v)
+
+ if require_number_of_records == 'true':
+ if len(return_list) != number_of_records:
+ raise AssertionError('returned list does not contain expected number of records')
+
+ return [json.loads(r.value) for r in return_list]
+
+ def records_get(self, topic_name='all'):
+ """
+ Retrieves the list of records for subscribed topics.
+
+ *Parameters*:
+ - topic_name: <string>, _all_; The name of the topic to retrieve from record list. Default: _all_.
+
+ *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
+ - topic: Name of the topic
+ - timestamp: Number of milliseconds since Jan 01 1970, midnight (UTC) as string format "yyyy-mm-dd hh:MM:ss.ffffff"
+ - message: Message content as dictionary
+ """
+ if not topic_name or topic_name == 'all':
+ return self._record_list
+ else:
+ return [r for r in self._record_list if r.get('topic') == topic_name]
+
+ def records_clear(self, topic_name='all'):
+ """
+ Clears the list of records for subscribed topics.
+
+ *Parameters*:
+ - topic_name: <string>, _all_; The name of the topic to remove from record list. Default: _all_.
+ """
+ if not topic_name or topic_name == 'all':
+ self._record_list = []
+ else:
+ self._record_list = [r for r in self._record_list if r.get('topic') != topic_name]
+
+ def configuration_set(self, **kwargs):
+ """
+ Available setting with example values. To get actual used setting use _Configuration Get_.
+ Values must be supplied using there correct type (e.g. int, str, bool)
+
+ - 'bootstrap_servers': 'localhost',
+ - 'client_id': 'kafka-python-' + __version__,
+ - 'group_id': None,
+ - 'key_deserializer': None,
+ - 'value_deserializer': None,
+ - 'fetch_max_wait_ms': 500,
+ - 'fetch_min_bytes': 1,
+ - 'fetch_max_bytes': 52428800,
+ - 'max_partition_fetch_bytes': 1 * 1024 * 1024,
+ - 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
+ - 'retry_backoff_ms': 100,
+ - 'reconnect_backoff_ms': 50,
+ - 'reconnect_backoff_max_ms': 1000,
+ - 'max_in_flight_requests_per_connection': 5,
+ - 'auto_offset_reset': 'latest',
+ - 'enable_auto_commit': True,
+ - 'auto_commit_interval_ms': 5000,
+ - 'default_offset_commit_callback': lambda offsets, response: True,
+ - 'check_crcs': True,
+ - 'metadata_max_age_ms': 5 * 60 * 1000,
+ - 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
+ - 'max_poll_records': 500,
+ - 'max_poll_interval_ms': 300000,
+ - 'session_timeout_ms': 10000,
+ - 'heartbeat_interval_ms': 3000,
+ - 'receive_buffer_bytes': None,
+ - 'send_buffer_bytes': None,
+ - 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ - 'consumer_timeout_ms': float('inf'),
+ - 'skip_double_compressed_messages': False,
+ - 'security_protocol': 'PLAINTEXT',
+ - 'ssl_context': None,
+ - 'ssl_check_hostname': True,
+ - 'ssl_cafile': None,
+ - 'ssl_certfile': None,
+ - 'ssl_keyfile': None,
+ - 'ssl_crlfile': None,
+ - 'ssl_password': None,
+ - 'api_version': None,
+ - 'api_version_auto_timeout_ms': 2000,
+ - 'connections_max_idle_ms': 9 * 60 * 1000,
+ - 'metric_reporters': [],
+ - 'metrics_num_samples': 2,
+ - 'metrics_sample_window_ms': 30000,
+ - 'metric_group_prefix': 'consumer',
+ - 'selector': selectors.DefaultSelector,
+ - 'exclude_internal_topics': True,
+ - 'sasl_mechanism': None,
+ - 'sasl_plain_username': None,
+ - 'sasl_plain_password': None,
+ - 'sasl_kerberos_service_name': 'kafka'
+ """
+
+ for key, value in six.iteritems(kwargs):
+ self._consumer_config[key] = value
+
+ # disable default ssl context if user configures a ssl setting
+ if key.startswith('ssl') and key != 'ssl_context':
+ self._consumer_config['ssl_context'] = None
+
+ def configuration_get(self):
+ """
+ Shows the current setting of the Kafka interface. For available keys check keyword _Configuration Set_
+ """
+ return self._consumer_config
+
+ def _add_record(self, record, **kwargs):
+ try:
+
+ if kwargs.get('timestamp_from') == 0 or kwargs.get('timestamp_from') <= record.timestamp:
+
+ record = {
+ 'topic': record.topic,
+ 'timestamp': datetime.fromtimestamp(int(record.timestamp) / 1e3).strftime('%Y-%m-%d %H:%M:%S.%f'),
+ 'timestamp_sec': record.timestamp,
+ 'message': record.value
+ }
+
+ if record not in self._record_list:
+ self._record_list.append(record)
+
+ except Exception as e:
+ self._log.error(e)
+
+ def _create_consumer(self):
+ return kafka.KafkaConsumer(**self._consumer_config)