blob: c8309ab4d0f7fa1f80e7fb8aa8edefe23ad0f38f [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
# nbhelper.py
# Helper functions for building YAML output from Netbox API calls
from __future__ import absolute_import
import re
import sys
import argparse
import logging
import netaddr
import pynetbox
import requests
from ruamel import yaml
# create shared logger
logging.basicConfig()
logger = logging.getLogger("nbh")
# to dump YAML properly, using internal representers
# see also:
# https://stackoverflow.com/questions/54378220/declare-data-type-to-ruamel-yaml-so-that-it-can-represen-serialize-it
# https://sourceforge.net/p/ruamel-yaml/code/ci/default/tree/representer.py
ydump = yaml.YAML(typ="safe")
ydump.representer.add_representer(
pynetbox.models.dcim.Devices, yaml.SafeRepresenter.represent_dict
)
ydump.representer.add_representer(
pynetbox.models.dcim.Interfaces, yaml.SafeRepresenter.represent_dict
)
ydump.representer.add_representer(
pynetbox.models.ipam.Prefixes, yaml.SafeRepresenter.represent_dict
)
ydump.representer.add_representer(
pynetbox.core.response.Record, yaml.SafeRepresenter.represent_dict
)
ydump.representer.add_representer(
pynetbox.models.ipam.IpAddresses, yaml.SafeRepresenter.represent_dict
)
ydump.representer.add_representer(
pynetbox.core.api.Api, yaml.SafeRepresenter.represent_none
)
netboxapi = None
netbox_config = None
netbox_version = None
def initialize(extra_args):
global netboxapi, netbox_config, netbox_version
args = parse_cli_args(extra_args)
netbox_config = yaml.safe_load(args.settings.read())
for require_args in ["api_endpoint", "token", "tenant_name"]:
if not netbox_config.get(require_args):
logger.error("The require argument: %s was not set. Stop." % require_args)
sys.exit(1)
netboxapi = pynetbox.api(
netbox_config["api_endpoint"], token=netbox_config["token"], threading=True,
)
if not netbox_config.get("validate_certs", True):
session = requests.Session()
session.verify = False
netboxapi.http_session = session
netbox_version = netboxapi.version
return args
def parse_cli_args(extra_args={}):
"""
parse CLI arguments. Can add extra arguments with a option:kwargs dict
"""
parser = argparse.ArgumentParser(description="Netbox")
# Positional args
parser.add_argument(
"settings",
type=argparse.FileType("r"),
help="YAML Ansible inventory file w/NetBox API token",
)
parser.add_argument(
"--debug", action="store_true", help="Print additional debugging information"
)
for ename, ekwargs in extra_args.items():
parser.add_argument(ename, **ekwargs)
args = parser.parse_args()
log_level = logging.DEBUG if args.debug else logging.INFO
logger.setLevel(log_level)
return args
def check_name_dns(name):
badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII)
if badchars:
logger.error(
"DNS name '%s' has one or more invalid characters: '%s'",
name,
badchars.group(0),
)
sys.exit(1)
return name.lower()
def clean_name_dns(name):
return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII)
class AttrDict(dict):
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
class NBTenant:
def __init__(self):
self.name = netbox_config["tenant_name"]
self.name_segments = netbox_config.get("prefix_segments", 1)
self.tenant = netboxapi.tenancy.tenants.get(name=self.name)
# NBTenant only keep the resources which owns by it
self.devices = list()
self.vms = list()
self.prefixes = dict()
# Get the Device and Virtual Machines from Netbox API
for device_data in netboxapi.dcim.devices.filter(tenant=self.tenant.slug):
self.devices.append(NBDevice(device_data))
for vm_data in netboxapi.virtualization.virtual_machines.filter(
tenant=self.tenant.slug
):
self.vms.append(NBVirtualMachine(vm_data))
def get_prefixes(self):
"""Get the IP Prefixes owns by current tenant"""
if self.prefixes:
return self.prefixes
vrf = netboxapi.ipam.vrfs.get(tenant=self.tenant.slug)
for prefix_data in netboxapi.ipam.prefixes.filter(vrf_id=vrf.id):
if prefix_data.description:
self.prefixes[prefix_data.description] = NBPrefix(
prefix_data, self.name_segments
)
return self.prefixes
def generate_netplan(self, name=""):
"""
Get the interface config of specific server belongs to this tenant,
If the name wasn't specified, return the management device config by default
"""
target = None
if not name:
for machine in self.devices + self.vms:
if machine.data["device_role"]["name"] == "Router":
target = machine
break
else:
for machine in self.devices + self.vms:
if machine.name == name:
target = machine
break
return target.generate_netplan()
@yaml.yaml_object(ydump)
class NBPrefix:
prefixes = {}
def __init__(self, data, name_segments):
self.data = data
self.name_segments = name_segments
self.domain_extension = check_name_dns(self.data.description)
logger.debug(
"Preix %s: domain_extension %s, data: %s",
self.data.prefix,
self.domain_extension,
dict(self.data),
)
# ip centric info
self.dhcp_range = None
self.reserved_ips = {}
self.aos = {}
# build item lists
self.build_prefix()
self.prefixes[self.data.prefix] = self
self.prefixes[self.data.prefix] = self
@classmethod
def all_prefixes(cls):
return cls.prefixes
@classmethod
def get_prefix(cls, prefix, name_segments=1):
if prefix in cls.prefixes:
return cls.prefixes[prefix]
data = netboxapi.ipam.prefixes.get(prefix=prefix)
if data:
return NBPrefix(data, name_segments)
else:
raise Exception("The prefix %s wasn't found in Netbox" % prefix)
def __repr__(self):
return str(self.data.prefix)
@classmethod
def to_yaml(cls, representer, node):
return representer.represent_dict(
{
"dhcp_range": node.dhcp_range,
"reserved_ips": node.reserved_ips,
"aos": node.aos,
"prefix_data": dict(node.prefix_data),
}
)
@classmethod
def all_reserved_by_ip(cls, ip_addr=""):
"""
all_reserved_by_ip will return all reserved IP found in prefixes
We have the IP address marked as type 'Reserved' in Prefix,
This type of IP address is using to define a DHCP range
"""
ret = list()
for prefix in cls.prefixes.values():
if ip_addr and ip_addr in prefix.aos.keys():
if prefix.reserved_ips:
return list(prefix.reserved_ips.values())
else:
if prefix.reserved_ips:
ret.extend(list(prefix.reserved_ips.values()))
return ret
def get_reserved_ips(self):
"""
Get the reserved IP range (DHCP) in prefix
We have the IP address marked as type 'Reserved' in Prefix,
This type of IP address is using to define a DHCP range
"""
if prefix.reserved_ips:
return list(prefix.reserved_ips.values())
def parent(self):
"""
Get the parent prefix to this prefix
FIXME: Doesn't handle multiple layers of prefixes, returns first found
"""
# get all parents of this prefix (include self)
possible_parents = netboxapi.ipam.prefixes.filter(contains=self.data.prefix)
logger.debug(
"Prefix %s: possible parents %s", self.data.prefix, possible_parents
)
# filter out self, return first found
for pparent in possible_parents:
if pparent.prefix != self.data.prefix:
return NBPrefix.get_prefix(pparent.prefix, self.name_segments)
return None
def build_prefix(self):
"""
find ip information for items (devices/vms, reserved_ips, dhcp_range) in prefix
"""
ips = netboxapi.ipam.ip_addresses.filter(parent=self.data.prefix)
for ip in sorted(ips, key=lambda k: k["address"]):
logger.debug("prefix_item ip: %s, data: %s", ip, dict(ip))
# if it's a DHCP range, add that range to the dev list as prefix_dhcp
if ip.status.value == "dhcp":
self.dhcp_range = str(ip.address)
continue
# reserved IPs
if ip.status.value == "reserved":
res = {}
res["name"] = ip.description.lower().split(" ")[0]
res["description"] = ip.description
res["ip4"] = str(netaddr.IPNetwork(ip.address))
res["custom_fields"] = ip.custom_fields
self.reserved_ips[str(ip)] = res
continue
# devices and VMs
if ip.assigned_object: # can be null if not assigned to a device/vm
aotype = ip.assigned_object_type
if aotype == "dcim.interface":
self.aos[str(ip)] = NBDevice.get_by_id(
ip.assigned_object.device.id,
)
elif aotype == "virtualization.vminterface":
self.aos[str(ip)] = NBVirtualMachine.get_by_id(
ip.assigned_object.virtual_machine.id,
)
else:
logger.error("IP %s has unknown device type: %s", ip, aotype)
sys.exit(1)
else:
logger.warning("Unknown IP type %s, with attributes: %s", ip, dict(ip))
@yaml.yaml_object(ydump)
class NBAssignedObject:
"""
Assigned Object is either a Device or Virtual Machine, which function
nearly identically in the NetBox data model.
This parent class holds common functions for those two child classes
"""
objects = dict()
def __init__(self, data):
self.data = data
# The AssignedObject attributes
self.id = data.id
self.name = data.name
self.ips = dict()
# The NetBox objects related with this AssignedObject
self.services = None
self.interfaces = list()
self.mgmt_interfaces = list()
self.interfaces_by_ip = dict()
if self.__class__ == NBDevice:
self.interfaces = netboxapi.dcim.interfaces.filter(device_id=self.id)
self.services = netboxapi.ipam.services.filter(device_id=self.id)
ip_addresses = netboxapi.ipam.ip_addresses.filter(device_id=self.id)
elif self.__class__ == NBVirtualMachine:
self.interfaces = netboxapi.virtualization.interfaces.filter(
virtual_machine_id=self.id
)
self.services = netboxapi.ipam.services.filter(virtual_machine_id=self.id)
ip_addresses = netboxapi.ipam.ip_addresses.filter(
virtual_machine_id=self.id
)
for ip in ip_addresses:
self.ips[ip.address] = ip
if ip.assigned_object and self.__class__ == NBDevice:
self.interfaces_by_ip[ip.address] = netboxapi.dcim.interfaces.get(
ip.assigned_object_id
)
elif ip.assigned_object and self.__class__ == NBVirtualMachine:
self.interfaces_by_ip[
ip.address
] = netboxapi.virtualization.interfaces.get(ip.assigned_object_id)
self.interfaces_by_ip[ip.address].mgmt_only = False
logger.debug(
"%s id: %d, data: %s, ips: %s"
% (self.type, self.id, dict(self.data), self.ips)
)
self.netplan_config = dict()
def __repr__(self):
return str(dict(self.data))
def dns_name(self, ip, prefix):
"""
Returns the DNS name for the device at this IP in the prefix
"""
def first_segment_suffix(split_name, suffixes, segments):
first_seg = "-".join([split_name[0], *suffixes])
if segments > 1:
name = ".".join([first_seg, *split_name[1:segments]])
else:
name = first_seg
return name
# clean/split the device name
name_split = clean_name_dns(self.data.name).split(".")
# always add interface suffix to mgmt interfaces
if self.interfaces_by_ip[ip].mgmt_only:
return first_segment_suffix(
name_split, [self.interfaces_by_ip[ip].name], prefix.name_segments
)
# find all IP's for this device in the prefix that aren't mgmt interfaces
prefix_ips = []
for s_ip in self.ips:
if s_ip in prefix.aos and not self.interfaces_by_ip[s_ip].mgmt_only:
prefix_ips.append(s_ip)
# name to use when only one IP address for device in a prefix
simple_name = ".".join(name_split[0 : prefix.name_segments])
# if more than one non-mgmt IP in prefix
if len(prefix_ips) > 1:
# use bare name if primary IP address
try: # skip if no primary_ip.address
if ip == self.data.primary_ip.address:
return simple_name
except AttributeError:
pass
# else, suffix with the interface name, and the last octet of IP address
return first_segment_suffix(
name_split,
[
self.interfaces_by_ip[ip].name,
str(netaddr.IPNetwork(ip).ip.words[3]),
],
prefix.name_segments,
)
# simplest case - only one IP in prefix, return simple_name
return simple_name
def dns_cnames(self, ip):
"""
returns a list of cnames for this object, based on IP matches
"""
cnames = []
for service in self.services:
# if not assigned to any IP's, service is on all IPs
if not service.ipaddresses:
cnames.append(service.name)
continue
# If assigned to an IP, only create a CNAME on that IP
for service_ip in service.ipaddresses:
if ip == service_ip.address:
cnames.append(service.name)
return cnames
def has_service(self, cidr_ip, port, protocol):
"""
Return True if this AO has a service using specific port and protocol combination
"""
if (
cidr_ip in self.interfaces_by_ip
and not self.interfaces_by_ip[cidr_ip].mgmt_only
):
for service in self.services:
if service.port == port and service.protocol.value == protocol:
return True
return False
def primary_iface(self):
"""
Returns the interface data for the device that has the primary_ip
"""
if self.data.primary_ip:
return self.interfaces_by_ip[self.data.primary_ip.address]
return None
@property
def type(self):
return "AssignedObject"
@classmethod
def get_by_id(cls, obj_id):
raise Exception("not implemented")
@classmethod
def all_objects(cls):
return cls.objects
@classmethod
def to_yaml(cls, representer, node):
return representer.represent_dict(
{
"data": node.data,
"services": node.services,
"ips": node.ips,
"interfaces_by_ip": node.interfaces_by_ip,
}
)
def generate_netplan(self):
"""
Get the interface config of specific server belongs to this tenant
"""
if self.netplan_config:
return self.netplan_config
if not self.data:
logger.error(
"{type} {name} doesn't have data yet.".format(
type=self.type, name=self.name
)
)
sys.exit(1)
primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
primary_if = self.interfaces_by_ip[primary_ip] if primary_ip else None
self.netplan_config["ethernets"] = dict()
if self.data.device_role.name == "Router":
for address, interface in self.interfaces_by_ip.items():
if interface.mgmt_only is True or str(interface.type) == "Virtual":
continue
self.netplan_config["ethernets"].setdefault(interface.name, {})
self.netplan_config["ethernets"][interface.name].setdefault(
"addresses", []
).append(address)
elif self.data.device_role.name == "Server":
if primary_if:
self.netplan_config["ethernets"][primary_if.name] = {
"dhcp4": "yes",
"dhcp4-overrides": {"route-metric": 100},
}
for physical_if in filter(
lambda i: str(i.type) != "Virtual"
and i != primary_if
and i.mgmt_only is False,
self.interfaces,
):
self.netplan_config["ethernets"][physical_if.name] = {
"dhcp4": "yes",
"dhcp4-overrides": {"route-metric": 200},
}
else:
# Exclude the device type which is not Router and Server
return None
# Get interfaces own by AssignedObject and is virtual (VLAN interface)
for virtual_if in filter(lambda i: str(i.type) == "Virtual", self.interfaces):
if "vlans" not in self.netplan_config:
self.netplan_config["vlans"] = dict()
# vlan_object_id is the "id" on netbox, it's different from known VLAN ID
vlan_object_id = virtual_if.tagged_vlans[0].id
vlan_object = netboxapi.ipam.vlans.get(vlan_object_id)
virtual_if_ips = netboxapi.ipam.ip_addresses.filter(
interface_id=virtual_if.id
)
routes = []
for ip in virtual_if_ips:
reserved_ips = NBPrefix.all_reserved_by_ip(str(ip))
for reserved_ip in reserved_ips:
destination = reserved_ip["custom_fields"].get("rfc3442routes", "")
if destination:
for dest_ip in destination.split():
new_route = {
"to": dest_ip,
"via": str(netaddr.IPNetwork(reserved_ip["ip4"]).ip),
"metric": 100,
}
if new_route not in routes:
routes.append(new_route)
self.netplan_config["vlans"][virtual_if.name] = {
"id": vlan_object.vid,
"link": virtual_if.label,
"addresses": [ip.address for ip in virtual_if_ips],
}
if routes:
self.netplan_config["vlans"][virtual_if.name]["routes"] = routes
# If the object is mgmtserver, it needs to have DNS/NTP server configs
if self.data["device_role"]["name"] == "Router":
services = list(netboxapi.ipam.services.filter(device_id=self.id))
service_names = list(map(lambda x: x.name, services))
if "dns" in service_names:
unbound_listen_ips = []
unbound_allow_ips = []
for ip, intf in self.interfaces_by_ip.items():
if ip != primary_ip and intf.mgmt_only == False:
unbound_listen_ips.append(ip)
for prefix in NBPrefix.all_prefixes().values():
if prefix.data.description:
unbound_allow_ips.append(prefix.data.prefix)
ntp_client_allow.append(prefix.data.prefix)
if unbound_listen_ips:
self.netplan_config["unbound_listen_ips"] = unbound_listen_ips
if unbound_allow_ips:
self.netplan_config["unbound_allow_ips"] = unbound_allow_ips
if "ntp" in service_names:
ntp_client_allow = []
for prefix in NBPrefix.all_prefixes().values():
if prefix.data.description:
ntp_client_allow.append(prefix.data.prefix)
if ntp_client_allow:
self.netplan_config["ntp_client_allow"] = ntp_client_allow
return self.netplan_config
@yaml.yaml_object(ydump)
class NBDevice(NBAssignedObject):
"""
Wraps a single Netbox device
Also caches all known devices in a class variable (devs)
"""
objects = dict()
def __init__(self, data):
super().__init__(data)
self.objects[self.id] = self
@property
def type(self):
return "NBDevice"
def get_interfaces(self):
if not self.interfaces:
self.interfaces = netboxapi.dcim.interfaces.filter(device_id=self.id)
return self.interfaces
@classmethod
def get_by_id(cls, obj_id):
obj = cls.objects.get(obj_id, None)
obj = obj or NBDevice(netboxapi.dcim.devices.get(obj_id))
return obj
@yaml.yaml_object(ydump)
class NBVirtualMachine(NBAssignedObject):
"""
VM equivalent of NBDevice
"""
objects = dict()
def __init__(self, data):
super().__init__(data)
self.objects[self.id] = self
@property
def type(self):
return "NBVirtualMachine"
def get_interfaces(self):
if not self.interfaces:
self.interfaces = netboxapi.virtualization.interfaces.filter(
virtual_machine_id=self.id
)
return self.interfaces
@classmethod
def get_by_id(cls, obj_id):
obj = cls.objects.get(obj_id, None)
obj = obj or NBVirtualMachine(
netboxapi.virtualization.virtual_machines.get(obj_id)
)
return obj
@yaml.yaml_object(ydump)
class NBDNSForwardZone:
fwd_zones = {}
def __init__(self, prefix):
self.domain_extension = prefix.domain_extension
self.a_recs = {}
self.cname_recs = {}
self.srv_recs = {}
self.ns_recs = []
self.txt_recs = {}
if prefix.dhcp_range:
self.create_dhcp_fwd(prefix.dhcp_range)
for ip, ao in prefix.aos.items():
self.add_ao_records(prefix, ip, ao)
for ip, res in prefix.reserved_ips.items():
self.add_reserved(ip, res)
# reqquired for the add_fwd_cname function below
if callable(getattr(prefix, "parent")):
parent_prefix = prefix.parent()
if parent_prefix:
self.merge_parent_prefix(parent_prefix, prefix)
self.fwd_zones[self.domain_extension] = self
def __repr__(self):
return str(
{
"a": self.a_recs,
"cname": self.cname_recs,
"ns": self.ns_recs,
"srv": self.srv_recs,
"txt": self.txt_recs,
}
)
@classmethod
def add_fwd_cname(cls, cname, fqdn_dest):
"""
Add an arbitrary CNAME (and possibly create the fwd zone if needed) pointing
at a FQDN destination name. It's used to support the per-IP "DNS name" field in NetBox
Note that the NS record
"""
try:
fqdn_split = re.compile(r"([a-z]+)\.([a-z.]+)\.")
(short_name, extension) = fqdn_split.match(cname).groups()
except AttributeError:
logger.warning(
"Invalid DNS CNAME: '%s', must be in FQDN format: 'host.example.com.', ignored",
cname,
)
return
fake_prefix = AttrDict(
{
"domain_extension": extension,
"dhcp_range": None,
"aos": {},
"reserved_ips": {},
"parent": None,
}
)
fwd_zone = cls.get_fwd_zone(fake_prefix)
fwd_zone.cname_recs[short_name] = fqdn_dest
@classmethod
def get_fwd_zone(cls, prefix):
if prefix.domain_extension in cls.fwd_zones:
return cls.fwd_zones[prefix.domain_extension]
return NBDNSForwardZone(prefix)
@classmethod
def all_fwd_zones(cls):
return cls.fwd_zones
@classmethod
def to_yaml(cls, representer, node):
return representer.represent_dict(
{
"a": node.a_recs,
"cname": node.cname_recs,
"ns": node.ns_recs,
"srv": node.srv_recs,
"txt": node.txt_recs,
}
)
def fqdn(self, name):
return "%s.%s." % (name, self.domain_extension)
def create_dhcp_fwd(self, dhcp_range):
for ip in netaddr.IPNetwork(dhcp_range).iter_hosts():
self.a_recs["dhcp%03d" % (ip.words[3])] = str(ip)
def name_is_duplicate(self, name, target, record_type):
"""
Returns True if name already exists in the zone as an A or CNAME
record, False otherwise
"""
if name in self.a_recs:
logger.warning(
"Duplicate DNS record for name %s - A record to '%s', %s record to '%s'",
name,
self.a_recs[name],
record_type,
target,
)
return True
if name in self.cname_recs:
logger.warning(
"Duplicate DNS record for name %s - CNAME record to '%s', %s record to '%s'",
name,
self.cname_recs[name],
record_type,
target,
)
return True
return False
def add_ao_records(self, prefix, ip, ao):
name = ao.dns_name(ip, prefix)
target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
# add A records
if not self.name_is_duplicate(name, target_ip, "A"):
self.a_recs[name] = target_ip
# add CNAME records that alias to this name
for cname in ao.dns_cnames(ip):
# check that it isn't a dupe
if not self.name_is_duplicate(cname, target_ip, "CNAME"):
self.cname_recs[cname] = self.fqdn(name)
# add NS records if this is a DNS server
if ao.has_service(ip, 53, "udp"):
self.ns_recs.append(self.fqdn(name))
# if a DNS name is set, add it as a CNAME
if ao.ips[ip]["dns_name"]: # and ip == aos.data.primary_ip.address:
self.add_fwd_cname(ao.ips[ip]["dns_name"], self.fqdn(name))
def add_reserved(self, ip, res):
target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
if not self.name_is_duplicate(res["name"], target_ip, "A"):
self.a_recs[res["name"]] = target_ip
def merge_parent_prefix(self, pprefix, prefix):
# only if no NS records exist already
if not self.ns_recs:
# scan parent prefix for services
for ip, ao in pprefix.aos.items():
# Create a DNS within this prefix pointing to out-of-prefix IP
# where DNS server is
name = ao.dns_name(ip, prefix)
target_ip = str(
netaddr.IPNetwork(ip).ip
) # make bare IP, not CIDR format
# add NS records if this is a DNS server
if ao.has_service(ip, 53, "udp"):
self.a_recs[name] = target_ip
self.ns_recs.append(self.fqdn(name))
@yaml.yaml_object(ydump)
class NBDNSReverseZones:
def __init__(self):
self.reverse_zones = {}
@classmethod
def to_yaml(cls, representer, node):
return representer.represent_dict(node.reverse_zones)
@classmethod
def canonicalize_rfc1918_prefix(cls, prefix):
"""
RFC1918 prefixes need to be expanded to their widest canonical range to
group all reverse lookup domains together for reverse DNS with NSD/Unbound.
"""
pnet = netaddr.IPNetwork(str(prefix))
(o1, o2, o3, o4) = pnet.network.words # Split ipv4 octets
cidr_plen = pnet.prefixlen
if o1 == 10:
o2 = o3 = o4 = 0
cidr_plen = 8
elif (o1 == 172 and o2 >= 16 and o2 <= 31) or (o1 == 192 and o2 == 168):
o3 = o4 = 0
cidr_plen = 16
return "%s/%d" % (".".join(map(str, [o1, o2, o3, o4])), cidr_plen)
def add_prefix(self, prefix):
canonical_prefix = self.canonicalize_rfc1918_prefix(prefix)
if canonical_prefix in self.reverse_zones:
rzone = self.reverse_zones[canonical_prefix]
else:
rzone = {
"ns": [],
"ptr": {},
}
if prefix.dhcp_range:
# FIXME: doesn't check for duplicate entries
rzone["ptr"].update(self.create_dhcp_rev(prefix))
for ip, ao in prefix.aos.items():
target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
ao_name = self.get_ao_name(ip, ao, prefix,)
rzone["ptr"][target_ip] = ao_name
# add NS records if this is a DNS server
if ao.has_service(ip, 53, "udp"):
rzone["ns"].append(ao_name)
parent_prefix = prefix.parent()
if parent_prefix:
self.merge_parent_prefix(rzone, parent_prefix)
self.reverse_zones[canonical_prefix] = rzone
def merge_parent_prefix(self, rzone, pprefix):
# parent items
p_ns = []
# scan parent prefix for services
for ip, ao in pprefix.aos.items():
ao_name = self.get_ao_name(ip, ao, pprefix,)
# add NS records if this is a DNS server
if ao.has_service(ip, 53, "udp"):
p_ns.append(ao_name)
# set DNS servers if none in rzone
if not rzone["ns"]:
rzone["ns"] = p_ns
def create_dhcp_rev(self, prefix):
dhcp_rzone = {}
for ip in netaddr.IPNetwork(prefix.dhcp_range).iter_hosts():
dhcp_rzone[str(ip)] = "dhcp%03d.%s." % (
ip.words[3],
prefix.domain_extension,
)
return dhcp_rzone
def get_ao_name(self, ip, ao, prefix):
short_name = ao.dns_name(ip, prefix)
return "%s.%s." % (short_name, prefix.domain_extension)
@yaml.yaml_object(ydump)
class NBDHCPSubnet:
def __init__(self, prefix):
self.domain_extension = prefix.domain_extension
self.subnet = None
self.range = None
self.first_ip = None
self.hosts = []
self.routers = []
self.dns_servers = []
self.dns_search = []
self.tftpd_server = None
self.ntp_servers = []
self.dhcpd_interface = None
self.add_prefix(prefix)
for ip, ao in prefix.aos.items():
self.add_ao(str(ip), ao, prefix)
parent_prefix = prefix.parent()
if parent_prefix:
self.merge_parent_prefix(parent_prefix)
def add_prefix(self, prefix):
self.subnet = str(prefix)
self.first_ip = str(netaddr.IPAddress(netaddr.IPNetwork(str(prefix)).first + 1))
self.dns_search = [prefix.domain_extension]
if prefix.dhcp_range:
self.range = prefix.dhcp_range
for ip, res in prefix.reserved_ips.items():
# routers are reserved IP's that start with 'router" in the IP description
if re.match("router", res["description"]):
router = {"ip": str(netaddr.IPNetwork(ip).ip)}
if (
"rfc3442routes" in res["custom_fields"]
and res["custom_fields"]["rfc3442routes"]
):
# split on whitespace
router["rfc3442routes"] = re.split(
r"\s+", res["custom_fields"]["rfc3442routes"]
)
self.routers.append(router)
# set first IP to router if not set otherwise.
if not self.routers:
router = {"ip": self.first_ip}
self.routers.append(router)
def add_ao(self, ip, ao, prefix):
target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
# find the DHCP interface if it's this IP
if target_ip == self.first_ip:
self.dhcpd_interface = ao.interfaces_by_ip[ip].name
name = ao.dns_name(ip, prefix)
# add only devices that have a macaddr for this IP
if ip in ao.interfaces_by_ip:
mac_addr = dict(ao.interfaces_by_ip[ip]).get("mac_address")
if mac_addr and mac_addr.strip(): # if exists and not blank
self.hosts.append(
{"name": name, "ip_addr": target_ip, "mac_addr": mac_addr.lower()}
)
# add dns servers
if ao.has_service(ip, 53, "udp"):
self.dns_servers.append(target_ip)
# add tftp server
if ao.has_service(ip, 69, "udp"):
if not self.tftpd_server:
self.tftpd_server = target_ip
else:
logger.warning(
"Duplicate TFTP servers in prefix, using first of %s and %s",
self.tftpd_server,
target_ip,
)
# add NTP servers
if ao.has_service(ip, 123, "udp"):
self.ntp_servers.append(target_ip)
def merge_parent_prefix(self, pprefix):
# parent items
p_dns_servers = []
p_tftpd_server = None
p_ntp_servers = []
# scan parent prefix for services
for ip, ao in pprefix.aos.items():
target_ip = str(netaddr.IPNetwork(ip).ip)
# add dns servers
if ao.has_service(ip, 53, "udp"):
p_dns_servers.append(target_ip)
# add tftp server
if ao.has_service(ip, 69, "udp"):
if not p_tftpd_server:
p_tftpd_server = target_ip
else:
logger.warning(
"Duplicate TFTP servers in parent prefix, using first of %s and %s",
p_tftpd_server,
target_ip,
)
# add NTP servers
if ao.has_service(ip, 123, "udp"):
p_ntp_servers.append(target_ip)
# merge if doesn't exist in prefix
if not self.dns_servers:
self.dns_servers = p_dns_servers
if not self.tftpd_server:
self.tftpd_server = p_tftpd_server
if not self.ntp_servers:
self.ntp_servers = p_ntp_servers
@classmethod
def to_yaml(cls, representer, node):
return representer.represent_dict(
{
"subnet": node.subnet,
"range": node.range,
"routers": node.routers,
"hosts": node.hosts,
"dns_servers": node.dns_servers,
"dns_search": node.dns_search,
"tftpd_server": node.tftpd_server,
"ntp_servers": node.ntp_servers,
}
)