blob: 6563ff9999fc479b188dc60f21553bd7563b4743 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import base64
15import datetime
16import os
17from tempfile import TemporaryDirectory
18from unittest import TestCase
19
20import magma.common.cert_utils as cu
21from cryptography import x509
22from cryptography.hazmat.backends import default_backend
23from cryptography.hazmat.primitives import hashes, serialization
24from cryptography.hazmat.primitives.asymmetric import ec
25
26
27class CertUtilsTest(TestCase):
28 def test_key(self):
29 with TemporaryDirectory(prefix='/tmp/test_cert_utils') as temp_dir:
30 key = ec.generate_private_key(ec.SECP384R1(), default_backend())
31 cu.write_key(key, os.path.join(temp_dir, 'test.key'))
32 key_load = cu.load_key(os.path.join(temp_dir, 'test.key'))
33
34 key_bytes = key.private_bytes(
35 serialization.Encoding.PEM,
36 serialization.PrivateFormat.TraditionalOpenSSL,
37 serialization.NoEncryption(),
38 )
39 key_load_bytes = key_load.private_bytes(
40 serialization.Encoding.PEM,
41 serialization.PrivateFormat.TraditionalOpenSSL,
42 serialization.NoEncryption(),
43 )
44 self.assertEqual(key_bytes, key_load_bytes)
45
46 def load_public_key_to_base64der(self):
47 with TemporaryDirectory(prefix='/tmp/test_cert_utils') as temp_dir:
48 key = ec.generate_private_key(ec.SECP384R1(), default_backend())
49 cu.write_key(key, os.path.join(temp_dir, 'test.key'))
50 base64der = cu.load_public_key_to_base64der(
51 os.path.join(temp_dir, 'test.key'),
52 )
53 der = base64.b64decode(base64der)
54 pub_key = serialization.load_der_public_key(der, default_backend())
55 self.assertEqual(pub_key, key.public_key())
56
57 def test_csr(self):
58 key = ec.generate_private_key(ec.SECP384R1(), default_backend())
59 csr = cu.create_csr(
60 key, 'i am dummy test',
61 'US', 'CA', 'MPK', 'FB', 'magma', 'magma@fb.com',
62 )
63 self.assertTrue(csr.is_signature_valid)
64 public_key_bytes = key.public_key().public_bytes(
65 serialization.Encoding.OpenSSH,
66 serialization.PublicFormat.OpenSSH,
67 )
68 csr_public_key_bytes = csr.public_key().public_bytes(
69 serialization.Encoding.OpenSSH,
70 serialization.PublicFormat.OpenSSH,
71 )
72 self.assertEqual(public_key_bytes, csr_public_key_bytes)
73
74 def test_cert(self):
75 with TemporaryDirectory(prefix='/tmp/test_cert_utils') as temp_dir:
76 cert = _create_dummy_cert()
77 cert_file = os.path.join(temp_dir, 'test.cert')
78 cu.write_cert(
79 cert.public_bytes(
80 serialization.Encoding.DER,
81 ), cert_file,
82 )
83 cert_load = cu.load_cert(cert_file)
84 self.assertEqual(cert, cert_load)
85
86
87def _create_dummy_cert():
88 key = ec.generate_private_key(ec.SECP384R1(), default_backend())
89 subject = issuer = x509.Name([
90 x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, u"US"),
91 x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
92 x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, u"San Francisco"),
93 x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, u"My Company"),
94 x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"mysite.com"),
95 ])
96 cert = x509.CertificateBuilder().subject_name(
97 subject,
98 ).issuer_name(
99 issuer,
100 ).public_key(
101 key.public_key(),
102 ).serial_number(
103 x509.random_serial_number(),
104 ).not_valid_before(
105 datetime.datetime.utcnow(),
106 ).not_valid_after(
107 datetime.datetime.utcnow() + datetime.timedelta(days=10),
108 ).sign(key, hashes.SHA256(), default_backend())
109 return cert