| #!/usr/bin/env python3 |
| |
| # SPDX-FileCopyrightText: © 2021 Open Networking Foundation <support@opennetworking.org> |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| # device.py |
| |
| import sys |
| import netaddr |
| |
| from .utils import logger |
| from .container import DeviceContainer, VirtualMachineContainer, PrefixContainer |
| |
| |
| class AssignedObject: |
| """ |
| Assigned Object is either a Device or Virtual Machine, which function |
| nearly identically in the NetBox data model. |
| |
| This parent class holds common functions for those two child classes |
| |
| An assignedObject (device or VM) should have following attributes: |
| - self.data: contains the original copy of data from NetBox |
| - self.id: Device ID or VM ID |
| - self.interfaces: A dictionary contains interfaces belong to this AO |
| the interface dictionary looks like: |
| |
| { |
| "eno1": { |
| "address": ["192.168.0.1/24", "192.168.0.2/24"], |
| "instance": <interface_instance>, |
| "isPrimary": True, |
| "mgmtOnly": False, |
| "isVirtual": False |
| } |
| } |
| """ |
| |
| objects = dict() |
| |
| def __init__(self, data): |
| from .utils import netboxapi |
| |
| self.data = data |
| self.nbapi = netboxapi |
| |
| # The AssignedObject attributes |
| self.id = self.data.id |
| self.tenant = None |
| self.primary_ip = None |
| self.primary_iface = None |
| |
| # In Netbox, we use FQDN as the Device name, but in the script, |
| # we use the first segment to be the name of device. |
| # For example, if the device named "mgmtserver1.stage1.menlo" on Netbox, |
| # then we will have "mgmtserver1" as name. |
| self.fullname = self.data.name |
| self.name = self.fullname.split(".")[0] |
| |
| # The device role which can be ["server", "router", "switch", ...] |
| self.role = None |
| |
| # The NetBox objects related with this AssignedObject |
| self.interfaces = dict() |
| self.services = None |
| |
| # Generated configuration for ansible playbooks |
| self.netplan_config = dict() |
| self.extra_config = dict() |
| |
| if self.__class__ == Device: |
| self.role = self.data.device_role.slug |
| self.services = self.nbapi.ipam.services.filter(device_id=self.id) |
| interfaces = self.nbapi.dcim.interfaces.filter(device_id=self.id) |
| ip_addresses = self.nbapi.ipam.ip_addresses.filter(device_id=self.id) |
| elif self.__class__ == VirtualMachine: |
| self.role = self.data.role.slug |
| self.services = self.nbapi.ipam.services.filter(virtual_machine_id=self.id) |
| interfaces = self.nbapi.virtualization.interfaces.filter( |
| virtual_machine_id=self.id |
| ) |
| ip_addresses = self.nbapi.ipam.ip_addresses.filter( |
| virtual_machine_id=self.id |
| ) |
| |
| self.primary_ip = self.data.primary_ip |
| |
| for interface in interfaces: |
| # The Device's interface structure is different from VM's interface |
| # VM interface doesn't have mgmt_only and type, Therefore, |
| # the default value of mgmtOnly is False, isVirtual is True |
| |
| self.interfaces[interface.name] = { |
| "addresses": list(), |
| "mac_address": interface.mac_address, |
| "instance": interface, |
| "isPrimary": False, |
| "mgmtOnly": getattr(interface, "mgmt_only", False), |
| "isVirtual": interface.type.value == "virtual" |
| if hasattr(interface, "type") |
| else True, |
| } |
| |
| for address in ip_addresses: |
| interface = self.interfaces[address.assigned_object.name] |
| interface["addresses"].append(address.address) |
| |
| # ipam.ip_addresses doesn't have primary tag, |
| # the primary tag is only available is only in the Device. |
| # So we need to compare address to check which one is primary ip |
| try: |
| if address.address == self.primary_ip.address: |
| interface["isPrimary"] = True |
| self.primary_iface = interface |
| except AttributeError: |
| logger.error("Error with primary address for device %s", self.fullname) |
| |
| # mgmt_only = False is a hack for VirtualMachine type |
| if self.__class__ == VirtualMachine: |
| interface["instance"].mgmt_only = False |
| |
| def __repr__(self): |
| return str(dict(self.data)) |
| |
| @property |
| def type(self): |
| return "AssignedObject" |
| |
| @property |
| def internal_interfaces(self): |
| """ |
| The function internal_interfaces |
| """ |
| |
| ret = dict() |
| for intfName, interface in self.interfaces.items(): |
| if ( |
| not interface["isPrimary"] |
| and not interface["mgmtOnly"] |
| and interface["addresses"] |
| ): |
| ret[intfName] = interface |
| |
| return ret |
| |
| def generate_netplan(self): |
| """ |
| Get the interface config of specific server belongs to this tenant |
| """ |
| |
| if self.netplan_config: |
| return self.netplan_config |
| |
| primary_if = None |
| for interface in self.interfaces.values(): |
| if interface["isPrimary"] is True: |
| primary_if = interface["instance"] |
| |
| if primary_if is None: |
| logger.error("The primary interface wasn't set for device %s", self.name) |
| return dict() |
| |
| # Initialize the part of "ethernets" configuration |
| self.netplan_config["ethernets"] = dict() |
| |
| # If the current selected device is a Router |
| if (isinstance(self, Device) and self.data.device_role.name == "Router") or ( |
| isinstance(self, VirtualMachine) and self.data.role.name == "Router" |
| ): |
| for intfName, interface in self.interfaces.items(): |
| if interface["mgmtOnly"] or interface["isVirtual"]: |
| continue |
| |
| # Check if this address is public IP address (e.g. "8.8.8.8" on eth0) |
| isExternalAddress = True |
| for prefix in PrefixContainer().all(): |
| for address in interface["addresses"]: |
| if address in netaddr.IPSet([prefix.subnet]): |
| isExternalAddress = False |
| |
| # If this interface has the public IP address, netplan shouldn't include it |
| if isExternalAddress: |
| continue |
| |
| self.netplan_config["ethernets"].setdefault(intfName, {}) |
| self.netplan_config["ethernets"][intfName].setdefault( |
| "addresses", [] |
| ).extend(interface["addresses"]) |
| |
| # If the current selected device is a Server |
| elif isinstance(self, Device) and self.data.device_role.name == "Server": |
| if primary_if: |
| self.netplan_config["ethernets"][primary_if.name] = { |
| "dhcp4": "yes", |
| "dhcp4-overrides": {"route-metric": 100}, |
| } |
| |
| for intfName, interface in self.interfaces.items(): |
| if ( |
| not interface["isVirtual"] |
| and intfName != primary_if.name |
| and not interface["mgmtOnly"] |
| and interface["addresses"] |
| ): |
| self.netplan_config["ethernets"][intfName] = { |
| "dhcp4": "yes", |
| "dhcp4-overrides": {"route-metric": 200}, |
| } |
| |
| else: |
| # Exclude the device type which is not Router and Server |
| return None |
| |
| # Get interfaces own by AssignedObject and is virtual (VLAN interface) |
| for intfName, interface in self.interfaces.items(): |
| |
| # If the interface is not a virtual interface or |
| # the interface doesn't have VLAN tagged, skip this interface |
| if not interface["isVirtual"] or not interface["instance"].tagged_vlans: |
| continue |
| |
| if "vlans" not in self.netplan_config: |
| self.netplan_config["vlans"] = dict() |
| |
| vlan_object_id = interface["instance"].tagged_vlans[0].id |
| vlan_object = self.nbapi.ipam.vlans.get(vlan_object_id) |
| |
| routes = list() |
| for address in interface["addresses"]: |
| |
| for reserved_ip in PrefixContainer().all_reserved_ips(address): |
| |
| destination = reserved_ip["custom_fields"].get("rfc3442routes", "") |
| if not destination: |
| continue |
| |
| for dest_addr in destination.split(","): |
| |
| # If interface address is in destination subnet, we don't need this route |
| if netaddr.IPNetwork(address).ip in netaddr.IPNetwork( |
| dest_addr |
| ): |
| continue |
| |
| new_route = { |
| "to": dest_addr, |
| "via": str(netaddr.IPNetwork(reserved_ip["ip4"]).ip), |
| "metric": 100, |
| } |
| |
| if new_route not in routes: |
| routes.append(new_route) |
| |
| self.netplan_config["vlans"][intfName] = { |
| "id": vlan_object.vid, |
| "link": interface["instance"].label, |
| "addresses": interface["addresses"], |
| } |
| |
| # Only the fabric virtual interface will need to route to other network segments |
| if routes and "fab" in intfName: |
| self.netplan_config["vlans"][intfName]["routes"] = routes |
| |
| return self.netplan_config |
| |
| def generate_nftables(self): |
| |
| ret = dict() |
| |
| internal_if = None |
| external_if = None |
| |
| # Use isPrimary == True as the identifier to select external interface |
| for interface in self.interfaces.values(): |
| if interface["isPrimary"] is True: |
| external_if = interface["instance"] |
| |
| if external_if is None: |
| logger.error("The primary interface wasn't set for device %s", self.name) |
| sys.exit(1) |
| |
| for interface in self.interfaces.values(): |
| # If "isVirtual" set to False and "mgmtOnly" set to False |
| if ( |
| not interface["isVirtual"] |
| and not interface["mgmtOnly"] |
| and interface["instance"] is not external_if |
| ): |
| internal_if = interface["instance"] |
| break |
| |
| ret["external_if"] = external_if.name |
| ret["internal_if"] = internal_if.name |
| |
| if self.services: |
| ret["services"] = list() |
| |
| for service in self.services: |
| ret["services"].append( |
| { |
| "name": service.name, |
| "protocol": service.protocol.value, |
| "port": service.port, |
| } |
| ) |
| |
| # Only management server needs to be configured the whitelist netrange of |
| # internal interface, this code will config the nftables parameters |
| # the nftables will do the SNAT for both UE ranges and Aether Central ranges |
| if self.data.device_role.name == "Router": |
| |
| ret["interface_subnets"] = dict() |
| |
| ret["acc_routing"] = dict() |
| ret["acc_routing"]["acc_subnets"] = self.data.config_context.pop( |
| "acc_subnets" |
| ) |
| |
| ret["ue_routing"] = dict() |
| ret["ue_routing"]["ue_subnets"] = self.data.config_context.pop("ue_subnets") |
| |
| # Create the interface_subnets in the configuration |
| # It's using the interface as the key to list IP addresses |
| for intfName, interface in self.interfaces.items(): |
| if interface["mgmtOnly"]: |
| continue |
| |
| for address in interface["addresses"]: |
| for prefix in PrefixContainer().all(): |
| intfAddr = netaddr.IPNetwork(address).ip |
| |
| # If interface IP doesn't belong to this prefix, skip |
| if intfAddr not in netaddr.IPNetwork(prefix.subnet): |
| continue |
| |
| # If prefix is a parent prefix (parent prefix won't config domain name) |
| # skip to add in interface_subnets |
| if not prefix.data.description: |
| continue |
| |
| ret["interface_subnets"].setdefault(intfName, list()) |
| |
| if prefix.subnet not in ret["interface_subnets"][intfName]: |
| ret["interface_subnets"][intfName].append(prefix.subnet) |
| for neighbor in prefix.neighbor: |
| if ( |
| neighbor.subnet |
| not in ret["interface_subnets"][intfName] |
| ): |
| ret["interface_subnets"][intfName].append( |
| neighbor.subnet |
| ) |
| |
| # Build data which needs by nftables, the UE subnets and ACC subnets |
| for prefix in PrefixContainer().all(): |
| |
| # The subnet in this site which needs the redirecting |
| if "fab" in prefix.data.description: |
| ret["ue_routing"].setdefault("src_subnets", []) |
| ret["ue_routing"]["src_subnets"].append(prefix.data.prefix) |
| ret["acc_routing"].setdefault("src_subnets", []) |
| ret["acc_routing"]["src_subnets"].append(prefix.data.prefix) |
| |
| # mgmtserver do the SNAT for fabric network on FAB interface |
| if ( |
| not ret["ue_routing"].get("snat_addr") |
| and "fab" in prefix.data.description |
| ): |
| for interface in self.interfaces.values(): |
| for address in interface["addresses"]: |
| if address in netaddr.IPSet([prefix.subnet]): |
| ret["ue_routing"]["snat_addr"] = str( |
| netaddr.IPNetwork(address).ip |
| ) |
| break |
| |
| # mgmtserver do the SNAT for mgmt network on mgmt interface |
| if ( |
| not ret["acc_routing"].get("snat_addr") |
| and "mgmt" in prefix.data.description |
| ): |
| for interface in self.interfaces.values(): |
| for address in interface["addresses"]: |
| if address in netaddr.IPSet([prefix.subnet]): |
| ret["acc_routing"]["snat_addr"] = str( |
| netaddr.IPNetwork(address).ip |
| ) |
| break |
| |
| return ret |
| |
| def generate_extra_config(self): |
| """ |
| Generate the extra configs which need in management server configuration |
| This function should only be called when the device role is "Router" |
| |
| Extra config includes: service configuring parameters, additional config context |
| """ |
| |
| if self.extra_config: |
| return self.extra_config |
| |
| service_names = list(map(lambda x: x.name, self.services)) |
| |
| if "dns" in service_names: |
| unbound_listen_ips = [] |
| unbound_allow_ips = [] |
| |
| for interface in self.interfaces.values(): |
| if not interface["isPrimary"] and not interface["mgmtOnly"]: |
| for address in interface["addresses"]: |
| unbound_listen_ips.append(address) |
| |
| for prefix in PrefixContainer().all(): |
| if prefix.data.description: |
| unbound_allow_ips.append(prefix.data.prefix) |
| |
| if unbound_listen_ips: |
| self.extra_config["unbound_listen_ips"] = unbound_listen_ips |
| |
| if unbound_allow_ips: |
| self.extra_config["unbound_allow_ips"] = unbound_allow_ips |
| |
| if "ntp" in service_names: |
| ntp_client_allow = [] |
| |
| for prefix in PrefixContainer().all(): |
| if prefix.data.description: |
| ntp_client_allow.append(prefix.data.prefix) |
| |
| if ntp_client_allow: |
| self.extra_config["ntp_client_allow"] = ntp_client_allow |
| |
| # If the key exists in generated config, warning with the key name |
| for key in self.data.config_context.keys(): |
| if key in self.extra_config: |
| logger.warning("Extra config Key %s was overwritten", key) |
| |
| self.extra_config.update(self.data.config_context) |
| |
| return self.extra_config |
| |
| |
| class Device(AssignedObject): |
| """ |
| Wraps a single Netbox device |
| Also caches all known devices in a class variable (devs) |
| """ |
| |
| def __init__(self, data): |
| |
| super().__init__(data) |
| DeviceContainer().add(self.id, self) |
| |
| @property |
| def type(self): |
| return "Device" |
| |
| def get_interfaces(self): |
| if not self.interfaces: |
| self.interfaces = self.nbapi.dcim.interfaces.filter(device_id=self.id) |
| |
| return self.interfaces |
| |
| |
| class VirtualMachine(AssignedObject): |
| """ |
| VM equivalent of Device |
| """ |
| |
| def __init__(self, data): |
| |
| super().__init__(data) |
| VirtualMachineContainer().add(self.id, self) |
| |
| @property |
| def type(self): |
| return "VirtualMachine" |
| |
| def get_interfaces(self): |
| if not self.interfaces: |
| self.interfaces = self.nbapi.virtualization.interfaces.filter( |
| virtual_machine_id=self.id |
| ) |
| |
| return self.interfaces |