blob: 5614ba92f42f05fa841a47aa26331f0dbc8d1ba5 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
# utils.py
# The utility functions shared among nbhelper objects
import re
import logging
import argparse
import pynetbox
import requests
from ruamel import yaml
# Initialize module-level variables
netboxapi = None
netbox_config = None
netbox_version = None
# create shared logger
logging.basicConfig()
logger = logging.getLogger("nbh")
def initialize(extra_args):
"""
Initialize the NBHelper module, the extra_args is required, it contains the
NetBox API url, API token, and the name of site
"""
global netboxapi, netbox_config, netbox_version
args = parse_cli_args(extra_args)
netbox_config = yaml.safe_load(args.settings.read())
for require_args in ["api_endpoint", "token", "tenant_name"]:
if not netbox_config.get(require_args):
logger.error("The require argument: %s was not set. Stop." % require_args)
sys.exit(1)
netboxapi = pynetbox.api(
netbox_config["api_endpoint"], token=netbox_config["token"], threading=True,
)
if not netbox_config.get("validate_certs", True):
session = requests.Session()
session.verify = False
netboxapi.http_session = session
netbox_version = netboxapi.version
return args
def parse_cli_args(extra_args={}):
"""
parse CLI arguments. Can add extra arguments with a option:kwargs dict
"""
parser = argparse.ArgumentParser(description="Netbox")
# Positional args
parser.add_argument(
"settings",
type=argparse.FileType("r"),
help="YAML Ansible inventory file w/NetBox API token",
)
parser.add_argument(
"--debug", action="store_true", help="Print additional debugging information"
)
for ename, ekwargs in extra_args.items():
parser.add_argument(ename, **ekwargs)
args = parser.parse_args()
log_level = logging.DEBUG if args.debug else logging.INFO
logger.setLevel(log_level)
return args
def check_name_dns(name):
badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII)
if badchars:
logger.error(
"DNS name '%s' has one or more invalid characters: '%s'",
name,
badchars.group(0),
)
sys.exit(1)
return name.lower()
def clean_name_dns(name):
return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII)
def apply_as_router(output_yaml):
""" Add the router-specific value for output structure """
output_yaml.setdefault("netprep_router", True)
output_yaml.setdefault("tftpd_files", ["undionly.kpxe"])
output_yaml.setdefault(
"vhosts", [{"name": "default", "default_server": True, "autoindex": True}]
)
output_yaml.setdefault("acme_username", "www-data")
return output_yaml
class AttrDict(dict):
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self