blob: 64277300bec68302d65d2c064ea717a38904124c [file] [log] [blame]
# Copyright 2020-present Open Networking Foundation
# Original copyright 2020-present ADTRAN, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import os
import ssl
import six
import json
import time
import uuid
import kafka
import getpass
import logging
import logging.config
import tempfile
import threading
import pkg_resources
from datetime import datetime
import robot
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
from robot.utils import robottime
from robot.api.deco import keyword
def _package_version_get(package_name, source=None):
"""
Returns the installed version number for the given pip package with name _package_name_.
"""
if source:
head, tail = os.path.split(os.path.dirname(os.path.abspath(source)))
while tail:
try:
with open(os.path.join(head, 'VERSION')) as version_file:
return version_file.read().strip()
except Exception:
head, tail = os.path.split(head)
try:
return pkg_resources.get_distribution(package_name).version
except pkg_resources.DistributionNotFound:
raise NameError("Package '%s' is not installed!" % package_name)
class FatalError(RuntimeError):
ROBOT_EXIT_ON_FAILURE = True
class KafkaRecord(object):
"""
Wrapper for Kafka message
"""
def __init__(self, customer_record):
self._org_record = customer_record
@property
def topic(self):
return self._org_record.topic
@property
def value(self):
return self._org_record.value
@property
def key(self):
return self._org_record.key
@property
def offset(self):
return self._org_record.offset
@property
def timestamp(self):
return self._org_record.timestamp
class KafkaListener(threading.Thread):
"""
Internal helper class used by this client.
"""
def __init__(self, topic_name, callback, consumer, **kwargs):
"""
- topic_name: The Kafka topic to be listened to
- callback: The function to be executed when Kafka message arrives
- consumer: The Kafka consumer object of type kafka.KafkaConsumer()
- **kwargs: The keyword arguments:
- timestamp_from: To be used in the callback function, description in KafkaClient.subscribe()
"""
super(KafkaListener, self).__init__()
self.setDaemon(True)
self._topic = topic_name
self._cb_func = callback
self._consumer = consumer
self._kwargs = kwargs
self.id = str(uuid.uuid4()).replace('-', '')
self.error = []
def run(self):
try:
self._consumer.subscribe([self._topic])
for record in self._consumer:
try:
self._cb_func(KafkaRecord(record), **self._kwargs)
except Exception as e:
logging.error('Error while deliver message: %s' % e)
self.stop()
except Exception:
pass
def stop(self):
if self._consumer is not None:
self._close_consumer()
def _close_consumer(self):
# try:
# self._consumer.unsubscribe()
# except Exception as e:
# self.error.append(str(e))
# print(self.error)
try:
self._consumer.close(autocommit=False)
except Exception as e:
self.error.append(str(e))
class KafkaClient(object):
""" Constants """
ROBOT_LIBRARY_VERSION = '0.0'
ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
global_init = 0
global_timeout = 120
min_robot_version = 30202
try:
ROBOT_LIBRARY_VERSION = _package_version_get('kafka_robot')
except NameError:
ROBOT_LIBRARY_VERSION = 'unknown'
def __init__(self, **kwargs):
"""
Constructor
"""
super(KafkaClient, self).__init__()
self._record_list = []
self._listeners = {}
self._host = None
self._port = None
self._cert_path = None
self._consumer = None
self._consumer_config = kafka.KafkaConsumer.DEFAULT_CONFIG
self._topic_name = None
self._enable_logging(kwargs.get('root_logging', False), kwargs.get('log_level', 'INFO'))
logging.getLogger('kafka').propagate = False
self._log = logging.getLogger('kafka.conn')
# set default ssl context to accept any (self signed) certificate
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
self._consumer_config['ssl_context'] = ssl_ctx
def _check_robot_version(self, min_robot_version):
"""
This method verifies the Min Robot version required to run
*Parameter* :
- min_robot_version : <string> ; Minimum robot version is: 20801
*Return* : None if no errors else error message
"""
if self._get_robot_version() < int(min_robot_version):
raise FatalError('wrong robot version: %s' % robot.get_version())
@staticmethod
def _get_robot_version():
"""
This method gets the Min Robot version required to run
*Parameter* : None
*Return* : None if no errors else error message
"""
version = robot.get_version().split('.')
if len(version) == 2:
return int(version[0]) * 10000 + int(version[1]) * 100
elif len(version) == 3:
return int(version[0]) * 10000 + \
int(version[1]) * 100 + int(version[2])
else:
return 0
@staticmethod
def _robot_bool_convert(robot_bool):
"""
Converts unicode to boolean or returns unchanged input variable if it is
boolean already.
:param robot_bool: value to be converted
:return: Input param converted to boolean
"""
real_bool = robot_bool
if not isinstance(robot_bool, bool):
robot_bool = str(robot_bool)
if robot_bool.lower() == "true":
real_bool = True
else:
real_bool = False
return real_bool
@keyword
def library_version_get(self):
"""
Retrieve the version of the currently running library instance.
*Returns*: version string consisting of three dot-separated numbers (x.y.z)
"""
return self.ROBOT_LIBRARY_VERSION
def __del__(self):
if self._host is not None:
self.connection_close()
@staticmethod
def _enable_logging(root_logging, log_level):
try:
log_dir = BuiltIn().replace_variables('${OUTPUT_DIR}')
except RobotNotRunningError:
log_dir = tempfile.gettempdir()
try:
logfile_name = os.path.join(log_dir, 'kafka_robot_%s.log' % getpass.getuser())
except KeyError:
logfile_name = os.path.join(log_dir, 'kafka_robot.log')
logging_dict = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s %(name)s [%(levelname)s] : %(message)s'
}
},
'handlers': {
'kafka_file': {
'level': log_level.upper(),
'class': 'logging.FileHandler',
'mode': 'w',
'filename': logfile_name,
'formatter': 'standard'
}
},
'loggers': {
'kafka': {
'handlers': ['kafka_file'],
'level': log_level.upper(),
'propagate': False
}
}
}
if not root_logging:
logging_dict['loggers'].update({'': {'level': 'NOTSET'}})
logging.config.dictConfig(logging_dict)
def connection_open(self, host, port='9093', topic_name='', **kwargs):
"""
Opens a connection to the Kafka host.
*Parameters*:
- host: <string>|<IP address>; Name or IP address of the Kafka host.
- port: <number>; TCP port of the Kafka host. Default: 9093.
- topic_name: <string>; The name of the topic to listen on; optional. If not set, the keyword _subscribe_ can be used to set the subscription.
*Named parameters*:
- timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
(e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
record list. If 0 then the filtering is switched off and received Kafka records are collected. If
_timestamp_from_ is not set then the library will use the current time for the filter.
To set connection parameters such as certificates or authentication methods use _configuration_set_, e.g. for configuring ssl use
| Configuration Set | ssl_cafile=nms_cachain.crt | ssl_certfile=nms_suite.crt | ssl_keyfile=nms_suite.key | ssl_check_hostname=${False} |
*Return*: The ID of the subscription, if the parameter _topic_name_ is set. This ID can be used to unsubscribe.
"""
self._host = host
self._port = port
self.configuration_set(bootstrap_servers='%s:%s' % (self._host, int(self._port)))
if topic_name:
return self.subscribe(topic_name=topic_name, **kwargs)
def connection_close(self):
"""
Closes the connection to the Kafka host. Stops all running listener to Kafka subscriptions.
"""
for _, l in self._listeners.items():
l.stop()
self._host = None
self._port = None
self._cert_path = None
@staticmethod
def _get_timestamp(timestamp):
if timestamp is None or timestamp == '':
timestamp = datetime.now()
try:
timestamp = int(time.mktime(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timetuple()) * 1000)
except TypeError:
try:
timestamp = int(time.mktime(timestamp.timetuple()) * 1000)
except AttributeError:
timestamp = int(timestamp)
except ValueError:
try:
timestamp = int(timestamp)
except ValueError:
raise ValueError('"%s" is not a valid input: should be of format "YYYY-MM-DD hh:mm:ss"' % timestamp)
return timestamp
def subscribe(self, topic_name, **kwargs): # type: (str) -> str
"""
Subscribe for an external topic and starts a listener on this topic. All records received in this listener are
stored in a record list, which can be retrieved with keyword _records_get_. Records are recorded until it is
unsubscribed from the topic.
*Parameters*:
- topic_name: <string>; The name of the topic to listen on.
*Named parameters*:
- timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
(e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
record list. If 0 then the filtering is switched off and received Kafka records are collected. If
_timestamp_from_ is not set then the library will use the current time for the filter.
- human_readable_timestamps:
*Return*: The ID of the subscription. This ID can be used to unsubscribe.
"""
kwargs['timestamp_from'] = self._get_timestamp(kwargs.get('timestamp_from'))
consumer = self._create_consumer()
listener = KafkaListener(topic_name=topic_name, callback=self._add_record, consumer=consumer, **kwargs) # type: KafkaListener
self._listeners[listener.id] = listener
listener.start()
return listener.id
def unsubscribe(self, subscription_id):
"""
Stops the listener and unsubscribes from an external topic.
*Parameters*:
- subscription_id: <string>; Subscription ID got from keyword _subscribe_.
"""
listener = self._listeners.get(subscription_id) # type: KafkaListener
if listener is not None:
listener.stop()
del self._listeners[subscription_id]
if listener.error:
raise Exception(', '.join(listener.error))
def _records_wait_for(self, topic_name, number_of_records, wait_time='0', require_number_of_records='true'):
"""
A lock function that waits for the specified _number_of_records_ on a topic or until the _wait_time_
is expired. The keyword subscribes for the Kafka topic. So it is not necessary to subscribe with the
_Subscribe_ keyword before.
*Parameter*:
- topic_name: <string>; The name of the topic to listen on.
- number_of_records: <number>; Number of records to be waited for.
- wait_time: <string>; Time to wait for the records. Default: _0_. If _0_ then the keyword waits forever for
the number of records, if not _0_ then it waits at maximum _wait_time_.
- require_number_of_records: _false_, _true_; Whether or not to check if list to return contains number of
records. If not an AssertionError is raised.
*Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
- topic: Name of the topic
- timestamp: Number of seconds since Jan 01 1970 (UTC)
- message: Message content as dictionary
"""
return_list = []
consumer = self._create_consumer()
consumer.subscribe(topics=[topic_name])
records = consumer.poll(timeout_ms=1000 * robottime.timestr_to_secs(wait_time), max_records=int(number_of_records))
consumer.unsubscribe()
consumer.close(autocommit=False)
for _, v in records.items():
return_list.extend(v)
if require_number_of_records == 'true':
if len(return_list) != number_of_records:
raise AssertionError('returned list does not contain expected number of records')
return [json.loads(r.value) for r in return_list]
def records_get(self, topic_name='all'):
"""
Retrieves the list of records for subscribed topics.
*Parameters*:
- topic_name: <string>, _all_; The name of the topic to retrieve from record list. Default: _all_.
*Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
- topic: Name of the topic
- timestamp: Number of milliseconds since Jan 01 1970, midnight (UTC) as string format "yyyy-mm-dd hh:MM:ss.ffffff"
- message: Message content as dictionary
"""
if not topic_name or topic_name == 'all':
return self._record_list
else:
return [r for r in self._record_list if r.get('topic') == topic_name]
def records_clear(self, topic_name='all'):
"""
Clears the list of records for subscribed topics.
*Parameters*:
- topic_name: <string>, _all_; The name of the topic to remove from record list. Default: _all_.
"""
if not topic_name or topic_name == 'all':
self._record_list = []
else:
self._record_list = [r for r in self._record_list if r.get('topic') != topic_name]
def configuration_set(self, **kwargs):
"""
Available setting with example values. To get actual used setting use _Configuration Get_.
Values must be supplied using there correct type (e.g. int, str, bool)
- 'bootstrap_servers': 'localhost',
- 'client_id': 'kafka-python-' + __version__,
- 'group_id': None,
- 'key_deserializer': None,
- 'value_deserializer': None,
- 'fetch_max_wait_ms': 500,
- 'fetch_min_bytes': 1,
- 'fetch_max_bytes': 52428800,
- 'max_partition_fetch_bytes': 1 * 1024 * 1024,
- 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
- 'retry_backoff_ms': 100,
- 'reconnect_backoff_ms': 50,
- 'reconnect_backoff_max_ms': 1000,
- 'max_in_flight_requests_per_connection': 5,
- 'auto_offset_reset': 'latest',
- 'enable_auto_commit': True,
- 'auto_commit_interval_ms': 5000,
- 'default_offset_commit_callback': lambda offsets, response: True,
- 'check_crcs': True,
- 'metadata_max_age_ms': 5 * 60 * 1000,
- 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
- 'max_poll_records': 500,
- 'max_poll_interval_ms': 300000,
- 'session_timeout_ms': 10000,
- 'heartbeat_interval_ms': 3000,
- 'receive_buffer_bytes': None,
- 'send_buffer_bytes': None,
- 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
- 'consumer_timeout_ms': float('inf'),
- 'skip_double_compressed_messages': False,
- 'security_protocol': 'PLAINTEXT',
- 'ssl_context': None,
- 'ssl_check_hostname': True,
- 'ssl_cafile': None,
- 'ssl_certfile': None,
- 'ssl_keyfile': None,
- 'ssl_crlfile': None,
- 'ssl_password': None,
- 'api_version': None,
- 'api_version_auto_timeout_ms': 2000,
- 'connections_max_idle_ms': 9 * 60 * 1000,
- 'metric_reporters': [],
- 'metrics_num_samples': 2,
- 'metrics_sample_window_ms': 30000,
- 'metric_group_prefix': 'consumer',
- 'selector': selectors.DefaultSelector,
- 'exclude_internal_topics': True,
- 'sasl_mechanism': None,
- 'sasl_plain_username': None,
- 'sasl_plain_password': None,
- 'sasl_kerberos_service_name': 'kafka'
"""
for key, value in six.iteritems(kwargs):
self._consumer_config[key] = value
# disable default ssl context if user configures a ssl setting
if key.startswith('ssl') and key != 'ssl_context':
self._consumer_config['ssl_context'] = None
def configuration_get(self):
"""
Shows the current setting of the Kafka interface. For available keys check keyword _Configuration Set_
"""
return self._consumer_config
def _add_record(self, record, **kwargs):
try:
if kwargs.get('timestamp_from') == 0 or kwargs.get('timestamp_from') <= record.timestamp:
record = {
'topic': record.topic,
'timestamp': datetime.fromtimestamp(int(record.timestamp) / 1e3).strftime('%Y-%m-%d %H:%M:%S.%f'),
'timestamp_sec': record.timestamp,
'message': record.value
}
if record not in self._record_list:
self._record_list.append(record)
except Exception as e:
self._log.error(e)
def _create_consumer(self):
return kafka.KafkaConsumer(**self._consumer_config)