| #!/usr/bin/env python3 |
| |
| # SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org> |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| # utils.py |
| # The utility functions shared among nbhelper objects |
| |
| import re |
| import logging |
| import argparse |
| import pynetbox |
| import requests |
| |
| from ruamel import yaml |
| |
| # Initialize module-level variables |
| netboxapi = None |
| netbox_config = None |
| netbox_version = None |
| |
| # create shared logger |
| logging.basicConfig() |
| logger = logging.getLogger("nbh") |
| |
| |
| def initialize(extra_args): |
| """ |
| Initialize the NBHelper module, the extra_args is required, it contains the |
| NetBox API url, API token, and the name of site |
| """ |
| global netboxapi, netbox_config, netbox_version |
| |
| args = parse_cli_args(extra_args) |
| netbox_config = yaml.safe_load(args.settings.read()) |
| |
| for require_args in ["api_endpoint", "token", "tenant_name"]: |
| if not netbox_config.get(require_args): |
| logger.error("The require argument: %s was not set. Stop." % require_args) |
| sys.exit(1) |
| |
| netboxapi = pynetbox.api( |
| netbox_config["api_endpoint"], token=netbox_config["token"], threading=True, |
| ) |
| |
| if not netbox_config.get("validate_certs", True): |
| session = requests.Session() |
| session.verify = False |
| netboxapi.http_session = session |
| |
| netbox_version = netboxapi.version |
| |
| return args |
| |
| |
| def parse_cli_args(extra_args={}): |
| """ |
| parse CLI arguments. Can add extra arguments with a option:kwargs dict |
| """ |
| |
| parser = argparse.ArgumentParser(description="Netbox") |
| |
| # Positional args |
| parser.add_argument( |
| "settings", |
| type=argparse.FileType("r"), |
| help="YAML Ansible inventory file w/NetBox API token", |
| ) |
| |
| parser.add_argument( |
| "--debug", action="store_true", help="Print additional debugging information" |
| ) |
| |
| for ename, ekwargs in extra_args.items(): |
| parser.add_argument(ename, **ekwargs) |
| |
| args = parser.parse_args() |
| log_level = logging.DEBUG if args.debug else logging.INFO |
| logger.setLevel(log_level) |
| |
| return args |
| |
| |
| def check_name_dns(name): |
| |
| badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII) |
| |
| if badchars: |
| logger.error( |
| "DNS name '%s' has one or more invalid characters: '%s'", |
| name, |
| badchars.group(0), |
| ) |
| sys.exit(1) |
| |
| return name.lower() |
| |
| |
| def clean_name_dns(name): |
| return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII) |
| |
| |
| def apply_as_router(output_yaml): |
| """ Add the router-specific value for output structure """ |
| |
| output_yaml.setdefault("netprep_router", True) |
| output_yaml.setdefault("tftpd_files", ["undionly.kpxe"]) |
| output_yaml.setdefault( |
| "vhosts", [{"name": "default", "default_server": True, "autoindex": True}] |
| ) |
| output_yaml.setdefault("acme_username", "www-data") |
| |
| return output_yaml |
| |
| |
| class AttrDict(dict): |
| def __init__(self, *args, **kwargs): |
| super(AttrDict, self).__init__(*args, **kwargs) |
| self.__dict__ = self |