| import os |
| import sys |
| import yaml |
| import requests |
| import default |
| from pykwalify.core import Core as PyKwalify |
| |
| DEFAULT_CONFIG_FILE = "/opt/xos/xos_config.yaml" |
| INITIALIZED = False |
| CONFIG = {} |
| |
| class Config: |
| """ |
| XOS Configuration APIs |
| """ |
| |
| @staticmethod |
| def init(config_file=DEFAULT_CONFIG_FILE): |
| global INITIALIZED |
| global CONFIG |
| # the config module can be initialized only one |
| if INITIALIZED: |
| raise Exception('[XOS-Config] Module already initialized') |
| INITIALIZED = True |
| |
| # if XOS-CONFIG is defined override the config_file |
| if os.environ.get('XOS-CONFIG'): |
| config_file = os.environ['XOS-CONFIG'] |
| |
| # if a -C parameter is set in the cli override the config_file |
| # FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method |
| if Config.get_cli_param(sys.argv): |
| config_file = Config.get_cli_param(sys.argv) |
| |
| CONFIG = Config.read_config(config_file) |
| |
| @staticmethod |
| def clear(): |
| global INITIALIZED |
| INITIALIZED = False |
| |
| @staticmethod |
| def validate_config_format(config_file): |
| schema = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config-schema.yaml') |
| c = PyKwalify(source_file=config_file, schema_files=[schema]) |
| c.validate(raise_exception=True) |
| |
| @staticmethod |
| def get_cli_param(args): |
| last = None |
| for arg in args: |
| if last == '-C': |
| return arg |
| last = arg |
| |
| @staticmethod |
| def read_config(config_file): |
| """ |
| Read the configuration file and return a dictionary |
| :param config_file: string |
| :return: dict |
| """ |
| if not os.path.exists(config_file): |
| raise Exception('[XOS-Config] Config file not found at: %s' % config_file) |
| |
| try: |
| Config.validate_config_format(config_file) |
| except Exception, e: |
| raise Exception('[XOS-Config] The config format is wrong: %s' % e.msg) |
| |
| with open(config_file, 'r') as stream: |
| return yaml.safe_load(stream) |
| |
| @staticmethod |
| def get(query): |
| """ |
| Read a parameter from the config |
| :param query: a dot separated selector for configuration options (eg: database.username) |
| :return: the requested parameter in any format the parameter is specified |
| """ |
| global INITIALIZED |
| global CONFIG |
| |
| if not INITIALIZED: |
| raise Exception('[XOS-Config] Module has not been initialized') |
| |
| val = Config.get_param(query, CONFIG) |
| if not val: |
| val = Config.get_param(query, default.DEFAULT_VALUES) |
| if not val: |
| raise Exception('[XOS-Config] Config does not have a value (or a default) parameter %s' % query) |
| return val |
| |
| @staticmethod |
| def get_param(query, config): |
| """ |
| Search for a parameter in config's first level, other call get_nested_param |
| :param query: a dot separated selector for configuration options (eg: database.username) |
| :param config: the config source to read from (can be the config file or the defaults) |
| :return: the requested parameter in any format the parameter is specified |
| """ |
| keys = query.split('.') |
| if len(keys) == 1: |
| key = keys[0] |
| if not config.has_key(key): |
| return None |
| return config[key] |
| else: |
| return Config.get_nested_param(keys, config) |
| |
| @staticmethod |
| def get_nested_param(keys, config): |
| """ |
| |
| :param keys: a list of descending selector |
| :param config: the config source to read from (can be the config file or the defaults) |
| :return: the requested parameter in any format the parameter is specified |
| """ |
| param = config |
| for k in keys: |
| if not param.has_key(k): |
| return None |
| param = param[k] |
| return param |
| |
| @staticmethod |
| def get_service_list(): |
| """ |
| Query registrator to get the list of services |
| NOTE: we assume that consul is a valid URL |
| :return: a list of service names |
| """ |
| service_dict = requests.get('http://consul:8500/v1/catalog/services').json() |
| service_list = [] |
| for s in service_dict: |
| service_list.append(s) |
| return service_list |
| |
| @staticmethod |
| def get_service_info(service_name): |
| """ |
| Query registrator to get the details about a service |
| NOTE: we assume that consul is a valid URL |
| :param service_name: the name of the service, can be retrieved from get_service_list |
| :return: the informations about a service |
| """ |
| response = requests.get('http://consul:8500/v1/catalog/service/%s' % service_name) |
| if not response.ok: |
| raise Exception('[XOS-Config] Registrator is down') |
| service = response.json() |
| if not service or len(service) == 0: |
| raise Exception('[XOS-Config] The service missing-service looking for does not exist') |
| return { |
| 'name': service[0]['ServiceName'], |
| 'url': service[0]['ServiceAddress'], |
| 'port': service[0]['ServicePort'] |
| } |
| |
| @staticmethod |
| def get_service_endpoint(service_name): |
| """ |
| Query registrator to get the details about a service and return the endpoint in for of a string |
| :param service_name: the name of the service, can be retrieved from get_service_list |
| :return: the endpoint of the service |
| """ |
| service = Config.get_service_info(service_name) |
| return 'http://%s:%s' % (service['url'], service['port']) |
| |
| # NOTE is this needed if this package is not meant to be execute from the CLI? |
| if __name__ == '__main__': |
| config = Config() |
| config.init() |