blob: 649fcb6d39c7962f560ef28a304a39a4a4c59aae [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13from datetime import datetime, timedelta, timezone
14from unittest import TestCase, mock
15from unittest.mock import Mock, patch
16
17import lxml.etree as ET
18from tests.test_utils.enb_acs_builder import (
19 EnodebAcsStateMachineBuilder,
20)
21from tr069 import models
22from tr069.rpc_methods import AutoConfigServer
23from tr069.spyne_mods import Tr069Application, Tr069Soap11
24from spyne import MethodContext
25from spyne.server import ServerBase
26
27
28class Tr069Test(TestCase):
29 """ Tests for the TR-069 server """
30 acs_to_cpe_queue = None
31 cpe_to_acs_queue = None
32
33 def setUp(self):
34 # Set up the ACS
35 self.enb_acs_manager = EnodebAcsStateMachineBuilder.build_acs_manager()
36 self.handler = EnodebAcsStateMachineBuilder.build_acs_state_machine()
37 AutoConfigServer.set_state_machine_manager(self.enb_acs_manager)
38
39 def side_effect(*args, **_kwargs):
40 msg = args[1]
41 return msg
42
43 self.p = patch.object(
44 AutoConfigServer, '_handle_tr069_message',
45 Mock(side_effect=side_effect),
46 )
47 self.p.start()
48
49 self.app = Tr069Application(
50 [AutoConfigServer],
51 models.CWMP_NS,
52 in_protocol=Tr069Soap11(validator='soft'),
53 out_protocol=Tr069Soap11(),
54 )
55
56 def tearDown(self):
57 self.p.stop()
58 self.handler = None
59
60 def _get_mconfig(self):
61 return {
62 "@type": "type.googleapis.com/magma.mconfig.EnodebD",
63 "bandwidthMhz": 20,
64 "specialSubframePattern": 7,
65 "earfcndl": 44490,
66 "logLevel": "INFO",
67 "plmnidList": "00101",
68 "pci": 260,
69 "allowEnodebTransmit": False,
70 "subframeAssignment": 2,
71 "tac": 1,
72 },
73
74 def _get_service_config(self):
75 return {
76 "tr069": {
77 "interface": "eth1",
78 "port": 48080,
79 "perf_mgmt_port": 8081,
80 "public_ip": "192.88.99.142",
81 },
82 "reboot_enodeb_on_mme_disconnected": True,
83 "s1_interface": "eth1",
84 }
85
86 def test_acs_manager_exception(self):
87 """
88 Test that an unexpected exception from the ACS SM manager will result
89 in an empty response.
90 """
91 self.enb_acs_manager.handle_tr069_message = mock.MagicMock(
92 side_effect=Exception('mock exception'),
93 )
94 # stop the patcher because we want to use the above MagicMock
95 self.p.stop()
96 server = ServerBase(self.app)
97
98 ctx = MethodContext(server, MethodContext.SERVER)
99 ctx.in_string = [b'']
100 ctx, = server.generate_contexts(ctx)
101
102 server.get_in_object(ctx)
103 self.assertIsNone(ctx.in_error)
104
105 server.get_out_object(ctx)
106 self.assertIsNone(ctx.out_error)
107
108 server.get_out_string(ctx)
109 self.assertEqual(b''.join(ctx.out_string), b'')
110
111 # start the patcher otherwise the p.stop() in tearDown will complain
112 self.p.start()
113
114 def test_parse_inform(self):
115 """
116 Test that example Inform RPC call can be parsed correctly
117 """
118 # Example TR-069 CPE->ACS RPC call. Copied from:
119 # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html
120 cpe_string = b'''
121 <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
122 <soapenv:Header>
123 <cwmp:ID soapenv:mustUnderstand="1">0_THOM_TR69_ID</cwmp:ID>
124 </soapenv:Header>
125 <soapenv:Body>
126 <cwmp:Inform>
127 <DeviceId>
128 <Manufacturer>THOMSON</Manufacturer>
129 <OUI>00147F</OUI>
130 <ProductClass>SpeedTouch 780</ProductClass>
131 <SerialNumber>CP0611JTLNW</SerialNumber>
132 </DeviceId>
133 <Event soap:arrayType="cwmp:EventStruct[04]">
134 <EventStruct>
135 <EventCode>0 BOOTSTRAP</EventCode>
136 <CommandKey></CommandKey>
137 </EventStruct>
138 <EventStruct>
139 <EventCode>1 BOOT</EventCode>
140 <CommandKey></CommandKey>
141 </EventStruct>
142 <EventStruct>
143 <EventCode>2 PERIODIC</EventCode>
144 <CommandKey></CommandKey>
145 </EventStruct>
146 <EventStruct>
147 <EventCode>4 VALUE CHANGE</EventCode>
148 <CommandKey></CommandKey>
149 </EventStruct>
150 </Event>
151 <MaxEnvelopes>2</MaxEnvelopes>
152 <CurrentTime>1970-01-01T00:01:09Z</CurrentTime>
153 <RetryCount>05</RetryCount>
154 <ParameterList soap:arrayType="cwmp:ParameterValueStruct[12]">
155 <ParameterValueStruct>
156 <Name>InternetGatewayDevice.DeviceSummary</Name>
157 <Value xsi:type="xsd:string">
158 InternetGatewayDevice:1.1[] (Baseline:1, EthernetLAN:1, ADSLWAN:1, Bridging:1, Time:1, WiFiLAN:1)</Value>
159 </ParameterValueStruct>
160 <ParameterValueStruct>
161 <Name>InternetGatewayDevice.DeviceInfo.SpecVersion</Name>
162 <Value xsi:type="xsd:string">1.1</Value>
163 </ParameterValueStruct>
164 <ParameterValueStruct>
165 <Name>InternetGatewayDevice.DeviceInfo.HardwareVersion</Name>
166 <Value xsi:type="xsd:string">BANT-R</Value>
167 </ParameterValueStruct>
168 <ParameterValueStruct>
169 <Name>InternetGatewayDevice.DeviceInfo.SoftwareVersion</Name>
170 <Value xsi:type="xsd:string">6.2.35.0</Value>
171 </ParameterValueStruct>
172 <ParameterValueStruct>
173 <Name>InternetGatewayDevice.DeviceInfo.ProvisioningCode</Name>
174 <Value xsi:type="xsd:string"></Value>
175 </ParameterValueStruct>
176 <ParameterValueStruct>
177 <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Name</Name>
178 <Value xsi:type="xsd:string">MyCompanyName</Value>
179 </ParameterValueStruct>
180 <ParameterValueStruct>
181 <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Version</Name>
182 <Value xsi:type="xsd:string"></Value>
183 </ParameterValueStruct>
184 <ParameterValueStruct>
185 <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Date</Name>
186 <Value xsi:type="xsd:dateTime">0001-01-01T00:00:00</Value>
187 </ParameterValueStruct>
188 <ParameterValueStruct>
189 <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Description</Name>
190 <Value xsi:type="xsd:string">MyCompanyName</Value>
191 </ParameterValueStruct>
192 <ParameterValueStruct>
193 <Name>InternetGatewayDevice.ManagementServer.ConnectionRequestURL</Name>
194 <Value xsi:type="xsd:string">http://10.127.129.205:51005/</Value>
195 </ParameterValueStruct>
196 <ParameterValueStruct>
197 <Name>InternetGatewayDevice.ManagementServer.ParameterKey</Name>
198 <Value xsi:type="xsd:string"></Value>
199 </ParameterValueStruct>
200 <ParameterValueStruct>
201 <Name>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress</Name>
202 <Value xsi:type="xsd:string">10.127.129.205</Value>
203 </ParameterValueStruct>
204 </ParameterList>
205 </cwmp:Inform>
206 </soapenv:Body>
207 </soapenv:Envelope>
208 '''
209
210 server = ServerBase(self.app)
211
212 ctx = MethodContext(server, MethodContext.SERVER)
213 ctx.in_string = [cpe_string]
214 ctx, = server.generate_contexts(ctx)
215
216 if ctx.in_error is not None:
217 print('In error: %s' % ctx.in_error)
218 self.assertEqual(ctx.in_error, None)
219
220 server.get_in_object(ctx)
221
222 self.assertEqual(ctx.in_object.DeviceId.OUI, '00147F')
223 self.assertEqual(
224 ctx.in_object.Event.EventStruct[0].EventCode, '0 BOOTSTRAP',
225 )
226 self.assertEqual(
227 ctx.in_object.Event.EventStruct[2].EventCode, '2 PERIODIC',
228 )
229 self.assertEqual(ctx.in_object.MaxEnvelopes, 2)
230 self.assertEqual(
231 ctx.in_object.ParameterList.ParameterValueStruct[1].Name,
232 'InternetGatewayDevice.DeviceInfo.SpecVersion',
233 )
234 self.assertEqual(
235 str(ctx.in_object.ParameterList.ParameterValueStruct[1].Value), '1.1',
236 )
237
238 def test_parse_inform_cavium(self):
239 """
240 Test that example Inform RPC call can be parsed correctly from OC-LTE
241 """
242 cpe_string = b'''<?xml version="1.0" encoding="UTF-8"?>
243 <SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
244 <SOAP-ENV:Header>
245 <cwmp:ID SOAP-ENV:mustUnderstand="1">CPE_1002</cwmp:ID>
246 </SOAP-ENV:Header>
247 <SOAP-ENV:Body SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
248 <cwmp:Inform>
249 <DeviceId>
250 <Manufacturer>Cavium, Inc.</Manufacturer>
251 <OUI>000FB7</OUI>
252 <ProductClass>Cavium eNB</ProductClass>
253 <SerialNumber>10.18.104.79</SerialNumber>
254 </DeviceId>
255 <Event xsi:type="SOAP-ENC:Array" SOAP-ENC:arrayType="cwmp:EventStruct[1]">
256 <EventStruct>
257 <EventCode>0 BOOTSTRAP</EventCode>
258 <CommandKey></CommandKey>
259 </EventStruct>
260 </Event>
261 <MaxEnvelopes>1</MaxEnvelopes>
262 <CurrentTime>1970-01-02T00:01:05.021239+00:00</CurrentTime>
263 <RetryCount>2</RetryCount>
264 <ParameterList xsi:type="SOAP-ENC:Array" SOAP-ENC:arrayType="cwmp:ParameterValueStruct[15]">
265 <ParameterValueStruct>
266 <Name>Device.DeviceInfo.HardwareVersion</Name>
267 <Value xsi:type="xsd:string">1.0</Value>
268 </ParameterValueStruct>
269 <ParameterValueStruct>
270 <Name>Device.DeviceInfo.SoftwareVersion</Name>
271 <Value xsi:type="xsd:string">1.0</Value>
272 </ParameterValueStruct>
273 <ParameterValueStruct>
274 <Name>Device.DeviceInfo.AdditionalHardwareVersion</Name>
275 <Value xsi:type="xsd:string">1.0</Value>
276 </ParameterValueStruct>
277 <ParameterValueStruct>
278 <Name>Device.DeviceInfo.AdditionalSoftwareVersion</Name>
279 <Value xsi:type="xsd:string">1.0</Value>
280 </ParameterValueStruct>
281 <ParameterValueStruct>
282 <Name>Device.DeviceInfo.ProvisioningCode</Name>
283 <Value xsi:type="xsd:string">Cavium</Value>
284 </ParameterValueStruct>
285 <ParameterValueStruct>
286 <Name>Device.ManagementServer.ParameterKey</Name>
287 <Value xsi:type="xsd:string"></Value>
288 </ParameterValueStruct>
289 <ParameterValueStruct>
290 <Name>Device.ManagementServer.ConnectionRequestURL</Name>
291 <Value xsi:type="xsd:string">http://192.88.99.253:8084/bucrhzjd</Value>
292 </ParameterValueStruct>
293 <ParameterValueStruct>
294 <Name>Device.ManagementServer.UDPConnectionRequestAddress</Name>
295 <Value xsi:type="xsd:string"></Value>
296 </ParameterValueStruct>
297 <ParameterValueStruct>
298 <Name>Device.ManagementServer.NATDetected</Name>
299 <Value xsi:type="xsd:boolean">0</Value>
300 </ParameterValueStruct>
301 <ParameterValueStruct>
302 <Name>Device.IP.Diagnostics.UDPEchoConfig.PacketsReceived</Name>
303 <Value xsi:type="xsd:unsignedInt">0</Value>
304 </ParameterValueStruct>
305 <ParameterValueStruct>
306 <Name>Device.IP.Diagnostics.UDPEchoConfig.PacketsResponded</Name>
307 <Value xsi:type="xsd:unsignedInt">0</Value>
308 </ParameterValueStruct>
309 <ParameterValueStruct>
310 <Name>Device.IP.Diagnostics.UDPEchoConfig.BytesReceived</Name>
311 <Value xsi:type="xsd:unsignedInt">0</Value>
312 </ParameterValueStruct>
313 <ParameterValueStruct>
314 <Name>Device.IP.Diagnostics.UDPEchoConfig.BytesResponded</Name>
315 <Value xsi:type="xsd:unsignedInt">0</Value>
316 </ParameterValueStruct>
317 <ParameterValueStruct>
318 <Name>Device.IP.Diagnostics.UDPEchoConfig.TimeFirstPacketReceived</Name>
319 <Value xsi:type="xsd:dateTime">1969-12-31T16:00:00.000000+00:00</Value>
320 </ParameterValueStruct>
321 <ParameterValueStruct>
322 <Name>Device.IP.Diagnostics.UDPEchoConfig.TimeLastPacketReceived</Name>
323 <Value xsi:type="xsd:dateTime">1969-12-31T16:00:00.000000+00:00</Value>
324 </ParameterValueStruct>
325 </ParameterList>
326 </cwmp:Inform>
327 </SOAP-ENV:Body>
328 </SOAP-ENV:Envelope>
329 '''
330
331 server = ServerBase(self.app)
332
333 ctx = MethodContext(server, MethodContext.SERVER)
334 ctx.in_string = [cpe_string]
335 ctx, = server.generate_contexts(ctx)
336
337 if ctx.in_error is not None:
338 print('In error: %s' % ctx.in_error)
339 self.assertEqual(ctx.in_error, None)
340
341 server.get_in_object(ctx)
342
343 self.assertEqual(ctx.in_object.DeviceId.OUI, '000FB7')
344 self.assertEqual(
345 ctx.in_object.Event.EventStruct[0].EventCode, '0 BOOTSTRAP',
346 )
347 self.assertEqual(ctx.in_object.MaxEnvelopes, 1)
348 self.assertEqual(
349 ctx.in_object.ParameterList.ParameterValueStruct[1].Name,
350 'Device.DeviceInfo.SoftwareVersion',
351 )
352 self.assertEqual(
353 str(ctx.in_object.ParameterList.ParameterValueStruct[1].Value), '1.0',
354 )
355
356 def test_handle_transfer_complete(self):
357 """
358 Test that example TransferComplete RPC call can be parsed correctly, and
359 response is correctly generated.
360 """
361 # Example TransferComplete CPE->ACS RPC request/response.
362 # Manually created.
363 cpe_string = b'''
364 <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
365 <soapenv:Header>
366 <cwmp:ID soapenv:mustUnderstand="1">1234</cwmp:ID>
367 </soapenv:Header>
368 <soapenv:Body>
369 <cwmp:TransferComplete>
370 <CommandKey>Downloading stuff</CommandKey>
371 <FaultStruct>
372 <FaultCode>0</FaultCode>
373 <FaultString></FaultString>
374 </FaultStruct>
375 <StartTime>2016-11-30T10:16:29Z</StartTime>
376 <CompleteTime>2016-11-30T10:17:05Z</CompleteTime>
377 </cwmp:TransferComplete>
378 </soapenv:Body>
379 </soapenv:Envelope>
380 '''
381 expected_acs_string = b'''
382 <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
383 <soapenv:Header>
384 <cwmp:ID soapenv:mustUnderstand="1">1234</cwmp:ID>
385 </soapenv:Header>
386 <soapenv:Body>
387 <cwmp:TransferCompleteResponse>
388 </cwmp:TransferCompleteResponse>
389 </soapenv:Body>
390 </soapenv:Envelope>
391 '''
392
393 self.p.stop()
394 self.p.start()
395
396 server = ServerBase(self.app)
397
398 ctx = MethodContext(server, MethodContext.SERVER)
399 ctx.in_string = [cpe_string]
400 ctx, = server.generate_contexts(ctx)
401
402 if ctx.in_error is not None:
403 print('In error: %s' % ctx.in_error)
404 self.assertEqual(ctx.in_error, None)
405
406 server.get_in_object(ctx)
407 self.assertEqual(ctx.in_error, None)
408
409 server.get_out_object(ctx)
410 self.assertEqual(ctx.out_error, None)
411
412 output_msg = ctx.out_object[0]
413 self.assertEqual(type(output_msg), models.TransferComplete)
414 self.assertEqual(output_msg.CommandKey, 'Downloading stuff')
415 self.assertEqual(output_msg.FaultStruct.FaultCode, 0)
416 self.assertEqual(output_msg.FaultStruct.FaultString, '')
417 self.assertEqual(
418 output_msg.StartTime,
419 datetime(
420 2016, 11, 30, 10, 16, 29,
421 tzinfo=timezone(timedelta(0)),
422 ),
423 )
424 self.assertEqual(
425 output_msg.CompleteTime,
426 datetime(
427 2016, 11, 30, 10, 17, 5,
428 tzinfo=timezone(timedelta(0)),
429 ),
430 )
431
432 server.get_out_string(ctx)
433 self.assertEqual(ctx.out_error, None)
434
435 xml_tree = XmlTree()
436 match = xml_tree.xml_compare(
437 xml_tree.convert_string_to_tree(b''.join(ctx.out_string)),
438 xml_tree.convert_string_to_tree(expected_acs_string),
439 )
440 self.assertTrue(match)
441
442 def test_parse_empty_http(self):
443 """
444 Test that empty HTTP message gets correctly mapped to 'EmptyHttp'
445 function call
446 """
447 cpe_string = b''
448
449 server = ServerBase(self.app)
450
451 ctx = MethodContext(server, MethodContext.SERVER)
452 ctx.in_string = [cpe_string]
453 ctx, = server.generate_contexts(ctx)
454
455 if ctx.in_error is not None:
456 print('In error: %s' % ctx.in_error)
457
458 self.assertEqual(ctx.in_error, None)
459 self.assertEqual(ctx.function, AutoConfigServer.empty_http)
460
461 def test_generate_empty_http(self):
462 """
463 Test that empty HTTP message is generated when setting output message
464 name to 'EmptyHttp'
465 """
466 cpe_string = b''
467
468 server = ServerBase(self.app)
469
470 ctx = MethodContext(server, MethodContext.SERVER)
471 ctx.in_string = [cpe_string]
472 ctx, = server.generate_contexts(ctx)
473
474 server.get_in_object(ctx)
475 if ctx.in_error is not None:
476 raise ctx.in_error
477
478 server.get_out_object(ctx)
479 if ctx.out_error is not None:
480 raise ctx.out_error
481
482 ctx.descriptor.out_message.Attributes.sub_name = 'EmptyHttp'
483 ctx.out_object = [models.AcsToCpeRequests()]
484
485 server.get_out_string(ctx)
486
487 self.assertEqual(b''.join(ctx.out_string), b'')
488
489 def test_generate_get_parameter_values_string(self):
490 """
491 Test that correct string is generated for SetParameterValues ACS->CPE
492 request
493 """
494 # Example ACS->CPE RPC call. Copied from:
495 # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html
496 # Following edits made:
497 # - Change header ID value from 'null0' to 'null', to match magma
498 # default ID
499 expected_acs_string = b'''
500 <soapenv:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
501 <soapenv:Header>
502 <cwmp:ID soapenv:mustUnderstand="1">null</cwmp:ID>
503 </soapenv:Header>
504 <soapenv:Body>
505 <cwmp:GetParameterValues>
506 <ParameterNames soap:arrayType="xsd:string[1]">
507 <string>foo</string>
508 </ParameterNames>
509 </cwmp:GetParameterValues>
510 </soapenv:Body>
511 </soapenv:Envelope>
512 '''
513
514 names = ['foo']
515 request = models.GetParameterValues()
516 request.ParameterNames = models.ParameterNames()
517 request.ParameterNames.arrayType = 'xsd:string[%d]' \
518 % len(names)
519 request.ParameterNames.string = []
520 for name in names:
521 request.ParameterNames.string.append(name)
522
523 request.ParameterKey = 'null'
524
525 def side_effect(*args, **_kwargs):
526 ctx = args[0]
527 ctx.out_header = models.ID(mustUnderstand='1')
528 ctx.out_header.Data = 'null'
529 ctx.descriptor.out_message.Attributes.sub_name = \
530 request.__class__.__name__
531 return AutoConfigServer._generate_acs_to_cpe_request_copy(request)
532
533 self.p.stop()
534 self.p = patch.object(
535 AutoConfigServer, '_handle_tr069_message',
536 side_effect=side_effect,
537 )
538 self.p.start()
539
540 server = ServerBase(self.app)
541
542 ctx = MethodContext(server, MethodContext.SERVER)
543 ctx.in_string = [b'']
544 ctx, = server.generate_contexts(ctx)
545
546 server.get_in_object(ctx)
547 if ctx.in_error is not None:
548 raise ctx.in_error
549
550 server.get_out_object(ctx)
551 if ctx.out_error is not None:
552 raise ctx.out_error
553
554 server.get_out_string(ctx)
555
556 xml_tree = XmlTree()
557 match = xml_tree.xml_compare(
558 xml_tree.convert_string_to_tree(b''.join(ctx.out_string)),
559 xml_tree.convert_string_to_tree(expected_acs_string),
560 )
561 self.assertTrue(match)
562
563 def test_generate_set_parameter_values_string(self):
564 """
565 Test that correct string is generated for SetParameterValues ACS->CPE
566 request
567 """
568 # Example ACS->CPE RPC call. Copied from:
569 # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html
570 # Following edits made:
571 # - Change header ID value from 'null0' to 'null', to match magma
572 # default ID
573 expected_acs_string = b'''
574 <soapenv:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
575 <soapenv:Header>
576 <cwmp:ID soapenv:mustUnderstand="1">null</cwmp:ID>
577 </soapenv:Header>
578 <soapenv:Body>
579 <cwmp:SetParameterValues>
580 <ParameterList soap:arrayType="cwmp:ParameterValueStruct[4]">
581 <ParameterValueStruct>
582 <Name>InternetGatewayDevice.ManagementServer.PeriodicInformEnable</Name>
583 <Value xsi:type="xsd:boolean">1</Value>
584 </ParameterValueStruct>
585 <ParameterValueStruct>
586 <Name>InternetGatewayDevice.ManagementServer.ConnectionRequestUsername</Name>
587 <Value xsi:type="xsd:string">00147F-SpeedTouch780-CP0611JTLNW</Value>
588 </ParameterValueStruct>
589 <ParameterValueStruct>
590 <Name>InternetGatewayDevice.ManagementServer.ConnectionRequestPassword</Name>
591 <Value xsi:type="xsd:string">98ff55fb377bf724c625f60dec448646</Value>
592 </ParameterValueStruct>
593 <ParameterValueStruct>
594 <Name>InternetGatewayDevice.ManagementServer.PeriodicInformInterval</Name>
595 <Value xsi:type="xsd:unsignedInt">60</Value>
596 </ParameterValueStruct>
597 </ParameterList>
598 <ParameterKey xsi:type="xsd:string">SetParameter1</ParameterKey>
599 </cwmp:SetParameterValues>
600 </soapenv:Body>
601 </soapenv:Envelope>
602 '''
603
604 request = models.SetParameterValues()
605
606 request.ParameterList = \
607 models.ParameterValueList(arrayType='cwmp:ParameterValueStruct[4]')
608 request.ParameterList.ParameterValueStruct = []
609
610 param = models.ParameterValueStruct()
611 param.Name = 'InternetGatewayDevice.ManagementServer.PeriodicInformEnable'
612 param.Value = models.anySimpleType(type='xsd:boolean')
613 param.Value.Data = '1'
614 request.ParameterList.ParameterValueStruct.append(param)
615
616 param = models.ParameterValueStruct()
617 param.Name = 'InternetGatewayDevice.ManagementServer.ConnectionRequestUsername'
618 param.Value = models.anySimpleType(type='xsd:string')
619 param.Value.Data = '00147F-SpeedTouch780-CP0611JTLNW'
620 request.ParameterList.ParameterValueStruct.append(param)
621
622 param = models.ParameterValueStruct()
623 param.Name = 'InternetGatewayDevice.ManagementServer.ConnectionRequestPassword'
624 param.Value = models.anySimpleType(type='xsd:string')
625 param.Value.Data = '98ff55fb377bf724c625f60dec448646'
626 request.ParameterList.ParameterValueStruct.append(param)
627
628 param = models.ParameterValueStruct()
629 param.Name = 'InternetGatewayDevice.ManagementServer.PeriodicInformInterval'
630 param.Value = models.anySimpleType(type='xsd:unsignedInt')
631 param.Value.Data = '60'
632 request.ParameterList.ParameterValueStruct.append(param)
633
634 request.ParameterKey = models.ParameterKeyType()
635 request.ParameterKey.type = 'xsd:string'
636 request.ParameterKey.Data = 'SetParameter1'
637
638 def side_effect(*args, **_kwargs):
639 ctx = args[0]
640 ctx.out_header = models.ID(mustUnderstand='1')
641 ctx.out_header.Data = 'null'
642 ctx.descriptor.out_message.Attributes.sub_name = request.__class__.__name__
643 return request
644
645 self.p.stop()
646 self.p = patch.object(
647 AutoConfigServer, '_handle_tr069_message',
648 Mock(side_effect=side_effect),
649 )
650 self.p.start()
651
652 server = ServerBase(self.app)
653
654 ctx = MethodContext(server, MethodContext.SERVER)
655 ctx.in_string = [b'']
656 ctx, = server.generate_contexts(ctx)
657
658 server.get_in_object(ctx)
659 if ctx.in_error is not None:
660 raise ctx.in_error
661
662 server.get_out_object(ctx)
663 if ctx.out_error is not None:
664 raise ctx.out_error
665
666 server.get_out_string(ctx)
667
668 xml_tree = XmlTree()
669 NS_SOAP11_ENC = 'soap11enc'
670 NS_SOAP11_ENV = 'soap11env'
671 xml_str = b''.join(ctx.out_string)
672 # Get the namespaces and validate the soap enc and env prefix are right
673 nsmap = xml_tree.get_ns(xml_str)
674 self.assertTrue(NS_SOAP11_ENC in nsmap.keys())
675 self.assertTrue(NS_SOAP11_ENV in nsmap.keys())
676
677 match = xml_tree.xml_compare(
678 xml_tree.convert_string_to_tree(xml_str),
679 xml_tree.convert_string_to_tree(expected_acs_string),
680 )
681 self.assertTrue(match)
682
683 def test_parse_fault_response(self):
684 """ Tests that a fault response from CPE is correctly parsed. """
685 # Example CPE->ACS fault response. Copied from:
686 # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html
687 cpe_string = b'''
688 <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
689 <soapenv:Header>
690 <cwmp:ID soapenv:mustUnderstand="1">1031422463</cwmp:ID>
691 </soapenv:Header>
692 <soapenv:Body>
693 <soapenv:Fault>
694 <faultcode>Client</faultcode>
695 <faultstring>CWMP fault</faultstring>
696 <detail>
697 <cwmp:Fault>
698 <FaultCode>9003</FaultCode>
699 <FaultString>Invalid arguments</FaultString>
700 <SetParameterValuesFault>
701 <ParameterName>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.3.WANPPPConnection.1.Password</ParameterName>
702 <FaultCode>9003</FaultCode>
703 <FaultString>Invalid arguments</FaultString>
704 </SetParameterValuesFault>
705 <SetParameterValuesFault>
706 <ParameterName>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.3.WANPPPConnection.1.Username</ParameterName>
707 <FaultCode>9003</FaultCode>
708 <FaultString>Invalid arguments</FaultString>
709 </SetParameterValuesFault>
710 </cwmp:Fault>
711 </detail>
712 </soapenv:Fault>
713 </soapenv:Body>
714 </soapenv:Envelope>
715 '''
716 server = ServerBase(self.app)
717
718 ctx = MethodContext(server, MethodContext.SERVER)
719 ctx.in_string = [cpe_string]
720 ctx, = server.generate_contexts(ctx)
721 server.get_in_object(ctx)
722 if ctx.in_error is not None:
723 raise ctx.in_error
724
725 # Calls function to receive and process message
726 server.get_out_object(ctx)
727
728 output_msg = ctx.out_object[0]
729 self.assertEqual(type(output_msg), models.Fault)
730 self.assertEqual(output_msg.FaultCode, 9003)
731 self.assertEqual(output_msg.FaultString, 'Invalid arguments')
732 self.assertEqual(
733 output_msg.SetParameterValuesFault[1].ParameterName,
734 'InternetGatewayDevice.WANDevice.1.WANConnectionDevice.3.WANPPPConnection.1.Username',
735 )
736 self.assertEqual(output_msg.SetParameterValuesFault[1].FaultCode, 9003)
737 self.assertEqual(
738 output_msg.SetParameterValuesFault[1].FaultString,
739 'Invalid arguments',
740 )
741
742 def test_parse_hex_values(self):
743 """
744 Test that non-utf-8 hex values can be parsed without error
745 """
746 # Example TR-069 CPE->ACS RPC call. Copied from:
747 # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html
748 cpe_string = b'''
749 <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
750 <soapenv:Header>
751 <cwmp:ID soapenv:mustUnderstand="1">0_THOM_TR69_ID</cwmp:ID>
752 </soapenv:Header>
753 <soapenv:Body>
754 <cwmp:Inform>
755 <DeviceId>
756 <Manufacturer>THOMSON</Manufacturer>
757 <OUI>00147F</OUI>
758 <ProductClass>SpeedTouch 780</ProductClass>
759 <SerialNumber>CP0611JTLNW</SerialNumber>
760 </DeviceId>
761 <Event soap:arrayType="cwmp:EventStruct[04]">
762 <EventStruct>
763 <EventCode>0 BOOTSTRAP</EventCode>
764 <CommandKey></CommandKey>
765 </EventStruct>
766 <EventStruct>
767 <EventCode>1 BOOT</EventCode>
768 <CommandKey></CommandKey>
769 </EventStruct>
770 <EventStruct>
771 <EventCode>2 PERIODIC</EventCode>
772 <CommandKey></CommandKey>
773 </EventStruct>
774 <EventStruct>
775 <EventCode>4 VALUE CHANGE</EventCode>
776 <CommandKey></CommandKey>
777 </EventStruct>
778 </Event>
779 <MaxEnvelopes>2</MaxEnvelopes>
780 <CurrentTime>1970-01-01T00:01:09Z</CurrentTime>
781 <RetryCount>05</RetryCount>
782 <ParameterList soap:arrayType="cwmp:ParameterValueStruct[12]">
783 <ParameterValueStruct>
784 <Name>InternetGatewayDevice.DeviceSummary</Name>
785 <Value xsi:type="xsd:string">
786 \xff\xff\xff\xff\xff</Value>
787 </ParameterValueStruct>
788 </ParameterList>
789 </cwmp:Inform>
790 </soapenv:Body>
791 </soapenv:Envelope>
792 '''
793
794 server = ServerBase(self.app)
795
796 ctx = MethodContext(server, MethodContext.SERVER)
797 ctx.in_string = [cpe_string]
798 ctx, = server.generate_contexts(ctx)
799
800 if ctx.in_error is not None:
801 print('In error: %s' % ctx.in_error)
802 self.assertEqual(ctx.in_error, None)
803
804 server.get_in_object(ctx)
805
806
807class XmlTree():
808
809 @staticmethod
810 def convert_string_to_tree(xmlString):
811
812 return ET.fromstring(xmlString)
813
814 @staticmethod
815 def get_ns(xmlString):
816 return ET.fromstring(xmlString).nsmap
817
818 def xml_compare(self, x1, x2, excludes=None):
819 """
820 Compares two xml etrees
821 :param x1: the first tree
822 :param x2: the second tree
823 :param excludes: list of string of attributes to exclude from comparison
824 :return:
825 True if both files match
826 """
827 excludes = [] if excludes is None else excludes
828
829 if x1.tag != x2.tag:
830 print('Tags do not match: %s and %s' % (x1.tag, x2.tag))
831 return False
832 for name, value in x1.attrib.items():
833 if name not in excludes:
834 if x2.attrib.get(name) != value:
835 print(
836 'Attributes do not match: %s=%r, %s=%r'
837 % (name, value, name, x2.attrib.get(name)),
838 )
839 return False
840 for name in x2.attrib.keys():
841 if name not in excludes:
842 if name not in x1.attrib:
843 print(
844 'x2 has an attribute x1 is missing: %s'
845 % name,
846 )
847 return False
848 if not self.text_compare(x1.text, x2.text):
849 print('text: %r != %r' % (x1.text, x2.text))
850 return False
851 if not self.text_compare(x1.tail, x2.tail):
852 print('tail: %r != %r' % (x1.tail, x2.tail))
853 return False
854 cl1 = x1.getchildren()
855 cl2 = x2.getchildren()
856 if len(cl1) != len(cl2):
857 print(
858 'children length differs, %i != %i'
859 % (len(cl1), len(cl2)),
860 )
861 return False
862 i = 0
863 for c1, c2 in zip(cl1, cl2):
864 i += 1
865 if c1.tag not in excludes:
866 if not self.xml_compare(c1, c2, excludes):
867 print(
868 'children %i do not match: %s'
869 % (i, c1.tag),
870 )
871 return False
872 return True
873
874 def text_compare(self, t1, t2):
875 """
876 Compare two text strings
877 :param t1: text one
878 :param t2: text two
879 :return:
880 True if a match
881 """
882 if not t1 and not t2:
883 return True
884 if t1 == '*' or t2 == '*':
885 return True
886 return (t1 or '').strip() == (t2 or '').strip()