Add nftable configuration generate code
The generated configuration includes:
- Service port
- Whitelist subnet for private network
- SNAT rule related variables
- UE routing information (from config contexts on Netbox)
ref: INF-138
Change-Id: Ibd37e0dbbe5920c82d0fbf1246d7d41b924c0def
diff --git a/scripts/edgeconfig.py b/scripts/edgeconfig.py
index c8a31c2..fc49606 100644
--- a/scripts/edgeconfig.py
+++ b/scripts/edgeconfig.py
@@ -35,7 +35,7 @@
tenant = nbhelper.NBTenant()
# use base_config for additional items
- yaml_out = yaml.safe_load(args.base_config.read())
+ base_yaml = yaml.safe_load(args.base_config.read())
dhcpd_subnets = []
dhcpd_interfaces = []
@@ -54,14 +54,24 @@
dhcpd_subnets.append(dhcpd_subnet)
- # yaml_out["devices"] = nbhelper.NBDevice.all_devs()
- yaml_out["netprep_netplan"] = tenant.generate_netplan()
- yaml_out["dns_forward_zones"] = nbhelper.NBDNSForwardZone.all_fwd_zones()
- yaml_out["dns_reverse_zones"] = dns_reverse_zones
- yaml_out["dhcpd_subnets"] = dhcpd_subnets
- yaml_out["dhcpd_interfaces"] = dhcpd_interfaces
+ for device in tenant.get_devices():
+ output_yaml = base_yaml.copy()
- if tenant.generate_extra_config():
- yaml_out.update(tenant.generate_extra_config())
+ if (
+ isinstance(device, nbhelper.NBDevice)
+ and device.data.device_role.slug == "router"
+ ) or (
+ isinstance(device, nbhelper.NBVirtualMachine)
+ and device.data.role.slug == "router"
+ ):
+ output_yaml["dns_forward_zones"] = nbhelper.NBDNSForwardZone.all_fwd_zones()
+ output_yaml["dns_reverse_zones"] = dns_reverse_zones
+ output_yaml["dhcpd_subnets"] = dhcpd_subnets
+ output_yaml["dhcpd_interfaces"] = dhcpd_interfaces
+ output_yaml["netprep_nftables"] = device.generate_nftables()
+ output_yaml.update(device.generate_extra_config())
- print(yaml.safe_dump(yaml_out, indent=2))
+ output_yaml["netprep_netplan"] = device.generate_netplan()
+
+ with open("inventory/host_vars/%s.yaml" % device.name, "w") as f:
+ f.write(yaml.safe_dump(output_yaml, indent=2))
diff --git a/scripts/nbhelper.py b/scripts/nbhelper.py
index 2138889..ecdfc66 100644
--- a/scripts/nbhelper.py
+++ b/scripts/nbhelper.py
@@ -165,47 +165,45 @@
return self.prefixes
- def generate_netplan(self, name=""):
+ def get_device_by_name(self, name):
"""
- Get the interface config of specific server belongs to this tenant,
- If the name wasn't specified, return the management device config by default
+ Find the device or VM which belongs to this Tenant,
+ If the name wasn't specified, return the management server
"""
- target = None
+ for machine in self.devices + self.vms:
+ if name and machine.name == name:
+ return machine
+ elif machine.data["device_role"]["name"] == "Router":
+ return machine
- if not name:
- for machine in self.devices + self.vms:
- if machine.data["device_role"]["name"] == "Router":
- target = machine
- break
- else:
- for machine in self.devices + self.vms:
- if machine.name == name:
- target = machine
- break
+ ret_msg = (
+ "The name '%s' wasn't found in this tenant, "
+ + "or can't found any Router in this tenant"
+ )
- return target.generate_netplan()
+ logger.error(ret_msg, name)
+ sys.exit(1)
- def generate_extra_config(self, name=""):
+ def get_devices(self, device_types=["server", "router"]):
"""
- Get the extra config of specific server belongs to this tenant,
- If the name wasn't specified, return the management device config by default
+ Get all devices (Router + Server) belong to this Tenant
"""
- target = None
+ if not device_types:
+ return self.devices + self.vms
- if not name:
- for machine in self.devices + self.vms:
- if machine.data["device_role"]["name"] == "Router":
- target = machine
- break
- else:
- for machine in self.devices + self.vms:
- if machine.name == name:
- target = machine
- break
+ ret = []
- return target.generate_extra_config()
+ for machine in self.devices:
+ if machine.data.device_role.slug in device_types:
+ ret.append(machine)
+
+ for vm in self.vms:
+ if vm.data.role.slug in device_types:
+ ret.append(vm)
+
+ return ret
@yaml.yaml_object(ydump)
@@ -568,7 +566,9 @@
self.netplan_config["ethernets"] = dict()
- if self.data.device_role.name == "Router":
+ if (isinstance(self, NBDevice) and self.data.device_role.name == "Router") or (
+ isinstance(self, NBVirtualMachine) and self.data.role.name == "Router"
+ ):
for address, interface in self.interfaces_by_ip.items():
if interface.mgmt_only is True or str(interface.type) == "Virtual":
continue
@@ -578,7 +578,7 @@
"addresses", []
).append(address)
- elif self.data.device_role.name == "Server":
+ elif isinstance(self, NBDevice) and self.data.device_role.name == "Server":
if primary_if:
self.netplan_config["ethernets"][primary_if.name] = {
"dhcp4": "yes",
@@ -604,6 +604,10 @@
if "vlans" not in self.netplan_config:
self.netplan_config["vlans"] = dict()
+ if not virtual_if.tagged_vlans:
+ # If a virtual interface doesn't have tagged VLAN, skip
+ continue
+
# vlan_object_id is the "id" on netbox, it's different from known VLAN ID
vlan_object_id = virtual_if.tagged_vlans[0].id
vlan_object = netboxapi.ipam.vlans.get(vlan_object_id)
@@ -637,45 +641,105 @@
return self.netplan_config
+ def generate_nftables(self):
+
+ ret = dict()
+
+ primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
+ external_if = self.interfaces_by_ip[primary_ip] if primary_ip else None
+ internal_if = None
+
+ if external_if is None:
+ logger.error("The primary interface wasn't set for device %s", self.name)
+ sys.exit(1)
+
+ for intf in filter(
+ lambda i: str(i.type) != "Virtual" and i.mgmt_only is False,
+ self.interfaces_by_ip.values(),
+ ):
+ if intf.id != external_if.id:
+ internal_if = intf
+ break
+
+ ret["external_if"] = external_if.name
+ ret["internal_if"] = internal_if.name
+
+ if self.services:
+ ret["services"] = list()
+
+ for service in self.services:
+ ret["services"].append(
+ {
+ "name": service.name,
+ "protocol": service.protocol.value,
+ "port": service.port,
+ }
+ )
+
+ # Only management server needs to be configured the whitelist netrange of internal interface
+ if self.data.device_role.name == "Router":
+ ret["allow_subnets"] = list()
+ ret["ue_routing"] = dict()
+ ret["ue_routing"]["ue_subnets"] = self.data.config_context["ue_subnets"]
+ for prefix in NBPrefix.all_prefixes().values():
+ if prefix.data.description:
+ ret["allow_subnets"].append(prefix.data.prefix)
+
+ if "fab" in prefix.data.description:
+ ret["ue_routing"].setdefault("src_subnets", [])
+ ret["ue_routing"]["src_subnets"].append(prefix.data.prefix)
+
+ if (
+ not ret["ue_routing"].get("snat_addr")
+ and "fab" in prefix.data.description
+ ):
+ for ip, device in prefix.aos.items():
+ if device.name == self.name:
+ ret["ue_routing"]["snat_addr"] = ip
+ break
+
+ return ret
+
def generate_extra_config(self):
+ """
+ Generate the extra configs which need in management server configuration
+ This function should only be called when the device role is "Router"
+ """
if self.extra_config:
return self.extra_config
primary_ip = self.data.primary_ip.address if self.data.primary_ip else None
- # If the object is mgmtserver, it needs to have DNS/NTP server configs
- if self.data["device_role"]["name"] == "Router":
- services = list(netboxapi.ipam.services.filter(device_id=self.id))
- service_names = list(map(lambda x: x.name, services))
+ service_names = list(map(lambda x: x.name, self.services))
- if "dns" in service_names:
- unbound_listen_ips = []
- unbound_allow_ips = []
+ if "dns" in service_names:
+ unbound_listen_ips = []
+ unbound_allow_ips = []
- for ip, intf in self.interfaces_by_ip.items():
- if ip != primary_ip and intf.mgmt_only == False:
- unbound_listen_ips.append(ip)
+ for ip, intf in self.interfaces_by_ip.items():
+ if ip != primary_ip and intf.mgmt_only == False:
+ unbound_listen_ips.append(ip)
- for prefix in NBPrefix.all_prefixes().values():
- if prefix.data.description:
- unbound_allow_ips.append(prefix.data.prefix)
+ for prefix in NBPrefix.all_prefixes().values():
+ if prefix.data.description:
+ unbound_allow_ips.append(prefix.data.prefix)
- if unbound_listen_ips:
- self.extra_config["unbound_listen_ips"] = unbound_listen_ips
+ if unbound_listen_ips:
+ self.extra_config["unbound_listen_ips"] = unbound_listen_ips
- if unbound_allow_ips:
- self.extra_config["unbound_allow_ips"] = unbound_allow_ips
+ if unbound_allow_ips:
+ self.extra_config["unbound_allow_ips"] = unbound_allow_ips
- if "ntp" in service_names:
- ntp_client_allow = []
+ if "ntp" in service_names:
+ ntp_client_allow = []
- for prefix in NBPrefix.all_prefixes().values():
- if prefix.data.description:
- ntp_client_allow.append(prefix.data.prefix)
+ for prefix in NBPrefix.all_prefixes().values():
+ if prefix.data.description:
+ ntp_client_allow.append(prefix.data.prefix)
- if ntp_client_allow:
- self.extra_config["ntp_client_allow"] = ntp_client_allow
+ if ntp_client_allow:
+ self.extra_config["ntp_client_allow"] = ntp_client_allow
return self.extra_config