blob: baa4a79b31931df73a2054c61c1ff8b8bcd46dc1 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
# tenant_validator.py
# Grab the data from Tenant and check if the information is invalidate
import re
import yaml
import logging
import argparse
import nbhelper
import pynetbox
import requests
import netaddr
logging.basicConfig()
logger = logging.getLogger("TenentValidator")
# The Global variable shared with different netbox api caller function
netboxapi = None
netbox_config = None
misconfs = list()
# The consistent variable should be same in every devices, prefixes of the deployment
site_name = None
deployment_name = None
# A Regex rule to identify if device name is a valid domain
fqdn_regex = re.compile(
"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
)
class Configuration(object):
mapping_dict = {
logging.ERROR: logger.error,
logging.WARN: logger.warn,
logging.INFO: logger.info,
}
def __init__(self, obj, message, level=logging.ERROR):
self.level = level
self.obj = obj
self.message = message
# self.mapping_dict[level](self)
def __repr__(self):
return "[%s] object %s: %s\nRef. %s" % (
"/".join(self.obj.url.split("/")[-4:-2]),
self.obj,
self.message,
self.obj.url.replace("10.76.28.11", "netbox.infra.onlab.us").replace(
"api/", ""
),
)
def get_object_type(obj):
return ("/".join(obj.url.split("/")[-4:-2]),)
def validate_vlans(vlans=list()):
global misconfs
for vlan in vlans:
if not vlan.group:
misconfs.append(Configuration(vlan, "VLAN group isn't set"))
if not vlan.site:
misconfs.append(Configuration(vlan, "VLAN site isn't set"))
if not vlan.tenant:
misconfs.append(Configuration(vlan, "VLAN tenant isn't set"))
def validate_prefixes(prefixes=list()):
global misconfs
tenant_dict = dict()
for prefix in prefixes:
netmask = prefix.prefix.split("/")[-1]
if not re.search(fqdn_regex, prefix.description):
misconfs.append(Configuration(prefix, "Description (FQDN) is invalid"))
else:
device = netboxapi.dcim.devices.filter(tenant_id=prefix.tenant.id)[0]
if device:
sitename = device.name.split(".")[-1]
if sitename not in prefix.description:
misconfs.append(
Configuration(prefix, "Site name should in the prefix FQDN")
)
if not prefix.tenant:
misconfs.append(Configuration(prefix, "Tenant isn't set"))
else:
tenant_dict.setdefault(prefix.tenant, dict())
tenant_dict[prefix.tenant].setdefault(netmask, list())
tenant_dict[prefix.tenant][netmask].append(prefix)
# Assume prefixes in different tenants don't have the intersection
# So parent prefix only check if it has a DHCP sub-prefix in same tenant
for prefixes_by_tenant in tenant_dict.values():
for netmask, prefixes_by_netmask in prefixes_by_tenant.items():
for prefix in prefixes_by_netmask:
if int(netmask) < 26:
children = [
child
for key, value in prefixes_by_tenant.items()
if int(key) > int(netmask)
for child in value
]
children = [
child
for child in children
if str(
netaddr.IPNetwork(
child.prefix.split("/")[0] + "/" + netmask
).cidr
)
== prefix.prefix
]
if children:
continue
dhcp_addr = list(
filter(
lambda ip: ip.status.value == "dhcp",
netboxapi.ipam.ip_addresses.filter(parent=prefix.prefix),
)
)
if not dhcp_addr:
misconfs.append(
Configuration(prefix, "DHCP subnet not found in prefix")
)
continue
if len(dhcp_addr) > 1:
misconfs.append(
Configuration(
prefix, "Prefix should have exact 1 DHCP subnet"
)
)
continue
dhcp_range = dhcp_addr[0].address
ip_addrs = netboxapi.ipam.ip_addresses.filter(parent=dhcp_range)
ip_addrs = list(filter(lambda ip: ip != dhcp_addr[0], ip_addrs))
if ip_addrs:
misconfs.append(
Configuration(
prefix,
"DHCP range %s contains other IP addresses: %s"
% (dhcp_range, ip_addrs),
)
)
def validate_ip_addresses(ip_addresses=list()):
global misconfs
prefix_dict = dict()
for ip_address in ip_addresses:
if netaddr.IPNetwork(ip_address.address).network.is_private():
if not ip_address.vrf:
misconfs.append(
Configuration(ip_address, "VRF isn't set for this IP address")
)
else:
prefix = str(netaddr.IPNetwork(ip_address.address).cidr)
if prefix not in prefix_dict:
prefix_dict[prefix] = netboxapi.ipam.prefixes.get(prefix=prefix)
if prefix_dict[prefix] and prefix_dict[prefix].vrf != ip_address.vrf:
misconfs.append(
Configuration(ip_address, "VRF isn't match with Prefix")
)
def validate_interfaces(interfaces=list()):
global misconfs
for interface in interfaces:
count_ips = 0
if get_object_type(interface) == "virtualization/interfaces":
count_ips = len(
netboxapi.ipam.ip_addresses.filter(vminterface_id=interface.id)
)
elif get_object_type(interface) == "dcim/interfaces":
count_ips = interface.count_ipaddresses
if count_ips > 0 and not interface.mac_address:
misconfs.append(
Configuration(
interface,
"VM Interface has IP address assigned but mac address isn't set",
)
)
if (
get_object_type(interface) == "dcim/interfaces"
and interface.type.value == "virtaul"
and (not interface.mgmt_only or not interface.tagged_vlans)
):
misconfs.append(
Configuration(
interface,
"Virtual interface should be management only or VLAN interface",
)
)
if interface.tagged_vlans:
if len(interface.tagged_vlans) > 1:
misconfs.append(
Configuration(interface, "Virtual Interface has multiple VLANs set")
)
elif str(interface.tagged_vlans[0].vid) not in interface.name:
misconfs.append(
Configuration(
interface, "Virtual Interface name not match to VLAN ID"
)
)
def validate_vrfs(vrfs=list()):
global misconfs
for vrf in vrfs:
if not vrf.enforce_unique:
misconfs.append(
Configuration(vrf, "VRF doesn't have enforce_unique set as True")
)
if not vrf.tenant:
misconfs.append(Configuration(vrf, "VRF doesn't have tenant set"))
def validate_machines(machines=list()):
global misconfs
tenant_info = dict()
for machine in machines:
if not re.search(fqdn_regex, machine.name):
misconfs.append(
Configuration(
machine, "Device/VM FQDN name %s is invalid" % machine.name
)
)
if not machine.tenant:
misconfs.append(Configuration(machine, "Device/VM doesn't have tenant set"))
segments = machine.name.split(".")
if len(segments) != 3:
misconfs.append(
Configuration(
machine,
"Device/VM FQDN should have 3 segments, found %d" % len(segments),
)
)
elif machine.tenant:
if machine.tenant not in tenant_info:
tenant_info.setdefault(
machine.tenant, {"deployment": segments[1], "site": segments[2]}
)
else:
deployment = tenant_info[machine.tenant]["deployment"]
site = tenant_info[machine.tenant]["site"]
if deployment != segments[1] or site != segments[2]:
misconfs.append(
Configuration(
machine,
"Deployment or Site name is not consistent with other Device/VM",
)
)
if (
(
machine.__class__.__name__ == "Devices"
and machine.device_role.name in ["Router", "Switch", "Server"]
)
or machine.__class__.__name__ == "VirtualMachines"
) and not machine.primary_ip:
misconfs.append(
Configuration(machine, "Primary IP must be set for this Device/VM")
)
def validate_tenants(tenants=list()):
global misconfs
for tenant in tenants:
if tenant.device_count == 0:
misconfs.append(Configuration(tenant, "0 device was found for this tenant"))
if tenant.prefix_count == 0:
misconfs.append(Configuration(tenant, "0 prefix was found for this tenant"))
if tenant.vrf_count == 0:
misconfs.append(Configuration(tenant, "0 vrf was found for this tenant"))
devices = list(netboxapi.dcim.devices.filter(tenant_id=tenant.id))
mgmtserver = list(filter(lambda d: d.device_role.name == "Router", devices))
if len(mgmtserver) != 1:
misconfs.append(
Configuration(tenant, "Tenant must have exact 1 Router (mgmtserver)")
)
def get_objects(tenant_name=""):
return_dict = dict()
tenants = list(netboxapi.tenancy.tenants.filter(name=tenant_name))
if len(tenants) == 0:
logger.critical("Tenant name %s wasn't found in Netbox" % tenant_name)
sys.exit(1)
tenant_id = None if len(tenants) != 1 else tenants[0].id
# If the tenant_id is None, then Netbox API will return all objects by default
devices = list(netboxapi.dcim.devices.filter(tenant_id=tenant_id))
virtual_machines = list(
netboxapi.virtualization.virtual_machines.filter(tenant_id=tenant_id)
)
physical_interfaces = list()
for device in devices:
physical_interfaces.extend(
list(netboxapi.dcim.interfaces.filter(device_id=device.id))
)
virtual_interfaces = list()
for virtual_machine in virtual_machines:
virtual_interfaces.extend(
list(
netboxapi.virtualization.interfaces.filter(
virtual_machine_id=virtual_machine.id
)
)
)
vrfs = list(netboxapi.ipam.vrfs.filter(tenant_id=tenant_id))
vlans = list(netboxapi.ipam.vlans.filter(tenant_id=tenant_id))
ip_addresses = list(netboxapi.ipam.ip_addresses.filter(tenant_id=tenant_id))
prefixes = list()
for vrf in vrfs:
prefixes.extend(list(netboxapi.ipam.prefixes.filter(vrf_id=vrf.id)))
return_dict = {
"tenants": tenants,
"devices": devices,
"physical_interfaces": physical_interfaces,
"virtual_machines": virtual_machines,
"virtual_interfaces": virtual_interfaces,
"vrfs": vrfs,
"prefixes": prefixes,
"vlans": vlans,
"ip_addresses": ip_addresses,
}
return return_dict
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Netbox Tenant Validator")
parser.add_argument(
"settings",
type=argparse.FileType("r"),
help="YAML Ansible inventory file w/NetBox API token",
)
args = parser.parse_args()
netbox_config = yaml.safe_load(args.settings.read())
netboxapi = pynetbox.api(
netbox_config["api_endpoint"], token=netbox_config["token"], threading=True
)
if not netbox_config.get("validate_certs", False):
session = requests.Session()
session.verify = False
netboxapi.http_session = session
mapping_func = {
"tenants": validate_tenants,
"devices": validate_machines,
"physical_interfaces": validate_interfaces,
"virtual_machines": validate_machines,
"virtual_interfaces": validate_interfaces,
"vrfs": validate_vrfs,
"vlans": validate_vlans,
"prefixes": validate_prefixes,
"ip_addresses": validate_ip_addresses,
}
netbox_data = get_objects(netbox_config.get("tenant_name", ""))
for key, validate_func in mapping_func.items():
if netbox_data[key]:
validate_func(netbox_data[key])
if not misconfs:
print("All checks passed.")
else:
for misconf in misconfs:
print(misconf)