| #!/usr/bin/env python3 |
| |
| # SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org> |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| # nbhelper.py |
| # Helper functions for building YAML output from Netbox API calls |
| |
| from __future__ import absolute_import |
| |
| import re |
| import sys |
| import argparse |
| import logging |
| import netaddr |
| import pynetbox |
| import requests |
| |
| from ruamel import yaml |
| |
| # create shared logger |
| logging.basicConfig() |
| logger = logging.getLogger("nbh") |
| |
| # to dump YAML properly, using internal representers |
| # see also: |
| # https://stackoverflow.com/questions/54378220/declare-data-type-to-ruamel-yaml-so-that-it-can-represen-serialize-it |
| # https://sourceforge.net/p/ruamel-yaml/code/ci/default/tree/representer.py |
| |
| ydump = yaml.YAML(typ="safe") |
| ydump.representer.add_representer( |
| pynetbox.models.dcim.Devices, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.models.dcim.Interfaces, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.models.ipam.Prefixes, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.core.response.Record, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.models.ipam.IpAddresses, yaml.SafeRepresenter.represent_dict |
| ) |
| ydump.representer.add_representer( |
| pynetbox.core.api.Api, yaml.SafeRepresenter.represent_none |
| ) |
| |
| netboxapi = None |
| netbox_config = None |
| netbox_version = None |
| |
| |
| def initialize(extra_args): |
| global netboxapi, netbox_config, netbox_version |
| |
| args = parse_cli_args(extra_args) |
| netbox_config = yaml.safe_load(args.settings.read()) |
| |
| for require_args in ["api_endpoint", "token", "tenant_name"]: |
| if not netbox_config.get(require_args): |
| logger.error("The require argument: %s was not set. Stop." % require_args) |
| sys.exit(1) |
| |
| netboxapi = pynetbox.api( |
| netbox_config["api_endpoint"], token=netbox_config["token"], threading=True, |
| ) |
| |
| if not netbox_config.get("validate_certs", True): |
| session = requests.Session() |
| session.verify = False |
| netboxapi.http_session = session |
| |
| netbox_version = netboxapi.version |
| |
| return args |
| |
| |
| def parse_cli_args(extra_args={}): |
| """ |
| parse CLI arguments. Can add extra arguments with a option:kwargs dict |
| """ |
| |
| parser = argparse.ArgumentParser(description="Netbox") |
| |
| # Positional args |
| parser.add_argument( |
| "settings", |
| type=argparse.FileType("r"), |
| help="YAML Ansible inventory file w/NetBox API token", |
| ) |
| |
| parser.add_argument( |
| "--debug", action="store_true", help="Print additional debugging information" |
| ) |
| |
| for ename, ekwargs in extra_args.items(): |
| parser.add_argument(ename, **ekwargs) |
| |
| args = parser.parse_args() |
| log_level = logging.DEBUG if args.debug else logging.INFO |
| logger.setLevel(log_level) |
| |
| return args |
| |
| |
| def check_name_dns(name): |
| |
| badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII) |
| |
| if badchars: |
| logger.error( |
| "DNS name '%s' has one or more invalid characters: '%s'", |
| name, |
| badchars.group(0), |
| ) |
| sys.exit(1) |
| |
| return name.lower() |
| |
| |
| def clean_name_dns(name): |
| return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII) |
| |
| |
| class AttrDict(dict): |
| def __init__(self, *args, **kwargs): |
| super(AttrDict, self).__init__(*args, **kwargs) |
| self.__dict__ = self |
| |
| |
| class NBTenant: |
| def __init__(self): |
| self.name = netbox_config["tenant_name"] |
| self.name_segments = netbox_config.get("prefix_segments", 1) |
| self.tenant = netboxapi.tenancy.tenants.get(name=self.name) |
| |
| # NBTenant only keep the resources which owns by it |
| self.devices = list() |
| self.vms = list() |
| self.prefixes = dict() |
| |
| # Get the Device and Virtual Machines from Netbox API |
| for device_data in netboxapi.dcim.devices.filter(tenant=self.tenant.slug): |
| self.devices.append(NBDevice(device_data)) |
| |
| for vm_data in netboxapi.virtualization.virtual_machines.filter( |
| tenant=self.tenant.slug |
| ): |
| self.vms.append(NBVirtualMachine(vm_data)) |
| |
| def get_prefixes(self): |
| """Get the IP Prefixes owns by current tenant""" |
| |
| if self.prefixes: |
| return self.prefixes |
| |
| vrf = netboxapi.ipam.vrfs.get(tenant=self.tenant.slug) |
| for prefix_data in netboxapi.ipam.prefixes.filter(vrf_id=vrf.id): |
| if prefix_data.description: |
| self.prefixes[prefix_data.description] = NBPrefix( |
| prefix_data, self.name_segments |
| ) |
| |
| return self.prefixes |
| |
| def get_device_by_name(self, name): |
| """ |
| Find the device or VM which belongs to this Tenant, |
| If the name wasn't specified, return the management server |
| """ |
| |
| for machine in self.devices + self.vms: |
| if name and machine.name == name: |
| return machine |
| elif machine.data["device_role"]["name"] == "Router": |
| return machine |
| |
| ret_msg = ( |
| "The name '%s' wasn't found in this tenant, " |
| + "or can't found any Router in this tenant" |
| ) |
| |
| logger.error(ret_msg, name) |
| sys.exit(1) |
| |
| def get_devices(self, device_types=["server", "router"]): |
| """ |
| Get all devices (Router + Server) belong to this Tenant |
| """ |
| |
| if not device_types: |
| return self.devices + self.vms |
| |
| ret = [] |
| |
| for machine in self.devices: |
| if machine.data.device_role.slug in device_types: |
| ret.append(machine) |
| |
| for vm in self.vms: |
| if vm.data.role.slug in device_types: |
| ret.append(vm) |
| |
| return ret |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBPrefix: |
| |
| prefixes = {} |
| |
| def __init__(self, data, name_segments): |
| self.data = data |
| self.name_segments = name_segments |
| self.domain_extension = check_name_dns(self.data.description) |
| |
| logger.debug( |
| "Preix %s: domain_extension %s, data: %s", |
| self.data.prefix, |
| self.domain_extension, |
| dict(self.data), |
| ) |
| |
| # ip centric info |
| self.dhcp_range = None |
| self.reserved_ips = {} |
| self.aos = {} |
| |
| # build item lists |
| self.build_prefix() |
| self.prefixes[self.data.prefix] = self |
| |
| @classmethod |
| def all_prefixes(cls): |
| return cls.prefixes |
| |
| @classmethod |
| def get_prefix(cls, prefix, name_segments=1): |
| if prefix in cls.prefixes: |
| return cls.prefixes[prefix] |
| |
| data = netboxapi.ipam.prefixes.get(prefix=prefix) |
| if data: |
| return NBPrefix(data, name_segments) |
| else: |
| raise Exception("The prefix %s wasn't found in Netbox" % prefix) |
| |
| def __repr__(self): |
| return str(self.data.prefix) |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "dhcp_range": node.dhcp_range, |
| "reserved_ips": node.reserved_ips, |
| "aos": node.aos, |
| "prefix_data": dict(node.prefix_data), |
| } |
| ) |
| |
| @classmethod |
| def all_reserved_by_ip(cls, ip_addr=""): |
| """ |
| all_reserved_by_ip will return all reserved IP found in prefixes |
| |
| We have the IP address marked as type 'Reserved' in Prefix, |
| This type of IP address is using to define a DHCP range |
| """ |
| |
| ret = list() |
| |
| for prefix in cls.prefixes.values(): |
| if ip_addr and ip_addr in prefix.aos.keys(): |
| if prefix.reserved_ips: |
| return list(prefix.reserved_ips.values()) |
| else: |
| if prefix.reserved_ips: |
| ret.extend(list(prefix.reserved_ips.values())) |
| |
| return ret |
| |
| def get_reserved_ips(self): |
| """ |
| Get the reserved IP range (DHCP) in prefix |
| |
| We have the IP address marked as type 'Reserved' in Prefix, |
| This type of IP address is using to define a DHCP range |
| """ |
| if prefix.reserved_ips: |
| return list(prefix.reserved_ips.values()) |
| |
| def parent(self): |
| """ |
| Get the parent prefix to this prefix |
| |
| FIXME: Doesn't handle multiple layers of prefixes, returns first found |
| """ |
| |
| # get all parents of this prefix (include self) |
| possible_parents = netboxapi.ipam.prefixes.filter(contains=self.data.prefix) |
| |
| logger.debug( |
| "Prefix %s: possible parents %s", self.data.prefix, possible_parents |
| ) |
| |
| # filter out self, return first found |
| for pparent in possible_parents: |
| if pparent.prefix != self.data.prefix: |
| return NBPrefix.get_prefix(pparent.prefix, self.name_segments) |
| |
| return None |
| |
| def build_prefix(self): |
| """ |
| find ip information for items (devices/vms, reserved_ips, dhcp_range) in prefix |
| """ |
| |
| ips = netboxapi.ipam.ip_addresses.filter(parent=self.data.prefix) |
| |
| for ip in sorted(ips, key=lambda k: k["address"]): |
| |
| logger.debug("prefix_item ip: %s, data: %s", ip, dict(ip)) |
| |
| # if it's a DHCP range, add that range to the dev list as prefix_dhcp |
| if ip.status.value == "dhcp": |
| self.dhcp_range = str(ip.address) |
| continue |
| |
| # reserved IPs |
| if ip.status.value == "reserved": |
| |
| res = {} |
| res["name"] = ip.description.lower().split(" ")[0] |
| res["description"] = ip.description |
| res["ip4"] = str(netaddr.IPNetwork(ip.address)) |
| res["custom_fields"] = ip.custom_fields |
| |
| self.reserved_ips[str(ip)] = res |
| continue |
| |
| # devices and VMs |
| if ip.assigned_object: # can be null if not assigned to a device/vm |
| aotype = ip.assigned_object_type |
| if aotype == "dcim.interface": |
| self.aos[str(ip)] = NBDevice.get_by_id( |
| ip.assigned_object.device.id, |
| ) |
| elif aotype == "virtualization.vminterface": |
| self.aos[str(ip)] = NBVirtualMachine.get_by_id( |
| ip.assigned_object.virtual_machine.id, |
| ) |
| else: |
| logger.error("IP %s has unknown device type: %s", ip, aotype) |
| sys.exit(1) |
| else: |
| logger.warning("Unknown IP type %s, with attributes: %s", ip, dict(ip)) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBAssignedObject: |
| """ |
| Assigned Object is either a Device or Virtual Machine, which function |
| nearly identically in the NetBox data model. |
| |
| This parent class holds common functions for those two child classes |
| """ |
| |
| objects = dict() |
| |
| def __init__(self, data): |
| self.data = data |
| |
| # The AssignedObject attributes |
| self.id = data.id |
| self.name = data.name |
| self.ips = dict() |
| |
| # The NetBox objects related with this AssignedObject |
| self.services = None |
| self.interfaces = list() |
| self.mgmt_interfaces = list() |
| self.interfaces_by_ip = dict() |
| |
| if self.__class__ == NBDevice: |
| self.interfaces = netboxapi.dcim.interfaces.filter(device_id=self.id) |
| self.services = netboxapi.ipam.services.filter(device_id=self.id) |
| ip_addresses = netboxapi.ipam.ip_addresses.filter(device_id=self.id) |
| elif self.__class__ == NBVirtualMachine: |
| self.interfaces = netboxapi.virtualization.interfaces.filter( |
| virtual_machine_id=self.id |
| ) |
| self.services = netboxapi.ipam.services.filter(virtual_machine_id=self.id) |
| ip_addresses = netboxapi.ipam.ip_addresses.filter( |
| virtual_machine_id=self.id |
| ) |
| |
| for ip in ip_addresses: |
| self.ips[ip.address] = ip |
| if ip.assigned_object and self.__class__ == NBDevice: |
| self.interfaces_by_ip[ip.address] = netboxapi.dcim.interfaces.get( |
| ip.assigned_object_id |
| ) |
| elif ip.assigned_object and self.__class__ == NBVirtualMachine: |
| self.interfaces_by_ip[ |
| ip.address |
| ] = netboxapi.virtualization.interfaces.get(ip.assigned_object_id) |
| self.interfaces_by_ip[ip.address].mgmt_only = False |
| |
| logger.debug( |
| "%s id: %d, data: %s, ips: %s" |
| % (self.type, self.id, dict(self.data), self.ips) |
| ) |
| |
| self.netplan_config = dict() |
| self.extra_config = dict() |
| |
| def __repr__(self): |
| return str(dict(self.data)) |
| |
| def dns_name(self, ip, prefix): |
| """ |
| Returns the DNS name for the device at this IP in the prefix |
| """ |
| |
| def first_segment_suffix(split_name, suffixes, segments): |
| first_seg = "-".join([split_name[0], *suffixes]) |
| |
| if segments > 1: |
| name = ".".join([first_seg, *split_name[1:segments]]) |
| else: |
| name = first_seg |
| |
| return name |
| |
| # clean/split the device name |
| name_split = clean_name_dns(self.data.name).split(".") |
| |
| # always add interface suffix to mgmt interfaces |
| if self.interfaces_by_ip[ip].mgmt_only: |
| return first_segment_suffix( |
| name_split, [self.interfaces_by_ip[ip].name], prefix.name_segments |
| ) |
| |
| # find all IP's for this device in the prefix that aren't mgmt interfaces |
| prefix_ips = [] |
| for s_ip in self.ips: |
| if s_ip in prefix.aos and not self.interfaces_by_ip[s_ip].mgmt_only: |
| prefix_ips.append(s_ip) |
| |
| # name to use when only one IP address for device in a prefix |
| simple_name = ".".join(name_split[0 : prefix.name_segments]) |
| |
| # if more than one non-mgmt IP in prefix |
| if len(prefix_ips) > 1: |
| |
| # use bare name if primary IP address |
| try: # skip if no primary_ip.address |
| if ip == self.data.primary_ip.address: |
| return simple_name |
| except AttributeError: |
| pass |
| |
| # else, suffix with the interface name, and the last octet of IP address |
| return first_segment_suffix( |
| name_split, |
| [ |
| self.interfaces_by_ip[ip].name, |
| str(netaddr.IPNetwork(ip).ip.words[3]), |
| ], |
| prefix.name_segments, |
| ) |
| |
| # simplest case - only one IP in prefix, return simple_name |
| return simple_name |
| |
| def dns_cnames(self, ip): |
| """ |
| returns a list of cnames for this object, based on IP matches |
| """ |
| |
| cnames = [] |
| |
| for service in self.services: |
| |
| # if not assigned to any IP's, service is on all IPs |
| if not service.ipaddresses: |
| cnames.append(service.name) |
| continue |
| |
| # If assigned to an IP, only create a CNAME on that IP |
| for service_ip in service.ipaddresses: |
| if ip == service_ip.address: |
| cnames.append(service.name) |
| |
| return cnames |
| |
| def has_service(self, cidr_ip, port, protocol): |
| """ |
| Return True if this AO has a service using specific port and protocol combination |
| """ |
| |
| if ( |
| cidr_ip in self.interfaces_by_ip |
| and not self.interfaces_by_ip[cidr_ip].mgmt_only |
| ): |
| for service in self.services: |
| if service.port == port and service.protocol.value == protocol: |
| return True |
| |
| return False |
| |
| def primary_iface(self): |
| """ |
| Returns the interface data for the device that has the primary_ip |
| """ |
| |
| if self.data.primary_ip: |
| return self.interfaces_by_ip[self.data.primary_ip.address] |
| |
| return None |
| |
| @property |
| def type(self): |
| return "AssignedObject" |
| |
| @classmethod |
| def get_by_id(cls, obj_id): |
| raise Exception("not implemented") |
| |
| @classmethod |
| def all_objects(cls): |
| return cls.objects |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "data": node.data, |
| "services": node.services, |
| "ips": node.ips, |
| "interfaces_by_ip": node.interfaces_by_ip, |
| } |
| ) |
| |
| def generate_netplan(self): |
| """ |
| Get the interface config of specific server belongs to this tenant |
| """ |
| |
| if self.netplan_config: |
| return self.netplan_config |
| |
| if not self.data: |
| logger.error( |
| "{type} {name} doesn't have data yet.".format( |
| type=self.type, name=self.name |
| ) |
| ) |
| sys.exit(1) |
| |
| primary_ip = self.data.primary_ip.address if self.data.primary_ip else None |
| primary_if = self.interfaces_by_ip[primary_ip] if primary_ip else None |
| |
| self.netplan_config["ethernets"] = dict() |
| |
| if (isinstance(self, NBDevice) and self.data.device_role.name == "Router") or ( |
| isinstance(self, NBVirtualMachine) and self.data.role.name == "Router" |
| ): |
| for address, interface in self.interfaces_by_ip.items(): |
| if interface.mgmt_only is True or str(interface.type) == "Virtual": |
| continue |
| |
| self.netplan_config["ethernets"].setdefault(interface.name, {}) |
| self.netplan_config["ethernets"][interface.name].setdefault( |
| "addresses", [] |
| ).append(address) |
| |
| elif isinstance(self, NBDevice) and self.data.device_role.name == "Server": |
| if primary_if: |
| self.netplan_config["ethernets"][primary_if.name] = { |
| "dhcp4": "yes", |
| "dhcp4-overrides": {"route-metric": 100}, |
| } |
| |
| for physical_if in filter( |
| lambda i: str(i.type) != "Virtual" |
| and i != primary_if |
| and i.mgmt_only is False, |
| self.interfaces, |
| ): |
| self.netplan_config["ethernets"][physical_if.name] = { |
| "dhcp4": "yes", |
| "dhcp4-overrides": {"route-metric": 200}, |
| } |
| else: |
| # Exclude the device type which is not Router and Server |
| return None |
| |
| # Get interfaces own by AssignedObject and is virtual (VLAN interface) |
| for virtual_if in filter(lambda i: str(i.type) == "Virtual", self.interfaces): |
| if "vlans" not in self.netplan_config: |
| self.netplan_config["vlans"] = dict() |
| |
| if not virtual_if.tagged_vlans: |
| # If a virtual interface doesn't have tagged VLAN, skip |
| continue |
| |
| # vlan_object_id is the "id" on netbox, it's different from known VLAN ID |
| vlan_object_id = virtual_if.tagged_vlans[0].id |
| vlan_object = netboxapi.ipam.vlans.get(vlan_object_id) |
| virtual_if_ips = netboxapi.ipam.ip_addresses.filter( |
| interface_id=virtual_if.id |
| ) |
| |
| routes = [] |
| for ip in virtual_if_ips: |
| reserved_ips = NBPrefix.all_reserved_by_ip(str(ip)) |
| for reserved_ip in reserved_ips: |
| destination = reserved_ip["custom_fields"].get("rfc3442routes", "") |
| if destination: |
| for dest_ip in destination.split(): |
| new_route = { |
| "to": dest_ip, |
| "via": str(netaddr.IPNetwork(reserved_ip["ip4"]).ip), |
| "metric": 100, |
| } |
| if new_route not in routes: |
| routes.append(new_route) |
| |
| self.netplan_config["vlans"][virtual_if.name] = { |
| "id": vlan_object.vid, |
| "link": virtual_if.label, |
| "addresses": [ip.address for ip in virtual_if_ips], |
| } |
| |
| if routes: |
| self.netplan_config["vlans"][virtual_if.name]["routes"] = routes |
| |
| return self.netplan_config |
| |
| def generate_nftables(self): |
| |
| ret = dict() |
| |
| primary_ip = self.data.primary_ip.address if self.data.primary_ip else None |
| external_if = self.interfaces_by_ip[primary_ip] if primary_ip else None |
| internal_if = None |
| |
| if external_if is None: |
| logger.error("The primary interface wasn't set for device %s", self.name) |
| sys.exit(1) |
| |
| for intf in filter( |
| lambda i: str(i.type) != "Virtual" and i.mgmt_only is False, |
| self.interfaces_by_ip.values(), |
| ): |
| if intf.id != external_if.id: |
| internal_if = intf |
| break |
| |
| ret["external_if"] = external_if.name |
| ret["internal_if"] = internal_if.name |
| |
| if self.services: |
| ret["services"] = list() |
| |
| for service in self.services: |
| ret["services"].append( |
| { |
| "name": service.name, |
| "protocol": service.protocol.value, |
| "port": service.port, |
| } |
| ) |
| |
| # Only management server needs to be configured the whitelist netrange of internal interface |
| if self.data.device_role.name == "Router": |
| ret["allow_subnets"] = list() |
| ret["ue_routing"] = dict() |
| ret["ue_routing"]["ue_subnets"] = self.data.config_context["ue_subnets"] |
| for prefix in NBPrefix.all_prefixes().values(): |
| if prefix.data.description: |
| ret["allow_subnets"].append(prefix.data.prefix) |
| |
| if "fab" in prefix.data.description: |
| ret["ue_routing"].setdefault("src_subnets", []) |
| ret["ue_routing"]["src_subnets"].append(prefix.data.prefix) |
| |
| if ( |
| not ret["ue_routing"].get("snat_addr") |
| and "fab" in prefix.data.description |
| ): |
| for ip, device in prefix.aos.items(): |
| if device.name == self.name: |
| ret["ue_routing"]["snat_addr"] = ip |
| break |
| |
| return ret |
| |
| def generate_extra_config(self): |
| """ |
| Generate the extra configs which need in management server configuration |
| This function should only be called when the device role is "Router" |
| """ |
| |
| if self.extra_config: |
| return self.extra_config |
| |
| primary_ip = self.data.primary_ip.address if self.data.primary_ip else None |
| |
| service_names = list(map(lambda x: x.name, self.services)) |
| |
| if "dns" in service_names: |
| unbound_listen_ips = [] |
| unbound_allow_ips = [] |
| |
| for ip, intf in self.interfaces_by_ip.items(): |
| if ip != primary_ip and intf.mgmt_only == False: |
| unbound_listen_ips.append(ip) |
| |
| for prefix in NBPrefix.all_prefixes().values(): |
| if prefix.data.description: |
| unbound_allow_ips.append(prefix.data.prefix) |
| |
| if unbound_listen_ips: |
| self.extra_config["unbound_listen_ips"] = unbound_listen_ips |
| |
| if unbound_allow_ips: |
| self.extra_config["unbound_allow_ips"] = unbound_allow_ips |
| |
| if "ntp" in service_names: |
| ntp_client_allow = [] |
| |
| for prefix in NBPrefix.all_prefixes().values(): |
| if prefix.data.description: |
| ntp_client_allow.append(prefix.data.prefix) |
| |
| if ntp_client_allow: |
| self.extra_config["ntp_client_allow"] = ntp_client_allow |
| |
| return self.extra_config |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDevice(NBAssignedObject): |
| """ |
| Wraps a single Netbox device |
| Also caches all known devices in a class variable (devs) |
| """ |
| |
| objects = dict() |
| |
| def __init__(self, data): |
| |
| super().__init__(data) |
| self.objects[self.id] = self |
| |
| @property |
| def type(self): |
| return "NBDevice" |
| |
| def get_interfaces(self): |
| if not self.interfaces: |
| self.interfaces = netboxapi.dcim.interfaces.filter(device_id=self.id) |
| |
| return self.interfaces |
| |
| @classmethod |
| def get_by_id(cls, obj_id): |
| obj = cls.objects.get(obj_id, None) |
| obj = obj or NBDevice(netboxapi.dcim.devices.get(obj_id)) |
| |
| return obj |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBVirtualMachine(NBAssignedObject): |
| """ |
| VM equivalent of NBDevice |
| """ |
| |
| objects = dict() |
| |
| def __init__(self, data): |
| |
| super().__init__(data) |
| self.objects[self.id] = self |
| |
| @property |
| def type(self): |
| return "NBVirtualMachine" |
| |
| def get_interfaces(self): |
| if not self.interfaces: |
| self.interfaces = netboxapi.virtualization.interfaces.filter( |
| virtual_machine_id=self.id |
| ) |
| |
| return self.interfaces |
| |
| @classmethod |
| def get_by_id(cls, obj_id): |
| obj = cls.objects.get(obj_id, None) |
| obj = obj or NBVirtualMachine( |
| netboxapi.virtualization.virtual_machines.get(obj_id) |
| ) |
| |
| return obj |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDNSForwardZone: |
| |
| fwd_zones = {} |
| |
| def __init__(self, prefix): |
| |
| self.domain_extension = prefix.domain_extension |
| |
| self.a_recs = {} |
| self.cname_recs = {} |
| self.srv_recs = {} |
| self.ns_recs = [] |
| self.txt_recs = {} |
| |
| if prefix.dhcp_range: |
| self.create_dhcp_fwd(prefix.dhcp_range) |
| |
| for ip, ao in prefix.aos.items(): |
| self.add_ao_records(prefix, ip, ao) |
| |
| for ip, res in prefix.reserved_ips.items(): |
| self.add_reserved(ip, res) |
| |
| # reqquired for the add_fwd_cname function below |
| if callable(getattr(prefix, "parent")): |
| parent_prefix = prefix.parent() |
| |
| if parent_prefix: |
| self.merge_parent_prefix(parent_prefix, prefix) |
| |
| self.fwd_zones[self.domain_extension] = self |
| |
| def __repr__(self): |
| return str( |
| { |
| "a": self.a_recs, |
| "cname": self.cname_recs, |
| "ns": self.ns_recs, |
| "srv": self.srv_recs, |
| "txt": self.txt_recs, |
| } |
| ) |
| |
| @classmethod |
| def add_fwd_cname(cls, cname, fqdn_dest): |
| """ |
| Add an arbitrary CNAME (and possibly create the fwd zone if needed) pointing |
| at a FQDN destination name. It's used to support the per-IP "DNS name" field in NetBox |
| Note that the NS record |
| """ |
| |
| try: |
| fqdn_split = re.compile(r"([a-z]+)\.([a-z.]+)\.") |
| (short_name, extension) = fqdn_split.match(cname).groups() |
| |
| except AttributeError: |
| logger.warning( |
| "Invalid DNS CNAME: '%s', must be in FQDN format: 'host.example.com.', ignored", |
| cname, |
| ) |
| return |
| |
| fake_prefix = AttrDict( |
| { |
| "domain_extension": extension, |
| "dhcp_range": None, |
| "aos": {}, |
| "reserved_ips": {}, |
| "parent": None, |
| } |
| ) |
| |
| fwd_zone = cls.get_fwd_zone(fake_prefix) |
| |
| fwd_zone.cname_recs[short_name] = fqdn_dest |
| |
| @classmethod |
| def get_fwd_zone(cls, prefix): |
| if prefix.domain_extension in cls.fwd_zones: |
| return cls.fwd_zones[prefix.domain_extension] |
| |
| return NBDNSForwardZone(prefix) |
| |
| @classmethod |
| def all_fwd_zones(cls): |
| return cls.fwd_zones |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "a": node.a_recs, |
| "cname": node.cname_recs, |
| "ns": node.ns_recs, |
| "srv": node.srv_recs, |
| "txt": node.txt_recs, |
| } |
| ) |
| |
| def fqdn(self, name): |
| return "%s.%s." % (name, self.domain_extension) |
| |
| def create_dhcp_fwd(self, dhcp_range): |
| |
| for ip in netaddr.IPNetwork(dhcp_range).iter_hosts(): |
| self.a_recs["dhcp%03d" % (ip.words[3])] = str(ip) |
| |
| def name_is_duplicate(self, name, target, record_type): |
| """ |
| Returns True if name already exists in the zone as an A or CNAME |
| record, False otherwise |
| """ |
| |
| if name in self.a_recs: |
| logger.warning( |
| "Duplicate DNS record for name %s - A record to '%s', %s record to '%s'", |
| name, |
| self.a_recs[name], |
| record_type, |
| target, |
| ) |
| return True |
| |
| if name in self.cname_recs: |
| logger.warning( |
| "Duplicate DNS record for name %s - CNAME record to '%s', %s record to '%s'", |
| name, |
| self.cname_recs[name], |
| record_type, |
| target, |
| ) |
| return True |
| |
| return False |
| |
| def add_ao_records(self, prefix, ip, ao): |
| |
| name = ao.dns_name(ip, prefix) |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| |
| # add A records |
| if not self.name_is_duplicate(name, target_ip, "A"): |
| self.a_recs[name] = target_ip |
| |
| # add CNAME records that alias to this name |
| for cname in ao.dns_cnames(ip): |
| # check that it isn't a dupe |
| if not self.name_is_duplicate(cname, target_ip, "CNAME"): |
| self.cname_recs[cname] = self.fqdn(name) |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| self.ns_recs.append(self.fqdn(name)) |
| |
| # if a DNS name is set, add it as a CNAME |
| if ao.ips[ip]["dns_name"]: # and ip == aos.data.primary_ip.address: |
| self.add_fwd_cname(ao.ips[ip]["dns_name"], self.fqdn(name)) |
| |
| def add_reserved(self, ip, res): |
| |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| |
| if not self.name_is_duplicate(res["name"], target_ip, "A"): |
| self.a_recs[res["name"]] = target_ip |
| |
| def merge_parent_prefix(self, pprefix, prefix): |
| |
| # only if no NS records exist already |
| if not self.ns_recs: |
| # scan parent prefix for services |
| for ip, ao in pprefix.aos.items(): |
| |
| # Create a DNS within this prefix pointing to out-of-prefix IP |
| # where DNS server is |
| name = ao.dns_name(ip, prefix) |
| target_ip = str( |
| netaddr.IPNetwork(ip).ip |
| ) # make bare IP, not CIDR format |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| self.a_recs[name] = target_ip |
| self.ns_recs.append(self.fqdn(name)) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDNSReverseZones: |
| def __init__(self): |
| |
| self.reverse_zones = {} |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict(node.reverse_zones) |
| |
| @classmethod |
| def canonicalize_rfc1918_prefix(cls, prefix): |
| """ |
| RFC1918 prefixes need to be expanded to their widest canonical range to |
| group all reverse lookup domains together for reverse DNS with NSD/Unbound. |
| """ |
| |
| pnet = netaddr.IPNetwork(str(prefix)) |
| (o1, o2, o3, o4) = pnet.network.words # Split ipv4 octets |
| cidr_plen = pnet.prefixlen |
| |
| if o1 == 10: |
| o2 = o3 = o4 = 0 |
| cidr_plen = 8 |
| elif (o1 == 172 and o2 >= 16 and o2 <= 31) or (o1 == 192 and o2 == 168): |
| o3 = o4 = 0 |
| cidr_plen = 16 |
| |
| return "%s/%d" % (".".join(map(str, [o1, o2, o3, o4])), cidr_plen) |
| |
| def add_prefix(self, prefix): |
| |
| canonical_prefix = self.canonicalize_rfc1918_prefix(prefix) |
| |
| if canonical_prefix in self.reverse_zones: |
| rzone = self.reverse_zones[canonical_prefix] |
| else: |
| rzone = { |
| "ns": [], |
| "ptr": {}, |
| } |
| |
| if prefix.dhcp_range: |
| # FIXME: doesn't check for duplicate entries |
| rzone["ptr"].update(self.create_dhcp_rev(prefix)) |
| |
| for ip, ao in prefix.aos.items(): |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| ao_name = self.get_ao_name(ip, ao, prefix,) |
| rzone["ptr"][target_ip] = ao_name |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| rzone["ns"].append(ao_name) |
| |
| parent_prefix = prefix.parent() |
| |
| if parent_prefix: |
| self.merge_parent_prefix(rzone, parent_prefix) |
| |
| self.reverse_zones[canonical_prefix] = rzone |
| |
| def merge_parent_prefix(self, rzone, pprefix): |
| |
| # parent items |
| p_ns = [] |
| |
| # scan parent prefix for services |
| for ip, ao in pprefix.aos.items(): |
| |
| ao_name = self.get_ao_name(ip, ao, pprefix,) |
| |
| # add NS records if this is a DNS server |
| if ao.has_service(ip, 53, "udp"): |
| p_ns.append(ao_name) |
| |
| # set DNS servers if none in rzone |
| if not rzone["ns"]: |
| rzone["ns"] = p_ns |
| |
| def create_dhcp_rev(self, prefix): |
| |
| dhcp_rzone = {} |
| |
| for ip in netaddr.IPNetwork(prefix.dhcp_range).iter_hosts(): |
| dhcp_rzone[str(ip)] = "dhcp%03d.%s." % ( |
| ip.words[3], |
| prefix.domain_extension, |
| ) |
| |
| return dhcp_rzone |
| |
| def get_ao_name(self, ip, ao, prefix): |
| short_name = ao.dns_name(ip, prefix) |
| return "%s.%s." % (short_name, prefix.domain_extension) |
| |
| |
| @yaml.yaml_object(ydump) |
| class NBDHCPSubnet: |
| def __init__(self, prefix): |
| |
| self.domain_extension = prefix.domain_extension |
| |
| self.subnet = None |
| self.range = None |
| self.first_ip = None |
| self.hosts = [] |
| self.routers = [] |
| self.dns_servers = [] |
| self.dns_search = [] |
| self.tftpd_server = None |
| self.ntp_servers = [] |
| self.dhcpd_interface = None |
| |
| self.add_prefix(prefix) |
| |
| for ip, ao in prefix.aos.items(): |
| self.add_ao(str(ip), ao, prefix) |
| |
| parent_prefix = prefix.parent() |
| |
| if parent_prefix: |
| self.merge_parent_prefix(parent_prefix) |
| |
| def add_prefix(self, prefix): |
| |
| self.subnet = str(prefix) |
| |
| self.first_ip = str(netaddr.IPAddress(netaddr.IPNetwork(str(prefix)).first + 1)) |
| |
| self.dns_search = [prefix.domain_extension] |
| |
| if prefix.dhcp_range: |
| self.range = prefix.dhcp_range |
| |
| for ip, res in prefix.reserved_ips.items(): |
| # routers are reserved IP's that start with 'router" in the IP description |
| if re.match("router", res["description"]): |
| router = {"ip": str(netaddr.IPNetwork(ip).ip)} |
| |
| if ( |
| "rfc3442routes" in res["custom_fields"] |
| and res["custom_fields"]["rfc3442routes"] |
| ): |
| # split on whitespace |
| router["rfc3442routes"] = re.split( |
| r"\s+", res["custom_fields"]["rfc3442routes"] |
| ) |
| |
| self.routers.append(router) |
| |
| # set first IP to router if not set otherwise. |
| if not self.routers: |
| router = {"ip": self.first_ip} |
| |
| self.routers.append(router) |
| |
| def add_ao(self, ip, ao, prefix): |
| |
| target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format |
| |
| # find the DHCP interface if it's this IP |
| if target_ip == self.first_ip: |
| self.dhcpd_interface = ao.interfaces_by_ip[ip].name |
| |
| name = ao.dns_name(ip, prefix) |
| |
| # add only devices that have a macaddr for this IP |
| if ip in ao.interfaces_by_ip: |
| |
| mac_addr = dict(ao.interfaces_by_ip[ip]).get("mac_address") |
| |
| if mac_addr and mac_addr.strip(): # if exists and not blank |
| self.hosts.append( |
| {"name": name, "ip_addr": target_ip, "mac_addr": mac_addr.lower()} |
| ) |
| |
| # add dns servers |
| if ao.has_service(ip, 53, "udp"): |
| self.dns_servers.append(target_ip) |
| |
| # add tftp server |
| if ao.has_service(ip, 69, "udp"): |
| if not self.tftpd_server: |
| self.tftpd_server = target_ip |
| else: |
| logger.warning( |
| "Duplicate TFTP servers in prefix, using first of %s and %s", |
| self.tftpd_server, |
| target_ip, |
| ) |
| |
| # add NTP servers |
| if ao.has_service(ip, 123, "udp"): |
| self.ntp_servers.append(target_ip) |
| |
| def merge_parent_prefix(self, pprefix): |
| |
| # parent items |
| p_dns_servers = [] |
| p_tftpd_server = None |
| p_ntp_servers = [] |
| |
| # scan parent prefix for services |
| for ip, ao in pprefix.aos.items(): |
| |
| target_ip = str(netaddr.IPNetwork(ip).ip) |
| |
| # add dns servers |
| if ao.has_service(ip, 53, "udp"): |
| p_dns_servers.append(target_ip) |
| |
| # add tftp server |
| if ao.has_service(ip, 69, "udp"): |
| if not p_tftpd_server: |
| p_tftpd_server = target_ip |
| else: |
| logger.warning( |
| "Duplicate TFTP servers in parent prefix, using first of %s and %s", |
| p_tftpd_server, |
| target_ip, |
| ) |
| |
| # add NTP servers |
| if ao.has_service(ip, 123, "udp"): |
| p_ntp_servers.append(target_ip) |
| |
| # merge if doesn't exist in prefix |
| if not self.dns_servers: |
| self.dns_servers = p_dns_servers |
| |
| if not self.tftpd_server: |
| self.tftpd_server = p_tftpd_server |
| |
| if not self.ntp_servers: |
| self.ntp_servers = p_ntp_servers |
| |
| @classmethod |
| def to_yaml(cls, representer, node): |
| return representer.represent_dict( |
| { |
| "subnet": node.subnet, |
| "range": node.range, |
| "routers": node.routers, |
| "hosts": node.hosts, |
| "dns_servers": node.dns_servers, |
| "dns_search": node.dns_search, |
| "tftpd_server": node.tftpd_server, |
| "ntp_servers": node.ntp_servers, |
| } |
| ) |