| #!/usr/bin/env python3 |
| |
| # SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org> |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| # tenant_validator.py |
| # Grab the data from Tenant and check if the information is invalidate |
| |
| from __future__ import absolute_import |
| |
| import re |
| import sys |
| import yaml |
| import logging |
| import argparse |
| import pynetbox |
| import requests |
| import netaddr |
| |
| |
| logging.basicConfig() |
| logger = logging.getLogger("TenentValidator") |
| |
| # The Global variable shared with different netbox api caller function |
| netboxapi = None |
| netbox_config = None |
| misconfs = list() |
| |
| # The consistent variable should be same in every devices, prefixes of the deployment |
| site_name = None |
| deployment_name = None |
| |
| # A Regex rule to identify if device name is a valid domain |
| fqdn_regex = re.compile( |
| r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" |
| + r"([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$" |
| ) |
| |
| |
| class Configuration(object): |
| |
| mapping_dict = { |
| logging.ERROR: logger.error, |
| logging.WARN: logger.warn, |
| logging.INFO: logger.info, |
| } |
| |
| def __init__(self, obj, message, level=logging.ERROR): |
| self.level = level |
| self.obj = obj |
| self.message = message |
| |
| # self.mapping_dict[level](self) |
| |
| def __repr__(self): |
| return "[%s] object %s: %s\nRef. %s" % ( |
| "/".join(self.obj.url.split("/")[-4:-2]), |
| self.obj, |
| self.message, |
| self.obj.url.replace("10.76.28.11", "netbox.infra.onlab.us").replace( |
| "api/", "" |
| ), |
| ) |
| |
| |
| def get_object_type(obj): |
| return ("/".join(obj.url.split("/")[-4:-2]),) |
| |
| |
| def validate_vlans(vlans=list()): |
| global misconfs |
| |
| for vlan in vlans: |
| if not vlan.group: |
| misconfs.append(Configuration(vlan, "VLAN group isn't set")) |
| if not vlan.site: |
| misconfs.append(Configuration(vlan, "VLAN site isn't set")) |
| if not vlan.tenant: |
| misconfs.append(Configuration(vlan, "VLAN tenant isn't set")) |
| |
| |
| def validate_prefixes(prefixes=list()): |
| global misconfs |
| |
| tenant_dict = dict() |
| |
| for prefix in prefixes: |
| netmask = prefix.prefix.split("/")[-1] |
| |
| if not re.search(fqdn_regex, prefix.description): |
| misconfs.append(Configuration(prefix, "Description (FQDN) is invalid")) |
| else: |
| device = netboxapi.dcim.devices.filter(tenant_id=prefix.tenant.id)[0] |
| if device: |
| sitename = device.name.split(".")[-1] |
| if sitename not in prefix.description: |
| misconfs.append( |
| Configuration(prefix, "Site name should in the prefix FQDN") |
| ) |
| |
| if not prefix.tenant: |
| misconfs.append(Configuration(prefix, "Tenant isn't set")) |
| else: |
| tenant_dict.setdefault(prefix.tenant, dict()) |
| tenant_dict[prefix.tenant].setdefault(netmask, list()) |
| tenant_dict[prefix.tenant][netmask].append(prefix) |
| |
| # Assume prefixes in different tenants don't have the intersection |
| # So parent prefix only check if it has a DHCP sub-prefix in same tenant |
| for prefixes_by_tenant in tenant_dict.values(): |
| for netmask, prefixes_by_netmask in prefixes_by_tenant.items(): |
| for prefix in prefixes_by_netmask: |
| if int(netmask) < 26: |
| children = [ |
| child |
| for key, value in prefixes_by_tenant.items() |
| if int(key) > int(netmask) |
| for child in value |
| ] |
| |
| children = [ |
| child |
| for child in children |
| if str( |
| netaddr.IPNetwork( |
| child.prefix.split("/")[0] + "/" + netmask |
| ).cidr |
| ) |
| == prefix.prefix |
| ] |
| if children: |
| continue |
| |
| dhcp_addr = list( |
| filter( |
| lambda ip: ip.status.value == "dhcp", |
| netboxapi.ipam.ip_addresses.filter(parent=prefix.prefix), |
| ) |
| ) |
| |
| if not dhcp_addr: |
| misconfs.append( |
| Configuration(prefix, "DHCP subnet not found in prefix") |
| ) |
| continue |
| |
| if len(dhcp_addr) > 1: |
| misconfs.append( |
| Configuration( |
| prefix, "Prefix should have exact 1 DHCP subnet" |
| ) |
| ) |
| continue |
| |
| dhcp_range = dhcp_addr[0].address |
| ip_addrs = netboxapi.ipam.ip_addresses.filter(parent=dhcp_range) |
| ip_addrs = list(filter(lambda ip: ip != dhcp_addr[0], ip_addrs)) |
| if ip_addrs: |
| misconfs.append( |
| Configuration( |
| prefix, |
| "DHCP range %s contains other IP addresses: %s" |
| % (dhcp_range, ip_addrs), |
| ) |
| ) |
| |
| |
| def validate_ip_addresses(ip_addresses=list()): |
| global misconfs |
| prefix_dict = dict() |
| |
| for ip_address in ip_addresses: |
| |
| if netaddr.IPNetwork(ip_address.address).network.is_private(): |
| if not ip_address.vrf: |
| misconfs.append( |
| Configuration(ip_address, "VRF isn't set for this IP address") |
| ) |
| else: |
| prefix = str(netaddr.IPNetwork(ip_address.address).cidr) |
| if prefix not in prefix_dict: |
| prefix_dict[prefix] = netboxapi.ipam.prefixes.get(prefix=prefix) |
| |
| if prefix_dict[prefix] and prefix_dict[prefix].vrf != ip_address.vrf: |
| misconfs.append( |
| Configuration(ip_address, "VRF isn't match with Prefix") |
| ) |
| |
| |
| def validate_interfaces(interfaces=list()): |
| global misconfs |
| |
| for interface in interfaces: |
| count_ips = 0 |
| |
| if get_object_type(interface) == "virtualization/interfaces": |
| count_ips = len( |
| netboxapi.ipam.ip_addresses.filter(vminterface_id=interface.id) |
| ) |
| elif get_object_type(interface) == "dcim/interfaces": |
| count_ips = interface.count_ipaddresses |
| |
| if count_ips > 0 and not interface.mac_address: |
| misconfs.append( |
| Configuration( |
| interface, |
| "VM Interface has IP address assigned but mac address isn't set", |
| ) |
| ) |
| |
| if ( |
| get_object_type(interface) == "dcim/interfaces" |
| and interface.type.value == "virtaul" |
| and (not interface.mgmt_only or not interface.tagged_vlans) |
| ): |
| misconfs.append( |
| Configuration( |
| interface, |
| "Virtual interface should be management only or VLAN interface", |
| ) |
| ) |
| |
| if interface.tagged_vlans: |
| if len(interface.tagged_vlans) > 1: |
| misconfs.append( |
| Configuration(interface, "Virtual Interface has multiple VLANs set") |
| ) |
| elif str(interface.tagged_vlans[0].vid) not in interface.name: |
| misconfs.append( |
| Configuration( |
| interface, "Virtual Interface name not match to VLAN ID" |
| ) |
| ) |
| |
| |
| def validate_vrfs(vrfs=list()): |
| global misconfs |
| |
| for vrf in vrfs: |
| if not vrf.enforce_unique: |
| misconfs.append( |
| Configuration(vrf, "VRF doesn't have enforce_unique set as True") |
| ) |
| |
| if not vrf.tenant: |
| misconfs.append(Configuration(vrf, "VRF doesn't have tenant set")) |
| |
| |
| def validate_machines(machines=list()): |
| global misconfs |
| |
| tenant_info = dict() |
| |
| for machine in machines: |
| if not re.search(fqdn_regex, machine.name): |
| misconfs.append( |
| Configuration( |
| machine, "Device/VM FQDN name %s is invalid" % machine.name |
| ) |
| ) |
| |
| if not machine.tenant: |
| misconfs.append(Configuration(machine, "Device/VM doesn't have tenant set")) |
| |
| segments = machine.name.split(".") |
| if len(segments) != 3: |
| misconfs.append( |
| Configuration( |
| machine, |
| "Device/VM FQDN should have 3 segments, found %d" % len(segments), |
| ) |
| ) |
| elif machine.tenant: |
| if machine.tenant not in tenant_info: |
| tenant_info.setdefault( |
| machine.tenant, {"deployment": segments[1], "site": segments[2]} |
| ) |
| else: |
| deployment = tenant_info[machine.tenant]["deployment"] |
| site = tenant_info[machine.tenant]["site"] |
| if deployment != segments[1] or site != segments[2]: |
| misconfs.append( |
| Configuration( |
| machine, |
| "Deployment or Site name is not consistent with other Device/VM", |
| ) |
| ) |
| |
| if ( |
| ( |
| machine.__class__.__name__ == "Devices" |
| and machine.device_role.name in ["Router", "Switch", "Server"] |
| ) |
| or machine.__class__.__name__ == "VirtualMachines" |
| ) and not machine.primary_ip: |
| misconfs.append( |
| Configuration(machine, "Primary IP must be set for this Device/VM") |
| ) |
| |
| |
| def validate_tenants(tenants=list()): |
| global misconfs |
| |
| for tenant in tenants: |
| if tenant.device_count == 0: |
| misconfs.append(Configuration(tenant, "0 device was found for this tenant")) |
| if tenant.prefix_count == 0: |
| misconfs.append(Configuration(tenant, "0 prefix was found for this tenant")) |
| if tenant.vrf_count == 0: |
| misconfs.append(Configuration(tenant, "0 vrf was found for this tenant")) |
| |
| devices = list(netboxapi.dcim.devices.filter(tenant_id=tenant.id)) |
| mgmtserver = list(filter(lambda d: d.device_role.name == "Router", devices)) |
| if len(mgmtserver) != 1: |
| misconfs.append( |
| Configuration(tenant, "Tenant must have exact 1 Router (mgmtserver)") |
| ) |
| |
| |
| def get_objects(tenant_name=""): |
| return_dict = dict() |
| |
| tenants = list(netboxapi.tenancy.tenants.filter(name=tenant_name)) |
| if len(tenants) == 0: |
| logger.critical("Tenant name %s wasn't found in Netbox" % tenant_name) |
| sys.exit(1) |
| tenant_id = None if len(tenants) != 1 else tenants[0].id |
| |
| # If the tenant_id is None, then Netbox API will return all objects by default |
| devices = list(netboxapi.dcim.devices.filter(tenant_id=tenant_id)) |
| virtual_machines = list( |
| netboxapi.virtualization.virtual_machines.filter(tenant_id=tenant_id) |
| ) |
| |
| physical_interfaces = list() |
| for device in devices: |
| physical_interfaces.extend( |
| list(netboxapi.dcim.interfaces.filter(device_id=device.id)) |
| ) |
| |
| virtual_interfaces = list() |
| for virtual_machine in virtual_machines: |
| virtual_interfaces.extend( |
| list( |
| netboxapi.virtualization.interfaces.filter( |
| virtual_machine_id=virtual_machine.id |
| ) |
| ) |
| ) |
| |
| vrfs = list(netboxapi.ipam.vrfs.filter(tenant_id=tenant_id)) |
| vlans = list(netboxapi.ipam.vlans.filter(tenant_id=tenant_id)) |
| ip_addresses = list(netboxapi.ipam.ip_addresses.filter(tenant_id=tenant_id)) |
| |
| prefixes = list() |
| for vrf in vrfs: |
| prefixes.extend(list(netboxapi.ipam.prefixes.filter(vrf_id=vrf.id))) |
| |
| return_dict = { |
| "tenants": tenants, |
| "devices": devices, |
| "physical_interfaces": physical_interfaces, |
| "virtual_machines": virtual_machines, |
| "virtual_interfaces": virtual_interfaces, |
| "vrfs": vrfs, |
| "prefixes": prefixes, |
| "vlans": vlans, |
| "ip_addresses": ip_addresses, |
| } |
| |
| return return_dict |
| |
| |
| if __name__ == "__main__": |
| |
| parser = argparse.ArgumentParser(description="Netbox Tenant Validator") |
| parser.add_argument( |
| "settings", |
| type=argparse.FileType("r"), |
| help="YAML Ansible inventory file w/NetBox API token", |
| ) |
| |
| args = parser.parse_args() |
| netbox_config = yaml.safe_load(args.settings.read()) |
| netboxapi = pynetbox.api( |
| netbox_config["api_endpoint"], token=netbox_config["token"], threading=True |
| ) |
| |
| if not netbox_config.get("validate_certs", False): |
| session = requests.Session() |
| session.verify = False |
| netboxapi.http_session = session |
| |
| mapping_func = { |
| "tenants": validate_tenants, |
| "devices": validate_machines, |
| "physical_interfaces": validate_interfaces, |
| "virtual_machines": validate_machines, |
| "virtual_interfaces": validate_interfaces, |
| "vrfs": validate_vrfs, |
| "vlans": validate_vlans, |
| "prefixes": validate_prefixes, |
| "ip_addresses": validate_ip_addresses, |
| } |
| |
| netbox_data = get_objects(netbox_config.get("tenant_name", "")) |
| |
| for key, validate_func in mapping_func.items(): |
| if netbox_data[key]: |
| validate_func(netbox_data[key]) |
| |
| if not misconfs: |
| print("All checks passed.") |
| else: |
| for misconf in misconfs: |
| print(misconf) |