blob: e19063f2e3e0f8f52453d530ed8667e11ea4183a [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import base64
15
16from cryptography import x509
17from cryptography.hazmat.backends import default_backend
18from cryptography.hazmat.primitives import hashes, serialization
19from cryptography.x509.oid import NameOID
20from common.serialization_utils import write_to_file_atomically
21
22
Wei-Yu Chen678f0a52021-12-21 13:50:52 +080023def load_key_bytes(key_file):
24 """Load a private key encoded in PEM format
25
26 Args:
27 key_file: path to the key file
28
29 Returns:
30 Bytes
31
32 Raises:
33 IOError: If file cannot be opened
34 ValueError: If the file content cannot be decoded successfully
35 TypeError: If the key_file is encrypted
36 """
37 with open(key_file, 'rb') as f:
38 key_bytes = f.read()
39
40 return key_bytes
41
Wei-Yu Chen49950b92021-11-08 19:19:18 +080042def load_key(key_file):
43 """Load a private key encoded in PEM format
44
45 Args:
46 key_file: path to the key file
47
48 Returns:
49 RSAPrivateKey or EllipticCurvePrivateKey depending on the contents of key_file
50
51 Raises:
52 IOError: If file cannot be opened
53 ValueError: If the file content cannot be decoded successfully
54 TypeError: If the key_file is encrypted
55 """
56 with open(key_file, 'rb') as f:
57 key_bytes = f.read()
58 return serialization.load_pem_private_key(
59 key_bytes, None, default_backend(),
60 )
61
62
63def write_key(key, key_file):
64 """Write key object to file in PEM format atomically
65
66 Args:
67 key: RSAPrivateKey or EllipticCurvePrivateKey object
68 key_file: path to the key file
69 """
70 key_pem = key.private_bytes(
71 serialization.Encoding.PEM,
72 serialization.PrivateFormat.TraditionalOpenSSL,
73 serialization.NoEncryption(),
74 )
75 write_to_file_atomically(key_file, key_pem.decode("utf-8"))
76
77
78def load_public_key_to_base64der(key_file):
79 """Load the public key of a private key and convert to base64 encoded DER
80 The return value can be used directly for device registration.
81
82 Args:
83 key_file: path to the private key file, pem encoded
84
85 Returns:
86 base64 encoded public key in DER format
87
88 Raises:
89 IOError: If file cannot be opened
90 ValueError: If the file content cannot be decoded successfully
91 TypeError: If the key_file is encrypted
92 """
93
94 key = load_key(key_file)
95 pub_key = key.public_key()
96 pub_bytes = pub_key.public_bytes(
97 encoding=serialization.Encoding.DER,
98 format=serialization.PublicFormat.SubjectPublicKeyInfo,
99 )
100 encoded = base64.b64encode(pub_bytes)
101 return encoded
102
103
104def create_csr(
105 key, common_name,
106 country=None, state=None, city=None, org=None,
107 org_unit=None, email_address=None,
108):
109 """Create csr and sign it with key.
110
111 Args:
112 key: RSAPrivateKey or EllipticCurvePrivateKey object
113 common_name: common name
114 country: country
115 state: state or province
116 city: city
117 org: organization
118 org_unit: organizational unit
119 email_address: email address
120
121 Returns:
122 csr: x509.CertificateSigningRequest
123 """
124 name_attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)]
125 if country:
126 name_attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country))
127 if state:
128 name_attrs.append(
129 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state),
130 )
131 if city:
132 name_attrs.append(x509.NameAttribute(NameOID.LOCALITY_NAME, city))
133 if org:
134 name_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, org))
135 if org_unit:
136 name_attrs.append(
137 x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, org_unit),
138 )
139 if email_address:
140 name_attrs.append(
141 x509.NameAttribute(NameOID.EMAIL_ADDRESS, email_address),
142 )
143
144 csr = x509.CertificateSigningRequestBuilder().subject_name(
145 x509.Name(name_attrs),
146 ).sign(key, hashes.SHA256(), default_backend())
147
148 return csr
149
Wei-Yu Chen678f0a52021-12-21 13:50:52 +0800150def load_cert_bytes(cert_file):
151 """Load certificate from a file
152
153 Args:
154 cert_file: path to file storing the cert in PEM format
155
156 Returns:
157 cert: an instance of x509.Certificate
158
159 Raises:
160 IOError: If file cannot be opened
161 ValueError: If the file content cannot be decoded successfully
162 """
163 with open(cert_file, 'rb') as f:
164 cert_pem = f.read()
165
166 return cert_pem
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800167
168def load_cert(cert_file):
169 """Load certificate from a file
170
171 Args:
172 cert_file: path to file storing the cert in PEM format
173
174 Returns:
175 cert: an instance of x509.Certificate
176
177 Raises:
178 IOError: If file cannot be opened
179 ValueError: If the file content cannot be decoded successfully
180 """
181 with open(cert_file, 'rb') as f:
182 cert_pem = f.read()
183 cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
184 return cert
185
186
187def write_cert(cert_der, cert_file):
188 """Write DER encoded cert to file in PEM format
189
190 Args:
191 cert_der: certificate encoded in DER format
192 cert_file: path to certificate
193 """
194 cert = x509.load_der_x509_certificate(cert_der, default_backend())
195 cert_pem = cert.public_bytes(serialization.Encoding.PEM)
196 write_to_file_atomically(cert_file, cert_pem.decode("utf-8"))