blob: 3a512193ec6e9b4afa8ca7c4fbdf34fa79f66b48 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
# nbhelper.py
# Helper functions for building YAML output from Netbox API calls
import netaddr
from .utils import logger, check_name_dns
from .container import PrefixContainer
from .container import DeviceContainer, VirtualMachineContainer
class Prefix:
def __init__(self, data, name_segments):
from .utils import netboxapi
self.data = data
self.name_segments = name_segments
self.nbapi = netboxapi
if self.data.description:
self.domain_extension = check_name_dns(self.data.description)
# ip centric info
self.subnet = self.data.prefix
self.dhcp_range = list()
self.routes = list()
self.reserved_ips = {}
self.neighbor = list()
# build item lists
self.build_prefix()
# Find the neighbor relationship in prefix level
self.find_neighbor()
PrefixContainer().add(self.data.prefix, self)
def __repr__(self):
return str(self.data.prefix)
def check_ip_belonging(self, ip):
"""
Check if an IP address is belonging to this prefix
"""
return ip in netaddr.IPSet([self.data.prefix])
def find_neighbor(self):
parent_prefixes = list()
for prefix in PrefixContainer().all():
if not prefix.data.description:
parent_prefixes.append(prefix)
for prefix in PrefixContainer().all():
if not prefix.data.description:
continue
targetSubnet = netaddr.IPNetwork(prefix.subnet)
mySubnet = netaddr.IPNetwork(self.subnet)
for parent in parent_prefixes:
parentSubnet = netaddr.IPNetwork(parent.subnet)
if mySubnet in parentSubnet and targetSubnet in parentSubnet:
if prefix not in self.neighbor:
self.neighbor.append(prefix)
if self not in prefix.neighbor:
prefix.neighbor.append(self)
def build_prefix(self):
"""
find ip information for items (devices/vms, reserved_ips, dhcp_range) in prefix
"""
ips = self.nbapi.ipam.ip_addresses.filter(parent=self.data.prefix)
for ip in sorted(ips, key=lambda k: k["address"]):
logger.debug("prefix_item ip: %s, data: %s", ip, dict(ip))
# if it's a DHCP range, add that range to the dev list as prefix_dhcp
if ip.status.value == "dhcp":
self.dhcp_range.append(str(ip.address))
# reserved IPs
if ip.status.value == "reserved":
self.reserved_ips[str(ip)] = {
"name": ip.description.lower().split(" ")[0],
"description": ip.description,
"ip4": str(ip.address),
"custom_fields": ip.custom_fields,
}
rfc3442routes = ip.custom_fields.get("rfc3442routes")
rfc3442routes = rfc3442routes.split(",") if rfc3442routes else list()
self.routes.append(
{
"ip": str(netaddr.IPNetwork(ip.address).ip),
"rfc3442routes": rfc3442routes,
}
)