| import netaddr |
| from .utils import logger |
| |
| |
| class Singleton(type): |
| _instances = {} |
| |
| def __call__(cls, *args, **kwargs): |
| if cls not in cls._instances: |
| cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) |
| return cls._instances[cls] |
| |
| |
| class Container: |
| def __init__(self): |
| self.instances = dict() |
| |
| def add(self, instance_id, instance): |
| if instance_id in self.instances: |
| raise Exception() |
| self.instances[instance_id] = instance |
| |
| def get(self, instance_id): |
| if instance_id not in self.instances: |
| return instance_id |
| return self.instances[instance_id] |
| |
| |
| class AssignedObjectContainer(Container): |
| # AssignedObjectContainer is the parent class |
| # which is with the shared function of Devices/VMs Container |
| |
| def all(self): |
| return self.instances.values() |
| |
| def getDNSServer(self): |
| for instance in self.instances.values(): |
| if "dns" in list(map(str, instance.services)): |
| return instance |
| return None |
| |
| def getDHCPServer(self): |
| for instance in self.instances.values(): |
| if "tftp" in list(map(str, instance.services)): |
| return instance |
| return None |
| |
| def getNTPServer(self): |
| for instance in self.instances.values(): |
| if "ntp" in list(map(str, instance.services)): |
| return instance |
| return None |
| |
| def getRouters(self): |
| """ Get a list of Devices/VMs which type is Router """ |
| |
| ret = list() |
| for instance in self.instances.values(): |
| if instance.role == "router": |
| ret.append(instance) |
| |
| return ret |
| |
| def getRouterIPforPrefix(self, prefix): |
| """ Get the first found IP address of exist routers as string """ |
| |
| for router in self.getRouters(): |
| for interface in router.interfaces.values(): |
| |
| # The mgmt-only interface will not act as gateway |
| if interface["mgmtOnly"]: |
| continue |
| |
| for address in interface["addresses"]: |
| if netaddr.IPNetwork(address).ip in netaddr.IPNetwork( |
| prefix.subnet |
| ): |
| return str(netaddr.IPNetwork(address).ip) |
| return None |
| |
| |
| class DeviceContainer(AssignedObjectContainer, metaclass=Singleton): |
| # DeviceContainer holds all devices fetch from Netbox, device_id as key |
| pass |
| |
| |
| class VirtualMachineContainer(AssignedObjectContainer, metaclass=Singleton): |
| # DeviceContainer holds all devices fetch from Netbox, vm_id as key |
| pass |
| |
| |
| class PrefixContainer(Container, metaclass=Singleton): |
| # PrefixContainer holds all prefixes fetch from Netbox, prefix(str) as key |
| |
| def all(self): |
| return self.instances.values() |
| |
| def all_reserved_ips(self, ip_addr=""): |
| ret = list() |
| |
| for prefix in self.instances.values(): |
| if ip_addr and netaddr.IPNetwork(ip_addr).ip in netaddr.IPNetwork( |
| prefix.subnet |
| ): |
| if prefix.reserved_ips: |
| return list(prefix.reserved_ips.values()) |
| elif not ip_addr: |
| if prefix.reserved_ips: |
| ret.extend(list(prefix.reserved_ips.values())) |
| |
| return ret |
| |
| |
| class ServiceInfoContainer(Container, metaclass=Singleton): |
| # ServiceInfoContainer holds hosts DNS/DHCP information in Tenant networks |
| |
| def __init__(self): |
| super().__init__() |
| self.initialized = False |
| |
| def all(self): |
| return self.instances.items() |
| |
| def initialize(self): |
| if self.initialized: |
| return |
| |
| deviceContainer = DeviceContainer() |
| vmContainer = VirtualMachineContainer() |
| prefixContainer = PrefixContainer() |
| |
| for prefix in prefixContainer.all(): |
| |
| subnet = netaddr.IPNetwork(prefix.subnet) |
| domain = prefix.data.description or "" |
| |
| if not domain: |
| continue |
| |
| # If prefix has set the Router IP, use this value as defualt, |
| # otherwise find router in this subnet and get its IP address |
| routes = ( |
| prefix.routes |
| or deviceContainer.getRouterIPforPrefix(prefix) |
| or vmContainer.getRouterIPforPrefix(prefix) |
| or "" |
| ) |
| |
| # When prefix.routes is None, we will get the RouterIP from prefix instance |
| # And to output a normal format, we need to build as a list here. |
| if isinstance(routes, str): |
| routes = [{"ip": routes}] |
| |
| self.instances[prefix.data.description] = { |
| "domain": domain, |
| "subnet": prefix.subnet, |
| "router": routes, |
| "dhcprange": prefix.dhcp_range or "", |
| "hosts": dict(), |
| "dnsServer": None, |
| "ntpServer": None, |
| "dhcpServer": None, |
| } |
| |
| # Find the service IP address for this network |
| serviceMap = { |
| "dnsServer": deviceContainer.getDNSServer() |
| or vmContainer.getDNSServer(), |
| "ntpServer": deviceContainer.getNTPServer() |
| or vmContainer.getNTPServer(), |
| "dhcpServer": deviceContainer.getDHCPServer() |
| or vmContainer.getDHCPServer(), |
| } |
| |
| # Loop through the device's IP, and set IP to dataset |
| for service, device in serviceMap.items(): |
| for interface in device.interfaces.values(): |
| for address in interface["addresses"]: |
| address = netaddr.IPNetwork(address).ip |
| if address in subnet: |
| self.instances[domain][service] = { |
| "name": device.name, |
| "address": str(address), |
| } |
| else: |
| for neighbor in prefix.neighbor: |
| neighborSubnet = netaddr.IPNetwork(neighbor.subnet) |
| if address in neighborSubnet: |
| self.instances[domain][service] = { |
| "name": device.name, |
| "address": str(address), |
| } |
| break |
| |
| # A dict to check if the device exists in this domain (prefix) |
| deviceInDomain = dict() |
| |
| # Gather all Devices/VMs in this tenant, build IP/Hostname map |
| for device in list(deviceContainer.all()) + list(vmContainer.all()): |
| |
| # Iterate the interface owned by device |
| for intfName, interface in device.interfaces.items(): |
| |
| # Iterate the address with each interface |
| for address in interface["addresses"]: |
| |
| # Extract the IP address from interface's IP (w/o netmask) |
| address = netaddr.IPNetwork(address).ip |
| |
| # Only record the IP address in current subnet, |
| # Skip if the mac_address is blank |
| if address in subnet: |
| |
| # deviceInDomain store the interface information like: |
| # {"mgmtserver1": { |
| # "non-mgmt-counter": 1, |
| # "interface": { |
| # "bmc": { |
| # "mgmtOnly": True, |
| # "macaddr": "ca:fe:ba:be:00:00", |
| # "ipaddr": [IPAddress("10.32.4.1")] |
| # } |
| # "eno1": { |
| # "mgmtOnly": False, |
| # "macaddr": "ca:fe:ba:be:11:11", |
| # "ipaddr": [IPAddress("10.32.4.129"), |
| # IPAddress("10.32.4.130")] |
| # } |
| # } |
| # "mgmtswitch1": ... |
| # } |
| |
| deviceInDomain.setdefault(device.name, dict()) |
| deviceInDomain[device.name].setdefault( |
| "non-mgmt-counter", 0 |
| ) |
| deviceInDomain[device.name].setdefault("interfaces", dict()) |
| |
| # Set up a interface structure in deviceInDomain[device.name] |
| deviceInDomain[device.name]["interfaces"].setdefault( |
| intfName, dict() |
| ) |
| interfaceDict = deviceInDomain[device.name]["interfaces"][ |
| intfName |
| ] |
| interfaceDict.setdefault("mgmtOnly", False) |
| |
| # Use interface["mac_address"] as the default value, but if the |
| # mac_address is None, that means we are dealing with a virtual |
| # interfaces so we can get the linked interface's mac_address instead |
| |
| try: |
| interfaceDict.setdefault( |
| "mac_address", |
| interface["mac_address"] |
| or device.interfaces[interface["instance"].label][ |
| "mac_address" |
| ], |
| ) |
| except KeyError: |
| logger.error( |
| "Problem with MAC address on interface %s", |
| interface, |
| ) |
| |
| interfaceDict.setdefault("ip_addresses", list()) |
| interfaceDict["ip_addresses"].append(address) |
| |
| # If the interface is mgmtOnly, set the attribute to True |
| # Otherwise, increase the non-mgmt-counter, the counter uses to |
| # find out how many interfaces of this device has IPs on subnet |
| if interface["mgmtOnly"]: |
| interfaceDict["mgmtOnly"] = True |
| else: |
| deviceInDomain[device.name]["non-mgmt-counter"] += 1 |
| |
| for deviceName, data in deviceInDomain.items(): |
| |
| nonMgmtCounter = data["non-mgmt-counter"] |
| for intfName, interface in data["interfaces"].items(): |
| |
| # If current interface doesn't have mac address set, skip |
| if not interface["mac_address"]: |
| continue |
| |
| # In the default situation, hostname is deviceName |
| hostname_list = [deviceName] |
| |
| # If the condition is - |
| # 1. multiple interfaces show on this subnet |
| # 2. the interface is a management-only interface |
| # then add interface name into hostname |
| if nonMgmtCounter > 1 or interface["mgmtOnly"]: |
| hostname_list.append(intfName) |
| |
| # Iterate the IP address owns by current interface, |
| # if the interface has multiple IP addresses, |
| # add last digit to hostname for identifiability |
| for address in interface["ip_addresses"]: |
| hostname = hostname_list.copy() |
| if len(interface["ip_addresses"]) > 1: |
| hostname.append(str(address.words[-1])) |
| |
| self.instances[domain]["hosts"][str(address)] = { |
| "hostname": "-".join(hostname), |
| "macaddr": interface["mac_address"].lower(), |
| } |
| |
| self.initialized = True |