blob: 0e728a1facabc89ff616511c299b3827294fc386 [file] [log] [blame]
import netaddr
from .utils import logger
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Container:
def __init__(self):
self.instances = dict()
def add(self, instance_id, instance):
if instance_id in self.instances:
raise Exception()
self.instances[instance_id] = instance
def get(self, instance_id):
if instance_id not in self.instances:
return instance_id
return self.instances[instance_id]
class AssignedObjectContainer(Container):
# AssignedObjectContainer is the parent class
# which is with the shared function of Devices/VMs Container
def all(self):
return self.instances.values()
def getDNSServer(self):
for instance in self.instances.values():
if "dns" in list(map(str, instance.services)):
return instance
return None
def getDHCPServer(self):
for instance in self.instances.values():
if "tftp" in list(map(str, instance.services)):
return instance
return None
def getNTPServer(self):
for instance in self.instances.values():
if "ntp" in list(map(str, instance.services)):
return instance
return None
def getRouters(self):
""" Get a list of Devices/VMs which type is Router """
ret = list()
for instance in self.instances.values():
if instance.role == "router":
ret.append(instance)
return ret
def getRouterIPforPrefix(self, prefix):
""" Get the first found IP address of exist routers as string """
for router in self.getRouters():
for interface in router.interfaces.values():
# The mgmt-only interface will not act as gateway
if interface["mgmtOnly"]:
continue
for address in interface["addresses"]:
if netaddr.IPNetwork(address).ip in netaddr.IPNetwork(
prefix.subnet
):
return str(netaddr.IPNetwork(address).ip)
return None
class DeviceContainer(AssignedObjectContainer, metaclass=Singleton):
# DeviceContainer holds all devices fetch from Netbox, device_id as key
pass
class VirtualMachineContainer(AssignedObjectContainer, metaclass=Singleton):
# DeviceContainer holds all devices fetch from Netbox, vm_id as key
pass
class PrefixContainer(Container, metaclass=Singleton):
# PrefixContainer holds all prefixes fetch from Netbox, prefix(str) as key
def all(self):
return self.instances.values()
def all_reserved_ips(self, ip_addr=""):
ret = list()
for prefix in self.instances.values():
if ip_addr and netaddr.IPNetwork(ip_addr).ip in netaddr.IPNetwork(
prefix.subnet
):
if prefix.reserved_ips:
return list(prefix.reserved_ips.values())
elif not ip_addr:
if prefix.reserved_ips:
ret.extend(list(prefix.reserved_ips.values()))
return ret
class ServiceInfoContainer(Container, metaclass=Singleton):
# ServiceInfoContainer holds hosts DNS/DHCP information in Tenant networks
def __init__(self):
super().__init__()
self.initialized = False
def all(self):
return self.instances.items()
def initialize(self):
if self.initialized:
return
deviceContainer = DeviceContainer()
vmContainer = VirtualMachineContainer()
prefixContainer = PrefixContainer()
for prefix in prefixContainer.all():
subnet = netaddr.IPNetwork(prefix.subnet)
domain = prefix.data.description or ""
if not domain:
continue
# If prefix has set the Router IP, use this value as defualt,
# otherwise find router in this subnet and get its IP address
routes = (
prefix.routes
or deviceContainer.getRouterIPforPrefix(prefix)
or vmContainer.getRouterIPforPrefix(prefix)
or ""
)
# When prefix.routes is None, we will get the RouterIP from prefix instance
# And to output a normal format, we need to build as a list here.
if isinstance(routes, str):
routes = [{"ip": routes}]
self.instances[prefix.data.description] = {
"domain": domain,
"subnet": prefix.subnet,
"router": routes,
"dhcprange": prefix.dhcp_range or "",
"hosts": dict(),
"dnsServer": None,
"ntpServer": None,
"dhcpServer": None,
}
# Find the service IP address for this network
serviceMap = {
"dnsServer": deviceContainer.getDNSServer()
or vmContainer.getDNSServer(),
"ntpServer": deviceContainer.getNTPServer()
or vmContainer.getNTPServer(),
"dhcpServer": deviceContainer.getDHCPServer()
or vmContainer.getDHCPServer(),
}
# Loop through the device's IP, and set IP to dataset
for service, device in serviceMap.items():
for interface in device.interfaces.values():
for address in interface["addresses"]:
address = netaddr.IPNetwork(address).ip
if address in subnet:
self.instances[domain][service] = {
"name": device.name,
"address": str(address),
}
else:
for neighbor in prefix.neighbor:
neighborSubnet = netaddr.IPNetwork(neighbor.subnet)
if address in neighborSubnet:
self.instances[domain][service] = {
"name": device.name,
"address": str(address),
}
break
# A dict to check if the device exists in this domain (prefix)
deviceInDomain = dict()
# Gather all Devices/VMs in this tenant, build IP/Hostname map
for device in list(deviceContainer.all()) + list(vmContainer.all()):
# Iterate the interface owned by device
for intfName, interface in device.interfaces.items():
# Iterate the address with each interface
for address in interface["addresses"]:
# Extract the IP address from interface's IP (w/o netmask)
address = netaddr.IPNetwork(address).ip
# Only record the IP address in current subnet,
# Skip if the mac_address is blank
if address in subnet:
# deviceInDomain store the interface information like:
# {"mgmtserver1": {
# "non-mgmt-counter": 1,
# "interface": {
# "bmc": {
# "mgmtOnly": True,
# "macaddr": "ca:fe:ba:be:00:00",
# "ipaddr": [IPAddress("10.32.4.1")]
# }
# "eno1": {
# "mgmtOnly": False,
# "macaddr": "ca:fe:ba:be:11:11",
# "ipaddr": [IPAddress("10.32.4.129"),
# IPAddress("10.32.4.130")]
# }
# }
# "mgmtswitch1": ...
# }
deviceInDomain.setdefault(device.name, dict())
deviceInDomain[device.name].setdefault(
"non-mgmt-counter", 0
)
deviceInDomain[device.name].setdefault("interfaces", dict())
# Set up a interface structure in deviceInDomain[device.name]
deviceInDomain[device.name]["interfaces"].setdefault(
intfName, dict()
)
interfaceDict = deviceInDomain[device.name]["interfaces"][
intfName
]
interfaceDict.setdefault("mgmtOnly", False)
# Use interface["mac_address"] as the default value, but if the
# mac_address is None, that means we are dealing with a virtual
# interfaces so we can get the linked interface's mac_address instead
try:
interfaceDict.setdefault(
"mac_address",
interface["mac_address"]
or device.interfaces[interface["instance"].label][
"mac_address"
],
)
except KeyError:
logger.error(
"Problem with MAC address on interface %s",
interface,
)
interfaceDict.setdefault("ip_addresses", list())
interfaceDict["ip_addresses"].append(address)
# If the interface is mgmtOnly, set the attribute to True
# Otherwise, increase the non-mgmt-counter, the counter uses to
# find out how many interfaces of this device has IPs on subnet
if interface["mgmtOnly"]:
interfaceDict["mgmtOnly"] = True
else:
deviceInDomain[device.name]["non-mgmt-counter"] += 1
for deviceName, data in deviceInDomain.items():
nonMgmtCounter = data["non-mgmt-counter"]
for intfName, interface in data["interfaces"].items():
# If current interface doesn't have mac address set, skip
if not interface["mac_address"]:
continue
# In the default situation, hostname is deviceName
hostname_list = [deviceName]
# If the condition is -
# 1. multiple interfaces show on this subnet
# 2. the interface is a management-only interface
# then add interface name into hostname
if nonMgmtCounter > 1 or interface["mgmtOnly"]:
hostname_list.append(intfName)
# Iterate the IP address owns by current interface,
# if the interface has multiple IP addresses,
# add last digit to hostname for identifiability
for address in interface["ip_addresses"]:
hostname = hostname_list.copy()
if len(interface["ip_addresses"]) > 1:
hostname.append(str(address.words[-1]))
self.instances[domain]["hosts"][str(address)] = {
"hostname": "-".join(hostname),
"macaddr": interface["mac_address"].lower(),
}
self.initialized = True