| from core.admin import ReadOnlyAwareAdmin, SliceInline, TenantPrivilegeInline |
| from core.middleware import get_request |
| from core.models import TenantPrivilege, User |
| from django import forms |
| from django.contrib import admin |
| from django.core import serializers |
| from services.vpn.models import VPN_KIND, VPNService, VPNTenant |
| from subprocess import Popen, PIPE |
| from xos.exceptions import XOSValidationError |
| |
| |
| class VPNServiceForm(forms.ModelForm): |
| |
| exposed_ports = forms.CharField(required=True) |
| |
| def save(self, commit=True): |
| self.instance.exposed_ports = self.cleaned_data['exposed_ports'] |
| return super(VPNServiceForm, self).save(commit=commit) |
| |
| def clean_exposed_ports(self): |
| exposed_ports = self.cleaned_data['exposed_ports'] |
| port_mapping = {"udp": [], "tcp": []} |
| parts = exposed_ports.split(",") |
| for part in parts: |
| part = part.strip() |
| if "/" in part: |
| (protocol, ports) = part.split("/", 1) |
| elif " " in part: |
| (protocol, ports) = part.split(None, 1) |
| else: |
| raise XOSValidationError('malformed port specifier %s, format example: "tcp 123, tcp 201:206, udp 333"' % part) |
| |
| protocol = protocol.strip() |
| ports = ports.strip() |
| |
| if not (protocol in ["udp", "tcp"]): |
| raise XOSValidationError('unknown protocol %s' % protocol) |
| |
| if "-" in ports: |
| port_mapping[protocol].extend(self.parse_port_range(ports, "-")) |
| elif ":" in ports: |
| port_mapping[protocol].extend(self.parse_port_range(ports, ":")) |
| else: |
| port_mapping[protocol].append(int(ports)) |
| |
| return port_mapping |
| |
| def parse_port_range(self, port_str, split_str): |
| (first, last) = port_str.split(split_str) |
| first = int(first.strip()) |
| last = int(last.strip()) |
| return list(range(first, last)) |
| |
| class Meta: |
| model = VPNService |
| |
| |
| class VPNServiceAdmin(ReadOnlyAwareAdmin): |
| """Defines the admin for the VPNService.""" |
| model = VPNService |
| form = VPNServiceForm |
| verbose_name = "VPN Service" |
| |
| list_display = ("backend_status_icon", "name", "enabled") |
| |
| list_display_links = ('backend_status_icon', 'name', ) |
| |
| fieldsets = [(None, {'fields': ['backend_status_text', 'name', 'enabled', |
| 'versionNumber', 'description', "view_url", 'exposed_ports'], |
| 'classes':['suit-tab suit-tab-general']})] |
| |
| readonly_fields = ('backend_status_text', ) |
| |
| inlines = [SliceInline] |
| |
| extracontext_registered_admins = True |
| |
| user_readonly_fields = ["name", "enabled", "versionNumber", "description"] |
| |
| suit_form_tabs = (('general', 'VPN Service Details'), |
| ('administration', 'Tenants'), |
| ('slices', 'Slices'),) |
| |
| suit_form_includes = (('vpnserviceadmin.html', |
| 'top', |
| 'administration'),) |
| |
| def queryset(self, request): |
| return VPNService.get_service_objects_by_user(request.user) |
| |
| |
| class VPNTenantForm(forms.ModelForm): |
| """The form used to create and edit a VPNTenant. |
| |
| Attributes: |
| creator (forms.ModelChoiceField): The XOS user that created this tenant. |
| client_conf (forms.CharField): The readonly configuration used on the client to connect to this Tenant. |
| server_address (forms.GenericIPAddressField): The ip address on the VPN of this Tenant. |
| client_address (forms.GenericIPAddressField): The ip address on the VPN of the client. |
| is_persistent (forms.BooleanField): Determines if this Tenant keeps this connection alive through failures. |
| """ |
| creator = forms.ModelChoiceField(queryset=User.objects.all()) |
| server_network = forms.GenericIPAddressField( |
| protocol="IPv4", required=True) |
| vpn_subnet = forms.GenericIPAddressField(protocol="IPv4", required=True) |
| is_persistent = forms.BooleanField(required=False) |
| clients_can_see_each_other = forms.BooleanField(required=False) |
| failover_servers = forms.ModelMultipleChoiceField(queryset=VPNTenant.get_tenant_objects(), required=False) |
| protocol = forms.ChoiceField(required=True, choices=[("udp", "udp"), ("tcp", "tcp")]) |
| |
| def __init__(self, *args, **kwargs): |
| super(VPNTenantForm, self).__init__(*args, **kwargs) |
| self.fields['kind'].widget.attrs['readonly'] = True |
| # self.fields['script_name'].widget.attrs['readonly'] = True |
| self.fields[ |
| 'provider_service'].queryset = VPNService.get_service_objects().all() |
| |
| self.fields['kind'].initial = VPN_KIND |
| |
| if self.instance: |
| self.fields['creator'].initial = self.instance.creator |
| self.fields['vpn_subnet'].initial = self.instance.vpn_subnet |
| self.fields[ |
| 'server_network'].initial = self.instance.server_network |
| self.fields[ |
| 'clients_can_see_each_other'].initial = self.instance.clients_can_see_each_other |
| self.fields['is_persistent'].initial = self.instance.is_persistent |
| self.fields['protocol'].initial = self.instance.protocol |
| if (self.instance.failover_servers): |
| self.initial['failover_servers'] = [model.pk for model in list(serializers.deserialize('json', self.instance.failover_servers))] |
| |
| if (not self.instance) or (not self.instance.pk): |
| self.fields['creator'].initial = get_request().user |
| self.fields['vpn_subnet'].initial = "255.255.255.0" |
| self.fields['server_network'].initial = "10.66.77.0" |
| self.fields['clients_can_see_each_other'].initial = True |
| self.fields['is_persistent'].initial = True |
| if VPNService.get_service_objects().exists(): |
| self.fields["provider_service"].initial = VPNService.get_service_objects().all()[ |
| 0] |
| |
| def save(self, commit=True): |
| self.instance.creator = self.cleaned_data.get("creator") |
| self.instance.is_persistent = self.cleaned_data.get('is_persistent') |
| self.instance.vpn_subnet = self.cleaned_data.get("vpn_subnet") |
| self.instance.server_network = self.cleaned_data.get('server_network') |
| self.instance.clients_can_see_each_other = self.cleaned_data.get( |
| 'clients_can_see_each_other') |
| self.instance.failover_servers = serializers.serialize("json", self.cleaned_data.get('failover_servers')) |
| |
| self.instance.port_number = self.instance.provider_service.get_next_available_port(self.instance.protocol) |
| self.instance.protocol = self.cleaned_data.get("protocol") |
| |
| if (not self.instance.ca_crt): |
| self.instance.ca_crt = self.generate_ca_crt() |
| |
| return super(VPNTenantForm, self).save(commit=commit) |
| |
| def generate_ca_crt(self): |
| """str: Generates the ca cert by reading from the ca file""" |
| with open("/opt/openvpn/easyrsa3/pki/ca.crt") as crt: |
| return crt.readlines() |
| |
| class Meta: |
| model = VPNTenant |
| |
| |
| class VPNTenantAdmin(ReadOnlyAwareAdmin): |
| verbose_name = "VPN Tenant Admin" |
| list_display = ('id', 'backend_status_icon', 'instance', |
| 'server_network', 'vpn_subnet') |
| list_display_links = ('id', 'backend_status_icon', |
| 'instance', 'server_network', 'vpn_subnet') |
| fieldsets = [(None, {'fields': ['backend_status_text', 'kind', |
| 'provider_service', 'instance', 'creator', |
| 'server_network', 'vpn_subnet', 'is_persistent', |
| 'clients_can_see_each_other', 'failover_servers', "protocol"], |
| 'classes': ['suit-tab suit-tab-general']})] |
| readonly_fields = ('backend_status_text', 'instance') |
| form = VPNTenantForm |
| inlines = [TenantPrivilegeInline] |
| |
| suit_form_tabs = (('general', 'Details'), ('tenantprivileges', 'Privileges')) |
| |
| def queryset(self, request): |
| return VPNTenant.get_tenant_objects_by_user(request.user) |
| |
| def certificate_name(self, tenant_privilege): |
| return str(tenant_privilege.user.email) + "-" + str(tenant_privilege.tenant.id) |
| |
| def save_formset(self, request, form, formset, change): |
| super(VPNTenantAdmin, self).save_formset(request, form, formset, change) |
| for obj in formset.deleted_objects: |
| # If anything deleated was a TenantPrivilege then revoke the certificate |
| if type(obj) is TenantPrivilege: |
| certificate = self.certificate_name(obj) |
| Popen("/opt/openvpn/easyrsa3/easyrsa --batch revoke " + certificate, shell=True, stdout=PIPE).communicate() |
| # TODO(jermowery): determine if this is necessary. |
| # if type(obj) is VPNTenant: |
| # if the tenant was deleted revoke all certs assoicated |
| # pass |
| |
| for obj in formset.new_objects: |
| # If there were any new TenantPrivlege objects then create certs |
| if type(obj) is TenantPrivilege: |
| certificate = self.certificate_name(obj) |
| Popen("/opt/openvpn/easyrsa3/easyrsa --batch build-client-full " + certificate + " nopass", shell=True, stdout=PIPE).communicate() |
| |
| |
| # Associate the admin forms with the models. |
| admin.site.register(VPNService, VPNServiceAdmin) |
| admin.site.register(VPNTenant, VPNTenantAdmin) |