| #!/usr/bin/env python3 |
| |
| # SPDX-FileCopyrightText: © 2020 Open Networking Foundation <support@opennetworking.org> |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| # netbox_edgeconfig.py |
| # given a s |
| |
| from __future__ import absolute_import |
| |
| import argparse |
| import json |
| import logging |
| import netaddr |
| import os |
| import re |
| import ssl |
| import urllib.parse |
| import urllib.request |
| from ruamel import yaml |
| |
| # create shared logger |
| logging.basicConfig() |
| logger = logging.getLogger("nbec") |
| |
| # global dict of jsonpath expressions -> compiled jsonpath parsers, as |
| # reparsing expressions in each loop results in 100x longer execution time |
| jpathexpr = {} |
| |
| # headers to pass, set globally |
| headers = [] |
| |
| # settings |
| settings = {} |
| |
| # cached data from API |
| device_interface_cache = {} |
| device_services_cache = {} |
| interface_mac_cache = {} |
| |
| # parent prefixes |
| parent_prefixes = {} |
| |
| |
| def parse_nb_args(): |
| """ |
| parse CLI arguments |
| """ |
| |
| parser = argparse.ArgumentParser(description="NetBox Edge Config") |
| |
| # Positional args |
| parser.add_argument( |
| "settings", |
| type=argparse.FileType("r"), |
| help="YAML ansible inventory file w/NetBox API token", |
| ) |
| |
| parser.add_argument( |
| "--debug", action="store_true", help="Print additional debugging information" |
| ) |
| |
| return parser.parse_args() |
| |
| |
| def json_api_get( |
| url, |
| headers, |
| data=None, |
| trim_prefix=False, |
| allow_failure=False, |
| validate_certs=False, |
| ): |
| """ |
| Call JSON API endpoint, return data as a dict |
| """ |
| |
| logger.debug("json_api_get url: %s", url) |
| |
| # if data included, encode it as JSON |
| if data: |
| data_enc = str(json.dumps(data)).encode("utf-8") |
| |
| request = urllib.request.Request(url, data=data_enc, method="POST") |
| request.add_header("Content-Type", "application/json; charset=UTF-8") |
| else: |
| request = urllib.request.Request(url) |
| |
| # add headers tuples |
| for header in headers: |
| request.add_header(*header) |
| |
| try: |
| |
| if validate_certs: |
| response = urllib.request.urlopen(request) |
| |
| else: |
| ctx = ssl.create_default_context() |
| ctx.check_hostname = False |
| ctx.verify_mode = ssl.CERT_NONE |
| |
| response = urllib.request.urlopen(request, context=ctx) |
| |
| except urllib.error.HTTPError: |
| # asking for data that doesn't exist results in a 404, just return nothing |
| if allow_failure: |
| return None |
| logger.exception("Server encountered an HTTPError at URL: '%s'", url) |
| except urllib.error.URLError: |
| logger.exception("An URLError occurred at URL: '%s'", url) |
| else: |
| # docs: https://docs.python.org/3/library/json.html |
| jsondata = response.read() |
| logger.debug("API response: %s", jsondata) |
| |
| try: |
| data = json.loads(jsondata) |
| except json.decoder.JSONDecodeError: |
| # allow return of no data |
| if allow_failure: |
| return None |
| logger.exception("Unable to decode JSON") |
| else: |
| logger.debug("JSON decoded: %s", data) |
| |
| return data |
| |
| |
| def create_dns_zone(extension, devs, parent_devs={}): |
| # Checks for dns entries |
| |
| a_recs = {} # PTR records created by inverting this |
| cname_recs = {} |
| srv_recs = {} |
| ns_recs = [] |
| txt_recs = {} |
| |
| # scan through devs and look for dns_name, if not, make from name and |
| # extension |
| for name, value in devs.items(): |
| |
| # add DNS entries for every DHCP host if there's a DHCP range |
| # DHCP addresses are of the form dhcp###.extension |
| if name == "prefix_dhcp": |
| for ip in netaddr.IPNetwork(value["dhcp_range"]).iter_hosts(): |
| a_recs["dhcp%03d" % (ip.words[3])] = str(ip) |
| |
| continue |
| |
| # require DNS names to only use ASCII characters (alphanumeric, lowercase, with dash/period) |
| # _'s are used in SRV/TXT records, but in general use aren't recommended |
| dns_name = re.sub("[^a-z0-9.-]", "-", name, 0, re.ASCII) |
| |
| # Add as an A record (and inverse, PTR record), only if it's a new name |
| if dns_name not in a_recs: |
| a_recs[dns_name] = value["ip4"] |
| else: |
| # most likely a data entry error |
| logger.warning( |
| "Duplicate DNS name '%s' for devices at IP: '%s' and '%s', ignoring", |
| dns_name, |
| a_recs[dns_name], |
| value["ip4"], |
| ) |
| continue |
| |
| # if a DNS name is given as a part of the IP address, it's viewed as a CNAME |
| if value["dns_name"]: |
| |
| if re.search("%s$" % extension, value["dns_name"]): |
| |
| # strip off the extension, and add as a CNAME |
| dns_cname = value["dns_name"].split(".%s" % extension)[0] |
| |
| elif "." in value["dns_name"]: |
| logger.warning( |
| "Device '%s' has a IP assigned DNS name '%s' outside the prefix extension: '%s', ignoring", |
| name, |
| value["dns_name"], |
| extension, |
| ) |
| continue |
| |
| else: |
| dns_cname = value["dns_name"] |
| |
| if dns_cname == dns_name: |
| logger.warning( |
| "DNS Name field '%s' is identical to device name '%s', ignoring", |
| value["dns_name"], |
| dns_name, |
| ) |
| else: |
| cname_recs[dns_cname] = "%s.%s." % (dns_name, extension) |
| |
| # Add services as cnames, and possibly ns records |
| for svc in value["services"]: |
| |
| # only add service if it uses the IP of the host |
| if value["ip4"] in svc["ip4s"]: |
| cname_recs[svc["name"]] = "%s.%s." % (dns_name, extension) |
| |
| if svc["port"] == 53 and svc["protocol"] == "udp": |
| ns_recs.append("%s.%s." % (dns_name, extension)) |
| |
| # iterate over the parent devs to add additional nameservers |
| for pname, pval in parent_devs.items(): |
| if "services" in pval: |
| for svc in pval["services"]: |
| # look for DNS servers |
| if svc["port"] == 53 and svc["protocol"] == "udp": |
| # make name |
| dns_name = re.sub("[^a-z0-9.-]", "-", pname, 0, re.ASCII) |
| |
| # add an a record for this nameserver if IP is outside of subnet |
| a_recs[dns_name] = pval["ip4"] |
| |
| # add a NS record if it doesn't already exist |
| ns_name = "%s.%s." % (dns_name, extension) |
| if ns_name not in ns_recs: |
| ns_recs.append(ns_name) |
| |
| return { |
| "a": a_recs, |
| "cname": cname_recs, |
| "ns": ns_recs, |
| "srv": srv_recs, |
| "txt": txt_recs, |
| } |
| |
| |
| def create_dhcp_subnet(prefix, prefix_search, devs, parent_devs={}): |
| # makes DHCP subnet information |
| |
| subnet = {} |
| |
| subnet["subnet"] = prefix |
| subnet["dns_search"] = [prefix_search] |
| |
| def dhcp_iterate(devs): |
| # inner function to iterate over a dev list |
| ihosts = [] |
| idyn_range = None |
| irouter = [] |
| idns_servers = [] |
| itftpd_server = None |
| |
| for name, value in devs.items(): |
| |
| # handle a DHCP range |
| if name == "prefix_dhcp": |
| idyn_range = value["dhcp_range"] |
| continue |
| |
| # handle a router reservation |
| if name == "router": |
| ir = {"ip": value["ip4"]} |
| if ( |
| "rfc3442routes" in value["custom_fields"] |
| and value["custom_fields"]["rfc3442routes"] |
| ): |
| ir["rfc3442routes"] = value["custom_fields"]["rfc3442routes"].split( |
| "," |
| ) |
| |
| irouter.append(ir) |
| continue |
| |
| # has a MAC address, and it's not null |
| if "macaddr" in value and value["macaddr"]: |
| |
| ihosts.append( |
| { |
| "name": name, |
| "ip_addr": value["ip4"], |
| "mac_addr": value["macaddr"].lower(), |
| } |
| ) |
| |
| # Add dns based on service entries |
| if "services" in value: |
| for svc in value["services"]: |
| |
| # add DNS server |
| if svc["port"] == 53 and svc["protocol"] == "udp": |
| idns_servers.append(value["ip4"]) |
| |
| # add tftp server |
| if svc["port"] == 69 and svc["protocol"] == "udp": |
| itftpd_server = value["ip4"] |
| |
| return (ihosts, idyn_range, irouter, idns_servers, itftpd_server) |
| |
| # run inner function and build |
| hosts, dyn_range, router, dns_servers, tftpd_server = dhcp_iterate(devs) |
| |
| # assign only hosts, dynamic range, based on the prefix |
| subnet["hosts"] = hosts |
| subnet["range"] = dyn_range |
| |
| # only assign router if specified |
| if router: |
| subnet["routers"] = router |
| |
| # find parent prefix devices, to fill in where needed |
| phosts, pdyn_range, prouter, pdns_servers, ptftpd_server = dhcp_iterate(parent_devs) |
| |
| # use parent prefix devices if dns/tftp services needed aren't found within prefix |
| if dns_servers: |
| subnet["dns_servers"] = dns_servers |
| else: |
| subnet["dns_servers"] = pdns_servers |
| |
| if tftpd_server: |
| subnet["tftpd_server"] = tftpd_server |
| else: |
| subnet["tftpd_server"] = ptftpd_server |
| |
| return subnet |
| |
| |
| def find_dhcpd_interface(prefix, devs): |
| # DHCPd interface is first usable IP in range |
| |
| first_ip = str(netaddr.IPAddress(netaddr.IPNetwork(prefix).first + 1)) |
| |
| # look for interface corresponding to first IP address in range |
| for name, value in devs.items(): |
| if "ip4" in value: |
| if value["ip4"] == first_ip: |
| return value["iface"] |
| |
| # if interface not found, return None and ignore |
| return None |
| |
| |
| def get_device_services(device_id, filters=""): |
| |
| if device_id in device_services_cache: |
| return device_services_cache[device_id] |
| |
| # get services info |
| url = "%s%s" % ( |
| settings["api_endpoint"], |
| "api/ipam/services/?device_id=%s%s" % (device_id, filters), |
| ) |
| |
| raw_svcs = json_api_get(url, headers, validate_certs=settings["validate_certs"]) |
| |
| services = [] |
| |
| for rsvc in raw_svcs["results"]: |
| |
| svc = {} |
| |
| svc["name"] = rsvc["name"] |
| svc["description"] = rsvc["description"] |
| svc["port"] = rsvc["port"] |
| svc["protocol"] = rsvc["protocol"]["value"] |
| svc["ip4s"] = [] |
| |
| for ip in rsvc["ipaddresses"]: |
| svc["ip4s"].append(str(netaddr.IPNetwork(ip["address"]).ip)) |
| |
| services.append(svc) |
| |
| device_services_cache[device_id] = services |
| return services |
| |
| |
| def get_interface_mac_addr(interface_id): |
| # return a mac addres, or None if undefined |
| if interface_id in interface_mac_cache: |
| return interface_mac_cache[interface_id] |
| |
| # get the interface info |
| url = "%s%s" % (settings["api_endpoint"], "api/dcim/interfaces/%s/" % interface_id) |
| |
| iface = json_api_get(url, headers, validate_certs=settings["validate_certs"]) |
| |
| if iface["mac_address"]: |
| interface_mac_cache[interface_id] = iface["mac_address"] |
| return iface["mac_address"] |
| |
| interface_mac_cache[interface_id] = None |
| return None |
| |
| |
| def get_device_interfaces(device_id, filters=""): |
| |
| if device_id in device_interface_cache: |
| return device_interface_cache[device_id] |
| |
| url = "%s%s" % ( |
| settings["api_endpoint"], |
| "api/dcim/interfaces/?device_id=%s%s&mgmt_only=true" % (device_id, filters), |
| ) |
| |
| logger.debug("raw_ifaces_url: %s", url) |
| |
| raw_ifaces = json_api_get(url, headers, validate_certs=settings["validate_certs"]) |
| |
| logger.debug("raw_ifaces: %s", raw_ifaces) |
| |
| ifaces = [] |
| |
| for raw_iface in raw_ifaces["results"]: |
| |
| iface = {} |
| |
| iface["name"] = raw_iface["name"] |
| iface["macaddr"] = raw_iface["mac_address"] |
| iface["mgmt_only"] = raw_iface["mgmt_only"] |
| iface["description"] = raw_iface["description"] |
| |
| if raw_iface["count_ipaddresses"]: |
| url = "%s%s" % ( |
| settings["api_endpoint"], |
| "api/ipam/ip-addresses/?interface_id=%s" % raw_iface["id"], |
| ) |
| |
| raw_ip = json_api_get( |
| url, headers, validate_certs=settings["validate_certs"] |
| ) |
| |
| iface["ip4"] = str(netaddr.IPNetwork(raw_ip["results"][0]["address"]).ip) |
| |
| ifaces.append(iface) |
| |
| device_interface_cache[device_id] = ifaces |
| return ifaces |
| |
| |
| def get_prefix_devices(prefix, filters=""): |
| |
| # get all devices in a prefix |
| url = "%s%s" % ( |
| settings["api_endpoint"], |
| "api/ipam/ip-addresses/?parent=%s%s" % (prefix, filters), |
| ) |
| |
| raw_ips = json_api_get(url, headers, validate_certs=settings["validate_certs"]) |
| |
| logger.debug("raw_ips: %s", raw_ips) |
| |
| devs = {} |
| |
| # iterate by IP, sorted |
| for ip in sorted(raw_ips["results"], key=lambda k: k["address"]): |
| |
| logger.debug("ip: %s", ip) |
| |
| # if it's a DHCP range, add that range to the dev list as prefix_dhcp |
| if ip["status"]["value"] == "dhcp": |
| devs["prefix_dhcp"] = {"dhcp_range": ip["address"]} |
| continue |
| |
| # if it's a reserved IP |
| if ip["status"]["value"] == "reserved": |
| res = {} |
| |
| res["type"] = "reserved" |
| res["description"] = ip["description"] |
| res["ip4"] = str(netaddr.IPNetwork(ip["address"]).ip) |
| res["dns_name"] = ip["dns_name"] if "dns_name" in ip else "None" |
| res["services"] = {} |
| res["custom_fields"] = ip["custom_fields"] |
| |
| resname = res["description"].lower().split(" ")[0] |
| |
| devs[resname] = res |
| continue |
| |
| aotype = ip["assigned_object_type"] |
| |
| # don't handle VM's yet |
| if aotype == "virtualization.vminterface": |
| continue |
| |
| dev = {} |
| |
| dev["type"] = "device" |
| dev["ip4"] = str(netaddr.IPNetwork(ip["address"]).ip) |
| dev["macaddr"] = get_interface_mac_addr(ip["assigned_object"]["id"]) |
| |
| ifaces = get_device_interfaces(ip["assigned_object"]["device"]["id"]) |
| |
| if ifaces and dev["ip4"] == ifaces[0]["ip4"]: # this is a mgmt IP |
| devname = "%s-%s" % ( |
| ip["assigned_object"]["device"]["name"].lower().split(".")[0], |
| ifaces[0]["name"], |
| ) |
| dev["iface"] = ip["assigned_object"]["name"] |
| dev["dns_name"] = "" |
| dev["services"] = [] |
| |
| else: # this is a primary IP |
| |
| name = ip["assigned_object"]["device"]["name"] |
| devname = name.lower().split(".")[0] |
| |
| dev["iface"] = ip["assigned_object"]["name"] |
| dev["dns_name"] = ip["dns_name"] if "dns_name" in ip else "None" |
| dev["services"] = get_device_services(ip["assigned_object"]["device"]["id"]) |
| |
| # fix multihomed devices in same IP range |
| # FIXME: Does not handle > 2 connections properly |
| if devname in devs: |
| devs["%s-1" % devname] = devs.pop(devname) |
| devs["%s-2" % devname] = dev |
| else: |
| devs[devname] = dev |
| |
| return devs |
| |
| |
| def get_parent_prefix(child_prefix): |
| # returns a parent prefix given a child prefix |
| # FIXME: only returns the first found prefix, so doesn't handle more than 2 layers of hierarchy |
| |
| # get all devices in a prefix |
| url = "%s%s" % ( |
| settings["api_endpoint"], |
| "api/ipam/prefixes/?contains=%s" % child_prefix, |
| ) |
| |
| raw_prefixes = json_api_get(url, headers, validate_certs=settings["validate_certs"]) |
| |
| logger.debug(raw_prefixes) |
| |
| for prefix in raw_prefixes["results"]: |
| if prefix["prefix"] != child_prefix: |
| return prefix["prefix"] |
| |
| return None |
| |
| |
| def get_prefix_data(prefix): |
| |
| # get all devices in a prefix |
| url = "%s%s" % (settings["api_endpoint"], "api/ipam/prefixes/?prefix=%s" % prefix) |
| |
| raw_prefix = json_api_get(url, headers, validate_certs=settings["validate_certs"]) |
| |
| logger.debug("raw_prefix: %s", raw_prefix) |
| |
| return raw_prefix["results"][0] |
| |
| |
| # main function that calls other functions |
| if __name__ == "__main__": |
| |
| args = parse_nb_args() |
| |
| # only print log messages if debugging |
| if args.debug: |
| logger.setLevel(logging.DEBUG) |
| else: |
| logger.setLevel(logging.INFO) |
| |
| # load settings from yaml file |
| settings = yaml.safe_load(args.settings.read()) |
| |
| yaml_out = {} |
| |
| # load default config |
| with open( |
| os.path.join( |
| os.path.dirname(os.path.realpath(__file__)), "base_edgeconfig.yaml" |
| ) |
| ) as defconfig: |
| yaml_out = yaml.safe_load(defconfig) |
| |
| logger.debug("settings: %s" % settings) |
| |
| # global, so this isn't run multiple times |
| headers = [ |
| ("Authorization", "Token %s" % settings["token"]), |
| ] |
| |
| # create structure from extracted data |
| dns_zones = {} |
| dns_rev_zones = {} |
| dhcpd_subnets = [] |
| dhcpd_interfaces = [] |
| devs_per_prefix = {} |
| prefixes = {} |
| parent_prefixes = {} |
| |
| for prefix in settings["ip_prefixes"]: |
| |
| prefix_data = get_prefix_data(prefix) |
| |
| parent_prefix = get_parent_prefix(prefix) |
| prefix_data["parent"] = parent_prefix |
| |
| pdevs = {} |
| if parent_prefix: |
| if parent_prefix in parent_prefixes: |
| pdevs = devs_per_prefix[parent_prefix] |
| else: |
| pdevs = get_prefix_devices(parent_prefix) |
| devs_per_prefix[parent_prefix] = pdevs |
| |
| prefix_data["parent_devs"] = pdevs |
| |
| prefixes[prefix] = prefix_data |
| |
| prefix_domain_extension = prefix_data["description"] |
| |
| devs = get_prefix_devices(prefix) |
| |
| devs_per_prefix[prefix] = devs |
| |
| dns_zones[prefix_domain_extension] = create_dns_zone( |
| prefix_domain_extension, devs, pdevs |
| ) |
| |
| dns_zones[prefix_domain_extension]["ip_range"] = prefix |
| |
| dhcpd_subnets.append( |
| create_dhcp_subnet(prefix, prefix_domain_extension, devs, pdevs) |
| ) |
| |
| dhcpd_if = find_dhcpd_interface(prefix, devs) |
| |
| if dhcpd_if and dhcpd_if not in dhcpd_interfaces: |
| dhcpd_interfaces.append(dhcpd_if) |
| |
| yaml_out.update( |
| { |
| "dns_zones": dns_zones, |
| "dns_rev_zones": dns_rev_zones, |
| "dhcpd_subnets": dhcpd_subnets, |
| "dhcpd_interfaces": dhcpd_interfaces, |
| # the below are useful when debugging |
| # "devs_per_prefix": devs_per_prefix, |
| # "prefixes": prefixes, |
| } |
| ) |
| |
| print(yaml.safe_dump(yaml_out, indent=2)) |