move the library to ONF

Change-Id: I633991669051555f195b51021fb2ef697c51be39
diff --git a/kafka_robot/kafka_client.py b/kafka_robot/kafka_client.py
new file mode 100644
index 0000000..6427730
--- /dev/null
+++ b/kafka_robot/kafka_client.py
@@ -0,0 +1,545 @@
+# Copyright 2020-present Open Networking Foundation
+# Original copyright 2020-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+import os
+import ssl
+import six
+import json
+import time
+import uuid
+import kafka
+import getpass
+import logging
+import logging.config
+import tempfile
+import threading
+import pkg_resources
+
+from datetime import datetime
+import robot
+from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
+from robot.utils import robottime
+from robot.api.deco import keyword
+
+
+def _package_version_get(package_name, source=None):
+    """
+    Returns the installed version number for the given pip package with name _package_name_.
+    """
+
+    if source:
+        head, tail = os.path.split(os.path.dirname(os.path.abspath(source)))
+
+        while tail:
+            try:
+                with open(os.path.join(head, 'VERSION')) as version_file:
+                    return version_file.read().strip()
+            except Exception:
+                head, tail = os.path.split(head)
+
+    try:
+        return pkg_resources.get_distribution(package_name).version
+    except pkg_resources.DistributionNotFound:
+        raise NameError("Package '%s' is not installed!" % package_name)
+
+
+class FatalError(RuntimeError):
+    ROBOT_EXIT_ON_FAILURE = True
+
+
+class KafkaRecord(object):
+    """
+    Wrapper for Kafka message
+    """
+
+    def __init__(self, customer_record):
+        self._org_record = customer_record
+
+    @property
+    def topic(self):
+        return self._org_record.topic
+
+    @property
+    def value(self):
+        return self._org_record.value
+
+    @property
+    def key(self):
+        return self._org_record.key
+
+    @property
+    def offset(self):
+        return self._org_record.offset
+
+    @property
+    def timestamp(self):
+        return self._org_record.timestamp
+
+
+class KafkaListener(threading.Thread):
+    """
+    Internal helper class used by this client.
+    """
+
+    def __init__(self, topic_name, callback, consumer, **kwargs):
+        """
+        - topic_name: The Kafka topic to be listened to
+        - callback: The function to be executed when Kafka message arrives
+        - consumer: The Kafka consumer object of type kafka.KafkaConsumer()
+        - **kwargs: The keyword arguments:
+            - timestamp_from: To be used in the callback function, description in KafkaClient.subscribe()
+        """
+        super(KafkaListener, self).__init__()
+
+        self.setDaemon(True)
+
+        self._topic = topic_name
+        self._cb_func = callback
+        self._consumer = consumer
+        self._kwargs = kwargs
+        self.id = str(uuid.uuid4()).replace('-', '')
+
+        self.error = []
+
+    def run(self):
+        try:
+            self._consumer.subscribe([self._topic])
+            for record in self._consumer:
+                try:
+                    self._cb_func(KafkaRecord(record), **self._kwargs)
+                except Exception as e:
+                    logging.error('Error while deliver message: %s' % e)
+                    self.stop()
+        except Exception:
+            pass
+
+    def stop(self):
+        if self._consumer is not None:
+            self._close_consumer()
+
+    def _close_consumer(self):
+        # try:
+        #     self._consumer.unsubscribe()
+        # except Exception as e:
+        #     self.error.append(str(e))
+        #     print(self.error)
+
+        try:
+            self._consumer.close(autocommit=False)
+        except Exception as e:
+            self.error.append(str(e))
+
+
+class KafkaClient(object):
+    """ Constants """
+    ROBOT_LIBRARY_VERSION = '0.0'
+    ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
+
+    global_init = 0
+    global_timeout = 120
+    min_robot_version = 30202
+
+    try:
+        ROBOT_LIBRARY_VERSION = _package_version_get('kafka_robot')
+    except NameError:
+        ROBOT_LIBRARY_VERSION = 'unknown'
+
+    def __init__(self, **kwargs):
+        """
+        Constructor
+        """
+        super(KafkaClient, self).__init__()
+        self._record_list = []
+        self._listeners = {}
+
+        self._host = None
+        self._port = None
+        self._cert_path = None
+        self._consumer = None
+        self._consumer_config = kafka.KafkaConsumer.DEFAULT_CONFIG
+        self._topic_name = None
+
+        self._enable_logging(kwargs.get('root_logging', False), kwargs.get('log_level', 'INFO'))
+        logging.getLogger('kafka').propagate = False
+        self._log = logging.getLogger('kafka.conn')
+
+        # set default ssl context to accept any (self signed) certificate
+        ssl_ctx = ssl.create_default_context()
+        ssl_ctx.check_hostname = False
+        ssl_ctx.verify_mode = ssl.CERT_NONE
+
+        self._consumer_config['ssl_context'] = ssl_ctx
+
+    def _check_robot_version(self, min_robot_version):
+        """
+        This method verifies the Min Robot version required to run
+
+        *Parameter* :
+            - min_robot_version : <string> ; Minimum robot version is: 20801
+
+        *Return* : None if no errors else error message
+        """
+
+        if self._get_robot_version() < int(min_robot_version):
+            raise FatalError('wrong robot version: %s' % robot.get_version())
+
+    @staticmethod
+    def _get_robot_version():
+        """
+        This method gets the Min Robot version required to run
+
+        *Parameter* : None
+
+        *Return* : None if no errors else error message
+        """
+        version = robot.get_version().split('.')
+        if len(version) == 2:
+            return int(version[0]) * 10000 + int(version[1]) * 100
+        elif len(version) == 3:
+            return int(version[0]) * 10000 + \
+                   int(version[1]) * 100 + int(version[2])
+        else:
+            return 0
+
+    @staticmethod
+    def _robot_bool_convert(robot_bool):
+        """
+        Converts unicode to boolean or returns unchanged input variable if it is
+        boolean already.
+
+        :param robot_bool: value to be converted
+        :return: Input param converted to boolean
+        """
+        real_bool = robot_bool
+        if not isinstance(robot_bool, bool):
+            robot_bool = str(robot_bool)
+            if robot_bool.lower() == "true":
+                real_bool = True
+            else:
+                real_bool = False
+        return real_bool
+
+    @keyword
+    def library_version_get(self):
+        """
+        Retrieve the version of the currently running library instance.
+
+        *Returns*: version string consisting of three dot-separated numbers (x.y.z)
+        """
+        return self.ROBOT_LIBRARY_VERSION
+
+    def __del__(self):
+        if self._host is not None:
+            self.connection_close()
+
+    @staticmethod
+    def _enable_logging(root_logging, log_level):
+
+        try:
+            log_dir = BuiltIn().replace_variables('${OUTPUT_DIR}')
+        except RobotNotRunningError:
+            log_dir = tempfile.gettempdir()
+
+        try:
+            logfile_name = os.path.join(log_dir, 'kafka_robot_%s.log' % getpass.getuser())
+        except KeyError:
+            logfile_name = os.path.join(log_dir, 'kafka_robot.log')
+
+        logging_dict = {
+            'version': 1,
+            'disable_existing_loggers': False,
+
+            'formatters': {
+                'standard': {
+                    'format': '%(asctime)s  %(name)s [%(levelname)s] : %(message)s'
+                }
+            },
+            'handlers': {
+                'kafka_file': {
+                    'level': log_level.upper(),
+                    'class': 'logging.FileHandler',
+                    'mode': 'w',
+                    'filename': logfile_name,
+                    'formatter': 'standard'
+                }
+            },
+            'loggers': {
+                'kafka': {
+                    'handlers': ['kafka_file'],
+                    'level': log_level.upper(),
+                    'propagate': False
+                }
+            }
+        }
+        if not root_logging:
+            logging_dict['loggers'].update({'': {'level': 'NOTSET'}})
+        logging.config.dictConfig(logging_dict)
+
+    def connection_open(self, host, port='9093', topic_name='', **kwargs):
+        """
+        Opens a connection to the Kafka host.
+
+        *Parameters*:
+        - host: <string>|<IP address>; Name or IP address of the Kafka host.
+        - port: <number>; TCP port of the Kafka host. Default: 9093.
+        - topic_name: <string>; The name of the topic to listen on; optional. If not set, the keyword _subscribe_ can be used to set the subscription.
+
+        *Named parameters*:
+        - timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
+            (e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
+            record list. If 0 then the filtering is switched off and received Kafka records are collected. If
+            _timestamp_from_ is not set then the library will use the current time for the filter.
+
+        To set connection parameters such as certificates or authentication methods use _configuration_set_, e.g. for configuring ssl use
+
+        | Configuration Set | ssl_cafile=nms_cachain.crt | ssl_certfile=nms_suite.crt | ssl_keyfile=nms_suite.key | ssl_check_hostname=${False} |
+
+        *Return*: The ID of the subscription, if the parameter _topic_name_ is set. This ID can be used to unsubscribe.
+        """
+        self._host = host
+        self._port = port
+
+        self.configuration_set(bootstrap_servers='%s:%s' % (self._host, int(self._port)))
+
+        if topic_name:
+            return self.subscribe(topic_name=topic_name, **kwargs)
+
+    def connection_close(self):
+        """
+        Closes the connection to the Kafka host. Stops all running listener to Kafka subscriptions.
+        """
+        for _, l in self._listeners.items():
+            l.stop()
+
+        self._host = None
+        self._port = None
+        self._cert_path = None
+
+    @staticmethod
+    def _get_timestamp(timestamp):
+
+        if timestamp is None or timestamp == '':
+            timestamp = datetime.now()
+
+        try:
+            timestamp = int(time.mktime(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timetuple()) * 1000)
+        except TypeError:
+            try:
+                timestamp = int(time.mktime(timestamp.timetuple()) * 1000)
+            except AttributeError:
+                timestamp = int(timestamp)
+        except ValueError:
+            try:
+                timestamp = int(timestamp)
+            except ValueError:
+                raise ValueError('"%s" is not a valid input: should be of format "YYYY-MM-DD hh:mm:ss"' % timestamp)
+
+        return timestamp
+
+    def subscribe(self, topic_name, **kwargs):  # type: (str) -> str
+        """
+        Subscribe for an external topic and starts a listener on this topic. All records received in this listener are
+        stored in a record list, which can be retrieved with keyword _records_get_. Records are recorded until it is
+        unsubscribed from the topic.
+
+        *Parameters*:
+        - topic_name: <string>; The name of the topic to listen on.
+
+        *Named parameters*:
+        - timestamp_from: <time string>, 0: Timestamp string format YYYY-MM-DD hh:mm:ss
+            (e.g. 2021-01-31 23:34:45) to define the time from which kafka records timestamps are collected in the
+            record list. If 0 then the filtering is switched off and received Kafka records are collected. If
+            _timestamp_from_ is not set then the library will use the current time for the filter.
+        - human_readable_timestamps:
+
+        *Return*: The ID of the subscription. This ID can be used to unsubscribe.
+        """
+        kwargs['timestamp_from'] = self._get_timestamp(kwargs.get('timestamp_from'))
+
+        consumer = self._create_consumer()
+        listener = KafkaListener(topic_name=topic_name, callback=self._add_record, consumer=consumer, **kwargs)  # type: KafkaListener
+        self._listeners[listener.id] = listener
+        listener.start()
+        return listener.id
+
+    def unsubscribe(self, subscription_id):
+        """
+        Stops the listener and unsubscribes from an external topic.
+
+        *Parameters*:
+         - subscription_id: <string>; Subscription ID got from keyword _subscribe_.
+        """
+        listener = self._listeners.get(subscription_id)  # type: KafkaListener
+        if listener is not None:
+            listener.stop()
+            del self._listeners[subscription_id]
+
+            if listener.error:
+                raise Exception(', '.join(listener.error))
+
+    def _records_wait_for(self, topic_name, number_of_records, wait_time='0', require_number_of_records='true'):
+        """
+        A lock function that waits for the specified _number_of_records_ on a topic or until the _wait_time_
+        is expired. The keyword subscribes for the Kafka topic. So it is not necessary to subscribe with the
+        _Subscribe_ keyword before.
+
+        *Parameter*:
+        - topic_name: <string>; The name of the topic to listen on.
+        - number_of_records: <number>; Number of records to be waited for.
+        - wait_time: <string>; Time to wait for the records. Default: _0_. If _0_ then the keyword waits forever for
+                the number of records, if not _0_ then it waits at maximum _wait_time_.
+        - require_number_of_records: _false_, _true_; Whether or not to check if list to return contains number of
+                records. If not an AssertionError is raised.
+
+        *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
+        - topic: Name of the topic
+        - timestamp: Number of seconds since Jan 01 1970 (UTC)
+        - message: Message content as dictionary
+        """
+        return_list = []
+
+        consumer = self._create_consumer()
+        consumer.subscribe(topics=[topic_name])
+        records = consumer.poll(timeout_ms=1000 * robottime.timestr_to_secs(wait_time), max_records=int(number_of_records))
+        consumer.unsubscribe()
+        consumer.close(autocommit=False)
+
+        for _, v in records.items():
+            return_list.extend(v)
+
+        if require_number_of_records == 'true':
+            if len(return_list) != number_of_records:
+                raise AssertionError('returned list does not contain expected number of records')
+
+        return [json.loads(r.value) for r in return_list]
+
+    def records_get(self, topic_name='all'):
+        """
+        Retrieves the list of records for subscribed topics.
+
+        *Parameters*:
+        - topic_name: <string>, _all_; The name of the topic to retrieve from record list. Default: _all_.
+
+        *Return*: A list of dictionaries. Each dictionary represents a record and consists of keys:
+        - topic: Name of the topic
+        - timestamp: Number of milliseconds since Jan 01 1970, midnight (UTC) as string format "yyyy-mm-dd hh:MM:ss.ffffff"
+        - message: Message content as dictionary
+        """
+        if not topic_name or topic_name == 'all':
+            return self._record_list
+        else:
+            return [r for r in self._record_list if r.get('topic') == topic_name]
+
+    def records_clear(self, topic_name='all'):
+        """
+        Clears the list of records for subscribed topics.
+
+        *Parameters*:
+        - topic_name: <string>, _all_; The name of the topic to remove from record list. Default: _all_.
+        """
+        if not topic_name or topic_name == 'all':
+            self._record_list = []
+        else:
+            self._record_list = [r for r in self._record_list if r.get('topic') != topic_name]
+
+    def configuration_set(self, **kwargs):
+        """
+        Available setting with example values. To get actual used setting use _Configuration Get_.
+        Values must be supplied using there correct type (e.g. int, str, bool)
+
+        - 'bootstrap_servers': 'localhost',
+        - 'client_id': 'kafka-python-' + __version__,
+        - 'group_id': None,
+        - 'key_deserializer': None,
+        - 'value_deserializer': None,
+        - 'fetch_max_wait_ms': 500,
+        - 'fetch_min_bytes': 1,
+        - 'fetch_max_bytes': 52428800,
+        - 'max_partition_fetch_bytes': 1 * 1024 * 1024,
+        - 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
+        - 'retry_backoff_ms': 100,
+        - 'reconnect_backoff_ms': 50,
+        - 'reconnect_backoff_max_ms': 1000,
+        - 'max_in_flight_requests_per_connection': 5,
+        - 'auto_offset_reset': 'latest',
+        - 'enable_auto_commit': True,
+        - 'auto_commit_interval_ms': 5000,
+        - 'default_offset_commit_callback': lambda offsets, response: True,
+        - 'check_crcs': True,
+        - 'metadata_max_age_ms': 5 * 60 * 1000,
+        - 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
+        - 'max_poll_records': 500,
+        - 'max_poll_interval_ms': 300000,
+        - 'session_timeout_ms': 10000,
+        - 'heartbeat_interval_ms': 3000,
+        - 'receive_buffer_bytes': None,
+        - 'send_buffer_bytes': None,
+        - 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+        - 'consumer_timeout_ms': float('inf'),
+        - 'skip_double_compressed_messages': False,
+        - 'security_protocol': 'PLAINTEXT',
+        - 'ssl_context': None,
+        - 'ssl_check_hostname': True,
+        - 'ssl_cafile': None,
+        - 'ssl_certfile': None,
+        - 'ssl_keyfile': None,
+        - 'ssl_crlfile': None,
+        - 'ssl_password': None,
+        - 'api_version': None,
+        - 'api_version_auto_timeout_ms': 2000,
+        - 'connections_max_idle_ms': 9 * 60 * 1000,
+        - 'metric_reporters': [],
+        - 'metrics_num_samples': 2,
+        - 'metrics_sample_window_ms': 30000,
+        - 'metric_group_prefix': 'consumer',
+        - 'selector': selectors.DefaultSelector,
+        - 'exclude_internal_topics': True,
+        - 'sasl_mechanism': None,
+        - 'sasl_plain_username': None,
+        - 'sasl_plain_password': None,
+        - 'sasl_kerberos_service_name': 'kafka'
+        """
+
+        for key, value in six.iteritems(kwargs):
+            self._consumer_config[key] = value
+
+            # disable default ssl context if user configures a ssl setting
+            if key.startswith('ssl') and key != 'ssl_context':
+                self._consumer_config['ssl_context'] = None
+
+    def configuration_get(self):
+        """
+        Shows the current setting of the Kafka interface. For available keys check keyword _Configuration Set_
+        """
+        return self._consumer_config
+
+    def _add_record(self, record, **kwargs):
+        try:
+
+            if kwargs.get('timestamp_from') == 0 or kwargs.get('timestamp_from') <= record.timestamp:
+
+                record = {
+                    'topic': record.topic,
+                    'timestamp': datetime.fromtimestamp(int(record.timestamp) / 1e3).strftime('%Y-%m-%d %H:%M:%S.%f'),
+                    'timestamp_sec': record.timestamp,
+                    'message': record.value
+                }
+
+                if record not in self._record_list:
+                    self._record_list.append(record)
+
+        except Exception as e:
+            self._log.error(e)
+
+    def _create_consumer(self):
+        return kafka.KafkaConsumer(**self._consumer_config)