Refactor nbhelper
Change-Id: I69d10d164fac3eb319e072447a520905880c31dd
diff --git a/scripts/base_edgeconfig.yaml b/scripts/base_edgeconfig.yaml
index 3cdcbf6..b462973 100644
--- a/scripts/base_edgeconfig.yaml
+++ b/scripts/base_edgeconfig.yaml
@@ -1,19 +1,8 @@
---
# this is copied into every edgeconfig
-netprep_router: true
netprep_netplan_file: "02-pronto"
-tftpd_files:
- - "undionly.kpxe"
-
-vhosts:
- - name: "default"
- default_server: true
- autoindex: true
-
-acme_username: "www-data" # make independent of the acme role
-
userlist:
- username: terraform
comment: "ONF Terraform User"
diff --git a/scripts/edgeconfig.py b/scripts/edgeconfig.py
index fc49606..7034f6c 100644
--- a/scripts/edgeconfig.py
+++ b/scripts/edgeconfig.py
@@ -14,7 +14,7 @@
import nbhelper
import os
-from ruamel import yaml
+from ruamel.yaml import YAML
# main function that calls other functions
if __name__ == "__main__":
@@ -32,46 +32,40 @@
}
args = nbhelper.initialize(extra_args)
- tenant = nbhelper.NBTenant()
+ tenant = nbhelper.Tenant()
# use base_config for additional items
- base_yaml = yaml.safe_load(args.base_config.read())
+ yaml = YAML(typ="rt")
+ base_yaml = yaml.load(args.base_config.read())
- dhcpd_subnets = []
dhcpd_interfaces = []
- # reverse zones aggregate across RFC1918 IP prefix
- dns_reverse_zones = nbhelper.NBDNSReverseZones()
- for prefix in tenant.get_prefixes().values():
+ # TODO
+ # dhcpd_if = dhcpd_subnet.dhcpd_interface
+ dns_forward_zones = nbhelper.service.dnsFowardZoneConfigGenerator()
+ dns_reverse_zones = nbhelper.service.dnsReverseZoneConfigGenerator()
+ dhcpd_subnets = nbhelper.service.dhcpSubnetConfigGenerator()
- nbhelper.NBDNSForwardZone.get_fwd_zone(prefix)
- dns_reverse_zones.add_prefix(prefix)
- dhcpd_subnet = nbhelper.NBDHCPSubnet(prefix)
- dhcpd_if = dhcpd_subnet.dhcpd_interface
-
- if dhcpd_if and dhcpd_if not in dhcpd_interfaces:
- dhcpd_interfaces.append(dhcpd_if)
-
- dhcpd_subnets.append(dhcpd_subnet)
-
- for device in tenant.get_devices():
+ for device in tenant.get_devices(device_types=["server", "router", "switch"]):
output_yaml = base_yaml.copy()
if (
- isinstance(device, nbhelper.NBDevice)
+ isinstance(device, nbhelper.Device)
and device.data.device_role.slug == "router"
) or (
- isinstance(device, nbhelper.NBVirtualMachine)
+ isinstance(device, nbhelper.VirtualMachine)
and device.data.role.slug == "router"
):
- output_yaml["dns_forward_zones"] = nbhelper.NBDNSForwardZone.all_fwd_zones()
+ output_yaml["dns_forward_zones"] = dns_forward_zones
output_yaml["dns_reverse_zones"] = dns_reverse_zones
output_yaml["dhcpd_subnets"] = dhcpd_subnets
- output_yaml["dhcpd_interfaces"] = dhcpd_interfaces
+ output_yaml["dhcpd_interfaces"] = list(device.internal_interfaces.keys())
output_yaml["netprep_nftables"] = device.generate_nftables()
output_yaml.update(device.generate_extra_config())
+ output_yaml = nbhelper.utils.apply_as_router(output_yaml)
output_yaml["netprep_netplan"] = device.generate_netplan()
- with open("inventory/host_vars/%s.yaml" % device.name, "w") as f:
- f.write(yaml.safe_dump(output_yaml, indent=2))
+ with open("inventory/host_vars/%s.yaml" % device.fullname, "w") as f:
+ # yaml.compact(seq_seq=False, seq_map=False)
+ yaml.dump(output_yaml, f)
diff --git a/scripts/nbhelper.py b/scripts/nbhelper.py
deleted file mode 100644
index d91108f..0000000
--- a/scripts/nbhelper.py
+++ /dev/null
@@ -1,1272 +0,0 @@
-#!/usr/bin/env python3
-
-# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
-# SPDX-License-Identifier: Apache-2.0
-
-# nbhelper.py
-# Helper functions for building YAML output from Netbox API calls
-
-from __future__ import absolute_import
-
-import re
-import sys
-import argparse
-import logging
-import netaddr
-import pynetbox
-import requests
-
-from ruamel import yaml
-
-# create shared logger
-logging.basicConfig()
-logger = logging.getLogger("nbh")
-
-# to dump YAML properly, using internal representers
-# see also:
-# https://stackoverflow.com/questions/54378220/declare-data-type-to-ruamel-yaml-so-that-it-can-represen-serialize-it
-# https://sourceforge.net/p/ruamel-yaml/code/ci/default/tree/representer.py
-
-ydump = yaml.YAML(typ="safe")
-ydump.representer.add_representer(
- pynetbox.models.dcim.Devices, yaml.SafeRepresenter.represent_dict
-)
-ydump.representer.add_representer(
- pynetbox.models.dcim.Interfaces, yaml.SafeRepresenter.represent_dict
-)
-ydump.representer.add_representer(
- pynetbox.models.ipam.Prefixes, yaml.SafeRepresenter.represent_dict
-)
-ydump.representer.add_representer(
- pynetbox.core.response.Record, yaml.SafeRepresenter.represent_dict
-)
-ydump.representer.add_representer(
- pynetbox.models.ipam.IpAddresses, yaml.SafeRepresenter.represent_dict
-)
-ydump.representer.add_representer(
- pynetbox.core.api.Api, yaml.SafeRepresenter.represent_none
-)
-
-netboxapi = None
-netbox_config = None
-netbox_version = None
-
-
-def initialize(extra_args):
- global netboxapi, netbox_config, netbox_version
-
- args = parse_cli_args(extra_args)
- netbox_config = yaml.safe_load(args.settings.read())
-
- for require_args in ["api_endpoint", "token", "tenant_name"]:
- if not netbox_config.get(require_args):
- logger.error("The require argument: %s was not set. Stop." % require_args)
- sys.exit(1)
-
- netboxapi = pynetbox.api(
- netbox_config["api_endpoint"], token=netbox_config["token"], threading=True,
- )
-
- if not netbox_config.get("validate_certs", True):
- session = requests.Session()
- session.verify = False
- netboxapi.http_session = session
-
- netbox_version = netboxapi.version
-
- return args
-
-
-def parse_cli_args(extra_args={}):
- """
- parse CLI arguments. Can add extra arguments with a option:kwargs dict
- """
-
- parser = argparse.ArgumentParser(description="Netbox")
-
- # Positional args
- parser.add_argument(
- "settings",
- type=argparse.FileType("r"),
- help="YAML Ansible inventory file w/NetBox API token",
- )
-
- parser.add_argument(
- "--debug", action="store_true", help="Print additional debugging information"
- )
-
- for ename, ekwargs in extra_args.items():
- parser.add_argument(ename, **ekwargs)
-
- args = parser.parse_args()
- log_level = logging.DEBUG if args.debug else logging.INFO
- logger.setLevel(log_level)
-
- return args
-
-
-def check_name_dns(name):
-
- badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII)
-
- if badchars:
- logger.error(
- "DNS name '%s' has one or more invalid characters: '%s'",
- name,
- badchars.group(0),
- )
- sys.exit(1)
-
- return name.lower()
-
-
-def clean_name_dns(name):
- return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII)
-
-
-class AttrDict(dict):
- def __init__(self, *args, **kwargs):
- super(AttrDict, self).__init__(*args, **kwargs)
- self.__dict__ = self
-
-
-class NBTenant:
- def __init__(self):
- self.name = netbox_config["tenant_name"]
- self.name_segments = netbox_config.get("prefix_segments", 1)
- self.tenant = netboxapi.tenancy.tenants.get(name=self.name)
-
- # NBTenant only keep the resources which owns by it
- self.devices = list()
- self.vms = list()
- self.prefixes = dict()
-
- # Get the Device and Virtual Machines from Netbox API
- for device_data in netboxapi.dcim.devices.filter(tenant=self.tenant.slug):
- device = NBDevice(device_data)
- device.tenant = self
- self.devices.append(device)
-
- for vm_data in netboxapi.virtualization.virtual_machines.filter(
- tenant=self.tenant.slug
- ):
- vm = NBVirtualMachine(vm_data)
- vm.tenant = self
- self.vms.append(vm)
-
- def get_prefixes(self):
- """Get the IP Prefixes owns by current tenant"""
-
- if self.prefixes:
- return self.prefixes
-
- vrf = netboxapi.ipam.vrfs.get(tenant=self.tenant.slug)
- for prefix_data in netboxapi.ipam.prefixes.filter(vrf_id=vrf.id):
- if prefix_data.description:
- self.prefixes[prefix_data.description] = NBPrefix(
- prefix_data, self.name_segments
- )
-
- return self.prefixes
-
- def get_device_by_name(self, name):
- """
- Find the device or VM which belongs to this Tenant,
- If the name wasn't specified, return the management server
- """
-
- for machine in self.devices + self.vms:
- if name and machine.name == name:
- return machine
- elif machine.data["device_role"]["name"] == "Router":
- return machine
-
- ret_msg = (
- "The name '%s' wasn't found in this tenant, "
- + "or can't found any Router in this tenant"
- )
-
- logger.error(ret_msg, name)
- sys.exit(1)
-
- def get_devices(self, device_types=["server", "router"]):
- """
- Get all devices (Router + Server) belong to this Tenant
- """
-
- if not device_types:
- return self.devices + self.vms
-
- ret = []
-
- for machine in self.devices:
- if machine.data.device_role.slug in device_types:
- ret.append(machine)
-
- for vm in self.vms:
- if vm.data.role.slug in device_types:
- ret.append(vm)
-
- return ret
-
-
-@yaml.yaml_object(ydump)
-class NBPrefix:
-
- prefixes = {}
-
- def __init__(self, data, name_segments):
- self.data = data
- self.name_segments = name_segments
- self.domain_extension = check_name_dns(self.data.description)
-
- logger.debug(
- "Preix %s: domain_extension %s, data: %s",
- self.data.prefix,
- self.domain_extension,
- dict(self.data),
- )
-
- # ip centric info
- self.dhcp_range = None
- self.reserved_ips = {}
- self.aos = {}
-
- # build item lists
- self.build_prefix()
- self.prefixes[self.data.prefix] = self
-
- @classmethod
- def all_prefixes(cls):
- return cls.prefixes
-
- @classmethod
- def get_prefix(cls, prefix, name_segments=1):
- if prefix in cls.prefixes:
- return cls.prefixes[prefix]
-
- data = netboxapi.ipam.prefixes.get(prefix=prefix)
- if data:
- return NBPrefix(data, name_segments)
- else:
- raise Exception("The prefix %s wasn't found in Netbox" % prefix)
-
- def __repr__(self):
- return str(self.data.prefix)
-
- @classmethod
- def to_yaml(cls, representer, node):
- return representer.represent_dict(
- {
- "dhcp_range": node.dhcp_range,
- "reserved_ips": node.reserved_ips,
- "aos": node.aos,
- "prefix_data": dict(node.prefix_data),
- }
- )
-
- @classmethod
- def all_reserved_by_ip(cls, ip_addr=""):
- """
- all_reserved_by_ip will return all reserved IP found in prefixes
-
- We have the IP address marked as type 'Reserved' in Prefix,
- This type of IP address is using to define a DHCP range
- """
-
- ret = list()
-
- for prefix in cls.prefixes.values():
- if ip_addr and ip_addr in prefix.aos.keys():
- if prefix.reserved_ips:
- return list(prefix.reserved_ips.values())
- else:
- if prefix.reserved_ips:
- ret.extend(list(prefix.reserved_ips.values()))
-
- return ret
-
- def get_reserved_ips(self):
- """
- Get the reserved IP range (DHCP) in prefix
-
- We have the IP address marked as type 'Reserved' in Prefix,
- This type of IP address is using to define a DHCP range
- """
- if prefix.reserved_ips:
- return list(prefix.reserved_ips.values())
-
- def check_ip_belonging(self, ip):
- """
- Check if an IP address is belonging to this prefix
- """
- return ip in netaddr.IPSet([self.data.prefix])
-
- def parent(self):
- """
- Get the parent prefix to this prefix
-
- FIXME: Doesn't handle multiple layers of prefixes, returns first found
- """
-
- # get all parents of this prefix (include self)
- possible_parents = netboxapi.ipam.prefixes.filter(contains=self.data.prefix)
-
- logger.debug(
- "Prefix %s: possible parents %s", self.data.prefix, possible_parents
- )
-
- # filter out self, return first found
- for pparent in possible_parents:
- if pparent.prefix != self.data.prefix:
- return NBPrefix.get_prefix(pparent.prefix, self.name_segments)
-
- return None
-
- def build_prefix(self):
- """
- find ip information for items (devices/vms, reserved_ips, dhcp_range) in prefix
- """
-
- ips = netboxapi.ipam.ip_addresses.filter(parent=self.data.prefix)
-
- for ip in sorted(ips, key=lambda k: k["address"]):
-
- logger.debug("prefix_item ip: %s, data: %s", ip, dict(ip))
-
- # if it's a DHCP range, add that range to the dev list as prefix_dhcp
- if ip.status.value == "dhcp":
- self.dhcp_range = str(ip.address)
- continue
-
- # reserved IPs
- if ip.status.value == "reserved":
-
- res = {}
- res["name"] = ip.description.lower().split(" ")[0]
- res["description"] = ip.description
- res["ip4"] = str(netaddr.IPNetwork(ip.address))
- res["custom_fields"] = ip.custom_fields
-
- self.reserved_ips[str(ip)] = res
- continue
-
- # devices and VMs
- if ip.assigned_object: # can be null if not assigned to a device/vm
- aotype = ip.assigned_object_type
- if aotype == "dcim.interface":
- self.aos[str(ip)] = NBDevice.get_by_id(
- ip.assigned_object.device.id,
- )
- elif aotype == "virtualization.vminterface":
- self.aos[str(ip)] = NBVirtualMachine.get_by_id(
- ip.assigned_object.virtual_machine.id,
- )
- else:
- logger.error("IP %s has unknown device type: %s", ip, aotype)
- sys.exit(1)
- else:
- logger.warning("Unknown IP type %s, with attributes: %s", ip, dict(ip))
-
-
-@yaml.yaml_object(ydump)
-class NBAssignedObject:
- """
- Assigned Object is either a Device or Virtual Machine, which function
- nearly identically in the NetBox data model.
-
- This parent class holds common functions for those two child classes
- """
-
- objects = dict()
-
- def __init__(self, data):
- self.data = data
-
- # The AssignedObject attributes
- self.tenant = None
- self.id = data.id
- self.name = data.name
- self.ips = dict()
-
- # The NetBox objects related with this AssignedObject
- self.services = None
- self.interfaces = list()
- self.mgmt_interfaces = list()
- self.interfaces_by_ip = dict()
-
- if self.__class__ == NBDevice:
- self.interfaces = netboxapi.dcim.interfaces.filter(device_id=self.id)
- self.services = netboxapi.ipam.services.filter(device_id=self.id)
- ip_addresses = netboxapi.ipam.ip_addresses.filter(device_id=self.id)
- elif self.__class__ == NBVirtualMachine:
- self.interfaces = netboxapi.virtualization.interfaces.filter(
- virtual_machine_id=self.id
- )
- self.services = netboxapi.ipam.services.filter(virtual_machine_id=self.id)
- ip_addresses = netboxapi.ipam.ip_addresses.filter(
- virtual_machine_id=self.id
- )
-
- for ip in ip_addresses:
- self.ips[ip.address] = ip
- if ip.assigned_object and self.__class__ == NBDevice:
- self.interfaces_by_ip[ip.address] = netboxapi.dcim.interfaces.get(
- ip.assigned_object_id
- )
- elif ip.assigned_object and self.__class__ == NBVirtualMachine:
- self.interfaces_by_ip[
- ip.address
- ] = netboxapi.virtualization.interfaces.get(ip.assigned_object_id)
- self.interfaces_by_ip[ip.address].mgmt_only = False
-
- logger.debug(
- "%s id: %d, data: %s, ips: %s"
- % (self.type, self.id, dict(self.data), self.ips)
- )
-
- self.netplan_config = dict()
- self.extra_config = dict()
-
- def __repr__(self):
- return str(dict(self.data))
-
- def dns_name(self, ip, prefix):
- """
- Returns the DNS name for the device at this IP in the prefix
- """
-
- def first_segment_suffix(split_name, suffixes, segments):
- first_seg = "-".join([split_name[0], *suffixes])
-
- if segments > 1:
- name = ".".join([first_seg, *split_name[1:segments]])
- else:
- name = first_seg
-
- return name
-
- # clean/split the device name
- name_split = clean_name_dns(self.data.name).split(".")
-
- # always add interface suffix to mgmt interfaces
- if self.interfaces_by_ip[ip].mgmt_only:
- return first_segment_suffix(
- name_split, [self.interfaces_by_ip[ip].name], prefix.name_segments
- )
-
- # find all IP's for this device in the prefix that aren't mgmt interfaces
- prefix_ips = []
- for s_ip in self.ips:
- if s_ip in prefix.aos and not self.interfaces_by_ip[s_ip].mgmt_only:
- prefix_ips.append(s_ip)
-
- # name to use when only one IP address for device in a prefix
- simple_name = ".".join(name_split[0 : prefix.name_segments])
-
- # if more than one non-mgmt IP in prefix
- if len(prefix_ips) > 1:
-
- # use bare name if primary IP address
- try: # skip if no primary_ip.address
- if ip == self.data.primary_ip.address:
- return simple_name
- except AttributeError:
- pass
-
- # else, suffix with the interface name, and the last octet of IP address
- return first_segment_suffix(
- name_split,
- [
- self.interfaces_by_ip[ip].name,
- str(netaddr.IPNetwork(ip).ip.words[3]),
- ],
- prefix.name_segments,
- )
-
- # simplest case - only one IP in prefix, return simple_name
- return simple_name
-
- def dns_cnames(self, ip):
- """
- returns a list of cnames for this object, based on IP matches
- """
-
- cnames = []
-
- for service in self.services:
-
- # if not assigned to any IP's, service is on all IPs
- if not service.ipaddresses:
- cnames.append(service.name)
- continue
-
- # If assigned to an IP, only create a CNAME on that IP
- for service_ip in service.ipaddresses:
- if ip == service_ip.address:
- cnames.append(service.name)
-
- return cnames
-
- def has_service(self, cidr_ip, port, protocol):
- """
- Return True if this AO has a service using specific port and protocol combination
- """
-
- if (
- cidr_ip in self.interfaces_by_ip
- and not self.interfaces_by_ip[cidr_ip].mgmt_only
- ):
- for service in self.services:
- if service.port == port and service.protocol.value == protocol:
- return True
-
- return False
-
- def primary_iface(self):
- """
- Returns the interface data for the device that has the primary_ip
- """
-
- if self.data.primary_ip:
- return self.interfaces_by_ip[self.data.primary_ip.address]
-
- return None
-
- @property
- def type(self):
- return "AssignedObject"
-
- @classmethod
- def get_by_id(cls, obj_id):
- raise Exception("not implemented")
-
- @classmethod
- def all_objects(cls):
- return cls.objects
-
- @classmethod
- def to_yaml(cls, representer, node):
- return representer.represent_dict(
- {
- "data": node.data,
- "services": node.services,
- "ips": node.ips,
- "interfaces_by_ip": node.interfaces_by_ip,
- }
- )
-
- def generate_netplan(self):
- """
- Get the interface config of specific server belongs to this tenant
- """
-
- if self.netplan_config:
- return self.netplan_config
-
- if not self.data:
- logger.error(
- "{type} {name} doesn't have data yet.".format(
- type=self.type, name=self.name
- )
- )
- sys.exit(1)
-
- primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
- primary_if = self.interfaces_by_ip[primary_ip] if primary_ip else None
-
- self.netplan_config["ethernets"] = dict()
-
- if (isinstance(self, NBDevice) and self.data.device_role.name == "Router") or (
- isinstance(self, NBVirtualMachine) and self.data.role.name == "Router"
- ):
- for address, interface in self.interfaces_by_ip.items():
- if interface.mgmt_only is True or str(interface.type) == "Virtual":
- continue
-
- if not any(
- [
- p.check_ip_belonging(address)
- for p in self.tenant.prefixes.values()
- ]
- ):
- continue
-
- self.netplan_config["ethernets"].setdefault(interface.name, {})
- self.netplan_config["ethernets"][interface.name].setdefault(
- "addresses", []
- ).append(address)
-
- elif isinstance(self, NBDevice) and self.data.device_role.name == "Server":
- if primary_if:
- self.netplan_config["ethernets"][primary_if.name] = {
- "dhcp4": "yes",
- "dhcp4-overrides": {"route-metric": 100},
- }
-
- for physical_if in filter(
- lambda i: str(i.type) != "Virtual"
- and i != primary_if
- and i.mgmt_only is False,
- self.interfaces,
- ):
- self.netplan_config["ethernets"][physical_if.name] = {
- "dhcp4": "yes",
- "dhcp4-overrides": {"route-metric": 200},
- }
- else:
- # Exclude the device type which is not Router and Server
- return None
-
- # Get interfaces own by AssignedObject and is virtual (VLAN interface)
- for virtual_if in filter(lambda i: str(i.type) == "Virtual", self.interfaces):
- if "vlans" not in self.netplan_config:
- self.netplan_config["vlans"] = dict()
-
- if not virtual_if.tagged_vlans:
- # If a virtual interface doesn't have tagged VLAN, skip
- continue
-
- # vlan_object_id is the "id" on netbox, it's different from known VLAN ID
- vlan_object_id = virtual_if.tagged_vlans[0].id
- vlan_object = netboxapi.ipam.vlans.get(vlan_object_id)
- virtual_if_ips = netboxapi.ipam.ip_addresses.filter(
- interface_id=virtual_if.id
- )
-
- routes = []
- for ip in virtual_if_ips:
- reserved_ips = NBPrefix.all_reserved_by_ip(str(ip))
- for reserved_ip in reserved_ips:
- destination = reserved_ip["custom_fields"].get("rfc3442routes", "")
- if destination:
- for dest_ip in destination.split():
- new_route = {
- "to": dest_ip,
- "via": str(netaddr.IPNetwork(reserved_ip["ip4"]).ip),
- "metric": 100,
- }
- if new_route not in routes:
- routes.append(new_route)
-
- self.netplan_config["vlans"][virtual_if.name] = {
- "id": vlan_object.vid,
- "link": virtual_if.label,
- "addresses": [ip.address for ip in virtual_if_ips],
- }
-
- # Only the fabric virtual interface will need to route to other network segments
- if routes and "fab" in virtual_if.name:
- self.netplan_config["vlans"][virtual_if.name]["routes"] = routes
-
- return self.netplan_config
-
- def generate_nftables(self):
-
- ret = dict()
-
- primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
- external_if = self.interfaces_by_ip[primary_ip] if primary_ip else None
- internal_if = None
-
- if external_if is None:
- logger.error("The primary interface wasn't set for device %s", self.name)
- sys.exit(1)
-
- for intf in filter(
- lambda i: str(i.type) != "Virtual" and i.mgmt_only is False,
- self.interfaces_by_ip.values(),
- ):
- if intf.id != external_if.id:
- internal_if = intf
- break
-
- ret["external_if"] = external_if.name
- ret["internal_if"] = internal_if.name
-
- if self.services:
- ret["services"] = list()
-
- for service in self.services:
- ret["services"].append(
- {
- "name": service.name,
- "protocol": service.protocol.value,
- "port": service.port,
- }
- )
-
- # Only management server needs to be configured the whitelist netrange of internal interface
- if self.data.device_role.name == "Router":
- ret["allow_subnets"] = list()
- ret["ue_routing"] = dict()
- ret["ue_routing"]["ue_subnets"] = self.data.config_context["ue_subnets"]
- for prefix in NBPrefix.all_prefixes().values():
- if prefix.data.description:
- ret["allow_subnets"].append(prefix.data.prefix)
-
- if "fab" in prefix.data.description:
- ret["ue_routing"].setdefault("src_subnets", [])
- ret["ue_routing"]["src_subnets"].append(prefix.data.prefix)
-
- if (
- not ret["ue_routing"].get("snat_addr")
- and "fab" in prefix.data.description
- ):
- for ip, device in prefix.aos.items():
- if device.name == self.name:
- ret["ue_routing"]["snat_addr"] = str(
- netaddr.IPNetwork(ip).ip
- )
- break
-
- return ret
-
- def generate_extra_config(self):
- """
- Generate the extra configs which need in management server configuration
- This function should only be called when the device role is "Router"
- """
-
- if self.extra_config:
- return self.extra_config
-
- primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
-
- service_names = list(map(lambda x: x.name, self.services))
-
- if "dns" in service_names:
- unbound_listen_ips = []
- unbound_allow_ips = []
-
- for ip, intf in self.interfaces_by_ip.items():
- if ip != primary_ip and intf.mgmt_only == False:
- unbound_listen_ips.append(ip)
-
- for prefix in NBPrefix.all_prefixes().values():
- if prefix.data.description:
- unbound_allow_ips.append(prefix.data.prefix)
-
- if unbound_listen_ips:
- self.extra_config["unbound_listen_ips"] = unbound_listen_ips
-
- if unbound_allow_ips:
- self.extra_config["unbound_allow_ips"] = unbound_allow_ips
-
- if "ntp" in service_names:
- ntp_client_allow = []
-
- for prefix in NBPrefix.all_prefixes().values():
- if prefix.data.description:
- ntp_client_allow.append(prefix.data.prefix)
-
- if ntp_client_allow:
- self.extra_config["ntp_client_allow"] = ntp_client_allow
-
- return self.extra_config
-
-
-@yaml.yaml_object(ydump)
-class NBDevice(NBAssignedObject):
- """
- Wraps a single Netbox device
- Also caches all known devices in a class variable (devs)
- """
-
- objects = dict()
-
- def __init__(self, data):
-
- super().__init__(data)
- self.objects[self.id] = self
-
- @property
- def type(self):
- return "NBDevice"
-
- def get_interfaces(self):
- if not self.interfaces:
- self.interfaces = netboxapi.dcim.interfaces.filter(device_id=self.id)
-
- return self.interfaces
-
- @classmethod
- def get_by_id(cls, obj_id):
- obj = cls.objects.get(obj_id, None)
- obj = obj or NBDevice(netboxapi.dcim.devices.get(obj_id))
-
- return obj
-
-
-@yaml.yaml_object(ydump)
-class NBVirtualMachine(NBAssignedObject):
- """
- VM equivalent of NBDevice
- """
-
- objects = dict()
-
- def __init__(self, data):
-
- super().__init__(data)
- self.objects[self.id] = self
-
- @property
- def type(self):
- return "NBVirtualMachine"
-
- def get_interfaces(self):
- if not self.interfaces:
- self.interfaces = netboxapi.virtualization.interfaces.filter(
- virtual_machine_id=self.id
- )
-
- return self.interfaces
-
- @classmethod
- def get_by_id(cls, obj_id):
- obj = cls.objects.get(obj_id, None)
- obj = obj or NBVirtualMachine(
- netboxapi.virtualization.virtual_machines.get(obj_id)
- )
-
- return obj
-
-
-@yaml.yaml_object(ydump)
-class NBDNSForwardZone:
-
- fwd_zones = {}
-
- def __init__(self, prefix):
-
- self.domain_extension = prefix.domain_extension
-
- self.a_recs = {}
- self.cname_recs = {}
- self.srv_recs = {}
- self.ns_recs = []
- self.txt_recs = {}
-
- if prefix.dhcp_range:
- self.create_dhcp_fwd(prefix.dhcp_range)
-
- for ip, ao in prefix.aos.items():
- self.add_ao_records(prefix, ip, ao)
-
- for ip, res in prefix.reserved_ips.items():
- self.add_reserved(ip, res)
-
- # reqquired for the add_fwd_cname function below
- if callable(getattr(prefix, "parent")):
- parent_prefix = prefix.parent()
-
- if parent_prefix:
- self.merge_parent_prefix(parent_prefix, prefix)
-
- self.fwd_zones[self.domain_extension] = self
-
- def __repr__(self):
- return str(
- {
- "a": self.a_recs,
- "cname": self.cname_recs,
- "ns": self.ns_recs,
- "srv": self.srv_recs,
- "txt": self.txt_recs,
- }
- )
-
- @classmethod
- def add_fwd_cname(cls, cname, fqdn_dest):
- """
- Add an arbitrary CNAME (and possibly create the fwd zone if needed) pointing
- at a FQDN destination name. It's used to support the per-IP "DNS name" field in NetBox
- Note that the NS record
- """
-
- try:
- fqdn_split = re.compile(r"([a-z]+)\.([a-z.]+)\.")
- (short_name, extension) = fqdn_split.match(cname).groups()
-
- except AttributeError:
- logger.warning(
- "Invalid DNS CNAME: '%s', must be in FQDN format: 'host.example.com.', ignored",
- cname,
- )
- return
-
- fake_prefix = AttrDict(
- {
- "domain_extension": extension,
- "dhcp_range": None,
- "aos": {},
- "reserved_ips": {},
- "parent": None,
- }
- )
-
- fwd_zone = cls.get_fwd_zone(fake_prefix)
-
- fwd_zone.cname_recs[short_name] = fqdn_dest
-
- @classmethod
- def get_fwd_zone(cls, prefix):
- if prefix.domain_extension in cls.fwd_zones:
- return cls.fwd_zones[prefix.domain_extension]
-
- return NBDNSForwardZone(prefix)
-
- @classmethod
- def all_fwd_zones(cls):
- return cls.fwd_zones
-
- @classmethod
- def to_yaml(cls, representer, node):
- return representer.represent_dict(
- {
- "a": node.a_recs,
- "cname": node.cname_recs,
- "ns": node.ns_recs,
- "srv": node.srv_recs,
- "txt": node.txt_recs,
- }
- )
-
- def fqdn(self, name):
- return "%s.%s." % (name, self.domain_extension)
-
- def create_dhcp_fwd(self, dhcp_range):
-
- for ip in netaddr.IPNetwork(dhcp_range).iter_hosts():
- self.a_recs["dhcp%03d" % (ip.words[3])] = str(ip)
-
- def name_is_duplicate(self, name, target, record_type):
- """
- Returns True if name already exists in the zone as an A or CNAME
- record, False otherwise
- """
-
- if name in self.a_recs:
- logger.warning(
- "Duplicate DNS record for name %s - A record to '%s', %s record to '%s'",
- name,
- self.a_recs[name],
- record_type,
- target,
- )
- return True
-
- if name in self.cname_recs:
- logger.warning(
- "Duplicate DNS record for name %s - CNAME record to '%s', %s record to '%s'",
- name,
- self.cname_recs[name],
- record_type,
- target,
- )
- return True
-
- return False
-
- def add_ao_records(self, prefix, ip, ao):
-
- name = ao.dns_name(ip, prefix)
- target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
-
- # add A records
- if not self.name_is_duplicate(name, target_ip, "A"):
- self.a_recs[name] = target_ip
-
- # add CNAME records that alias to this name
- for cname in ao.dns_cnames(ip):
- # check that it isn't a dupe
- if not self.name_is_duplicate(cname, target_ip, "CNAME"):
- self.cname_recs[cname] = self.fqdn(name)
-
- # add NS records if this is a DNS server
- if ao.has_service(ip, 53, "udp"):
- self.ns_recs.append(self.fqdn(name))
-
- # if a DNS name is set, add it as a CNAME
- if ao.ips[ip]["dns_name"]: # and ip == aos.data.primary_ip.address:
- self.add_fwd_cname(ao.ips[ip]["dns_name"], self.fqdn(name))
-
- def add_reserved(self, ip, res):
-
- target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
-
- if not self.name_is_duplicate(res["name"], target_ip, "A"):
- self.a_recs[res["name"]] = target_ip
-
- def merge_parent_prefix(self, pprefix, prefix):
-
- # only if no NS records exist already
- if not self.ns_recs:
- # scan parent prefix for services
- for ip, ao in pprefix.aos.items():
-
- # Create a DNS within this prefix pointing to out-of-prefix IP
- # where DNS server is
- name = ao.dns_name(ip, prefix)
- target_ip = str(
- netaddr.IPNetwork(ip).ip
- ) # make bare IP, not CIDR format
-
- # add NS records if this is a DNS server
- if ao.has_service(ip, 53, "udp"):
- self.a_recs[name] = target_ip
- self.ns_recs.append(self.fqdn(name))
-
-
-@yaml.yaml_object(ydump)
-class NBDNSReverseZones:
- def __init__(self):
-
- self.reverse_zones = {}
-
- @classmethod
- def to_yaml(cls, representer, node):
- return representer.represent_dict(node.reverse_zones)
-
- @classmethod
- def canonicalize_rfc1918_prefix(cls, prefix):
- """
- RFC1918 prefixes need to be expanded to their widest canonical range to
- group all reverse lookup domains together for reverse DNS with NSD/Unbound.
- """
-
- pnet = netaddr.IPNetwork(str(prefix))
- (o1, o2, o3, o4) = pnet.network.words # Split ipv4 octets
- cidr_plen = pnet.prefixlen
-
- if o1 == 10:
- o2 = o3 = o4 = 0
- cidr_plen = 8
- elif (o1 == 172 and o2 >= 16 and o2 <= 31) or (o1 == 192 and o2 == 168):
- o3 = o4 = 0
- cidr_plen = 16
-
- return "%s/%d" % (".".join(map(str, [o1, o2, o3, o4])), cidr_plen)
-
- def add_prefix(self, prefix):
-
- canonical_prefix = self.canonicalize_rfc1918_prefix(prefix)
-
- if canonical_prefix in self.reverse_zones:
- rzone = self.reverse_zones[canonical_prefix]
- else:
- rzone = {
- "ns": [],
- "ptr": {},
- }
-
- if prefix.dhcp_range:
- # FIXME: doesn't check for duplicate entries
- rzone["ptr"].update(self.create_dhcp_rev(prefix))
-
- for ip, ao in prefix.aos.items():
- target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
- ao_name = self.get_ao_name(ip, ao, prefix,)
- rzone["ptr"][target_ip] = ao_name
-
- # add NS records if this is a DNS server
- if ao.has_service(ip, 53, "udp"):
- rzone["ns"].append(ao_name)
-
- parent_prefix = prefix.parent()
-
- if parent_prefix:
- self.merge_parent_prefix(rzone, parent_prefix)
-
- self.reverse_zones[canonical_prefix] = rzone
-
- def merge_parent_prefix(self, rzone, pprefix):
-
- # parent items
- p_ns = []
-
- # scan parent prefix for services
- for ip, ao in pprefix.aos.items():
-
- ao_name = self.get_ao_name(ip, ao, pprefix,)
-
- # add NS records if this is a DNS server
- if ao.has_service(ip, 53, "udp"):
- p_ns.append(ao_name)
-
- # set DNS servers if none in rzone
- if not rzone["ns"]:
- rzone["ns"] = p_ns
-
- def create_dhcp_rev(self, prefix):
-
- dhcp_rzone = {}
-
- for ip in netaddr.IPNetwork(prefix.dhcp_range).iter_hosts():
- dhcp_rzone[str(ip)] = "dhcp%03d.%s." % (
- ip.words[3],
- prefix.domain_extension,
- )
-
- return dhcp_rzone
-
- def get_ao_name(self, ip, ao, prefix):
- short_name = ao.dns_name(ip, prefix)
- return "%s.%s." % (short_name, prefix.domain_extension)
-
-
-@yaml.yaml_object(ydump)
-class NBDHCPSubnet:
- def __init__(self, prefix):
-
- self.domain_extension = prefix.domain_extension
-
- self.subnet = None
- self.range = None
- self.first_ip = None
- self.hosts = []
- self.routers = []
- self.dns_servers = []
- self.dns_search = []
- self.tftpd_server = None
- self.ntp_servers = []
- self.dhcpd_interface = None
-
- self.add_prefix(prefix)
-
- for ip, ao in prefix.aos.items():
- self.add_ao(str(ip), ao, prefix)
-
- parent_prefix = prefix.parent()
-
- if parent_prefix:
- self.merge_parent_prefix(parent_prefix)
-
- def add_prefix(self, prefix):
-
- self.subnet = str(prefix)
-
- self.first_ip = str(netaddr.IPAddress(netaddr.IPNetwork(str(prefix)).first + 1))
-
- self.dns_search = [prefix.domain_extension]
-
- if prefix.dhcp_range:
- self.range = prefix.dhcp_range
-
- for ip, res in prefix.reserved_ips.items():
- # routers are reserved IP's that start with 'router" in the IP description
- if re.match("router", res["description"]):
- router = {"ip": str(netaddr.IPNetwork(ip).ip)}
-
- if (
- "rfc3442routes" in res["custom_fields"]
- and res["custom_fields"]["rfc3442routes"]
- ):
- # split on whitespace
- router["rfc3442routes"] = re.split(
- r"\s+", res["custom_fields"]["rfc3442routes"]
- )
-
- self.routers.append(router)
-
- # set first IP to router if not set otherwise.
- if not self.routers:
- router = {"ip": self.first_ip}
-
- self.routers.append(router)
-
- def add_ao(self, ip, ao, prefix):
-
- target_ip = str(netaddr.IPNetwork(ip).ip) # make bare IP, not CIDR format
-
- # find the DHCP interface if it's this IP
- if target_ip == self.first_ip:
- self.dhcpd_interface = ao.interfaces_by_ip[ip].name
-
- name = ao.dns_name(ip, prefix)
-
- # add only devices that have a macaddr for this IP
- if ip in ao.interfaces_by_ip:
-
- mac_addr = dict(ao.interfaces_by_ip[ip]).get("mac_address")
-
- if mac_addr and mac_addr.strip(): # if exists and not blank
- self.hosts.append(
- {"name": name, "ip_addr": target_ip, "mac_addr": mac_addr.lower()}
- )
-
- # add dns servers
- if ao.has_service(ip, 53, "udp"):
- self.dns_servers.append(target_ip)
-
- # add tftp server
- if ao.has_service(ip, 69, "udp"):
- if not self.tftpd_server:
- self.tftpd_server = target_ip
- else:
- logger.warning(
- "Duplicate TFTP servers in prefix, using first of %s and %s",
- self.tftpd_server,
- target_ip,
- )
-
- # add NTP servers
- if ao.has_service(ip, 123, "udp"):
- self.ntp_servers.append(target_ip)
-
- def merge_parent_prefix(self, pprefix):
-
- # parent items
- p_dns_servers = []
- p_tftpd_server = None
- p_ntp_servers = []
-
- # scan parent prefix for services
- for ip, ao in pprefix.aos.items():
-
- target_ip = str(netaddr.IPNetwork(ip).ip)
-
- # add dns servers
- if ao.has_service(ip, 53, "udp"):
- p_dns_servers.append(target_ip)
-
- # add tftp server
- if ao.has_service(ip, 69, "udp"):
- if not p_tftpd_server:
- p_tftpd_server = target_ip
- else:
- logger.warning(
- "Duplicate TFTP servers in parent prefix, using first of %s and %s",
- p_tftpd_server,
- target_ip,
- )
-
- # add NTP servers
- if ao.has_service(ip, 123, "udp"):
- p_ntp_servers.append(target_ip)
-
- # merge if doesn't exist in prefix
- if not self.dns_servers:
- self.dns_servers = p_dns_servers
-
- if not self.tftpd_server:
- self.tftpd_server = p_tftpd_server
-
- if not self.ntp_servers:
- self.ntp_servers = p_ntp_servers
-
- @classmethod
- def to_yaml(cls, representer, node):
- return representer.represent_dict(
- {
- "subnet": node.subnet,
- "range": node.range,
- "routers": node.routers,
- "hosts": node.hosts,
- "dns_servers": node.dns_servers,
- "dns_search": node.dns_search,
- "tftpd_server": node.tftpd_server,
- "ntp_servers": node.ntp_servers,
- }
- )
diff --git a/scripts/nbhelper/__init__.py b/scripts/nbhelper/__init__.py
new file mode 100644
index 0000000..7755623
--- /dev/null
+++ b/scripts/nbhelper/__init__.py
@@ -0,0 +1,8 @@
+from .utils import initialize
+from .tenant import Tenant
+from .device import Device, VirtualMachine
+from .service import (
+ dhcpSubnetConfigGenerator,
+ dnsFowardZoneConfigGenerator,
+ dnsReverseZoneConfigGenerator,
+)
diff --git a/scripts/nbhelper/container.py b/scripts/nbhelper/container.py
new file mode 100644
index 0000000..6e0cfcf
--- /dev/null
+++ b/scripts/nbhelper/container.py
@@ -0,0 +1,291 @@
+import netaddr
+
+
+class Singleton(type):
+ _instances = {}
+
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
+ return cls._instances[cls]
+
+
+class Container(object):
+ def __init__(self):
+ self.instances = dict()
+
+ def add(self, instance_id, instance):
+ if instance_id in self.instances:
+ raise Exception()
+ self.instances[instance_id] = instance
+
+ def get(self, instance_id):
+ if instance_id not in self.instances:
+ return instance_id
+ return self.instances[instance_id]
+
+
+class AssignedObjectContainer(Container):
+ # AssignedObjectContainer is the parent class
+ # which is with the shared function of Devices/VMs Container
+
+ def all(self):
+ return self.instances.values()
+
+ def getDNSServer(self):
+ for instance in self.instances.values():
+ if "dns" in list(map(str, instance.services)):
+ return instance
+
+ def getDHCPServer(self):
+ for instance in self.instances.values():
+ if "tftp" in list(map(str, instance.services)):
+ return instance
+
+ def getNTPServer(self):
+ for instance in self.instances.values():
+ if "ntp" in list(map(str, instance.services)):
+ return instance
+
+ def getRouters(self):
+ """ Get a list of Devices/VMs which type is Router """
+
+ ret = list()
+ for instance in self.instances.values():
+ if instance.role == "router":
+ ret.append(instance)
+
+ return ret
+
+ def getRouterIPforPrefix(self, prefix):
+ """ Get the first found IP address of exist routers as string """
+
+ for router in self.getRouters():
+ for interface in router.interfaces.values():
+
+ # The mgmt-only interface will not act as gateway
+ if interface["mgmtOnly"]:
+ continue
+
+ for address in interface["addresses"]:
+ if netaddr.IPNetwork(address).ip in netaddr.IPNetwork(
+ prefix.subnet
+ ):
+ return str(netaddr.IPNetwork(address).ip)
+
+
+class DeviceContainer(AssignedObjectContainer, metaclass=Singleton):
+ # DeviceContainer holds all devices fetch from Netbox, device_id as key
+ pass
+
+
+class VirtualMachineContainer(AssignedObjectContainer, metaclass=Singleton):
+ # DeviceContainer holds all devices fetch from Netbox, vm_id as key
+ pass
+
+
+class PrefixContainer(Container, metaclass=Singleton):
+ # PrefixContainer holds all prefixes fetch from Netbox, prefix(str) as key
+
+ def get(self, instance_id, name_segments=1):
+ return super().get(instance_id)
+
+ def all(self):
+ return self.instances.values()
+
+ def all_reserved_ips(self, ip_addr=""):
+ ret = list()
+
+ for prefix in self.instances.values():
+ if ip_addr and netaddr.IPNetwork(ip_addr).ip in netaddr.IPNetwork(
+ prefix.subnet
+ ):
+ if prefix.reserved_ips:
+ return list(prefix.reserved_ips.values())
+ else:
+ if prefix.reserved_ips:
+ ret.extend(list(prefix.reserved_ips.values()))
+
+ return ret
+
+
+class ServiceInfoContainer(Container, metaclass=Singleton):
+ # ServiceInfoContainer holds hosts DNS/DHCP information in Tenant networks
+
+ def __init__(self):
+ super().__init__()
+ self.initialized = False
+
+ def all(self):
+ return self.instances.items()
+
+ def initialize(self):
+ if self.initialized:
+ return
+
+ deviceContainer = DeviceContainer()
+ vmContainer = VirtualMachineContainer()
+ prefixContainer = PrefixContainer()
+
+ for prefix in prefixContainer.all():
+
+ subnet = netaddr.IPNetwork(prefix.subnet)
+ domain = prefix.data.description or ""
+
+ if not domain:
+ continue
+
+ # If prefix has set the Router IP, use this value as defualt,
+ # otherwise find router in this subnet and get its IP address
+ routes = (
+ prefix.routes
+ or deviceContainer.getRouterIPforPrefix(prefix)
+ or vmContainer.getRouterIPforPrefix(prefix)
+ or ""
+ )
+
+ self.instances[prefix.data.description] = {
+ "domain": domain,
+ "subnet": prefix.subnet,
+ "router": routes,
+ "dhcprange": prefix.dhcp_range or "",
+ "hosts": dict(),
+ "dnsServer": None,
+ "ntpServer": None,
+ "dhcpServer": None,
+ }
+
+ # Find the service IP address for this network
+ serviceMap = {
+ "dnsServer": deviceContainer.getDNSServer()
+ or vmContainer.getDNSServer(),
+ "ntpServer": deviceContainer.getNTPServer()
+ or vmContainer.getNTPServer(),
+ "dhcpServer": deviceContainer.getDHCPServer()
+ or vmContainer.getDHCPServer(),
+ }
+
+ # Loop through the device's IP, and set IP to dataset
+ for service, device in serviceMap.items():
+ for interface in device.interfaces.values():
+ for address in interface["addresses"]:
+ address = netaddr.IPNetwork(address).ip
+ if address in subnet:
+ self.instances[domain][service] = {
+ "name": device.name,
+ "address": str(address),
+ }
+ else:
+ for neighbor in prefix.neighbor:
+ neighborSubnet = netaddr.IPNetwork(neighbor.subnet)
+ if address in neighborSubnet:
+ self.instances[domain][service] = {
+ "name": device.name,
+ "address": str(address),
+ }
+ break
+
+ # A dict to check if the device exists in this domain (prefix)
+ deviceInDomain = dict()
+
+ # Gather all Devices/VMs in this tenant, build IP/Hostname map
+ for device in list(deviceContainer.all()) + list(vmContainer.all()):
+
+ # Iterate the interface owned by device
+ for intfName, interface in device.interfaces.items():
+
+ # Iterate the address with each interface
+ for address in interface["addresses"]:
+
+ # Extract the IP address from interface's IP (w/o netmask)
+ address = netaddr.IPNetwork(address).ip
+
+ # Only record the IP address in current subnet,
+ # Skip if the mac_address is blank
+ if address in subnet:
+
+ # deviceInDomain store the interface information like:
+ # {"mgmtserver1": {
+ # "non-mgmt-counter": 1,
+ # "interface": {
+ # "bmc": {
+ # "mgmtOnly": True,
+ # "macaddr": "ca:fe:ba:be:00:00",
+ # "ipaddr": [IPAddress("10.32.4.1")]
+ # }
+ # "eno1": {
+ # "mgmtOnly": False,
+ # "macaddr": "ca:fe:ba:be:11:11",
+ # "ipaddr": [IPAddress("10.32.4.129"), IPAddress("10.32.4.130")]
+ # }
+ # }
+ # "mgmtswitch1": ...
+ # }
+
+ deviceInDomain.setdefault(device.name, dict())
+ deviceInDomain[device.name].setdefault(
+ "non-mgmt-counter", 0
+ )
+ deviceInDomain[device.name].setdefault("interfaces", dict())
+
+ # Set up a interface structure in deviceInDomain[device.name]
+ deviceInDomain[device.name]["interfaces"].setdefault(
+ intfName, dict()
+ )
+ interfaceDict = deviceInDomain[device.name]["interfaces"][
+ intfName
+ ]
+ interfaceDict.setdefault("mgmtOnly", False)
+
+ # Use interface["mac_address"] as the default value, but if the mac_address
+ # is None, that means we are dealing with a virtual interfaces
+ # so we can get the linked interface's mac_address instead
+
+ interfaceDict.setdefault(
+ "mac_address", interface["mac_address"] or
+ device.interfaces[interface["instance"].label]["mac_address"]
+ )
+ interfaceDict.setdefault("ip_addresses", list())
+ interfaceDict["ip_addresses"].append(address)
+
+ # If the interface is mgmtOnly, set the attribute to True
+ # Otherwise, increase the non-mgmt-counter, the counter uses to
+ # find out how many interfaces of this device has IPs on subnet
+ if interface["mgmtOnly"]:
+ interfaceDict["mgmtOnly"] = True
+ else:
+ deviceInDomain[device.name]["non-mgmt-counter"] += 1
+
+ for deviceName, data in deviceInDomain.items():
+
+ nonMgmtCounter = data["non-mgmt-counter"]
+ for intfName, interface in data["interfaces"].items():
+
+ # If current interface doesn't have mac address set, skip
+ if not interface["mac_address"]:
+ continue
+
+ # In the default situation, hostname is deviceName
+ hostname_list = [deviceName]
+
+ # If the condition is -
+ # 1. multiple interfaces show on this subnet
+ # 2. the interface is a management-only interface
+ # then add interface name into hostname
+ if nonMgmtCounter > 1 or interface["mgmtOnly"]:
+ hostname_list.append(intfName)
+
+ # Iterate the IP address owns by current interface,
+ # if the interface has multiple IP addresses,
+ # add last digit to hostname for identifiability
+ for address in interface["ip_addresses"]:
+ hostname = hostname_list.copy()
+ if len(interface["ip_addresses"]) > 1:
+ hostname.append(str(address.words[-1]))
+
+ self.instances[domain]["hosts"][str(address)] = {
+ "hostname": "-".join(hostname),
+ "macaddr": interface["mac_address"].lower(),
+ }
+
+ self.initialized = True
diff --git a/scripts/nbhelper/device.py b/scripts/nbhelper/device.py
new file mode 100644
index 0000000..a88740b
--- /dev/null
+++ b/scripts/nbhelper/device.py
@@ -0,0 +1,441 @@
+#!/usr/bin/env python3
+
+# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
+# SPDX-License-Identifier: Apache-2.0
+
+# device.py
+#
+
+import netaddr
+
+from .utils import logger, clean_name_dns
+from .network import Prefix
+from .container import DeviceContainer, VirtualMachineContainer, PrefixContainer
+
+
+class AssignedObject:
+ """
+ Assigned Object is either a Device or Virtual Machine, which function
+ nearly identically in the NetBox data model.
+
+ This parent class holds common functions for those two child classes
+
+ An assignedObject (device or VM) should have following attributes:
+ - self.data: contains the original copy of data from NetBox
+ - self.id: Device ID or VM ID
+ - self.interfaces: A dictionary contains interfaces belong to this AO
+ the interface dictionary looks like:
+
+ {
+ "eno1": {
+ "address": ["192.168.0.1/24", "192.168.0.2/24"],
+ "instance": <interface_instance>,
+ "isPrimary": True,
+ "mgmtOnly": False,
+ "isVirtual": False
+ }
+ }
+ """
+
+ objects = dict()
+
+ def __init__(self, data):
+ from .utils import netboxapi, netbox_config
+
+ self.data = data
+ self.nbapi = netboxapi
+
+ # The AssignedObject attributes
+ self.id = self.data.id
+ self.tenant = None
+ self.primary_ip = None
+
+ # In Netbox, we use FQDN as the Device name, but in the script,
+ # we use the first segment to be the name of device.
+ # For example, if the device named "mgmtserver1.stage1.menlo" on Netbox,
+ # then we will have "mgmtserver1" as name.
+ self.fullname = self.data.name
+ self.name = self.fullname.split(".")[0]
+
+ # The device role which can be ["server", "router", "switch", ...]
+ self.role = None
+
+ # The NetBox objects related with this AssignedObject
+ self.interfaces = dict()
+ self.services = None
+
+ # Generated configuration for ansible playbooks
+ self.netplan_config = dict()
+ self.extra_config = dict()
+
+ if self.__class__ == Device:
+ self.role = self.data.device_role.slug
+ self.services = self.nbapi.ipam.services.filter(device_id=self.id)
+ interfaces = self.nbapi.dcim.interfaces.filter(device_id=self.id)
+ ip_addresses = self.nbapi.ipam.ip_addresses.filter(device_id=self.id)
+ elif self.__class__ == VirtualMachine:
+ self.role = self.data.role.slug
+ self.services = self.nbapi.ipam.services.filter(virtual_machine_id=self.id)
+ interfaces = self.nbapi.virtualization.interfaces.filter(
+ virtual_machine_id=self.id
+ )
+ ip_addresses = self.nbapi.ipam.ip_addresses.filter(
+ virtual_machine_id=self.id
+ )
+
+ self.primary_ip = self.data.primary_ip
+
+ for interface in interfaces:
+ # The Device's interface structure is different from VM's interface
+ # VM interface doesn't have mgmt_only and type, Therefore,
+ # the default value of mgmtOnly is False, isVirtual is True
+
+ self.interfaces[interface.name] = {
+ "addresses": list(),
+ "mac_address": interface.mac_address,
+ "instance": interface,
+ "isPrimary": False,
+ "mgmtOnly": getattr(interface, "mgmt_only", False),
+ "isVirtual": interface.type.value == "virtual"
+ if hasattr(interface, "type")
+ else True,
+ }
+
+ for address in ip_addresses:
+ interface = self.interfaces[address.assigned_object.name]
+ interface["addresses"].append(address.address)
+
+ # ipam.ip_addresses doesn't have primary tag,
+ # the primary tag is only available is only in the Device.
+ # So we need to compare address to check which one is primary ip
+ if address.address == self.primary_ip.address:
+ interface["isPrimary"] = True
+
+ # mgmt_only = False is a hack for VirtualMachine type
+ if self.__class__ == VirtualMachine:
+ interface["instance"].mgmt_only = False
+
+ def __repr__(self):
+ return str(dict(self.data))
+
+ @property
+ def type(self):
+ return "AssignedObject"
+
+ @property
+ def internal_interfaces(self):
+ """
+ The function internal_interfaces
+ """
+
+ ret = dict()
+ for intfName, interface in self.interfaces.items():
+ if (
+ not interface["isPrimary"]
+ and not interface["mgmtOnly"]
+ and interface["addresses"]
+ ):
+ ret[intfName] = interface
+
+ return ret
+
+ def generate_netplan(self):
+ """
+ Get the interface config of specific server belongs to this tenant
+ """
+
+ if self.netplan_config:
+ return self.netplan_config
+
+ primary_if = None
+ for interface in self.interfaces.values():
+ if interface["isPrimary"] is True:
+ primary_if = interface["instance"]
+
+ if primary_if is None:
+ logger.error("The primary interface wasn't set for device %s", self.name)
+ return dict()
+
+ # Initialize the part of "ethernets" configuration
+ self.netplan_config["ethernets"] = dict()
+
+ # If the current selected device is a Router
+ if (isinstance(self, Device) and self.data.device_role.name == "Router") or (
+ isinstance(self, VirtualMachine) and self.data.role.name == "Router"
+ ):
+ for intfName, interface in self.interfaces.items():
+ if interface["mgmtOnly"] or interface["isVirtual"]:
+ continue
+
+ # Check if this address is public IP address (e.g. "8.8.8.8" on eth0)
+ isExternalAddress = True
+ for prefix in PrefixContainer().all():
+ for address in interface["addresses"]:
+ if address in netaddr.IPSet([prefix.subnet]):
+ isExternalAddress = False
+
+ # If this interface has the public IP address, netplan shouldn't include it
+ if isExternalAddress:
+ continue
+
+ self.netplan_config["ethernets"].setdefault(intfName, {})
+ self.netplan_config["ethernets"][intfName].setdefault(
+ "addresses", []
+ ).append(address)
+
+ # If the current selected device is a Server
+ elif isinstance(self, Device) and self.data.device_role.name == "Server":
+ if primary_if:
+ self.netplan_config["ethernets"][primary_if.name] = {
+ "dhcp4": "yes",
+ "dhcp4-overrides": {"route-metric": 100},
+ }
+
+ for intfName, interface in self.interfaces.items():
+ if (
+ not interface["isVirtual"]
+ and intfName != primary_if.name
+ and not interface["mgmtOnly"]
+ and interface["addresses"]
+ ):
+ self.netplan_config["ethernets"][intfName] = {
+ "dhcp4": "yes",
+ "dhcp4-overrides": {"route-metric": 200},
+ }
+
+ else:
+ # Exclude the device type which is not Router and Server
+ return None
+
+ # Get interfaces own by AssignedObject and is virtual (VLAN interface)
+ for intfName, interface in self.interfaces.items():
+
+ # If the interface is not a virtual interface or
+ # the interface doesn't have VLAN tagged, skip this interface
+ if not interface["isVirtual"] or not interface["instance"].tagged_vlans:
+ continue
+
+ if "vlans" not in self.netplan_config:
+ self.netplan_config["vlans"] = dict()
+
+ vlan_object_id = interface["instance"].tagged_vlans[0].id
+ vlan_object = self.nbapi.ipam.vlans.get(vlan_object_id)
+
+ routes = list()
+ for address in interface["addresses"]:
+
+ for reserved_ip in PrefixContainer().all_reserved_ips(address):
+
+ destination = reserved_ip["custom_fields"].get("rfc3442routes", "")
+ if not destination:
+ continue
+
+ # If interface address is in destination subnet, we don't need this route
+ if netaddr.IPNetwork(address).ip in netaddr.IPNetwork(destination):
+ continue
+
+ for dest_addr in destination.split():
+ new_route = {
+ "to": dest_addr,
+ "via": str(netaddr.IPNetwork(reserved_ip["ip4"]).ip),
+ "metric": 100,
+ }
+
+ if new_route not in routes:
+ routes.append(new_route)
+
+ self.netplan_config["vlans"][intfName] = {
+ "id": vlan_object.vid,
+ "link": interface["instance"].label,
+ "addresses": interface["addresses"],
+ }
+
+ # Only the fabric virtual interface will need to route to other network segments
+ if routes and "fab" in intfName:
+ self.netplan_config["vlans"][intfName]["routes"] = routes
+
+ return self.netplan_config
+
+ def generate_nftables(self):
+
+ ret = dict()
+
+ internal_if = None
+ external_if = None
+
+ # Use isPrimary == True as the identifier to select external interface
+ for interface in self.interfaces.values():
+ if interface["isPrimary"] is True:
+ external_if = interface["instance"]
+
+ if external_if is None:
+ logger.error("The primary interface wasn't set for device %s", self.name)
+ sys.exit(1)
+
+ for interface in self.interfaces.values():
+ # If "isVirtual" set to False and "mgmtOnly" set to False
+ if (
+ not interface["isVirtual"]
+ and not interface["mgmtOnly"]
+ and interface["instance"] is not external_if
+ ):
+ internal_if = interface["instance"]
+ break
+
+ ret["external_if"] = external_if.name
+ ret["internal_if"] = internal_if.name
+
+ if self.services:
+ ret["services"] = list()
+
+ for service in self.services:
+ ret["services"].append(
+ {
+ "name": service.name,
+ "protocol": service.protocol.value,
+ "port": service.port,
+ }
+ )
+
+ # Only management server needs to be configured the whitelist netrange of internal interface
+ if self.data.device_role.name == "Router":
+
+ ret["interface_subnets"] = dict()
+ ret["ue_routing"] = dict()
+ ret["ue_routing"]["ue_subnets"] = self.data.config_context["ue_subnets"]
+
+ # Create the interface_subnets in the configuration
+ # It's using the interface as the key to list IP addresses
+ for intfName, interface in self.interfaces.items():
+ if interface["mgmtOnly"]:
+ continue
+
+ for address in interface["addresses"]:
+ for prefix in PrefixContainer().all():
+ intfAddr = netaddr.IPNetwork(address).ip
+
+ # If interface IP doesn't belong to this prefix, skip
+ if intfAddr not in netaddr.IPNetwork(prefix.subnet):
+ continue
+
+ # If prefix is a parent prefix (parent prefix won't config domain name)
+ # skip to add in interface_subnets
+ if not prefix.data.description:
+ continue
+
+ ret["interface_subnets"].setdefault(intfName, list())
+
+ if prefix.subnet not in ret["interface_subnets"][intfName]:
+ ret["interface_subnets"][intfName].append(prefix.subnet)
+ for neighbor in prefix.neighbor:
+ if neighbor.subnet not in ret["interface_subnets"][intfName]:
+ ret["interface_subnets"][intfName].append(neighbor.subnet)
+
+ for prefix in PrefixContainer().all():
+
+ if "fab" in prefix.data.description:
+ ret["ue_routing"].setdefault("src_subnets", [])
+ ret["ue_routing"]["src_subnets"].append(prefix.data.prefix)
+
+ if (
+ not ret["ue_routing"].get("snat_addr")
+ and "fab" in prefix.data.description
+ ):
+ for interface in self.interfaces.values():
+ for address in interface["addresses"]:
+ if address in netaddr.IPSet([prefix.subnet]):
+ ret["ue_routing"]["snat_addr"] = str(
+ netaddr.IPNetwork(address).ip
+ )
+ break
+
+ return ret
+
+ def generate_extra_config(self):
+ """
+ Generate the extra configs which need in management server configuration
+ This function should only be called when the device role is "Router"
+ """
+
+ if self.extra_config:
+ return self.extra_config
+
+ primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
+
+ service_names = list(map(lambda x: x.name, self.services))
+
+ if "dns" in service_names:
+ unbound_listen_ips = []
+ unbound_allow_ips = []
+
+ for interface in self.interfaces.values():
+ if not interface["isPrimary"] and not interface["mgmtOnly"]:
+ for address in interface["addresses"]:
+ unbound_listen_ips.append(address)
+
+ for prefix in PrefixContainer().all():
+ if prefix.data.description:
+ unbound_allow_ips.append(prefix.data.prefix)
+
+ if unbound_listen_ips:
+ self.extra_config["unbound_listen_ips"] = unbound_listen_ips
+
+ if unbound_allow_ips:
+ self.extra_config["unbound_allow_ips"] = unbound_allow_ips
+
+ if "ntp" in service_names:
+ ntp_client_allow = []
+
+ for prefix in PrefixContainer().all():
+ if prefix.data.description:
+ ntp_client_allow.append(prefix.data.prefix)
+
+ if ntp_client_allow:
+ self.extra_config["ntp_client_allow"] = ntp_client_allow
+
+ return self.extra_config
+
+
+class Device(AssignedObject):
+ """
+ Wraps a single Netbox device
+ Also caches all known devices in a class variable (devs)
+ """
+
+ def __init__(self, data):
+
+ super().__init__(data)
+ DeviceContainer().add(self.id, self)
+
+ @property
+ def type(self):
+ return "Device"
+
+ def get_interfaces(self):
+ if not self.interfaces:
+ self.interfaces = self.nbapi.dcim.interfaces.filter(device_id=self.id)
+
+ return self.interfaces
+
+
+class VirtualMachine(AssignedObject):
+ """
+ VM equivalent of Device
+ """
+
+ def __init__(self, data):
+
+ super().__init__(data)
+ VirtualMachineContainer().add(self.id, self)
+
+ @property
+ def type(self):
+ return "VirtualMachine"
+
+ def get_interfaces(self):
+ if not self.interfaces:
+ self.interfaces = self.nbapi.virtualization.interfaces.filter(
+ virtual_machine_id=self.id
+ )
+
+ return self.interfaces
diff --git a/scripts/nbhelper/network.py b/scripts/nbhelper/network.py
new file mode 100644
index 0000000..5520d48
--- /dev/null
+++ b/scripts/nbhelper/network.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3
+
+# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
+# SPDX-License-Identifier: Apache-2.0
+
+# nbhelper.py
+# Helper functions for building YAML output from Netbox API calls
+
+import netaddr
+
+from .utils import logger, check_name_dns
+from .container import PrefixContainer
+from .container import DeviceContainer, VirtualMachineContainer
+
+
+class Prefix:
+ def __init__(self, data, name_segments):
+ from .utils import netboxapi
+
+ self.data = data
+ self.name_segments = name_segments
+ self.nbapi = netboxapi
+
+ if self.data.description:
+ self.domain_extension = check_name_dns(self.data.description)
+
+ # ip centric info
+ self.subnet = self.data.prefix
+ self.dhcp_range = list()
+ self.routes = list()
+ self.reserved_ips = {}
+
+ self.neighbor = list()
+
+ # build item lists
+ self.build_prefix()
+
+ # Find the neighbor relationship in prefix level
+ self.find_neighbor()
+
+ PrefixContainer().add(self.data.prefix, self)
+
+ def __repr__(self):
+ return str(self.data.prefix)
+
+ def check_ip_belonging(self, ip):
+ """
+ Check if an IP address is belonging to this prefix
+ """
+ return ip in netaddr.IPSet([self.data.prefix])
+
+ def find_neighbor(self):
+
+ parent_prefixes = list()
+ for prefix in PrefixContainer().all():
+ if not prefix.data.description:
+ parent_prefixes.append(prefix)
+
+ for prefix in PrefixContainer().all():
+ if not prefix.data.description:
+ continue
+
+ targetSubnet = netaddr.IPNetwork(prefix.subnet)
+ mySubnet = netaddr.IPNetwork(self.subnet)
+
+ for parent in parent_prefixes:
+ parentSubnet = netaddr.IPNetwork(parent.subnet)
+ if mySubnet in parentSubnet and targetSubnet in parentSubnet:
+ if prefix not in self.neighbor:
+ self.neighbor.append(prefix)
+ if self not in prefix.neighbor:
+ prefix.neighbor.append(self)
+
+
+ def build_prefix(self):
+ """
+ find ip information for items (devices/vms, reserved_ips, dhcp_range) in prefix
+ """
+ ips = self.nbapi.ipam.ip_addresses.filter(parent=self.data.prefix)
+
+ for ip in sorted(ips, key=lambda k: k["address"]):
+
+ logger.debug("prefix_item ip: %s, data: %s", ip, dict(ip))
+
+ # if it's a DHCP range, add that range to the dev list as prefix_dhcp
+ if ip.status.value == "dhcp":
+ self.dhcp_range.append(str(ip.address))
+
+ # reserved IPs
+ if ip.status.value == "reserved":
+ self.reserved_ips[str(ip)] = {
+ "name": ip.description.lower().split(" ")[0],
+ "description": ip.description,
+ "ip4": str(ip.address),
+ "custom_fields": ip.custom_fields,
+ }
+ self.routes.append(
+ {
+ "ip": str(ip.address),
+ "rfc3442routes": [ip.custom_fields.get("rfc3442routes")],
+ }
+ )
diff --git a/scripts/nbhelper/service.py b/scripts/nbhelper/service.py
new file mode 100644
index 0000000..d84f0cb
--- /dev/null
+++ b/scripts/nbhelper/service.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python3
+
+# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
+# SPDX-License-Identifier: Apache-2.0
+
+# service.py
+#
+
+import re
+import netaddr
+
+from .utils import logger, AttrDict
+from .container import ServiceInfoContainer
+
+
+def getIPaddress(addressWithMask):
+ return str(netaddr.IPNetwork(addressWithMask).ip)
+
+
+def dhcpSubnetConfigGenerator():
+ dhcpSubnetConfigs = list()
+
+ serviceInfoContainer = ServiceInfoContainer()
+ serviceInfoContainer.initialize()
+
+ for domainName, domain in serviceInfoContainer.all():
+ subnetConfig = {
+ "dns_search": [domainName],
+ "dns_servers": [domain["dnsServer"]["address"]],
+ "ntp_servers": [domain["ntpServer"]["address"]],
+ "tftp_servers": domain["dhcpServer"]["address"],
+ "range": domain["dhcprange"],
+ "subnet": domain["subnet"],
+ "routers": [{"ip": domain["router"]}],
+ "hosts": list(),
+ }
+
+ for address, host in domain["hosts"].items():
+ subnetConfig["hosts"].append(
+ {
+ "ip_addr": getIPaddress(address),
+ "mac_addr": host["macaddr"],
+ "name": host["hostname"],
+ }
+ )
+
+ subnetConfig["hosts"] = sorted(
+ subnetConfig["hosts"], key=lambda x: int(x["ip_addr"].split(".")[-1])
+ )
+ dhcpSubnetConfigs.append(subnetConfig)
+
+ return dhcpSubnetConfigs
+
+
+def dnsFowardZoneConfigGenerator():
+ def getDomainNameByIP(ip_address):
+ """
+ getDomainNameByIP will return the corresponding domain name of an IP address
+ In the ntpServer, dhcpServer, dnsServer we only have the IP addresses of them,
+ But we can use the dhcp subnet configuration to find the FQDN
+ """
+ dhcpSubnetConfigs = dhcpSubnetConfigGenerator()
+
+ for domain in dhcpSubnetConfigs:
+ domainName = domain["dns_search"][0]
+ for host in domain["hosts"]:
+ if ip_address == host["ip_addr"]:
+ return f"{host['name']}.{domainName}."
+
+
+ dnsForwardZoneConfigs = dict()
+
+ serviceInfoContainer = ServiceInfoContainer()
+ serviceInfoContainer.initialize()
+
+ for domainName, domain in serviceInfoContainer.all():
+ forwardZoneConfig = {
+ "cname": dict(),
+ "a": dict(),
+ "ns": list(),
+ "srv": dict(),
+ "txt": dict(),
+ }
+
+ # Get the services set to this Tenant network
+ ntpServer = domain["ntpServer"] or None
+ dhcpServer = domain["dhcpServer"] or None
+ dnsServer = domain["dnsServer"] or None
+
+ # If service exists, set the FQDN to CNAME records
+ if ntpServer:
+ forwardZoneConfig["cname"]["dns"] = getDomainNameByIP(ntpServer["address"])
+ if dhcpServer:
+ forwardZoneConfig["cname"]["tftp"] = getDomainNameByIP(dhcpServer["address"])
+ if dnsServer:
+ forwardZoneConfig["cname"]["dns"] = getDomainNameByIP(dnsServer["address"])
+ forwardZoneConfig["ns"].append(getDomainNameByIP(dnsServer["address"]))
+
+ for address, host in domain["hosts"].items():
+ # Add exist IP address into dnsReverseZoneConfigs,
+ hostname = host["hostname"]
+ forwardZoneConfig["a"][hostname] = address
+
+ for address in netaddr.IPSet(domain["dhcprange"]):
+ # If address exists in ServiceInfoContainer's host dictionary,
+ # Use the pre-generated hostname as A record's name
+ hostname = "dhcp%03d" % address.words[-1]
+ forwardZoneConfig["a"][hostname] = str(address)
+
+ dnsForwardZoneConfigs[domainName] = forwardZoneConfig
+
+ return dnsForwardZoneConfigs
+
+
+def dnsReverseZoneConfigGenerator():
+ def canonicalize_rfc1918_prefix(prefix):
+ """
+ RFC1918 prefixes need to be expanded to their widest canonical range to
+ group all reverse lookup domains together for reverse DNS with NSD/Unbound.
+ """
+
+ pnet = netaddr.IPNetwork(prefix)
+ (o1, o2, o3, o4) = pnet.network.words # Split ipv4 octets
+ cidr_plen = pnet.prefixlen
+
+ if o1 == 10:
+ o2 = o3 = o4 = 0
+ cidr_plen = 8
+ elif (o1 == 172 and o2 >= 16 and o2 <= 31) or (o1 == 192 and o2 == 168):
+ o3 = o4 = 0
+ cidr_plen = 16
+
+ return "%s/%d" % (".".join(map(str, [o1, o2, o3, o4])), cidr_plen)
+
+ serviceInfoContainer = ServiceInfoContainer()
+ serviceInfoContainer.initialize()
+
+ # dnsReverseZoneConfigs contains all reverse zone records.
+ dnsReverseZoneConfigs = dict()
+ widedomain = None
+
+ for domainName, domain in serviceInfoContainer.all():
+
+ # Expand network range to group all tenant domains
+ widedomain = widedomain or canonicalize_rfc1918_prefix(domain["subnet"])
+
+ # Create the basic structure of reverse zone config
+ # {"10.0.0.0/8": {"ns": list(), "ptr": dict()}}
+ dnsReverseZoneConfigs.setdefault(widedomain, dict())
+ dnsReverseZoneConfigs[widedomain].setdefault("ns", list())
+ dnsReverseZoneConfigs[widedomain].setdefault("ptr", dict())
+
+ # Get the DNS services set to this Tenant network
+ dnsServer = domain["dnsServer"]["name"] if domain["dnsServer"] else None
+
+ # If service exists, set the FQDN to CNAME records
+ if dnsServer:
+ dnsReverseZoneConfigs[widedomain]["ns"].append(f"{domainName}.{dnsServer}.")
+
+ for address, host in domain["hosts"].items():
+ # Add exist IP address into dnsReverseZoneConfigs,
+ hostname = host["hostname"]
+ dnsReverseZoneConfigs[widedomain]["ptr"][
+ address
+ ] = f"{hostname}.{domainName}."
+
+ for address in netaddr.IPSet(domain["dhcprange"]):
+ # Add DHCP range IP address into dnsReverseZoneConfigs,
+ # Use the pre-generated hostname as A record's name
+ hostname = "dhcp%03d" % address.words[3]
+ dnsReverseZoneConfigs[widedomain]["ptr"][
+ str(address)
+ ] = f"{hostname}.{domainName}."
+
+ dnsReverseZoneConfigs[widedomain]["ptr"] = dict(
+ sorted(
+ dnsReverseZoneConfigs[widedomain]["ptr"].items(),
+ key=lambda x: (int(x[0].split(".")[-2]), int(x[0].split(".")[-1])),
+ )
+ )
+
+ return dnsReverseZoneConfigs
diff --git a/scripts/nbhelper/tenant.py b/scripts/nbhelper/tenant.py
new file mode 100644
index 0000000..4d529a7
--- /dev/null
+++ b/scripts/nbhelper/tenant.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3
+
+# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
+# SPDX-License-Identifier: Apache-2.0
+
+# tenant.py
+# The tenant abstract object of Netbox Object - Tenant
+
+from .utils import logger, netboxapi
+from .device import Device, VirtualMachine
+from .network import Prefix
+
+
+class Tenant:
+ def __init__(self):
+
+ from .utils import netboxapi, netbox_config
+
+ self.nbapi = netboxapi
+ self.name = netbox_config["tenant_name"]
+ self.name_segments = netbox_config.get("prefix_segments", 1)
+
+ self.tenant = self.nbapi.tenancy.tenants.get(name=self.name)
+
+ # Tenant only keep the resources which owns by it
+ self.devices = list()
+ self.vms = list()
+ self.prefixes = dict()
+
+ # Get the Device and Virtual Machines from Netbox API
+ for device_data in self.nbapi.dcim.devices.filter(tenant=self.tenant.slug):
+ device = Device(device_data)
+ device.tenant = self
+ self.devices.append(device)
+
+ for vm_data in self.nbapi.virtualization.virtual_machines.filter(
+ tenant=self.tenant.slug
+ ):
+ vm = VirtualMachine(vm_data)
+ vm.tenant = self
+ self.vms.append(vm)
+
+ vrf = self.nbapi.ipam.vrfs.get(tenant=self.tenant.slug)
+ for prefix_data in self.nbapi.ipam.prefixes.filter(vrf_id=vrf.id):
+ prefix = Prefix(prefix_data, self.name_segments)
+ if prefix_data.description:
+ self.prefixes[prefix_data.prefix] = prefix
+
+ def get_prefixes(self):
+ """Get the IP Prefixes owns by current tenant"""
+
+ return self.prefixes
+
+ def get_device_by_name(self, name):
+ """
+ Find the device or VM which belongs to this Tenant,
+ If the name wasn't specified, return the management server
+ """
+
+ for machine in self.devices + self.vms:
+ if name and machine.name == name:
+ return machine
+ elif machine.data["device_role"]["name"] == "Router":
+ return machine
+
+ ret_msg = (
+ "The name '%s' wasn't found in this tenant, "
+ + "or can't found any Router in this tenant"
+ )
+
+ logger.error(ret_msg, name)
+ sys.exit(1)
+
+ def get_devices(self, device_types=["server", "router"]):
+ """
+ Get all devices (Router + Server) belong to this Tenant
+ """
+
+ if not device_types:
+ return self.devices + self.vms
+
+ ret = []
+
+ for machine in self.devices:
+ if machine.data.device_role.slug in device_types:
+ ret.append(machine)
+
+ for vm in self.vms:
+ if vm.data.role.slug in device_types:
+ ret.append(vm)
+
+ return ret
diff --git a/scripts/nbhelper/utils.py b/scripts/nbhelper/utils.py
new file mode 100644
index 0000000..5614ba9
--- /dev/null
+++ b/scripts/nbhelper/utils.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python3
+
+# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
+# SPDX-License-Identifier: Apache-2.0
+
+# utils.py
+# The utility functions shared among nbhelper objects
+
+import re
+import logging
+import argparse
+import pynetbox
+import requests
+
+from ruamel import yaml
+
+# Initialize module-level variables
+netboxapi = None
+netbox_config = None
+netbox_version = None
+
+# create shared logger
+logging.basicConfig()
+logger = logging.getLogger("nbh")
+
+
+def initialize(extra_args):
+ """
+ Initialize the NBHelper module, the extra_args is required, it contains the
+ NetBox API url, API token, and the name of site
+ """
+ global netboxapi, netbox_config, netbox_version
+
+ args = parse_cli_args(extra_args)
+ netbox_config = yaml.safe_load(args.settings.read())
+
+ for require_args in ["api_endpoint", "token", "tenant_name"]:
+ if not netbox_config.get(require_args):
+ logger.error("The require argument: %s was not set. Stop." % require_args)
+ sys.exit(1)
+
+ netboxapi = pynetbox.api(
+ netbox_config["api_endpoint"], token=netbox_config["token"], threading=True,
+ )
+
+ if not netbox_config.get("validate_certs", True):
+ session = requests.Session()
+ session.verify = False
+ netboxapi.http_session = session
+
+ netbox_version = netboxapi.version
+
+ return args
+
+
+def parse_cli_args(extra_args={}):
+ """
+ parse CLI arguments. Can add extra arguments with a option:kwargs dict
+ """
+
+ parser = argparse.ArgumentParser(description="Netbox")
+
+ # Positional args
+ parser.add_argument(
+ "settings",
+ type=argparse.FileType("r"),
+ help="YAML Ansible inventory file w/NetBox API token",
+ )
+
+ parser.add_argument(
+ "--debug", action="store_true", help="Print additional debugging information"
+ )
+
+ for ename, ekwargs in extra_args.items():
+ parser.add_argument(ename, **ekwargs)
+
+ args = parser.parse_args()
+ log_level = logging.DEBUG if args.debug else logging.INFO
+ logger.setLevel(log_level)
+
+ return args
+
+
+def check_name_dns(name):
+
+ badchars = re.search("[^a-z0-9.-]", name.lower(), re.ASCII)
+
+ if badchars:
+ logger.error(
+ "DNS name '%s' has one or more invalid characters: '%s'",
+ name,
+ badchars.group(0),
+ )
+ sys.exit(1)
+
+ return name.lower()
+
+
+def clean_name_dns(name):
+ return re.sub("[^a-z0-9.-]", "-", name.lower(), 0, re.ASCII)
+
+
+def apply_as_router(output_yaml):
+ """ Add the router-specific value for output structure """
+
+ output_yaml.setdefault("netprep_router", True)
+ output_yaml.setdefault("tftpd_files", ["undionly.kpxe"])
+ output_yaml.setdefault(
+ "vhosts", [{"name": "default", "default_server": True, "autoindex": True}]
+ )
+ output_yaml.setdefault("acme_username", "www-data")
+
+ return output_yaml
+
+
+class AttrDict(dict):
+ def __init__(self, *args, **kwargs):
+ super(AttrDict, self).__init__(*args, **kwargs)
+ self.__dict__ = self