move the library to ONF
Change-Id: I383437e2934ce04cc1a7dc332134f7308991776f
diff --git a/grpc_robot/grpc_robot.py b/grpc_robot/grpc_robot.py
new file mode 100644
index 0000000..2de6192
--- /dev/null
+++ b/grpc_robot/grpc_robot.py
@@ -0,0 +1,349 @@
+# Copyright 2020-present Open Networking Foundation
+# Original copyright 2020-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+import os
+import grpc
+import json
+import getpass
+import inspect
+import logging
+import logging.config
+import tempfile
+import textwrap
+import importlib
+import pkg_resources
+
+from .tools.protop import ProtoBufParser
+from distutils.version import StrictVersion
+from robot.api.deco import keyword
+from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
+
+
+def _package_version_get(package_name, source=None):
+ """
+ Returns the installed version number for the given pip package with name _package_name_.
+ """
+
+ if source:
+ head, tail = os.path.split(os.path.dirname(os.path.abspath(source)))
+
+ while tail:
+ try:
+ with open(os.path.join(head, 'VERSION')) as version_file:
+ return version_file.read().strip()
+ except Exception:
+ head, tail = os.path.split(head)
+
+ try:
+ return pkg_resources.get_distribution(package_name).version
+ except pkg_resources.DistributionNotFound:
+ raise NameError("Package '%s' is not installed!" % package_name)
+
+
+class GrpcRobot(object):
+
+ device = None
+ package_name = ''
+ installed_package = None
+
+ try:
+ ROBOT_LIBRARY_VERSION = _package_version_get('grpc_robot')
+ except NameError:
+ ROBOT_LIBRARY_VERSION = 'unknown'
+
+ ROBOT_LIBRARY_SCOPE = 'TEST_SUITE'
+ global_init = 0
+ global_timeout = 120
+ min_robot_version = 30202
+
+ connection_type = 'grpc'
+
+ def __init__(self, **kwargs):
+ super().__init__()
+
+ self._host = None
+ self._port = None
+
+ self.grpc_channel = None
+ self.timeout = 30
+ self.protobuf = None
+
+ self.keywords = {}
+
+ self.enable_logging()
+ self.logger = logging.getLogger('grpc')
+
+ self.pb_version = self.get_installed_version() or self.get_latest_pb_version()
+ self.load_services(self.pb_version)
+
+ @staticmethod
+ def enable_logging():
+
+ try:
+ log_dir = BuiltIn().replace_variables('${OUTPUT_DIR}')
+ except RobotNotRunningError:
+ log_dir = tempfile.gettempdir()
+
+ try:
+ logfile_name = os.path.join(log_dir, 'grpc_robot_%s.log' % getpass.getuser())
+ except KeyError:
+ logfile_name = os.path.join(log_dir, 'grpc_robot.log')
+
+ logging.config.dictConfig({
+ 'version': 1,
+ 'disable_existing_loggers': False,
+
+ 'formatters': {
+ 'standard': {
+ 'format': '%(asctime)s %(name)s [%(levelname)s] : %(message)s'
+ },
+ },
+ 'handlers': {
+ 'file': {
+ 'level': 'DEBUG',
+ 'class': 'logging.FileHandler',
+ 'mode': 'a',
+ 'filename': logfile_name,
+ 'formatter': 'standard'
+ },
+ },
+ 'loggers': {
+ 'grpc': {
+ 'handlers': ['file'],
+ 'level': 'DEBUG',
+ 'propagate': True
+ },
+ }
+ })
+
+ @staticmethod
+ def get_modules(*modules):
+ module_list = []
+ for module in modules:
+ for name, obj in inspect.getmembers(module, predicate=lambda o: inspect.isclass(o)):
+ module_list.append(obj)
+
+ return module_list
+
+ @staticmethod
+ def get_keywords_from_modules(*modules):
+ keywords = {}
+
+ for module in modules:
+ for name, obj in inspect.getmembers(module):
+ if hasattr(obj, 'robot_name'):
+ keywords[name] = module
+
+ return keywords
+
+ def get_installed_version(self):
+ dists = [str(d).split() for d in pkg_resources.working_set if str(d).split()[0] == self.package_name]
+
+ try:
+ pb_version = dists[0][-1]
+ self.logger.info('installed package %s==%s' % (self.package_name, pb_version))
+ except IndexError:
+ self.logger.error('package for %s not installed' % self.package_name)
+ return None
+
+ if pb_version not in self.get_supported_versions():
+ self.logger.warning('installed package %s==%s not supported by library, using version %s' % (
+ self.package_name, pb_version, self.get_latest_pb_version()))
+ pb_version = None
+
+ return pb_version
+
+ def get_supported_versions(self):
+
+ path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'services', self.device)
+
+ return sorted([
+ (name.split(self.device + '_')[1]).replace('_', '.')
+ for name in os.listdir(path)
+ if os.path.isdir(os.path.join(path, name)) and name.startswith(self.device)
+ ], key=StrictVersion)
+
+ def get_latest_pb_version(self):
+ return self.get_supported_versions()[-1]
+
+ def load_services(self, pb_version):
+ pb_version = pb_version.replace('.', '_')
+ path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'services', self.device, '%s_%s' % (self.device, pb_version))
+
+ modules = importlib.import_module('grpc_robot.services.%s.%s_%s' % (self.device, self.device, pb_version))
+
+ module_list = self.get_modules(modules)
+
+ self.keywords = self.get_keywords_from_modules(*module_list)
+
+ try:
+ self.protobuf = json.loads(open(os.path.join(path, '%s.json' % self.device)).read())
+ self.logger.debug('loaded services from %s' % os.path.join(path, '%s.json' % self.device))
+ except FileNotFoundError:
+ pip_dir = os.path.join(os.path.dirname(self.installed_package.__file__), 'protos')
+ self.protobuf = ProtoBufParser(self.device, self.pb_version, pip_dir).parse_files()
+
+ @keyword
+ def get_keyword_names(self):
+ """
+ Returns the list of keyword names
+ """
+ return sorted(list(self.keywords.keys()) + [name for name in dir(self) if hasattr(getattr(self, name), 'robot_name')])
+
+ def run_keyword(self, keyword_name, args, kwargs):
+ """
+ http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#running-keywords
+
+ :param keyword_name: name of method to run
+ :param args: arguments to this method
+ :param kwargs: kwargs
+ :return: whatever the method returns
+ """
+ if keyword_name in self.keywords:
+ c = self.keywords[keyword_name](self)
+ else:
+ c = self
+
+ return getattr(c, keyword_name)(*args, **kwargs)
+
+ def get_keyword_arguments(self, keyword_name):
+ """
+ http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#getting-keyword-arguments
+
+ :param keyword_name: name of method
+ :return: list of method arguments like in urls above
+ """
+
+ if keyword_name in self.keywords:
+ a = inspect.getargspec(getattr(self.keywords[keyword_name], keyword_name))
+ else:
+ a = inspect.getargspec(getattr(self, keyword_name))
+
+ # skip "self" as first parameter -> [1:]
+ args_without_defaults = a.args[1:-len(a.defaults)] if a.defaults is not None else a.args[1:]
+
+ args_with_defaults = []
+ if a.defaults is not None:
+ args_with_defaults = zip(a.args[-len(a.defaults):], a.defaults)
+ args_with_defaults = ['%s=%s' % (x, y) for x, y in args_with_defaults]
+
+ args = args_without_defaults + args_with_defaults
+
+ if a.varargs is not None:
+ args.append('*%s' % a.varargs)
+
+ if a.keywords is not None:
+ args.append('**%s' % a.keywords)
+
+ return args
+
+ def get_keyword_documentation(self, keyword_name):
+ """
+ http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#getting-keyword-documentation
+ http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#documentation-formatting
+
+ :param keyword_name: name of method to get documentation for
+ :return: string formatted according documentation
+ """
+
+ if keyword_name == '__intro__':
+ return self.__doc__
+
+ doc_string = ''
+
+ if keyword_name in self.keywords:
+ c = self.keywords[keyword_name]
+ doc_string += textwrap.dedent(getattr(c, keyword_name).__doc__ or '') + '\n'
+ doc_string += c(self).get_documentation(keyword_name) # instanciate class and call "get_documentation"
+ return doc_string
+
+ return textwrap.dedent(getattr(self, keyword_name).__doc__ or '')
+
+ @keyword
+ def connection_open(self, host, port, **kwargs):
+ """
+ Opens a connection to the gRPC host.
+
+ *Parameters*:
+ - host: <string>|<IP address>; Name or IP address of the gRPC host.
+ - port: <number>; TCP port of the gRPC host.
+
+ *Named Parameters*:
+ - timeout: <number>; Timeout in seconds for a gRPC response. Default: 30 s
+ """
+ self._host = host
+ self._port = port
+ self.timeout = int(kwargs.get('timeout', self.timeout))
+
+ channel_options = [
+ ('grpc.keepalive_time_ms', 10000),
+ ('grpc.keepalive_timeout_ms', 5000)
+ ]
+
+ if kwargs.get('insecure', True):
+ self.grpc_channel = grpc.insecure_channel('%s:%s' % (self._host, self._port), options=channel_options)
+ else:
+ raise NotImplementedError('other than "insecure channel" not implemented')
+
+ user_pb_version = kwargs.get('pb_version') or self.pb_version
+ pb_version = user_pb_version # ToDo: or device_pb_version # get the pb version from device when available
+
+ self.load_services(pb_version)
+
+ @keyword
+ def connection_close(self):
+ """
+ Closes the connection to the gRPC host.
+ """
+ del self.grpc_channel
+ self.grpc_channel = None
+
+ def _connection_parameters_get(self):
+ return {
+ 'timeout': self.timeout
+ }
+
+ @keyword
+ def connection_parameters_set(self, **kwargs):
+ """
+ Sets the gRPC channel connection parameters.
+
+ *Named Parameters*:
+ - timeout: <number>; Timeout in seconds for a gRPC response.
+
+ *Return*: Same dictionary as the keyword _Connection Parameter Get_ with the values before they got changed.
+ """
+ connection_parameters = self._connection_parameters_get()
+
+ self.timeout = int(kwargs.get('timeout', self.timeout))
+
+ return connection_parameters
+
+ @keyword
+ def connection_parameters_get(self):
+ """
+ Retrieves the connection parameters for the gRPC channel.
+
+ *Return*: A dictionary with the keys:
+ - timeout
+ """
+ return self._connection_parameters_get()
+
+ @keyword
+ def library_version_get(self):
+ """
+ Retrieve the version of the currently running library instance.
+
+ *Return*: version string consisting of three dot-separated numbers (x.y.z)
+ """
+ return self.ROBOT_LIBRARY_VERSION