blob: 1d18569f78b1c6eda3416e97eff1289ac071fa2d [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import asyncio
7import errno
8import ssl
9from unittest import TestCase
10from unittest.mock import MagicMock, patch
11
12import magma.common.cert_validity as cv
13
14
15# https://stackoverflow.com/questions/32480108/mocking-async-call-in-python-3-5
16def AsyncMock():
17 coro = MagicMock(name="CoroutineResult")
18 corofunc = MagicMock(
19 name="CoroutineFunction",
20 side_effect=asyncio.coroutine(coro),
21 )
22 corofunc.coro = coro
23 return corofunc
24
25
26class CertValidityTests(TestCase):
27 def setUp(self):
28 self.host = 'localhost'
29 self.port = 8080
30 self.certfile = 'certfile'
31 self.keyfile = 'keyfile'
32
33 asyncio.set_event_loop(None)
34 self.loop = asyncio.new_event_loop()
35
36 def test_tcp_connection(self):
37 """
38 Test that loop.create_connection called with the correct TCP args.
39 """
40 self.loop.create_connection = MagicMock()
41
42 @asyncio.coroutine
43 def go():
44 yield from cv.create_tcp_connection(
45 self.host,
46 self.port,
47 self.loop,
48 )
49 self.loop.run_until_complete(go())
50
51 self.loop.create_connection.assert_called_once_with(
52 cv.TCPClientProtocol,
53 self.host,
54 self.port,
55 )
56
57 @patch('magma.common.cert_validity.ssl.SSLContext')
58 def test_ssl_connection(self, mock_ssl):
59 """
60 Test that ssl.SSLContext and loop.create_connection are called with the
61 correct SSL args.
62 """
63 self.loop.create_connection = MagicMock()
64
65 @asyncio.coroutine
66 def go():
67 yield from cv.create_ssl_connection(
68 self.host,
69 self.port,
70 self.certfile,
71 self.keyfile,
72 self.loop,
73 )
74 self.loop.run_until_complete(go())
75
76 mock_context = mock_ssl.return_value
77
78 mock_ssl.assert_called_once_with(ssl.PROTOCOL_SSLv23)
79 mock_context.load_cert_chain.assert_called_once_with(
80 self.certfile,
81 keyfile=self.keyfile,
82 )
83
84 self.loop.create_connection.assert_called_once_with(
85 cv.TCPClientProtocol,
86 self.host,
87 self.port,
88 ssl=mock_context,
89 )
90
91 @patch(
92 'magma.common.cert_validity.create_ssl_connection',
93 new_callable=AsyncMock,
94 )
95 @patch(
96 'magma.common.cert_validity.create_tcp_connection',
97 new_callable=AsyncMock,
98 )
99 def test_cert_is_invalid_both_ok(self, mock_create_tcp, mock_create_ssl):
100 """
101 Test the appropriate calls and return value for cert_is_invalid()
102 cert_is_invalid() == False when TCP and SSL succeed
103 """
104
105 @asyncio.coroutine
106 def go():
107 return (
108 yield from cv.cert_is_invalid(
109 self.host,
110 self.port,
111 self.certfile,
112 self.keyfile,
113 self.loop,
114 )
115 )
116 ret_val = self.loop.run_until_complete(go())
117
118 mock_create_tcp.assert_called_once_with(
119 self.host,
120 self.port,
121 self.loop,
122 )
123 mock_create_ssl.assert_called_once_with(
124 self.host,
125 self.port,
126 self.certfile,
127 self.keyfile,
128 self.loop,
129 )
130 self.assertEqual(ret_val, False)
131
132 @patch(
133 'magma.common.cert_validity.create_ssl_connection',
134 new_callable=AsyncMock,
135 )
136 @patch('magma.common.cert_validity.create_tcp_connection', AsyncMock())
137 def test_cert_is_invalid_ssl_fail(self, mock_create_ssl):
138 """
139 Test cert_is_invalid() == True when TCP succeeds and SSL fails
140 """
141
142 mock_err = TimeoutError()
143 mock_err.errno = errno.ETIMEDOUT
144 mock_create_ssl.coro.side_effect = mock_err
145
146 @asyncio.coroutine
147 def go():
148 return (
149 yield from cv.cert_is_invalid(
150 self.host,
151 self.port,
152 self.certfile,
153 self.keyfile,
154 self.loop,
155 )
156 )
157 ret_val = self.loop.run_until_complete(go())
158 self.assertEqual(ret_val, True)
159
160 @patch(
161 'magma.common.cert_validity.create_ssl_connection',
162 new_callable=AsyncMock,
163 )
164 @patch('magma.common.cert_validity.create_tcp_connection', AsyncMock())
165 def test_cert_is_invalid_ssl_fail_none_errno(self, mock_create_ssl):
166 """
167 Test cert_is_invalid() == True when TCP succeeds and SSL fails w/o error number
168 """
169
170 mock_err = TimeoutError()
171 mock_err.errno = None
172 mock_create_ssl.coro.side_effect = mock_err
173
174 @asyncio.coroutine
175 def go():
176 return (
177 yield from cv.cert_is_invalid(
178 self.host,
179 self.port,
180 self.certfile,
181 self.keyfile,
182 self.loop,
183 )
184 )
185 ret_val = self.loop.run_until_complete(go())
186 self.assertEqual(ret_val, True)
187
188 @patch('magma.common.cert_validity.create_ssl_connection', AsyncMock())
189 @patch(
190 'magma.common.cert_validity.create_tcp_connection',
191 new_callable=AsyncMock,
192 )
193 def test_cert_is_invalid_tcp_fail_none_errno(self, mock_create_tcp):
194 """
195 Test cert_is_invalid() == False when TCP fails w/o errno and SSL succeeds
196 """
197
198 mock_err = TimeoutError()
199 mock_err.errno = None
200 mock_create_tcp.coro.side_effect = mock_err
201
202 @asyncio.coroutine
203 def go():
204 return (
205 yield from cv.cert_is_invalid(
206 self.host,
207 self.port,
208 self.certfile,
209 self.keyfile,
210 self.loop,
211 )
212 )
213 ret_val = self.loop.run_until_complete(go())
214 self.assertEqual(ret_val, False)
215
216 @patch('magma.common.cert_validity.create_ssl_connection', AsyncMock())
217 @patch(
218 'magma.common.cert_validity.create_tcp_connection',
219 new_callable=AsyncMock,
220 )
221 def test_cert_is_invalid_tcp_fail(self, mock_create_tcp):
222 """
223 Test cert_is_invalid() == False when TCP fails and SSL succeeds
224 """
225
226 mock_err = TimeoutError()
227 mock_err.errno = errno.ETIMEDOUT
228 mock_create_tcp.coro.side_effect = mock_err
229
230 @asyncio.coroutine
231 def go():
232 return (
233 yield from cv.cert_is_invalid(
234 self.host,
235 self.port,
236 self.certfile,
237 self.keyfile,
238 self.loop,
239 )
240 )
241 ret_val = self.loop.run_until_complete(go())
242 self.assertEqual(ret_val, False)
243
244 @patch(
245 'magma.common.cert_validity.create_ssl_connection',
246 new_callable=AsyncMock,
247 )
248 @patch(
249 'magma.common.cert_validity.create_tcp_connection',
250 new_callable=AsyncMock,
251 )
252 def test_cert_is_invalid_both_fail(self, mock_create_tcp, mock_create_ssl):
253 """
254 Test cert_is_invalid() == False when TCP and SSL fail
255 """
256
257 mock_tcp_err = TimeoutError()
258 mock_tcp_err.errno = errno.ETIMEDOUT
259 mock_create_tcp.coro.side_effect = mock_tcp_err
260
261 mock_ssl_err = TimeoutError()
262 mock_ssl_err.errno = errno.ETIMEDOUT
263 mock_create_ssl.coro.side_effect = mock_ssl_err
264
265 @asyncio.coroutine
266 def go():
267 return (
268 yield from cv.cert_is_invalid(
269 self.host,
270 self.port,
271 self.certfile,
272 self.keyfile,
273 self.loop,
274 )
275 )
276 ret_val = self.loop.run_until_complete(go())
277 self.assertEqual(ret_val, False)