Automate validation of edge configurations in Netbox
This commit is working for AETHER-1965
Change-Id: I5164e8ca77f77f198b38865367f95ea602d6d68a
diff --git a/scripts/tenant_validator.py b/scripts/tenant_validator.py
new file mode 100644
index 0000000..baa4a79
--- /dev/null
+++ b/scripts/tenant_validator.py
@@ -0,0 +1,413 @@
+#!/usr/bin/env python3
+
+# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
+# SPDX-License-Identifier: Apache-2.0
+
+# tenant_validator.py
+# Grab the data from Tenant and check if the information is invalidate
+
+import re
+import yaml
+import logging
+import argparse
+import nbhelper
+import pynetbox
+import requests
+import netaddr
+
+
+logging.basicConfig()
+logger = logging.getLogger("TenentValidator")
+
+# The Global variable shared with different netbox api caller function
+netboxapi = None
+netbox_config = None
+misconfs = list()
+
+# The consistent variable should be same in every devices, prefixes of the deployment
+site_name = None
+deployment_name = None
+
+# A Regex rule to identify if device name is a valid domain
+fqdn_regex = re.compile(
+ "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
+)
+
+
+class Configuration(object):
+
+ mapping_dict = {
+ logging.ERROR: logger.error,
+ logging.WARN: logger.warn,
+ logging.INFO: logger.info,
+ }
+
+ def __init__(self, obj, message, level=logging.ERROR):
+ self.level = level
+ self.obj = obj
+ self.message = message
+
+ # self.mapping_dict[level](self)
+
+ def __repr__(self):
+ return "[%s] object %s: %s\nRef. %s" % (
+ "/".join(self.obj.url.split("/")[-4:-2]),
+ self.obj,
+ self.message,
+ self.obj.url.replace("10.76.28.11", "netbox.infra.onlab.us").replace(
+ "api/", ""
+ ),
+ )
+
+
+def get_object_type(obj):
+ return ("/".join(obj.url.split("/")[-4:-2]),)
+
+
+def validate_vlans(vlans=list()):
+ global misconfs
+
+ for vlan in vlans:
+ if not vlan.group:
+ misconfs.append(Configuration(vlan, "VLAN group isn't set"))
+ if not vlan.site:
+ misconfs.append(Configuration(vlan, "VLAN site isn't set"))
+ if not vlan.tenant:
+ misconfs.append(Configuration(vlan, "VLAN tenant isn't set"))
+
+
+def validate_prefixes(prefixes=list()):
+ global misconfs
+
+ tenant_dict = dict()
+
+ for prefix in prefixes:
+ netmask = prefix.prefix.split("/")[-1]
+
+ if not re.search(fqdn_regex, prefix.description):
+ misconfs.append(Configuration(prefix, "Description (FQDN) is invalid"))
+ else:
+ device = netboxapi.dcim.devices.filter(tenant_id=prefix.tenant.id)[0]
+ if device:
+ sitename = device.name.split(".")[-1]
+ if sitename not in prefix.description:
+ misconfs.append(
+ Configuration(prefix, "Site name should in the prefix FQDN")
+ )
+
+ if not prefix.tenant:
+ misconfs.append(Configuration(prefix, "Tenant isn't set"))
+ else:
+ tenant_dict.setdefault(prefix.tenant, dict())
+ tenant_dict[prefix.tenant].setdefault(netmask, list())
+ tenant_dict[prefix.tenant][netmask].append(prefix)
+
+ # Assume prefixes in different tenants don't have the intersection
+ # So parent prefix only check if it has a DHCP sub-prefix in same tenant
+ for prefixes_by_tenant in tenant_dict.values():
+ for netmask, prefixes_by_netmask in prefixes_by_tenant.items():
+ for prefix in prefixes_by_netmask:
+ if int(netmask) < 26:
+ children = [
+ child
+ for key, value in prefixes_by_tenant.items()
+ if int(key) > int(netmask)
+ for child in value
+ ]
+
+ children = [
+ child
+ for child in children
+ if str(
+ netaddr.IPNetwork(
+ child.prefix.split("/")[0] + "/" + netmask
+ ).cidr
+ )
+ == prefix.prefix
+ ]
+ if children:
+ continue
+
+ dhcp_addr = list(
+ filter(
+ lambda ip: ip.status.value == "dhcp",
+ netboxapi.ipam.ip_addresses.filter(parent=prefix.prefix),
+ )
+ )
+
+ if not dhcp_addr:
+ misconfs.append(
+ Configuration(prefix, "DHCP subnet not found in prefix")
+ )
+ continue
+
+ if len(dhcp_addr) > 1:
+ misconfs.append(
+ Configuration(
+ prefix, "Prefix should have exact 1 DHCP subnet"
+ )
+ )
+ continue
+
+ dhcp_range = dhcp_addr[0].address
+ ip_addrs = netboxapi.ipam.ip_addresses.filter(parent=dhcp_range)
+ ip_addrs = list(filter(lambda ip: ip != dhcp_addr[0], ip_addrs))
+ if ip_addrs:
+ misconfs.append(
+ Configuration(
+ prefix,
+ "DHCP range %s contains other IP addresses: %s"
+ % (dhcp_range, ip_addrs),
+ )
+ )
+
+
+def validate_ip_addresses(ip_addresses=list()):
+ global misconfs
+ prefix_dict = dict()
+
+ for ip_address in ip_addresses:
+
+ if netaddr.IPNetwork(ip_address.address).network.is_private():
+ if not ip_address.vrf:
+ misconfs.append(
+ Configuration(ip_address, "VRF isn't set for this IP address")
+ )
+ else:
+ prefix = str(netaddr.IPNetwork(ip_address.address).cidr)
+ if prefix not in prefix_dict:
+ prefix_dict[prefix] = netboxapi.ipam.prefixes.get(prefix=prefix)
+
+ if prefix_dict[prefix] and prefix_dict[prefix].vrf != ip_address.vrf:
+ misconfs.append(
+ Configuration(ip_address, "VRF isn't match with Prefix")
+ )
+
+
+def validate_interfaces(interfaces=list()):
+ global misconfs
+
+ for interface in interfaces:
+ count_ips = 0
+
+ if get_object_type(interface) == "virtualization/interfaces":
+ count_ips = len(
+ netboxapi.ipam.ip_addresses.filter(vminterface_id=interface.id)
+ )
+ elif get_object_type(interface) == "dcim/interfaces":
+ count_ips = interface.count_ipaddresses
+
+ if count_ips > 0 and not interface.mac_address:
+ misconfs.append(
+ Configuration(
+ interface,
+ "VM Interface has IP address assigned but mac address isn't set",
+ )
+ )
+
+ if (
+ get_object_type(interface) == "dcim/interfaces"
+ and interface.type.value == "virtaul"
+ and (not interface.mgmt_only or not interface.tagged_vlans)
+ ):
+ misconfs.append(
+ Configuration(
+ interface,
+ "Virtual interface should be management only or VLAN interface",
+ )
+ )
+
+ if interface.tagged_vlans:
+ if len(interface.tagged_vlans) > 1:
+ misconfs.append(
+ Configuration(interface, "Virtual Interface has multiple VLANs set")
+ )
+ elif str(interface.tagged_vlans[0].vid) not in interface.name:
+ misconfs.append(
+ Configuration(
+ interface, "Virtual Interface name not match to VLAN ID"
+ )
+ )
+
+
+def validate_vrfs(vrfs=list()):
+ global misconfs
+
+ for vrf in vrfs:
+ if not vrf.enforce_unique:
+ misconfs.append(
+ Configuration(vrf, "VRF doesn't have enforce_unique set as True")
+ )
+
+ if not vrf.tenant:
+ misconfs.append(Configuration(vrf, "VRF doesn't have tenant set"))
+
+
+def validate_machines(machines=list()):
+ global misconfs
+
+ tenant_info = dict()
+
+ for machine in machines:
+ if not re.search(fqdn_regex, machine.name):
+ misconfs.append(
+ Configuration(
+ machine, "Device/VM FQDN name %s is invalid" % machine.name
+ )
+ )
+
+ if not machine.tenant:
+ misconfs.append(Configuration(machine, "Device/VM doesn't have tenant set"))
+
+ segments = machine.name.split(".")
+ if len(segments) != 3:
+ misconfs.append(
+ Configuration(
+ machine,
+ "Device/VM FQDN should have 3 segments, found %d" % len(segments),
+ )
+ )
+ elif machine.tenant:
+ if machine.tenant not in tenant_info:
+ tenant_info.setdefault(
+ machine.tenant, {"deployment": segments[1], "site": segments[2]}
+ )
+ else:
+ deployment = tenant_info[machine.tenant]["deployment"]
+ site = tenant_info[machine.tenant]["site"]
+ if deployment != segments[1] or site != segments[2]:
+ misconfs.append(
+ Configuration(
+ machine,
+ "Deployment or Site name is not consistent with other Device/VM",
+ )
+ )
+
+ if (
+ (
+ machine.__class__.__name__ == "Devices"
+ and machine.device_role.name in ["Router", "Switch", "Server"]
+ )
+ or machine.__class__.__name__ == "VirtualMachines"
+ ) and not machine.primary_ip:
+ misconfs.append(
+ Configuration(machine, "Primary IP must be set for this Device/VM")
+ )
+
+
+def validate_tenants(tenants=list()):
+ global misconfs
+
+ for tenant in tenants:
+ if tenant.device_count == 0:
+ misconfs.append(Configuration(tenant, "0 device was found for this tenant"))
+ if tenant.prefix_count == 0:
+ misconfs.append(Configuration(tenant, "0 prefix was found for this tenant"))
+ if tenant.vrf_count == 0:
+ misconfs.append(Configuration(tenant, "0 vrf was found for this tenant"))
+
+ devices = list(netboxapi.dcim.devices.filter(tenant_id=tenant.id))
+ mgmtserver = list(filter(lambda d: d.device_role.name == "Router", devices))
+ if len(mgmtserver) != 1:
+ misconfs.append(
+ Configuration(tenant, "Tenant must have exact 1 Router (mgmtserver)")
+ )
+
+
+def get_objects(tenant_name=""):
+ return_dict = dict()
+
+ tenants = list(netboxapi.tenancy.tenants.filter(name=tenant_name))
+ if len(tenants) == 0:
+ logger.critical("Tenant name %s wasn't found in Netbox" % tenant_name)
+ sys.exit(1)
+ tenant_id = None if len(tenants) != 1 else tenants[0].id
+
+ # If the tenant_id is None, then Netbox API will return all objects by default
+ devices = list(netboxapi.dcim.devices.filter(tenant_id=tenant_id))
+ virtual_machines = list(
+ netboxapi.virtualization.virtual_machines.filter(tenant_id=tenant_id)
+ )
+
+ physical_interfaces = list()
+ for device in devices:
+ physical_interfaces.extend(
+ list(netboxapi.dcim.interfaces.filter(device_id=device.id))
+ )
+
+ virtual_interfaces = list()
+ for virtual_machine in virtual_machines:
+ virtual_interfaces.extend(
+ list(
+ netboxapi.virtualization.interfaces.filter(
+ virtual_machine_id=virtual_machine.id
+ )
+ )
+ )
+
+ vrfs = list(netboxapi.ipam.vrfs.filter(tenant_id=tenant_id))
+ vlans = list(netboxapi.ipam.vlans.filter(tenant_id=tenant_id))
+ ip_addresses = list(netboxapi.ipam.ip_addresses.filter(tenant_id=tenant_id))
+
+ prefixes = list()
+ for vrf in vrfs:
+ prefixes.extend(list(netboxapi.ipam.prefixes.filter(vrf_id=vrf.id)))
+
+ return_dict = {
+ "tenants": tenants,
+ "devices": devices,
+ "physical_interfaces": physical_interfaces,
+ "virtual_machines": virtual_machines,
+ "virtual_interfaces": virtual_interfaces,
+ "vrfs": vrfs,
+ "prefixes": prefixes,
+ "vlans": vlans,
+ "ip_addresses": ip_addresses,
+ }
+
+ return return_dict
+
+
+if __name__ == "__main__":
+
+ parser = argparse.ArgumentParser(description="Netbox Tenant Validator")
+ parser.add_argument(
+ "settings",
+ type=argparse.FileType("r"),
+ help="YAML Ansible inventory file w/NetBox API token",
+ )
+
+ args = parser.parse_args()
+ netbox_config = yaml.safe_load(args.settings.read())
+ netboxapi = pynetbox.api(
+ netbox_config["api_endpoint"], token=netbox_config["token"], threading=True
+ )
+
+ if not netbox_config.get("validate_certs", False):
+ session = requests.Session()
+ session.verify = False
+ netboxapi.http_session = session
+
+ mapping_func = {
+ "tenants": validate_tenants,
+ "devices": validate_machines,
+ "physical_interfaces": validate_interfaces,
+ "virtual_machines": validate_machines,
+ "virtual_interfaces": validate_interfaces,
+ "vrfs": validate_vrfs,
+ "vlans": validate_vlans,
+ "prefixes": validate_prefixes,
+ "ip_addresses": validate_ip_addresses,
+ }
+
+ netbox_data = get_objects(netbox_config.get("tenant_name", ""))
+
+ for key, validate_func in mapping_func.items():
+ if netbox_data[key]:
+ validate_func(netbox_data[key])
+
+ if not misconfs:
+ print("All checks passed.")
+ else:
+ for misconf in misconfs:
+ print(misconf)