blob: 427395a451111bf92ce2327768c1f3d252670c86 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2020 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
from __future__ import absolute_import
import argparse
import json
import logging
import netaddr
import re
import ssl
import urllib.parse
import urllib.request
from ruamel import yaml
# create shared logger
logging.basicConfig()
logger = logging.getLogger("nbht")
# headers to pass, set globally
headers = []
# settings
settings = {}
def parse_nb_args():
"""
parse CLI arguments
"""
parser = argparse.ArgumentParser(description="NetBox Host Descriptions")
# Positional args
parser.add_argument(
"settings",
type=argparse.FileType("r"),
help="YAML ansible inventory file w/netbox info",
)
parser.add_argument(
"--debug", action="store_true", help="Print additional debugging information"
)
return parser.parse_args()
def json_api_get(
url,
headers,
data=None,
trim_prefix=False,
allow_failure=False,
validate_certs=False,
):
"""
Call JSON API endpoint, return data as a dict
"""
logger.debug("json_api_get url: %s", url)
# if data included, encode it as JSON
if data:
data_enc = str(json.dumps(data)).encode("utf-8")
request = urllib.request.Request(url, data=data_enc, method="POST")
request.add_header("Content-Type", "application/json; charset=UTF-8")
else:
request = urllib.request.Request(url)
# add headers tuples
for header in headers:
request.add_header(*header)
try:
if validate_certs:
response = urllib.request.urlopen(request)
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(request, context=ctx)
except urllib.error.HTTPError:
# asking for data that doesn't exist results in a 404, just return nothing
if allow_failure:
return None
logger.exception("Server encountered an HTTPError at URL: '%s'", url)
except urllib.error.URLError:
logger.exception("An URLError occurred at URL: '%s'", url)
else:
# docs: https://docs.python.org/3/library/json.html
jsondata = response.read()
logger.debug("API response: %s", jsondata)
try:
data = json.loads(jsondata)
except json.decoder.JSONDecodeError:
# allow return of no data
if allow_failure:
return None
logger.exception("Unable to decode JSON")
else:
logger.debug("JSON decoded: %s", data)
return data
def get_pxe_devices(tenant_group, filters=""):
# get all devices in a prefix
url = "%s%s" % (
settings["api_endpoint"],
"api/dcim/devices/?tenant_group=%s%s" % (tenant_group, filters),
)
print(url)
raw_devs = json_api_get(url, headers, validate_certs=settings["validate_certs"])
logger.debug("raw_devs: %s", raw_devs)
devs = []
for item in raw_devs["results"]:
dev = {}
dev["serial"] = item["serial"]
dev["hostname"] = item["name"]
dev["domain"] = "aetherproject.net"
devs.append(dev)
return devs
# main function that calls other functions
if __name__ == "__main__":
args = parse_nb_args()
# only print log messages if debugging
if args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# load settings from yaml file
settings = yaml.safe_load(args.settings.read())
logger.info("settings: %s" % settings)
# global, so this isn't run multiple times
headers = [
("Authorization", "Token %s" % settings["token"]),
]
# create structure from extracted data
pxe_devices = get_pxe_devices("aether", "&role_id=1")
print(yaml.safe_dump({"pxeboot_hosts": pxe_devices}, indent=2))