| # SPDX-FileCopyrightText: 2020 The Magma Authors. |
| # SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| import base64 |
| |
| from cryptography import x509 |
| from cryptography.hazmat.backends import default_backend |
| from cryptography.hazmat.primitives import hashes, serialization |
| from cryptography.x509.oid import NameOID |
| from common.serialization_utils import write_to_file_atomically |
| |
| |
| def load_key_bytes(key_file): |
| """Load a private key encoded in PEM format |
| |
| Args: |
| key_file: path to the key file |
| |
| Returns: |
| Bytes |
| |
| Raises: |
| IOError: If file cannot be opened |
| ValueError: If the file content cannot be decoded successfully |
| TypeError: If the key_file is encrypted |
| """ |
| with open(key_file, 'rb') as f: |
| key_bytes = f.read() |
| |
| return key_bytes |
| |
| def load_key(key_file): |
| """Load a private key encoded in PEM format |
| |
| Args: |
| key_file: path to the key file |
| |
| Returns: |
| RSAPrivateKey or EllipticCurvePrivateKey depending on the contents of key_file |
| |
| Raises: |
| IOError: If file cannot be opened |
| ValueError: If the file content cannot be decoded successfully |
| TypeError: If the key_file is encrypted |
| """ |
| with open(key_file, 'rb') as f: |
| key_bytes = f.read() |
| return serialization.load_pem_private_key( |
| key_bytes, None, default_backend(), |
| ) |
| |
| |
| def write_key(key, key_file): |
| """Write key object to file in PEM format atomically |
| |
| Args: |
| key: RSAPrivateKey or EllipticCurvePrivateKey object |
| key_file: path to the key file |
| """ |
| key_pem = key.private_bytes( |
| serialization.Encoding.PEM, |
| serialization.PrivateFormat.TraditionalOpenSSL, |
| serialization.NoEncryption(), |
| ) |
| write_to_file_atomically(key_file, key_pem.decode("utf-8")) |
| |
| |
| def load_public_key_to_base64der(key_file): |
| """Load the public key of a private key and convert to base64 encoded DER |
| The return value can be used directly for device registration. |
| |
| Args: |
| key_file: path to the private key file, pem encoded |
| |
| Returns: |
| base64 encoded public key in DER format |
| |
| Raises: |
| IOError: If file cannot be opened |
| ValueError: If the file content cannot be decoded successfully |
| TypeError: If the key_file is encrypted |
| """ |
| |
| key = load_key(key_file) |
| pub_key = key.public_key() |
| pub_bytes = pub_key.public_bytes( |
| encoding=serialization.Encoding.DER, |
| format=serialization.PublicFormat.SubjectPublicKeyInfo, |
| ) |
| encoded = base64.b64encode(pub_bytes) |
| return encoded |
| |
| |
| def create_csr( |
| key, common_name, |
| country=None, state=None, city=None, org=None, |
| org_unit=None, email_address=None, |
| ): |
| """Create csr and sign it with key. |
| |
| Args: |
| key: RSAPrivateKey or EllipticCurvePrivateKey object |
| common_name: common name |
| country: country |
| state: state or province |
| city: city |
| org: organization |
| org_unit: organizational unit |
| email_address: email address |
| |
| Returns: |
| csr: x509.CertificateSigningRequest |
| """ |
| name_attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)] |
| if country: |
| name_attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country)) |
| if state: |
| name_attrs.append( |
| x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state), |
| ) |
| if city: |
| name_attrs.append(x509.NameAttribute(NameOID.LOCALITY_NAME, city)) |
| if org: |
| name_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, org)) |
| if org_unit: |
| name_attrs.append( |
| x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, org_unit), |
| ) |
| if email_address: |
| name_attrs.append( |
| x509.NameAttribute(NameOID.EMAIL_ADDRESS, email_address), |
| ) |
| |
| csr = x509.CertificateSigningRequestBuilder().subject_name( |
| x509.Name(name_attrs), |
| ).sign(key, hashes.SHA256(), default_backend()) |
| |
| return csr |
| |
| def load_cert_bytes(cert_file): |
| """Load certificate from a file |
| |
| Args: |
| cert_file: path to file storing the cert in PEM format |
| |
| Returns: |
| cert: an instance of x509.Certificate |
| |
| Raises: |
| IOError: If file cannot be opened |
| ValueError: If the file content cannot be decoded successfully |
| """ |
| with open(cert_file, 'rb') as f: |
| cert_pem = f.read() |
| |
| return cert_pem |
| |
| def load_cert(cert_file): |
| """Load certificate from a file |
| |
| Args: |
| cert_file: path to file storing the cert in PEM format |
| |
| Returns: |
| cert: an instance of x509.Certificate |
| |
| Raises: |
| IOError: If file cannot be opened |
| ValueError: If the file content cannot be decoded successfully |
| """ |
| with open(cert_file, 'rb') as f: |
| cert_pem = f.read() |
| cert = x509.load_pem_x509_certificate(cert_pem, default_backend()) |
| return cert |
| |
| |
| def write_cert(cert_der, cert_file): |
| """Write DER encoded cert to file in PEM format |
| |
| Args: |
| cert_der: certificate encoded in DER format |
| cert_file: path to certificate |
| """ |
| cert = x509.load_der_x509_certificate(cert_der, default_backend()) |
| cert_pem = cert.public_bytes(serialization.Encoding.PEM) |
| write_to_file_atomically(cert_file, cert_pem.decode("utf-8")) |