blob: 9f4e9d60b7a492fb285b27cff7597a5252c90541 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2020 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
# netbox_edgeconfig.py
# given a s
from __future__ import absolute_import
import argparse
import json
import logging
import netaddr
import os
import re
import ssl
import urllib.parse
import urllib.request
from ruamel import yaml
# create shared logger
logging.basicConfig()
logger = logging.getLogger("nbec")
# global dict of jsonpath expressions -> compiled jsonpath parsers, as
# reparsing expressions in each loop results in 100x longer execution time
jpathexpr = {}
# headers to pass, set globally
headers = []
# settings
settings = {}
# cached data from API
device_interface_cache = {}
device_services_cache = {}
interface_mac_cache = {}
# parent prefixes
parent_prefixes = {}
def parse_nb_args():
"""
parse CLI arguments
"""
parser = argparse.ArgumentParser(description="NetBox Edge Config")
# Positional args
parser.add_argument(
"settings",
type=argparse.FileType("r"),
help="YAML ansible inventory file w/NetBox API token",
)
parser.add_argument(
"--debug", action="store_true", help="Print additional debugging information"
)
return parser.parse_args()
def json_api_get(
url,
headers,
data=None,
trim_prefix=False,
allow_failure=False,
validate_certs=False,
):
"""
Call JSON API endpoint, return data as a dict
"""
logger.debug("json_api_get url: %s", url)
# if data included, encode it as JSON
if data:
data_enc = str(json.dumps(data)).encode("utf-8")
request = urllib.request.Request(url, data=data_enc, method="POST")
request.add_header("Content-Type", "application/json; charset=UTF-8")
else:
request = urllib.request.Request(url)
# add headers tuples
for header in headers:
request.add_header(*header)
try:
if validate_certs:
response = urllib.request.urlopen(request)
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(request, context=ctx)
except urllib.error.HTTPError:
# asking for data that doesn't exist results in a 404, just return nothing
if allow_failure:
return None
logger.exception("Server encountered an HTTPError at URL: '%s'", url)
except urllib.error.URLError:
logger.exception("An URLError occurred at URL: '%s'", url)
else:
# docs: https://docs.python.org/3/library/json.html
jsondata = response.read()
logger.debug("API response: %s", jsondata)
try:
data = json.loads(jsondata)
except json.decoder.JSONDecodeError:
# allow return of no data
if allow_failure:
return None
logger.exception("Unable to decode JSON")
else:
logger.debug("JSON decoded: %s", data)
return data
def create_dns_zone(extension, devs, parent_devs={}):
# Checks for dns entries
a_recs = {} # PTR records created by inverting this
cname_recs = {}
srv_recs = {}
ns_recs = []
txt_recs = {}
# scan through devs and look for dns_name, if not, make from name and
# extension
for name, value in devs.items():
# add DNS entries for every DHCP host if there's a DHCP range
# DHCP addresses are of the form dhcp###.extension
if name == "prefix_dhcp":
for ip in netaddr.IPNetwork(value["dhcp_range"]).iter_hosts():
a_recs["dhcp%03d" % (ip.words[3])] = str(ip)
continue
# require DNS names to only use ASCII characters (alphanumeric, lowercase, with dash/period)
# _'s are used in SRV/TXT records, but in general use aren't recommended
dns_name = re.sub("[^a-z0-9.-]", "-", name, 0, re.ASCII)
# Add as an A record (and inverse, PTR record), only if it's a new name
if dns_name not in a_recs:
a_recs[dns_name] = value["ip4"]
else:
# most likely a data entry error
logger.warning(
"Duplicate DNS name '%s' for devices at IP: '%s' and '%s', ignoring",
dns_name,
a_recs[dns_name],
value["ip4"],
)
continue
# if a DNS name is given as a part of the IP address, it's viewed as a CNAME
if value["dns_name"]:
if re.search("%s$" % extension, value["dns_name"]):
# strip off the extension, and add as a CNAME
dns_cname = value["dns_name"].split(".%s" % extension)[0]
elif "." in value["dns_name"]:
logger.warning(
"Device '%s' has a IP assigned DNS name '%s' outside the prefix extension: '%s', ignoring",
name,
value["dns_name"],
extension,
)
continue
else:
dns_cname = value["dns_name"]
if dns_cname == dns_name:
logger.warning(
"DNS Name field '%s' is identical to device name '%s', ignoring",
value["dns_name"],
dns_name,
)
else:
cname_recs[dns_cname] = "%s.%s." % (dns_name, extension)
# Add services as cnames, and possibly ns records
for svc in value["services"]:
# only add service if it uses the IP of the host
if value["ip4"] in svc["ip4s"]:
cname_recs[svc["name"]] = "%s.%s." % (dns_name, extension)
if svc["port"] == 53 and svc["protocol"] == "udp":
ns_recs.append("%s.%s." % (dns_name, extension))
# iterate over the parent devs to add additional nameservers
for pname, pval in parent_devs.items():
if "services" in pval:
for svc in pval["services"]:
# look for DNS servers
if svc["port"] == 53 and svc["protocol"] == "udp":
# make name
dns_name = re.sub("[^a-z0-9.-]", "-", pname, 0, re.ASCII)
# add an a record for this nameserver if IP is outside of subnet
a_recs[dns_name] = pval["ip4"]
# add a NS record if it doesn't already exist
ns_name = "%s.%s." % (dns_name, extension)
if ns_name not in ns_recs:
ns_recs.append(ns_name)
return {
"a": a_recs,
"cname": cname_recs,
"ns": ns_recs,
"srv": srv_recs,
"txt": txt_recs,
}
def create_dhcp_subnet(prefix, prefix_search, devs, parent_devs={}):
# makes DHCP subnet information
subnet = {}
subnet["subnet"] = prefix
subnet["dns_search"] = [prefix_search]
def dhcp_iterate(devs):
# inner function to iterate over a dev list
ihosts = []
idyn_range = None
irouter = []
idns_servers = []
itftpd_server = None
for name, value in devs.items():
# handle a DHCP range
if name == "prefix_dhcp":
idyn_range = value["dhcp_range"]
continue
# handle a router reservation
if name == "router":
ir = {"ip": value["ip4"]}
if (
"rfc3442routes" in value["custom_fields"]
and value["custom_fields"]["rfc3442routes"]
):
ir["rfc3442routes"] = value["custom_fields"]["rfc3442routes"].split(
","
)
irouter.append(ir)
continue
# has a MAC address, and it's not null
if "macaddr" in value and value["macaddr"]:
ihosts.append(
{
"name": name,
"ip_addr": value["ip4"],
"mac_addr": value["macaddr"].lower(),
}
)
# Add dns based on service entries
if "services" in value:
for svc in value["services"]:
# add DNS server
if svc["port"] == 53 and svc["protocol"] == "udp":
idns_servers.append(value["ip4"])
# add tftp server
if svc["port"] == 69 and svc["protocol"] == "udp":
itftpd_server = value["ip4"]
return (ihosts, idyn_range, irouter, idns_servers, itftpd_server)
# run inner function and build
hosts, dyn_range, router, dns_servers, tftpd_server = dhcp_iterate(devs)
# assign only hosts, dynamic range, based on the prefix
subnet["hosts"] = hosts
subnet["range"] = dyn_range
# only assign router if specified
if router:
subnet["routers"] = router
# find parent prefix devices, to fill in where needed
phosts, pdyn_range, prouter, pdns_servers, ptftpd_server = dhcp_iterate(parent_devs)
# use parent prefix devices if dns/tftp services needed aren't found within prefix
if dns_servers:
subnet["dns_servers"] = dns_servers
else:
subnet["dns_servers"] = pdns_servers
if tftpd_server:
subnet["tftpd_server"] = tftpd_server
else:
subnet["tftpd_server"] = ptftpd_server
return subnet
def find_dhcpd_interface(prefix, devs):
# DHCPd interface is first usable IP in range
first_ip = str(netaddr.IPAddress(netaddr.IPNetwork(prefix).first + 1))
# look for interface corresponding to first IP address in range
for name, value in devs.items():
if "ip4" in value:
if value["ip4"] == first_ip:
return value["iface"]
# if interface not found, return None and ignore
return None
def get_device_services(device_id, filters=""):
if device_id in device_services_cache:
return device_services_cache[device_id]
# get services info
url = "%s%s" % (
settings["api_endpoint"],
"api/ipam/services/?device_id=%s%s" % (device_id, filters),
)
raw_svcs = json_api_get(url, headers, validate_certs=settings["validate_certs"])
services = []
for rsvc in raw_svcs["results"]:
svc = {}
svc["name"] = rsvc["name"]
svc["description"] = rsvc["description"]
svc["port"] = rsvc["port"]
svc["protocol"] = rsvc["protocol"]["value"]
svc["ip4s"] = []
for ip in rsvc["ipaddresses"]:
svc["ip4s"].append(str(netaddr.IPNetwork(ip["address"]).ip))
services.append(svc)
device_services_cache[device_id] = services
return services
def get_interface_mac_addr(interface_id):
# return a mac addres, or None if undefined
if interface_id in interface_mac_cache:
return interface_mac_cache[interface_id]
# get the interface info
url = "%s%s" % (settings["api_endpoint"], "api/dcim/interfaces/%s/" % interface_id)
iface = json_api_get(url, headers, validate_certs=settings["validate_certs"])
if iface["mac_address"]:
interface_mac_cache[interface_id] = iface["mac_address"]
return iface["mac_address"]
interface_mac_cache[interface_id] = None
return None
def get_device_interfaces(device_id, filters=""):
if device_id in device_interface_cache:
return device_interface_cache[device_id]
url = "%s%s" % (
settings["api_endpoint"],
"api/dcim/interfaces/?device_id=%s%s&mgmt_only=true" % (device_id, filters),
)
logger.debug("raw_ifaces_url: %s", url)
raw_ifaces = json_api_get(url, headers, validate_certs=settings["validate_certs"])
logger.debug("raw_ifaces: %s", raw_ifaces)
ifaces = []
for raw_iface in raw_ifaces["results"]:
iface = {}
iface["name"] = raw_iface["name"]
iface["macaddr"] = raw_iface["mac_address"]
iface["mgmt_only"] = raw_iface["mgmt_only"]
iface["description"] = raw_iface["description"]
if raw_iface["count_ipaddresses"]:
url = "%s%s" % (
settings["api_endpoint"],
"api/ipam/ip-addresses/?interface_id=%s" % raw_iface["id"],
)
raw_ip = json_api_get(
url, headers, validate_certs=settings["validate_certs"]
)
iface["ip4"] = str(netaddr.IPNetwork(raw_ip["results"][0]["address"]).ip)
ifaces.append(iface)
device_interface_cache[device_id] = ifaces
return ifaces
def get_prefix_devices(prefix, filters=""):
# get all devices in a prefix
url = "%s%s" % (
settings["api_endpoint"],
"api/ipam/ip-addresses/?parent=%s%s" % (prefix, filters),
)
raw_ips = json_api_get(url, headers, validate_certs=settings["validate_certs"])
logger.debug("raw_ips: %s", raw_ips)
devs = {}
# iterate by IP, sorted
for ip in sorted(raw_ips["results"], key=lambda k: k["address"]):
logger.debug("ip: %s", ip)
# if it's a DHCP range, add that range to the dev list as prefix_dhcp
if ip["status"]["value"] == "dhcp":
devs["prefix_dhcp"] = {"dhcp_range": ip["address"]}
continue
# if it's a reserved IP
if ip["status"]["value"] == "reserved":
res = {}
res["type"] = "reserved"
res["description"] = ip["description"]
res["ip4"] = str(netaddr.IPNetwork(ip["address"]).ip)
res["dns_name"] = ip["dns_name"] if "dns_name" in ip else "None"
res["services"] = {}
res["custom_fields"] = ip["custom_fields"]
resname = res["description"].lower().split(" ")[0]
devs[resname] = res
continue
aotype = ip["assigned_object_type"]
# don't handle VM's yet
if aotype == "virtualization.vminterface":
continue
dev = {}
dev["type"] = "device"
dev["ip4"] = str(netaddr.IPNetwork(ip["address"]).ip)
dev["macaddr"] = get_interface_mac_addr(ip["assigned_object"]["id"])
ifaces = get_device_interfaces(ip["assigned_object"]["device"]["id"])
if ifaces and dev["ip4"] == ifaces[0]["ip4"]: # this is a mgmt IP
devname = "%s-%s" % (
ip["assigned_object"]["device"]["name"].lower().split(".")[0],
ifaces[0]["name"],
)
dev["iface"] = ip["assigned_object"]["name"]
dev["dns_name"] = ""
dev["services"] = []
else: # this is a primary IP
name = ip["assigned_object"]["device"]["name"]
devname = name.lower().split(".")[0]
dev["iface"] = ip["assigned_object"]["name"]
dev["dns_name"] = ip["dns_name"] if "dns_name" in ip else "None"
dev["services"] = get_device_services(ip["assigned_object"]["device"]["id"])
# fix multihomed devices in same IP range
# FIXME: Does not handle > 2 connections properly
if devname in devs:
devs["%s-1" % devname] = devs.pop(devname)
devs["%s-2" % devname] = dev
else:
devs[devname] = dev
return devs
def get_parent_prefix(child_prefix):
# returns a parent prefix given a child prefix
# FIXME: only returns the first found prefix, so doesn't handle more than 2 layers of hierarchy
# get all devices in a prefix
url = "%s%s" % (
settings["api_endpoint"],
"api/ipam/prefixes/?contains=%s" % child_prefix,
)
raw_prefixes = json_api_get(url, headers, validate_certs=settings["validate_certs"])
logger.debug(raw_prefixes)
for prefix in raw_prefixes["results"]:
if prefix["prefix"] != child_prefix:
return prefix["prefix"]
return None
def get_prefix_data(prefix):
# get all devices in a prefix
url = "%s%s" % (settings["api_endpoint"], "api/ipam/prefixes/?prefix=%s" % prefix)
raw_prefix = json_api_get(url, headers, validate_certs=settings["validate_certs"])
logger.debug("raw_prefix: %s", raw_prefix)
return raw_prefix["results"][0]
# main function that calls other functions
if __name__ == "__main__":
args = parse_nb_args()
# only print log messages if debugging
if args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# load settings from yaml file
settings = yaml.safe_load(args.settings.read())
yaml_out = {}
# load default config
with open(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), "base_edgeconfig.yaml"
)
) as defconfig:
yaml_out = yaml.safe_load(defconfig)
logger.debug("settings: %s" % settings)
# global, so this isn't run multiple times
headers = [
("Authorization", "Token %s" % settings["token"]),
]
# create structure from extracted data
dns_zones = {}
dns_rev_zones = {}
dhcpd_subnets = []
dhcpd_interfaces = []
devs_per_prefix = {}
prefixes = {}
parent_prefixes = {}
for prefix in settings["ip_prefixes"]:
prefix_data = get_prefix_data(prefix)
parent_prefix = get_parent_prefix(prefix)
prefix_data["parent"] = parent_prefix
pdevs = {}
if parent_prefix:
if parent_prefix in parent_prefixes:
pdevs = devs_per_prefix[parent_prefix]
else:
pdevs = get_prefix_devices(parent_prefix)
devs_per_prefix[parent_prefix] = pdevs
prefix_data["parent_devs"] = pdevs
prefixes[prefix] = prefix_data
prefix_domain_extension = prefix_data["description"]
devs = get_prefix_devices(prefix)
devs_per_prefix[prefix] = devs
dns_zones[prefix_domain_extension] = create_dns_zone(
prefix_domain_extension, devs, pdevs
)
dns_zones[prefix_domain_extension]["ip_range"] = prefix
dhcpd_subnets.append(
create_dhcp_subnet(prefix, prefix_domain_extension, devs, pdevs)
)
dhcpd_if = find_dhcpd_interface(prefix, devs)
if dhcpd_if and dhcpd_if not in dhcpd_interfaces:
dhcpd_interfaces.append(dhcpd_if)
yaml_out.update(
{
"dns_zones": dns_zones,
"dns_rev_zones": dns_rev_zones,
"dhcpd_subnets": dhcpd_subnets,
"dhcpd_interfaces": dhcpd_interfaces,
# the below are useful when debugging
# "devs_per_prefix": devs_per_prefix,
# "prefixes": prefixes,
}
)
print(yaml.safe_dump(yaml_out, indent=2))