| # SPDX-FileCopyrightText: 2020 The Magma Authors. |
| # SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| |
| from datetime import datetime, timedelta, timezone |
| from unittest import TestCase, mock |
| from unittest.mock import Mock, patch |
| |
| import lxml.etree as ET |
| from tests.test_utils.enb_acs_builder import ( |
| EnodebAcsStateMachineBuilder, |
| ) |
| from tr069 import models |
| from tr069.rpc_methods import AutoConfigServer |
| from tr069.spyne_mods import Tr069Application, Tr069Soap11 |
| from spyne import MethodContext |
| from spyne.server import ServerBase |
| |
| |
| class Tr069Test(TestCase): |
| """ Tests for the TR-069 server """ |
| acs_to_cpe_queue = None |
| cpe_to_acs_queue = None |
| |
| def setUp(self): |
| # Set up the ACS |
| self.enb_acs_manager = EnodebAcsStateMachineBuilder.build_acs_manager() |
| self.handler = EnodebAcsStateMachineBuilder.build_acs_state_machine() |
| AutoConfigServer.set_state_machine_manager(self.enb_acs_manager) |
| |
| def side_effect(*args, **_kwargs): |
| msg = args[1] |
| return msg |
| |
| self.p = patch.object( |
| AutoConfigServer, '_handle_tr069_message', |
| Mock(side_effect=side_effect), |
| ) |
| self.p.start() |
| |
| self.app = Tr069Application( |
| [AutoConfigServer], |
| models.CWMP_NS, |
| in_protocol=Tr069Soap11(validator='soft'), |
| out_protocol=Tr069Soap11(), |
| ) |
| |
| def tearDown(self): |
| self.p.stop() |
| self.handler = None |
| |
| def _get_mconfig(self): |
| return { |
| "@type": "type.googleapis.com/magma.mconfig.EnodebD", |
| "bandwidthMhz": 20, |
| "specialSubframePattern": 7, |
| "earfcndl": 44490, |
| "logLevel": "INFO", |
| "plmnidList": "00101", |
| "pci": 260, |
| "allowEnodebTransmit": False, |
| "subframeAssignment": 2, |
| "tac": 1, |
| }, |
| |
| def _get_service_config(self): |
| return { |
| "tr069": { |
| "interface": "eth1", |
| "port": 48080, |
| "perf_mgmt_port": 8081, |
| "public_ip": "192.88.99.142", |
| }, |
| "reboot_enodeb_on_mme_disconnected": True, |
| "s1_interface": "eth1", |
| } |
| |
| def test_acs_manager_exception(self): |
| """ |
| Test that an unexpected exception from the ACS SM manager will result |
| in an empty response. |
| """ |
| self.enb_acs_manager.handle_tr069_message = mock.MagicMock( |
| side_effect=Exception('mock exception'), |
| ) |
| # stop the patcher because we want to use the above MagicMock |
| self.p.stop() |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [b''] |
| ctx, = server.generate_contexts(ctx) |
| |
| server.get_in_object(ctx) |
| self.assertIsNone(ctx.in_error) |
| |
| server.get_out_object(ctx) |
| self.assertIsNone(ctx.out_error) |
| |
| server.get_out_string(ctx) |
| self.assertEqual(b''.join(ctx.out_string), b'') |
| |
| # start the patcher otherwise the p.stop() in tearDown will complain |
| self.p.start() |
| |
| def test_parse_inform(self): |
| """ |
| Test that example Inform RPC call can be parsed correctly |
| """ |
| # Example TR-069 CPE->ACS RPC call. Copied from: |
| # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html |
| cpe_string = b''' |
| <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">0_THOM_TR69_ID</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <cwmp:Inform> |
| <DeviceId> |
| <Manufacturer>THOMSON</Manufacturer> |
| <OUI>00147F</OUI> |
| <ProductClass>SpeedTouch 780</ProductClass> |
| <SerialNumber>CP0611JTLNW</SerialNumber> |
| </DeviceId> |
| <Event soap:arrayType="cwmp:EventStruct[04]"> |
| <EventStruct> |
| <EventCode>0 BOOTSTRAP</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| <EventStruct> |
| <EventCode>1 BOOT</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| <EventStruct> |
| <EventCode>2 PERIODIC</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| <EventStruct> |
| <EventCode>4 VALUE CHANGE</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| </Event> |
| <MaxEnvelopes>2</MaxEnvelopes> |
| <CurrentTime>1970-01-01T00:01:09Z</CurrentTime> |
| <RetryCount>05</RetryCount> |
| <ParameterList soap:arrayType="cwmp:ParameterValueStruct[12]"> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceSummary</Name> |
| <Value xsi:type="xsd:string"> |
| InternetGatewayDevice:1.1[] (Baseline:1, EthernetLAN:1, ADSLWAN:1, Bridging:1, Time:1, WiFiLAN:1)</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.SpecVersion</Name> |
| <Value xsi:type="xsd:string">1.1</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.HardwareVersion</Name> |
| <Value xsi:type="xsd:string">BANT-R</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.SoftwareVersion</Name> |
| <Value xsi:type="xsd:string">6.2.35.0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.ProvisioningCode</Name> |
| <Value xsi:type="xsd:string"></Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Name</Name> |
| <Value xsi:type="xsd:string">MyCompanyName</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Version</Name> |
| <Value xsi:type="xsd:string"></Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Date</Name> |
| <Value xsi:type="xsd:dateTime">0001-01-01T00:00:00</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceInfo.VendorConfigFile.1.Description</Name> |
| <Value xsi:type="xsd:string">MyCompanyName</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.ManagementServer.ConnectionRequestURL</Name> |
| <Value xsi:type="xsd:string">http://10.127.129.205:51005/</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.ManagementServer.ParameterKey</Name> |
| <Value xsi:type="xsd:string"></Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress</Name> |
| <Value xsi:type="xsd:string">10.127.129.205</Value> |
| </ParameterValueStruct> |
| </ParameterList> |
| </cwmp:Inform> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| |
| if ctx.in_error is not None: |
| print('In error: %s' % ctx.in_error) |
| self.assertEqual(ctx.in_error, None) |
| |
| server.get_in_object(ctx) |
| |
| self.assertEqual(ctx.in_object.DeviceId.OUI, '00147F') |
| self.assertEqual( |
| ctx.in_object.Event.EventStruct[0].EventCode, '0 BOOTSTRAP', |
| ) |
| self.assertEqual( |
| ctx.in_object.Event.EventStruct[2].EventCode, '2 PERIODIC', |
| ) |
| self.assertEqual(ctx.in_object.MaxEnvelopes, 2) |
| self.assertEqual( |
| ctx.in_object.ParameterList.ParameterValueStruct[1].Name, |
| 'InternetGatewayDevice.DeviceInfo.SpecVersion', |
| ) |
| self.assertEqual( |
| str(ctx.in_object.ParameterList.ParameterValueStruct[1].Value), '1.1', |
| ) |
| |
| def test_parse_inform_cavium(self): |
| """ |
| Test that example Inform RPC call can be parsed correctly from OC-LTE |
| """ |
| cpe_string = b'''<?xml version="1.0" encoding="UTF-8"?> |
| <SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"> |
| <SOAP-ENV:Header> |
| <cwmp:ID SOAP-ENV:mustUnderstand="1">CPE_1002</cwmp:ID> |
| </SOAP-ENV:Header> |
| <SOAP-ENV:Body SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> |
| <cwmp:Inform> |
| <DeviceId> |
| <Manufacturer>Cavium, Inc.</Manufacturer> |
| <OUI>000FB7</OUI> |
| <ProductClass>Cavium eNB</ProductClass> |
| <SerialNumber>10.18.104.79</SerialNumber> |
| </DeviceId> |
| <Event xsi:type="SOAP-ENC:Array" SOAP-ENC:arrayType="cwmp:EventStruct[1]"> |
| <EventStruct> |
| <EventCode>0 BOOTSTRAP</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| </Event> |
| <MaxEnvelopes>1</MaxEnvelopes> |
| <CurrentTime>1970-01-02T00:01:05.021239+00:00</CurrentTime> |
| <RetryCount>2</RetryCount> |
| <ParameterList xsi:type="SOAP-ENC:Array" SOAP-ENC:arrayType="cwmp:ParameterValueStruct[15]"> |
| <ParameterValueStruct> |
| <Name>Device.DeviceInfo.HardwareVersion</Name> |
| <Value xsi:type="xsd:string">1.0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.DeviceInfo.SoftwareVersion</Name> |
| <Value xsi:type="xsd:string">1.0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.DeviceInfo.AdditionalHardwareVersion</Name> |
| <Value xsi:type="xsd:string">1.0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.DeviceInfo.AdditionalSoftwareVersion</Name> |
| <Value xsi:type="xsd:string">1.0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.DeviceInfo.ProvisioningCode</Name> |
| <Value xsi:type="xsd:string">Cavium</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.ManagementServer.ParameterKey</Name> |
| <Value xsi:type="xsd:string"></Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.ManagementServer.ConnectionRequestURL</Name> |
| <Value xsi:type="xsd:string">http://192.88.99.253:8084/bucrhzjd</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.ManagementServer.UDPConnectionRequestAddress</Name> |
| <Value xsi:type="xsd:string"></Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.ManagementServer.NATDetected</Name> |
| <Value xsi:type="xsd:boolean">0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.IP.Diagnostics.UDPEchoConfig.PacketsReceived</Name> |
| <Value xsi:type="xsd:unsignedInt">0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.IP.Diagnostics.UDPEchoConfig.PacketsResponded</Name> |
| <Value xsi:type="xsd:unsignedInt">0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.IP.Diagnostics.UDPEchoConfig.BytesReceived</Name> |
| <Value xsi:type="xsd:unsignedInt">0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.IP.Diagnostics.UDPEchoConfig.BytesResponded</Name> |
| <Value xsi:type="xsd:unsignedInt">0</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.IP.Diagnostics.UDPEchoConfig.TimeFirstPacketReceived</Name> |
| <Value xsi:type="xsd:dateTime">1969-12-31T16:00:00.000000+00:00</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>Device.IP.Diagnostics.UDPEchoConfig.TimeLastPacketReceived</Name> |
| <Value xsi:type="xsd:dateTime">1969-12-31T16:00:00.000000+00:00</Value> |
| </ParameterValueStruct> |
| </ParameterList> |
| </cwmp:Inform> |
| </SOAP-ENV:Body> |
| </SOAP-ENV:Envelope> |
| ''' |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| |
| if ctx.in_error is not None: |
| print('In error: %s' % ctx.in_error) |
| self.assertEqual(ctx.in_error, None) |
| |
| server.get_in_object(ctx) |
| |
| self.assertEqual(ctx.in_object.DeviceId.OUI, '000FB7') |
| self.assertEqual( |
| ctx.in_object.Event.EventStruct[0].EventCode, '0 BOOTSTRAP', |
| ) |
| self.assertEqual(ctx.in_object.MaxEnvelopes, 1) |
| self.assertEqual( |
| ctx.in_object.ParameterList.ParameterValueStruct[1].Name, |
| 'Device.DeviceInfo.SoftwareVersion', |
| ) |
| self.assertEqual( |
| str(ctx.in_object.ParameterList.ParameterValueStruct[1].Value), '1.0', |
| ) |
| |
| def test_handle_transfer_complete(self): |
| """ |
| Test that example TransferComplete RPC call can be parsed correctly, and |
| response is correctly generated. |
| """ |
| # Example TransferComplete CPE->ACS RPC request/response. |
| # Manually created. |
| cpe_string = b''' |
| <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">1234</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <cwmp:TransferComplete> |
| <CommandKey>Downloading stuff</CommandKey> |
| <FaultStruct> |
| <FaultCode>0</FaultCode> |
| <FaultString></FaultString> |
| </FaultStruct> |
| <StartTime>2016-11-30T10:16:29Z</StartTime> |
| <CompleteTime>2016-11-30T10:17:05Z</CompleteTime> |
| </cwmp:TransferComplete> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| expected_acs_string = b''' |
| <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">1234</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <cwmp:TransferCompleteResponse> |
| </cwmp:TransferCompleteResponse> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| |
| self.p.stop() |
| self.p.start() |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| |
| if ctx.in_error is not None: |
| print('In error: %s' % ctx.in_error) |
| self.assertEqual(ctx.in_error, None) |
| |
| server.get_in_object(ctx) |
| self.assertEqual(ctx.in_error, None) |
| |
| server.get_out_object(ctx) |
| self.assertEqual(ctx.out_error, None) |
| |
| output_msg = ctx.out_object[0] |
| self.assertEqual(type(output_msg), models.TransferComplete) |
| self.assertEqual(output_msg.CommandKey, 'Downloading stuff') |
| self.assertEqual(output_msg.FaultStruct.FaultCode, 0) |
| self.assertEqual(output_msg.FaultStruct.FaultString, '') |
| self.assertEqual( |
| output_msg.StartTime, |
| datetime( |
| 2016, 11, 30, 10, 16, 29, |
| tzinfo=timezone(timedelta(0)), |
| ), |
| ) |
| self.assertEqual( |
| output_msg.CompleteTime, |
| datetime( |
| 2016, 11, 30, 10, 17, 5, |
| tzinfo=timezone(timedelta(0)), |
| ), |
| ) |
| |
| server.get_out_string(ctx) |
| self.assertEqual(ctx.out_error, None) |
| |
| xml_tree = XmlTree() |
| match = xml_tree.xml_compare( |
| xml_tree.convert_string_to_tree(b''.join(ctx.out_string)), |
| xml_tree.convert_string_to_tree(expected_acs_string), |
| ) |
| self.assertTrue(match) |
| |
| def test_parse_empty_http(self): |
| """ |
| Test that empty HTTP message gets correctly mapped to 'EmptyHttp' |
| function call |
| """ |
| cpe_string = b'' |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| |
| if ctx.in_error is not None: |
| print('In error: %s' % ctx.in_error) |
| |
| self.assertEqual(ctx.in_error, None) |
| self.assertEqual(ctx.function, AutoConfigServer.empty_http) |
| |
| def test_generate_empty_http(self): |
| """ |
| Test that empty HTTP message is generated when setting output message |
| name to 'EmptyHttp' |
| """ |
| cpe_string = b'' |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| |
| server.get_in_object(ctx) |
| if ctx.in_error is not None: |
| raise ctx.in_error |
| |
| server.get_out_object(ctx) |
| if ctx.out_error is not None: |
| raise ctx.out_error |
| |
| ctx.descriptor.out_message.Attributes.sub_name = 'EmptyHttp' |
| ctx.out_object = [models.AcsToCpeRequests()] |
| |
| server.get_out_string(ctx) |
| |
| self.assertEqual(b''.join(ctx.out_string), b'') |
| |
| def test_generate_get_parameter_values_string(self): |
| """ |
| Test that correct string is generated for SetParameterValues ACS->CPE |
| request |
| """ |
| # Example ACS->CPE RPC call. Copied from: |
| # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html |
| # Following edits made: |
| # - Change header ID value from 'null0' to 'null', to match magma |
| # default ID |
| expected_acs_string = b''' |
| <soapenv:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">null</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <cwmp:GetParameterValues> |
| <ParameterNames soap:arrayType="xsd:string[1]"> |
| <string>foo</string> |
| </ParameterNames> |
| </cwmp:GetParameterValues> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| |
| names = ['foo'] |
| request = models.GetParameterValues() |
| request.ParameterNames = models.ParameterNames() |
| request.ParameterNames.arrayType = 'xsd:string[%d]' \ |
| % len(names) |
| request.ParameterNames.string = [] |
| for name in names: |
| request.ParameterNames.string.append(name) |
| |
| request.ParameterKey = 'null' |
| |
| def side_effect(*args, **_kwargs): |
| ctx = args[0] |
| ctx.out_header = models.ID(mustUnderstand='1') |
| ctx.out_header.Data = 'null' |
| ctx.descriptor.out_message.Attributes.sub_name = \ |
| request.__class__.__name__ |
| return AutoConfigServer._generate_acs_to_cpe_request_copy(request) |
| |
| self.p.stop() |
| self.p = patch.object( |
| AutoConfigServer, '_handle_tr069_message', |
| side_effect=side_effect, |
| ) |
| self.p.start() |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [b''] |
| ctx, = server.generate_contexts(ctx) |
| |
| server.get_in_object(ctx) |
| if ctx.in_error is not None: |
| raise ctx.in_error |
| |
| server.get_out_object(ctx) |
| if ctx.out_error is not None: |
| raise ctx.out_error |
| |
| server.get_out_string(ctx) |
| |
| xml_tree = XmlTree() |
| match = xml_tree.xml_compare( |
| xml_tree.convert_string_to_tree(b''.join(ctx.out_string)), |
| xml_tree.convert_string_to_tree(expected_acs_string), |
| ) |
| self.assertTrue(match) |
| |
| def test_generate_set_parameter_values_string(self): |
| """ |
| Test that correct string is generated for SetParameterValues ACS->CPE |
| request |
| """ |
| # Example ACS->CPE RPC call. Copied from: |
| # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html |
| # Following edits made: |
| # - Change header ID value from 'null0' to 'null', to match magma |
| # default ID |
| expected_acs_string = b''' |
| <soapenv:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">null</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <cwmp:SetParameterValues> |
| <ParameterList soap:arrayType="cwmp:ParameterValueStruct[4]"> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.ManagementServer.PeriodicInformEnable</Name> |
| <Value xsi:type="xsd:boolean">1</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.ManagementServer.ConnectionRequestUsername</Name> |
| <Value xsi:type="xsd:string">00147F-SpeedTouch780-CP0611JTLNW</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.ManagementServer.ConnectionRequestPassword</Name> |
| <Value xsi:type="xsd:string">98ff55fb377bf724c625f60dec448646</Value> |
| </ParameterValueStruct> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.ManagementServer.PeriodicInformInterval</Name> |
| <Value xsi:type="xsd:unsignedInt">60</Value> |
| </ParameterValueStruct> |
| </ParameterList> |
| <ParameterKey xsi:type="xsd:string">SetParameter1</ParameterKey> |
| </cwmp:SetParameterValues> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| |
| request = models.SetParameterValues() |
| |
| request.ParameterList = \ |
| models.ParameterValueList(arrayType='cwmp:ParameterValueStruct[4]') |
| request.ParameterList.ParameterValueStruct = [] |
| |
| param = models.ParameterValueStruct() |
| param.Name = 'InternetGatewayDevice.ManagementServer.PeriodicInformEnable' |
| param.Value = models.anySimpleType(type='xsd:boolean') |
| param.Value.Data = '1' |
| request.ParameterList.ParameterValueStruct.append(param) |
| |
| param = models.ParameterValueStruct() |
| param.Name = 'InternetGatewayDevice.ManagementServer.ConnectionRequestUsername' |
| param.Value = models.anySimpleType(type='xsd:string') |
| param.Value.Data = '00147F-SpeedTouch780-CP0611JTLNW' |
| request.ParameterList.ParameterValueStruct.append(param) |
| |
| param = models.ParameterValueStruct() |
| param.Name = 'InternetGatewayDevice.ManagementServer.ConnectionRequestPassword' |
| param.Value = models.anySimpleType(type='xsd:string') |
| param.Value.Data = '98ff55fb377bf724c625f60dec448646' |
| request.ParameterList.ParameterValueStruct.append(param) |
| |
| param = models.ParameterValueStruct() |
| param.Name = 'InternetGatewayDevice.ManagementServer.PeriodicInformInterval' |
| param.Value = models.anySimpleType(type='xsd:unsignedInt') |
| param.Value.Data = '60' |
| request.ParameterList.ParameterValueStruct.append(param) |
| |
| request.ParameterKey = models.ParameterKeyType() |
| request.ParameterKey.type = 'xsd:string' |
| request.ParameterKey.Data = 'SetParameter1' |
| |
| def side_effect(*args, **_kwargs): |
| ctx = args[0] |
| ctx.out_header = models.ID(mustUnderstand='1') |
| ctx.out_header.Data = 'null' |
| ctx.descriptor.out_message.Attributes.sub_name = request.__class__.__name__ |
| return request |
| |
| self.p.stop() |
| self.p = patch.object( |
| AutoConfigServer, '_handle_tr069_message', |
| Mock(side_effect=side_effect), |
| ) |
| self.p.start() |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [b''] |
| ctx, = server.generate_contexts(ctx) |
| |
| server.get_in_object(ctx) |
| if ctx.in_error is not None: |
| raise ctx.in_error |
| |
| server.get_out_object(ctx) |
| if ctx.out_error is not None: |
| raise ctx.out_error |
| |
| server.get_out_string(ctx) |
| |
| xml_tree = XmlTree() |
| NS_SOAP11_ENC = 'soap11enc' |
| NS_SOAP11_ENV = 'soap11env' |
| xml_str = b''.join(ctx.out_string) |
| # Get the namespaces and validate the soap enc and env prefix are right |
| nsmap = xml_tree.get_ns(xml_str) |
| self.assertTrue(NS_SOAP11_ENC in nsmap.keys()) |
| self.assertTrue(NS_SOAP11_ENV in nsmap.keys()) |
| |
| match = xml_tree.xml_compare( |
| xml_tree.convert_string_to_tree(xml_str), |
| xml_tree.convert_string_to_tree(expected_acs_string), |
| ) |
| self.assertTrue(match) |
| |
| def test_parse_fault_response(self): |
| """ Tests that a fault response from CPE is correctly parsed. """ |
| # Example CPE->ACS fault response. Copied from: |
| # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html |
| cpe_string = b''' |
| <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">1031422463</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <soapenv:Fault> |
| <faultcode>Client</faultcode> |
| <faultstring>CWMP fault</faultstring> |
| <detail> |
| <cwmp:Fault> |
| <FaultCode>9003</FaultCode> |
| <FaultString>Invalid arguments</FaultString> |
| <SetParameterValuesFault> |
| <ParameterName>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.3.WANPPPConnection.1.Password</ParameterName> |
| <FaultCode>9003</FaultCode> |
| <FaultString>Invalid arguments</FaultString> |
| </SetParameterValuesFault> |
| <SetParameterValuesFault> |
| <ParameterName>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.3.WANPPPConnection.1.Username</ParameterName> |
| <FaultCode>9003</FaultCode> |
| <FaultString>Invalid arguments</FaultString> |
| </SetParameterValuesFault> |
| </cwmp:Fault> |
| </detail> |
| </soapenv:Fault> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| server.get_in_object(ctx) |
| if ctx.in_error is not None: |
| raise ctx.in_error |
| |
| # Calls function to receive and process message |
| server.get_out_object(ctx) |
| |
| output_msg = ctx.out_object[0] |
| self.assertEqual(type(output_msg), models.Fault) |
| self.assertEqual(output_msg.FaultCode, 9003) |
| self.assertEqual(output_msg.FaultString, 'Invalid arguments') |
| self.assertEqual( |
| output_msg.SetParameterValuesFault[1].ParameterName, |
| 'InternetGatewayDevice.WANDevice.1.WANConnectionDevice.3.WANPPPConnection.1.Username', |
| ) |
| self.assertEqual(output_msg.SetParameterValuesFault[1].FaultCode, 9003) |
| self.assertEqual( |
| output_msg.SetParameterValuesFault[1].FaultString, |
| 'Invalid arguments', |
| ) |
| |
| def test_parse_hex_values(self): |
| """ |
| Test that non-utf-8 hex values can be parsed without error |
| """ |
| # Example TR-069 CPE->ACS RPC call. Copied from: |
| # http://djuro82.blogspot.com/2011/05/tr-069-cpe-provisioning.html |
| cpe_string = b''' |
| <soapenv:Envelope soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"> |
| <soapenv:Header> |
| <cwmp:ID soapenv:mustUnderstand="1">0_THOM_TR69_ID</cwmp:ID> |
| </soapenv:Header> |
| <soapenv:Body> |
| <cwmp:Inform> |
| <DeviceId> |
| <Manufacturer>THOMSON</Manufacturer> |
| <OUI>00147F</OUI> |
| <ProductClass>SpeedTouch 780</ProductClass> |
| <SerialNumber>CP0611JTLNW</SerialNumber> |
| </DeviceId> |
| <Event soap:arrayType="cwmp:EventStruct[04]"> |
| <EventStruct> |
| <EventCode>0 BOOTSTRAP</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| <EventStruct> |
| <EventCode>1 BOOT</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| <EventStruct> |
| <EventCode>2 PERIODIC</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| <EventStruct> |
| <EventCode>4 VALUE CHANGE</EventCode> |
| <CommandKey></CommandKey> |
| </EventStruct> |
| </Event> |
| <MaxEnvelopes>2</MaxEnvelopes> |
| <CurrentTime>1970-01-01T00:01:09Z</CurrentTime> |
| <RetryCount>05</RetryCount> |
| <ParameterList soap:arrayType="cwmp:ParameterValueStruct[12]"> |
| <ParameterValueStruct> |
| <Name>InternetGatewayDevice.DeviceSummary</Name> |
| <Value xsi:type="xsd:string"> |
| \xff\xff\xff\xff\xff</Value> |
| </ParameterValueStruct> |
| </ParameterList> |
| </cwmp:Inform> |
| </soapenv:Body> |
| </soapenv:Envelope> |
| ''' |
| |
| server = ServerBase(self.app) |
| |
| ctx = MethodContext(server, MethodContext.SERVER) |
| ctx.in_string = [cpe_string] |
| ctx, = server.generate_contexts(ctx) |
| |
| if ctx.in_error is not None: |
| print('In error: %s' % ctx.in_error) |
| self.assertEqual(ctx.in_error, None) |
| |
| server.get_in_object(ctx) |
| |
| |
| class XmlTree(): |
| |
| @staticmethod |
| def convert_string_to_tree(xmlString): |
| |
| return ET.fromstring(xmlString) |
| |
| @staticmethod |
| def get_ns(xmlString): |
| return ET.fromstring(xmlString).nsmap |
| |
| def xml_compare(self, x1, x2, excludes=None): |
| """ |
| Compares two xml etrees |
| :param x1: the first tree |
| :param x2: the second tree |
| :param excludes: list of string of attributes to exclude from comparison |
| :return: |
| True if both files match |
| """ |
| excludes = [] if excludes is None else excludes |
| |
| if x1.tag != x2.tag: |
| print('Tags do not match: %s and %s' % (x1.tag, x2.tag)) |
| return False |
| for name, value in x1.attrib.items(): |
| if name not in excludes: |
| if x2.attrib.get(name) != value: |
| print( |
| 'Attributes do not match: %s=%r, %s=%r' |
| % (name, value, name, x2.attrib.get(name)), |
| ) |
| return False |
| for name in x2.attrib.keys(): |
| if name not in excludes: |
| if name not in x1.attrib: |
| print( |
| 'x2 has an attribute x1 is missing: %s' |
| % name, |
| ) |
| return False |
| if not self.text_compare(x1.text, x2.text): |
| print('text: %r != %r' % (x1.text, x2.text)) |
| return False |
| if not self.text_compare(x1.tail, x2.tail): |
| print('tail: %r != %r' % (x1.tail, x2.tail)) |
| return False |
| cl1 = x1.getchildren() |
| cl2 = x2.getchildren() |
| if len(cl1) != len(cl2): |
| print( |
| 'children length differs, %i != %i' |
| % (len(cl1), len(cl2)), |
| ) |
| return False |
| i = 0 |
| for c1, c2 in zip(cl1, cl2): |
| i += 1 |
| if c1.tag not in excludes: |
| if not self.xml_compare(c1, c2, excludes): |
| print( |
| 'children %i do not match: %s' |
| % (i, c1.tag), |
| ) |
| return False |
| return True |
| |
| def text_compare(self, t1, t2): |
| """ |
| Compare two text strings |
| :param t1: text one |
| :param t2: text two |
| :return: |
| True if a match |
| """ |
| if not t1 and not t2: |
| return True |
| if t1 == '*' or t2 == '*': |
| return True |
| return (t1 or '').strip() == (t2 or '').strip() |