blob: 5614ba92f42f05fa841a47aa26331f0dbc8d1ba5 [file] [log] [blame]
Wei-Yu Chenbd495ba2021-08-31 19:46:35 +08001#!/usr/bin/env python3
2
3# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
4# SPDX-License-Identifier: Apache-2.0
5
6# utils.py
7# The utility functions shared among nbhelper objects
8
9import re
10import logging
11import argparse
12import pynetbox
13import requests
14
15from ruamel import yaml
16
17# Initialize module-level variables
18netboxapi = None
19netbox_config = None
20netbox_version = None
21
22# create shared logger
23logging.basicConfig()
24logger = logging.getLogger("nbh")
25
26
27def initialize(extra_args):
28 """
29 Initialize the NBHelper module, the extra_args is required, it contains the
30 NetBox API url, API token, and the name of site
31 """
32 global netboxapi, netbox_config, netbox_version
33
34 args = parse_cli_args(extra_args)
35 netbox_config = yaml.safe_load(args.settings.read())
36
37 for require_args in ["api_endpoint", "token", "tenant_name"]:
38 if not netbox_config.get(require_args):
39 logger.error("The require argument: %s was not set. Stop." % require_args)
40 sys.exit(1)
41
42 netboxapi = pynetbox.api(
43 netbox_config["api_endpoint"], token=netbox_config["token"], threading=True,
44 )
45
46 if not netbox_config.get("validate_certs", True):
47 session = requests.Session()
48 session.verify = False
49 netboxapi.http_session = session
50
51 netbox_version = netboxapi.version
52
53 return args
54
55
56def parse_cli_args(extra_args={}):
57 """
58 parse CLI arguments. Can add extra arguments with a option:kwargs dict
59 """
60
61 parser = argparse.ArgumentParser(description="Netbox")
62
63 # Positional args
64 parser.add_argument(
65 "settings",
66 type=argparse.FileType("r"),
67 help="YAML Ansible inventory file w/NetBox API token",
68 )
69
70 parser.add_argument(
71 "--debug", action="store_true", help="Print additional debugging information"
72 )
73
74 for ename, ekwargs in extra_args.items():
75 parser.add_argument(ename, **ekwargs)
76
77 args = parser.parse_args()
78 log_level = logging.DEBUG if args.debug else logging.INFO
79 logger.setLevel(log_level)
80
81 return args
82
83
84def check_name_dns(name):
85
86 badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII)
87
88 if badchars:
89 logger.error(
90 "DNS name '%s' has one or more invalid characters: '%s'",
91 name,
92 badchars.group(0),
93 )
94 sys.exit(1)
95
96 return name.lower()
97
98
99def clean_name_dns(name):
100 return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII)
101
102
103def apply_as_router(output_yaml):
104 """ Add the router-specific value for output structure """
105
106 output_yaml.setdefault("netprep_router", True)
107 output_yaml.setdefault("tftpd_files", ["undionly.kpxe"])
108 output_yaml.setdefault(
109 "vhosts", [{"name": "default", "default_server": True, "autoindex": True}]
110 )
111 output_yaml.setdefault("acme_username", "www-data")
112
113 return output_yaml
114
115
116class AttrDict(dict):
117 def __init__(self, *args, **kwargs):
118 super(AttrDict, self).__init__(*args, **kwargs)
119 self.__dict__ = self