| #!/usr/bin/env python3 |
| |
| # SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org> |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| # nbhelper.py |
| # Helper functions for building YAML output from Netbox API calls |
| |
| from __future__ import absolute_import |
| |
| import re |
| import sys |
| import argparse |
| import logging |
| import netaddr |
| import pynetbox |
| import requests |
| |
| from ruamel import yaml |
| |
| # create shared logger |
| logging.basicConfig() |
| logger = logging.getLogger("nbh") |
| |
| # to dump YAML properly, using internal representers |
| # see also: |
| # https://stackoverflow.com/questions/54378220/declare-data-type-to-ruamel-yaml-so-that-it-can-represen-serialize-it |
| # https://sourceforge.net/p/ruamel-yaml/code/ci/default/tree/representer.py |
| |
| ydump = yaml.YAML(typ="safe") |
| ydump.representer.add_representer( |
| pynetbox.models.dcim.Devices, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.models.dcim.Interfaces, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.models.ipam.Prefixes, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.core.response.Record, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.models.ipam.IpAddresses, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.core.api.Api, yaml.SafeRepresenter.represent_none |
| ) |
| |
| |
| def parse_cli_args(extra_args={}): |
| """ |
| parse CLI arguments. Can add extra arguments with a option:kwargs dict |
| """ |
| |
| parser = argparse.ArgumentParser(description="Netbox") |
| |
| # Positional args |
| parser.add_argument( |
| "settings", |
| type=argparse.FileType("r"), |
| help="YAML Ansible inventory file w/NetBox API token", |
| ) |
| |
| parser.add_argument( |
| "--debug", action="store_true", help="Print additional debugging information" |
| ) |
| |
| if extra_args: |
| for ename, ekwargs in extra_args.items(): |
| parser.add_argument(ename, **ekwargs) |
| |
| args = parser.parse_args() |
| |
| # only print log messages if debugging |
| if args.debug: |
| logger.setLevel(logging.DEBUG) |
| else: |
| logger.setLevel(logging.INFO) |
| |
| return args |
| |
| |
| class AttrDict(dict): |
| def __init__(self, *args, **kwargs): |
| super(AttrDict, self).__init__(*args, **kwargs) |
| self.__dict__ = self |
| |
| |
| class NBHelper: |
| def __init__(self, args): |
| |
| self.settings = yaml.safe_load(args.settings.read()) |
| |
| self.nbapi = pynetbox.api( |
| self.settings["api_endpoint"], token=self.settings["token"], threading=True, |
| ) |
| |
| if not self.settings["validate_certs"]: |
| |
| session = requests.Session() |
| session.verify = False |
| self.nbapi.http_session = session |
| |
| self.nb_version = self.nbapi.version |
| |
| def all_prefixes(self): |
| """ |
| Return a list of prefix objects |
| """ |
| |
| p_items = [] |
| |
| segments = 1 |
| |
| if "prefix_segments" in self.settings: |
| segments = self.settings["prefix_segments"] |
| |
| for prefix in self.settings["ip_prefixes"]: |
| p_items.append(NBPrefix.get_prefix(self.nbapi, prefix, segments)) |
| |
| return p_items |
| |
| @classmethod |
| def check_name_dns(cls, name): |
| |
| badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII) |
| |
| if badchars: |
| logger.error( |
| "DNS name '%s' has one or more invalid characters: '%s'", |
| name, |
| badchars.group(0), |
| ) |
| sys.exit(1) |
| |
| return name.lower() |
| |
| @classmethod |
| def clean_name_dns(cls, name): |
| return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBPrefix: |
| |
| prefixes = {} |
| |
| def __init__(self, api, prefix, name_segments): |
| |
| self.nbapi = api |
| self.prefix = prefix |
| self.name_segments = name_segments |
| |
| # get prefix information |
| self.prefix_data = self.nbapi.ipam.prefixes.get(prefix=self.prefix) |
| self.domain_extension = NBHelper.check_name_dns(self.prefix_data.description) |
| |
| logger.debug( |
| "prefix %s, domain_extension %s, data: %s", |
| self.prefix, |
| self.domain_extension, |
| dict(self.prefix_data), |
| ) |
| |
| # ip centric info |
| self.dhcp_range = None |
| self.reserved_ips = {} |
| self.aos = {} |
| |
| # build item lists |
| self.build_prefix() |
| |
| @classmethod |
| def all_prefixes(cls): |
| return cls.prefixes |
| |
| @classmethod |
| def get_prefix(cls, api, prefix, name_segments=1): |
| if prefix in cls.prefixes: |
| return cls.prefixes[prefix] |
| |
| return NBPrefix(api, prefix, name_segments) |
| |
| def __repr__(self): |
| return str(self.prefix) |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "dhcp_range": node.dhcp_range, |
| "reserved_ips": node.reserved_ips, |
| "aos": node.aos, |
| "prefix_data": dict(node.prefix_data), |
| } |
| ) |
| |
| def parent(self): |
| """ |
| Get the parent prefix to this prefix |
| |
| FIXME: Doesn't handle multiple layers of prefixes, returns first found |
| """ |
| |
| # get all parents of this prefix (include self) |
| possible_parents = self.nbapi.ipam.prefixes.filter(contains=self.prefix) |
| |
| logger.debug("Prefix %s: possible parents %s", self.prefix, possible_parents) |
| |
| # filter out self, return first found |
| for pparent in possible_parents: |
| if pparent.prefix != self.prefix: |
| return NBPrefix.get_prefix( |
| self.nbapi, pparent.prefix, self.name_segments |
| ) |
| |
| return None |
| |
| def build_prefix(self): |
| """ |
| find ip information for items (devices/vms, reserved_ips, dhcp_range) in prefix |
| """ |
| |
| ips = self.nbapi.ipam.ip_addresses.filter(parent=self.prefix) |
| |
| for ip in sorted(ips, key=lambda k: k["address"]): |
| |
| logger.debug("prefix_item ip: %s, data: %s", ip, dict(ip)) |
| |
| # if it's a DHCP range, add that range to the dev list as prefix_dhcp |
| if ip.status.value == "dhcp": |
| self.dhcp_range = str(ip.address) |
| continue |
| |
| # reserved IPs |
| if ip.status.value == "reserved": |
| |
| res = {} |
| res["name"] = ip.description.lower().split(" ")[0] |
| res["description"] = ip.description |
| res["ip4"] = str(netaddr.IPNetwork(ip.address)) |
| res["custom_fields"] = ip.custom_fields |
| |
| self.reserved_ips[str(ip)] = res |
| continue |
| |
| # devices and VMs |
| if ip.assigned_object: # can be null if not assigned to a device/vm |
| aotype = ip.assigned_object_type |
| |
| if aotype == "dcim.interface": |
| |
| self.aos[str(ip)] = NBDevice.get_dev( |
| self.nbapi, ip.assigned_object.device.id, |
| ) |
| |
| elif aotype == "virtualization.vminterface": |
| self.aos[str(ip)] = NBVirtualMachine.get_vm( |
| self.nbapi, ip.assigned_object.virtual_machine.id, |
| ) |
| |
| else: |
| logger.error("IP %s has unknown device type: %s", ip, aotype) |
| sys.exit(1) |
| |
| else: |
| logger.warning("Unknown IP type %s, with attributes: %s", ip, dict(ip)) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBAssignedObject: |
| """ |
| Assigned Object is either a Device or Virtual Machine, which function |
| nearly identically in the NetBox data model. |
| |
| This parent class holds common functions for those two child classes |
| """ |
| |
| def __init__(self, api): |
| self.nbapi = api |
| |
| def dns_name(self, ip, prefix): |
| """ |
| Returns the DNS name for the device at this IP in the prefix |
| """ |
| |
| def first_segment_suffix(split_name, suffixes, segments): |
| first_seg = "-".join([split_name[0], *suffixes]) |
| |
| if segments > 1: |
| name = ".".join([first_seg, *split_name[1:segments]]) |
| else: |
| name = first_seg |
| |
| return name |
| |
| # clean/split the device name |
| name_split = NBHelper.clean_name_dns(self.data.name).split(".") |
| |
| # always add interface suffix to mgmt interfaces |
| if self.interfaces_by_ip[ip].mgmt_only: |
| return first_segment_suffix( |
| name_split, [self.interfaces_by_ip[ip].name], prefix.name_segments |
| ) |
| |
| # find all IP's for this device in the prefix that aren't mgmt interfaces |
| prefix_ips = [] |
| for s_ip in self.ips: |
| if s_ip in prefix.aos and not self.interfaces_by_ip[s_ip].mgmt_only: |
| prefix_ips.append(s_ip) |
| |
| # name to use when only one IP address for device in a prefix |
| simple_name = ".".join(name_split[0 : prefix.name_segments]) |
| |
| # if more than one non-mgmt IP in prefix |
| if len(prefix_ips) > 1: |
| |
| # use bare name if primary IP address |
| try: # skip if no primary_ip.address |
| if ip == self.data.primary_ip.address: |
| return simple_name |
| except AttributeError: |
| pass |
| |
| # else, suffix with the interface name, and the last octet of IP address |
| return first_segment_suffix( |
| name_split, |
| [ |
| self.interfaces_by_ip[ip].name, |
| str(netaddr.IPNetwork(ip).ip.words[3]), |
| ], |
| prefix.name_segments, |
| ) |
| |
| # simplest case - only one IP in prefix, return simple_name |
| return simple_name |
| |
| def dns_cnames(self, ip): |
| """ |
| returns a list of cnames for this object, based on IP matches |
| """ |
| |
| cnames = [] |
| |
| for service in self.services: |
| |
| # if not assigned to any IP's, service is on all IPs |
| if not service.ipaddresses: |
| cnames.append(service.name) |
| continue |
| |
| # If assigned to an IP, only create a CNAME on that IP |
| for service_ip in service.ipaddresses: |
| if ip == service_ip.address: |
| cnames.append(service.name) |
| |
| return cnames |
| |
| def has_service(self, cidr_ip, port, protocol): |
| """ |
| Return True if this AO has a service using specific port and protocol combination |
| """ |
| |
| if ( |
| cidr_ip in self.interfaces_by_ip |
| and not self.interfaces_by_ip[cidr_ip].mgmt_only |
| ): |
| for service in self.services: |
| if service.port == port and service.protocol.value == protocol: |
| return True |
| |
| return False |
| |
| def primary_iface(self): |
| """ |
| Returns the interface data for the device that has the primary_ip |
| """ |
| |
| if self.data["primary_ip"]: |
| return self.interfaces_by_ip[self.data["primary_ip"]["address"]] |
| |
| return None |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDevice(NBAssignedObject): |
| """ |
| Wraps a single Netbox device |
| Also caches all known devices in a class variable (devs) |
| """ |
| |
| devs = {} |
| |
| def __init__(self, api, dev_id): |
| |
| super().__init__(api) |
| |
| self.id = dev_id |
| self.data = self.nbapi.dcim.devices.get(dev_id) |
| self.services = self.nbapi.ipam.services.filter(device_id=dev_id) |
| |
| # not filled in unless specifically asked for (expensive for a 48 port switch) |
| self.interfaces = [] |
| self.mgmt_interfaces = [] |
| |
| # look up all IP's for this device |
| self.ips = { |
| str(ip): ip for ip in self.nbapi.ipam.ip_addresses.filter(device_id=dev_id) |
| } |
| |
| # look up interfaces by IP |
| self.interfaces_by_ip = {} |
| for ip, ip_data in self.ips.items(): |
| if ip_data.assigned_object: |
| self.interfaces_by_ip[ip] = self.nbapi.dcim.interfaces.get( |
| ip_data.assigned_object_id |
| ) |
| |
| logger.debug( |
| "NBDevice id: %d, data: %s, ips: %s", self.id, dict(self.data), self.ips, |
| ) |
| |
| self.devs[dev_id] = self |
| |
| def __repr__(self): |
| return str(dict(self.data)) |
| |
| def get_interfaces(self): |
| if not self.interfaces: |
| self.interfaces = self.nbapi.dcim.interfaces.filter(device_id=self.id) |
| |
| return self.interfaces |
| |
| @classmethod |
| def get_dev(cls, api, dev_id): |
| if dev_id in cls.devs: |
| return cls.devs[dev_id] |
| |
| return NBDevice(api, dev_id) |
| |
| @classmethod |
| def all_devs(cls): |
| return cls.devs |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "data": node.data, |
| "services": node.services, |
| "ips": node.ips, |
| "interfaces_by_ip": node.interfaces_by_ip, |
| } |
| ) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBVirtualMachine(NBAssignedObject): |
| """ |
| VM equivalent of NBDevice |
| """ |
| |
| vms = {} |
| |
| def __init__(self, api, vm_id): |
| |
| super().__init__(api) |
| |
| self.id = vm_id |
| self.data = self.nbapi.virtualization.virtual_machines.get(vm_id) |
| self.services = self.nbapi.ipam.services.filter(virtual_machine_id=vm_id) |
| |
| # not filled in unless specifically asked for |
| self.interfaces = [] |
| |
| # look up all IP's for this device |
| self.ips = { |
| str(ip): ip |
| for ip in self.nbapi.ipam.ip_addresses.filter(virtual_machine_id=vm_id) |
| } |
| |
| # look up interfaces by IP |
| self.interfaces_by_ip = {} |
| for ip, ip_data in self.ips.items(): |
| if ip_data.assigned_object: |
| self.interfaces_by_ip[ip] = self.nbapi.virtualization.interfaces.get( |
| ip_data.assigned_object_id |
| ) |
| # hack as VM interfaces lack this key, and needed for services |
| self.interfaces_by_ip[ip].mgmt_only = False |
| |
| logger.debug( |
| "NBVirtualMachine id: %d, data: %s, ips: %s", |
| self.id, |
| dict(self.data), |
| self.ips, |
| ) |
| |
| self.vms[vm_id] = self |
| |
| def __repr__(self): |
| return str(dict(self.data)) |
| |
| def get_interfaces(self): |
| if not self.interfaces: |
| self.interfaces = self.nbapi.virtualization.interfaces.filter( |
| virtual_machine_id=self.id |
| ) |
| |
| return self.interfaces |
| |
| @classmethod |
| def get_vm(cls, api, vm_id): |
| if vm_id in cls.vms: |
| return cls.vms[vm_id] |
| |
| return NBVirtualMachine(api, vm_id) |
| |
| @classmethod |
| def all_vms(cls): |
| return cls.vms |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "data": node.data, |
| "services": node.services, |
| "ips": node.ips, |
| "interfaces_by_ip": node.interfaces_by_ip, |
| } |
| ) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDNSForwardZone: |
| |
| fwd_zones = {} |
| |
| def __init__(self, prefix): |
| |
| self.domain_extension = prefix.domain_extension |
| |
| self.a_recs = {} |
| self.cname_recs = {} |
| self.srv_recs = {} |
| self.ns_recs = [] |
| self.txt_recs = {} |
| |
| if prefix.dhcp_range: |
| self.create_dhcp_fwd(prefix.dhcp_range) |
| |
| for ip, ao in prefix.aos.items(): |
| self.add_ao_records(prefix, ip, ao) |
| |
| for ip, res in prefix.reserved_ips.items(): |
| self.add_reserved(ip, res) |
| |
| # reqquired for the add_fwd_cname function below |
| if callable(getattr(prefix, "parent")): |
| parent_prefix = prefix.parent() |
| |
| if parent_prefix: |
| self.merge_parent_prefix(parent_prefix, prefix) |
| |
| self.fwd_zones[self.domain_extension] = self |
| |
| def __repr__(self): |
| return str( |
| { |
| "a": self.a_recs, |
| "cname": self.cname_recs, |
| "ns": self.ns_recs, |
| "srv": self.srv_recs, |
| "txt": self.txt_recs, |
| } |
| ) |
| |
| @classmethod |
| def add_fwd_cname(cls, cname, fqdn_dest): |
| """ |
| Add an arbitrary CNAME (and possibly create the fwd zone if needed) pointing |
| at a FQDN destination name. It's used to support the per-IP "DNS name" field in NetBox |
| Note that the NS record |
| """ |
| |
| try: |
| fqdn_split = re.compile(r"([a-z]+)\.([a-z.]+)\.") |
| (short_name, extension) = fqdn_split.match(cname).groups() |
| |
| except AttributeError: |
| logger.warning( |
| "Invalid DNS CNAME: '%s', must be in FQDN format: 'host.example.com.', ignored", |
| cname, |
| ) |
| return |
| |
| fake_prefix = AttrDict( |
| { |
| "domain_extension": extension, |
| "dhcp_range": None, |
| "aos": {}, |
| "reserved_ips": {}, |
| "parent": None, |
| } |
| ) |
| |
| fwd_zone = cls.get_fwd_zone(fake_prefix) |
| |
| fwd_zone.cname_recs[short_name] = fqdn_dest |
| |
| @classmethod |
| def get_fwd_zone(cls, prefix): |
| if prefix.domain_extension in cls.fwd_zones: |
| return cls.fwd_zones[prefix.domain_extension] |
| |
| return NBDNSForwardZone(prefix) |
| |
| @classmethod |
| def all_fwd_zones(cls): |
| return cls.fwd_zones |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "a": node.a_recs, |
| "cname": node.cname_recs, |
| "ns": node.ns_recs, |
| "srv": node.srv_recs, |
| "txt": node.txt_recs, |
| } |
| ) |
| |
| def fqdn(self, name): |
| return "%s.%s." % (name, self.domain_extension) |
| |
| def create_dhcp_fwd(self, dhcp_range): |
| |
| for ip in netaddr.IPNetwork(dhcp_range).iter_hosts(): |
| self.a_recs["dhcp%03d" % (ip.words[3])] = str(ip) |
| |
| def name_is_duplicate(self, name, target, record_type): |
| """ |
| Returns True if name already exists in the zone as an A or CNAME |
| record, False otherwise |
| """ |
| |
| if name in self.a_recs: |
| logger.warning( |
| "Duplicate DNS record for name %s - A record to '%s', %s record to '%s'", |
| name, |
| self.a_recs[name], |
| record_type, |
| target, |
| ) |
| return True |
| |
| if name in self.cname_recs: |
| logger.warning( |
| "Duplicate DNS record for name %s - CNAME record to '%s', %s record to '%s'", |
| name, |
| self.cname_recs[name], |
| record_type, |
| target, |
| ) |
| return True |
| |
| return False |
| |
| def add_ao_records(self, prefix, ip, ao): |
| |
| name = ao.dns_name(ip, prefix) |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| |
| # add A records |
| if not self.name_is_duplicate(name, target_ip, "A"): |
| self.a_recs[name] = target_ip |
| |
| # add CNAME records that alias to this name |
| for cname in ao.dns_cnames(ip): |
| # check that it isn't a dupe |
| if not self.name_is_duplicate(cname, target_ip, "CNAME"): |
| self.cname_recs[cname] = self.fqdn(name) |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| self.ns_recs.append(self.fqdn(name)) |
| |
| # if a DNS name is set, add it as a CNAME |
| if ao.ips[ip]["dns_name"]: # and ip == aos.data.primary_ip.address: |
| self.add_fwd_cname(ao.ips[ip]["dns_name"], self.fqdn(name)) |
| |
| def add_reserved(self, ip, res): |
| |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| |
| if not self.name_is_duplicate(res["name"], target_ip, "A"): |
| self.a_recs[res["name"]] = target_ip |
| |
| def merge_parent_prefix(self, pprefix, prefix): |
| |
| # only if no NS records exist already |
| if not self.ns_recs: |
| # scan parent prefix for services |
| for ip, ao in pprefix.aos.items(): |
| |
| # Create a DNS within this prefix pointing to out-of-prefix IP |
| # where DNS server is |
| name = ao.dns_name(ip, prefix) |
| target_ip = str( |
| netaddr.IPNetwork(ip).ip |
| ) # make bare IP, not CIDR format |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| self.a_recs[name] = target_ip |
| self.ns_recs.append(self.fqdn(name)) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDNSReverseZones: |
| def __init__(self): |
| |
| self.reverse_zones = {} |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict(node.reverse_zones) |
| |
| @classmethod |
| def canonicalize_rfc1918_prefix(cls, prefix): |
| """ |
| RFC1918 prefixes need to be expanded to their widest canonical range to |
| group all reverse lookup domains together for reverse DNS with NSD/Unbound. |
| """ |
| |
| pnet = netaddr.IPNetwork(str(prefix)) |
| (o1, o2, o3, o4) = pnet.network.words # Split ipv4 octets |
| cidr_plen = pnet.prefixlen |
| |
| if o1 == 10: |
| o2 = o3 = o4 = 0 |
| cidr_plen = 8 |
| elif (o1 == 172 and o2 >= 16 and o2 <= 31) or (o1 == 192 and o2 == 168): |
| o3 = o4 = 0 |
| cidr_plen = 16 |
| |
| return "%s/%d" % (".".join(map(str, [o1, o2, o3, o4])), cidr_plen) |
| |
| def add_prefix(self, prefix): |
| |
| canonical_prefix = self.canonicalize_rfc1918_prefix(prefix) |
| |
| if canonical_prefix in self.reverse_zones: |
| rzone = self.reverse_zones[canonical_prefix] |
| else: |
| rzone = { |
| "ns": [], |
| "ptr": {}, |
| } |
| |
| if prefix.dhcp_range: |
| # FIXME: doesn't check for duplicate entries |
| rzone["ptr"].update(self.create_dhcp_rev(prefix)) |
| |
| for ip, ao in prefix.aos.items(): |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| ao_name = self.get_ao_name(ip, ao, prefix,) |
| rzone["ptr"][target_ip] = ao_name |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| rzone["ns"].append(ao_name) |
| |
| parent_prefix = prefix.parent() |
| |
| if parent_prefix: |
| self.merge_parent_prefix(rzone, parent_prefix) |
| |
| self.reverse_zones[canonical_prefix] = rzone |
| |
| def merge_parent_prefix(self, rzone, pprefix): |
| |
| # parent items |
| p_ns = [] |
| |
| # scan parent prefix for services |
| for ip, ao in pprefix.aos.items(): |
| |
| ao_name = self.get_ao_name(ip, ao, pprefix,) |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| p_ns.append(ao_name) |
| |
| # set DNS servers if none in rzone |
| if not rzone["ns"]: |
| rzone["ns"] = p_ns |
| |
| def create_dhcp_rev(self, prefix): |
| |
| dhcp_rzone = {} |
| |
| for ip in netaddr.IPNetwork(prefix.dhcp_range).iter_hosts(): |
| dhcp_rzone[str(ip)] = "dhcp%03d.%s." % ( |
| ip.words[3], |
| prefix.domain_extension, |
| ) |
| |
| return dhcp_rzone |
| |
| def get_ao_name(self, ip, ao, prefix): |
| short_name = ao.dns_name(ip, prefix) |
| return "%s.%s." % (short_name, prefix.domain_extension) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDHCPSubnet: |
| def __init__(self, prefix): |
| |
| self.domain_extension = prefix.domain_extension |
| |
| self.subnet = None |
| self.range = None |
| self.first_ip = None |
| self.hosts = [] |
| self.routers = [] |
| self.dns_servers = [] |
| self.dns_search = [] |
| self.tftpd_server = None |
| self.ntp_servers = [] |
| self.dhcpd_interface = None |
| |
| self.add_prefix(prefix) |
| |
| for ip, ao in prefix.aos.items(): |
| self.add_ao(str(ip), ao, prefix) |
| |
| parent_prefix = prefix.parent() |
| |
| if parent_prefix: |
| self.merge_parent_prefix(parent_prefix) |
| |
| def add_prefix(self, prefix): |
| |
| self.subnet = str(prefix) |
| |
| self.first_ip = str(netaddr.IPAddress(netaddr.IPNetwork(str(prefix)).first + 1)) |
| |
| self.dns_search = [prefix.domain_extension] |
| |
| if prefix.dhcp_range: |
| self.range = prefix.dhcp_range |
| |
| for ip, res in prefix.reserved_ips.items(): |
| # routers are reserved IP's that start with 'router" in the IP description |
| if re.match("router", res["description"]): |
| router = {"ip": str(netaddr.IPNetwork(ip).ip)} |
| |
| if ( |
| "rfc3442routes" in res["custom_fields"] |
| and res["custom_fields"]["rfc3442routes"] |
| ): |
| # split on whitespace |
| router["rfc3442routes"] = re.split( |
| r"\s+", res["custom_fields"]["rfc3442routes"] |
| ) |
| |
| self.routers.append(router) |
| |
| # set first IP to router if not set otherwise. |
| if not self.routers: |
| router = {"ip": self.first_ip} |
| |
| self.routers.append(router) |
| |
| def add_ao(self, ip, ao, prefix): |
| |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| |
| # find the DHCP interface if it's this IP |
| if target_ip == self.first_ip: |
| self.dhcpd_interface = ao.interfaces_by_ip[ip].name |
| |
| name = ao.dns_name(ip, prefix) |
| |
| # add only devices that have a macaddr for this IP |
| if ip in ao.interfaces_by_ip: |
| |
| mac_addr = dict(ao.interfaces_by_ip[ip]).get("mac_address") |
| |
| if mac_addr and mac_addr.strip(): # if exists and not blank |
| self.hosts.append( |
| {"name": name, "ip_addr": target_ip, "mac_addr": mac_addr.lower(),} |
| ) |
| |
| # add dns servers |
| if ao.has_service(ip, 53, "udp"): |
| self.dns_servers.append(target_ip) |
| |
| # add tftp server |
| if ao.has_service(ip, 69, "udp"): |
| if not self.tftpd_server: |
| self.tftpd_server = target_ip |
| else: |
| logger.warning( |
| "Duplicate TFTP servers in prefix, using first of %s and %s", |
| self.tftpd_server, |
| target_ip, |
| ) |
| |
| # add NTP servers |
| if ao.has_service(ip, 123, "udp"): |
| self.ntp_servers.append(target_ip) |
| |
| def merge_parent_prefix(self, pprefix): |
| |
| # parent items |
| p_dns_servers = [] |
| p_tftpd_server = None |
| p_ntp_servers = [] |
| |
| # scan parent prefix for services |
| for ip, ao in pprefix.aos.items(): |
| |
| target_ip = str(netaddr.IPNetwork(ip).ip) |
| |
| # add dns servers |
| if ao.has_service(ip, 53, "udp"): |
| p_dns_servers.append(target_ip) |
| |
| # add tftp server |
| if ao.has_service(ip, 69, "udp"): |
| if not p_tftpd_server: |
| p_tftpd_server = target_ip |
| else: |
| logger.warning( |
| "Duplicate TFTP servers in parent prefix, using first of %s and %s", |
| p_tftpd_server, |
| target_ip, |
| ) |
| |
| # add NTP servers |
| if ao.has_service(ip, 123, "udp"): |
| p_ntp_servers.append(target_ip) |
| |
| # merge if doesn't exist in prefix |
| if not self.dns_servers: |
| self.dns_servers = p_dns_servers |
| |
| if not self.tftpd_server: |
| self.tftpd_server = p_tftpd_server |
| |
| if not self.ntp_servers: |
| self.ntp_servers = p_ntp_servers |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "subnet": node.subnet, |
| "range": node.range, |
| "routers": node.routers, |
| "hosts": node.hosts, |
| "dns_servers": node.dns_servers, |
| "dns_search": node.dns_search, |
| "tftpd_server": node.tftpd_server, |
| "ntp_servers": node.ntp_servers, |
| } |
| ) |