blob: 8ce67dc7f0271609755631212d1d8cbf88078ea3 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import asyncio
15import errno
16import ssl
17from unittest import TestCase
18from unittest.mock import MagicMock, patch
19
20import magma.common.cert_validity as cv
21
22
23# https://stackoverflow.com/questions/32480108/mocking-async-call-in-python-3-5
24def AsyncMock():
25 coro = MagicMock(name="CoroutineResult")
26 corofunc = MagicMock(
27 name="CoroutineFunction",
28 side_effect=asyncio.coroutine(coro),
29 )
30 corofunc.coro = coro
31 return corofunc
32
33
34class CertValidityTests(TestCase):
35 def setUp(self):
36 self.host = 'localhost'
37 self.port = 8080
38 self.certfile = 'certfile'
39 self.keyfile = 'keyfile'
40
41 asyncio.set_event_loop(None)
42 self.loop = asyncio.new_event_loop()
43
44 def test_tcp_connection(self):
45 """
46 Test that loop.create_connection called with the correct TCP args.
47 """
48 self.loop.create_connection = MagicMock()
49
50 @asyncio.coroutine
51 def go():
52 yield from cv.create_tcp_connection(
53 self.host,
54 self.port,
55 self.loop,
56 )
57 self.loop.run_until_complete(go())
58
59 self.loop.create_connection.assert_called_once_with(
60 cv.TCPClientProtocol,
61 self.host,
62 self.port,
63 )
64
65 @patch('magma.common.cert_validity.ssl.SSLContext')
66 def test_ssl_connection(self, mock_ssl):
67 """
68 Test that ssl.SSLContext and loop.create_connection are called with the
69 correct SSL args.
70 """
71 self.loop.create_connection = MagicMock()
72
73 @asyncio.coroutine
74 def go():
75 yield from cv.create_ssl_connection(
76 self.host,
77 self.port,
78 self.certfile,
79 self.keyfile,
80 self.loop,
81 )
82 self.loop.run_until_complete(go())
83
84 mock_context = mock_ssl.return_value
85
86 mock_ssl.assert_called_once_with(ssl.PROTOCOL_SSLv23)
87 mock_context.load_cert_chain.assert_called_once_with(
88 self.certfile,
89 keyfile=self.keyfile,
90 )
91
92 self.loop.create_connection.assert_called_once_with(
93 cv.TCPClientProtocol,
94 self.host,
95 self.port,
96 ssl=mock_context,
97 )
98
99 @patch(
100 'magma.common.cert_validity.create_ssl_connection',
101 new_callable=AsyncMock,
102 )
103 @patch(
104 'magma.common.cert_validity.create_tcp_connection',
105 new_callable=AsyncMock,
106 )
107 def test_cert_is_invalid_both_ok(self, mock_create_tcp, mock_create_ssl):
108 """
109 Test the appropriate calls and return value for cert_is_invalid()
110 cert_is_invalid() == False when TCP and SSL succeed
111 """
112
113 @asyncio.coroutine
114 def go():
115 return (
116 yield from cv.cert_is_invalid(
117 self.host,
118 self.port,
119 self.certfile,
120 self.keyfile,
121 self.loop,
122 )
123 )
124 ret_val = self.loop.run_until_complete(go())
125
126 mock_create_tcp.assert_called_once_with(
127 self.host,
128 self.port,
129 self.loop,
130 )
131 mock_create_ssl.assert_called_once_with(
132 self.host,
133 self.port,
134 self.certfile,
135 self.keyfile,
136 self.loop,
137 )
138 self.assertEqual(ret_val, False)
139
140 @patch(
141 'magma.common.cert_validity.create_ssl_connection',
142 new_callable=AsyncMock,
143 )
144 @patch('magma.common.cert_validity.create_tcp_connection', AsyncMock())
145 def test_cert_is_invalid_ssl_fail(self, mock_create_ssl):
146 """
147 Test cert_is_invalid() == True when TCP succeeds and SSL fails
148 """
149
150 mock_err = TimeoutError()
151 mock_err.errno = errno.ETIMEDOUT
152 mock_create_ssl.coro.side_effect = mock_err
153
154 @asyncio.coroutine
155 def go():
156 return (
157 yield from cv.cert_is_invalid(
158 self.host,
159 self.port,
160 self.certfile,
161 self.keyfile,
162 self.loop,
163 )
164 )
165 ret_val = self.loop.run_until_complete(go())
166 self.assertEqual(ret_val, True)
167
168 @patch(
169 'magma.common.cert_validity.create_ssl_connection',
170 new_callable=AsyncMock,
171 )
172 @patch('magma.common.cert_validity.create_tcp_connection', AsyncMock())
173 def test_cert_is_invalid_ssl_fail_none_errno(self, mock_create_ssl):
174 """
175 Test cert_is_invalid() == True when TCP succeeds and SSL fails w/o error number
176 """
177
178 mock_err = TimeoutError()
179 mock_err.errno = None
180 mock_create_ssl.coro.side_effect = mock_err
181
182 @asyncio.coroutine
183 def go():
184 return (
185 yield from cv.cert_is_invalid(
186 self.host,
187 self.port,
188 self.certfile,
189 self.keyfile,
190 self.loop,
191 )
192 )
193 ret_val = self.loop.run_until_complete(go())
194 self.assertEqual(ret_val, True)
195
196 @patch('magma.common.cert_validity.create_ssl_connection', AsyncMock())
197 @patch(
198 'magma.common.cert_validity.create_tcp_connection',
199 new_callable=AsyncMock,
200 )
201 def test_cert_is_invalid_tcp_fail_none_errno(self, mock_create_tcp):
202 """
203 Test cert_is_invalid() == False when TCP fails w/o errno and SSL succeeds
204 """
205
206 mock_err = TimeoutError()
207 mock_err.errno = None
208 mock_create_tcp.coro.side_effect = mock_err
209
210 @asyncio.coroutine
211 def go():
212 return (
213 yield from cv.cert_is_invalid(
214 self.host,
215 self.port,
216 self.certfile,
217 self.keyfile,
218 self.loop,
219 )
220 )
221 ret_val = self.loop.run_until_complete(go())
222 self.assertEqual(ret_val, False)
223
224 @patch('magma.common.cert_validity.create_ssl_connection', AsyncMock())
225 @patch(
226 'magma.common.cert_validity.create_tcp_connection',
227 new_callable=AsyncMock,
228 )
229 def test_cert_is_invalid_tcp_fail(self, mock_create_tcp):
230 """
231 Test cert_is_invalid() == False when TCP fails and SSL succeeds
232 """
233
234 mock_err = TimeoutError()
235 mock_err.errno = errno.ETIMEDOUT
236 mock_create_tcp.coro.side_effect = mock_err
237
238 @asyncio.coroutine
239 def go():
240 return (
241 yield from cv.cert_is_invalid(
242 self.host,
243 self.port,
244 self.certfile,
245 self.keyfile,
246 self.loop,
247 )
248 )
249 ret_val = self.loop.run_until_complete(go())
250 self.assertEqual(ret_val, False)
251
252 @patch(
253 'magma.common.cert_validity.create_ssl_connection',
254 new_callable=AsyncMock,
255 )
256 @patch(
257 'magma.common.cert_validity.create_tcp_connection',
258 new_callable=AsyncMock,
259 )
260 def test_cert_is_invalid_both_fail(self, mock_create_tcp, mock_create_ssl):
261 """
262 Test cert_is_invalid() == False when TCP and SSL fail
263 """
264
265 mock_tcp_err = TimeoutError()
266 mock_tcp_err.errno = errno.ETIMEDOUT
267 mock_create_tcp.coro.side_effect = mock_tcp_err
268
269 mock_ssl_err = TimeoutError()
270 mock_ssl_err.errno = errno.ETIMEDOUT
271 mock_create_ssl.coro.side_effect = mock_ssl_err
272
273 @asyncio.coroutine
274 def go():
275 return (
276 yield from cv.cert_is_invalid(
277 self.host,
278 self.port,
279 self.certfile,
280 self.keyfile,
281 self.loop,
282 )
283 )
284 ret_val = self.loop.run_until_complete(go())
285 self.assertEqual(ret_val, False)