blob: 0f0e918b3bab567e904a2492481065ad80a9b6b7 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12
13This file contains modifications of the core spyne functionality. This is done
14using child classes and function override to avoid modifying spyne code itself.
15Each function below is a modified version of the parent function. These
16modifications are required because:
171) Spyne is not fully python3-compliant
182) Not all parts of the TR-069 spec are possible through spyne APIs (e.g RPC
19 calls from server to client in HTTP responses)
203) Minor enhancements for debug-ability
21"""
22
23from lxml import etree
24from logger import EnodebdLogger as logger
25from spyne.application import Application
26from spyne.interface._base import Interface
27from spyne.protocol.soap import Soap11
28from spyne.protocol.xml import XmlDocument
29
30
31class Tr069Interface(Interface):
32 """ Modified base interface class. """
33
34 def reset_interface(self):
35 super(Tr069Interface, self).reset_interface()
36 # Replace default namespace prefix (may not strictly be
37 # required, but makes it easier to debug)
38 del self.nsmap['tns']
39 self.nsmap['cwmp'] = self.get_tns()
40 self.prefmap[self.get_tns()] = 'cwmp'
41 # To validate against the xsd:<types>, the namespace
42 # prefix is expected to be the same
43 del self.nsmap['xs']
44 self.nsmap['xsd'] = 'http://www.w3.org/2001/XMLSchema'
45 self.prefmap['http://www.w3.org/2001/XMLSchema'] = 'xsd'
46
47
48class Tr069Application(Application):
49 """ Modified spyne application. """
50
51 def __init__(
52 self, services, tns, name=None, in_protocol=None,
53 out_protocol=None, config=None,
54 ):
55 super(Tr069Application, self).__init__(
56 services, tns, name, in_protocol, out_protocol, config,
57 )
58 # Use modified interface class
59 self.interface = Tr069Interface(self)
60
61
62class Tr069Soap11(Soap11):
63 """ Modified SOAP protocol. """
64
65 def __init__(self, *args, **kwargs):
66 super(Tr069Soap11, self).__init__(*args, **kwargs)
67 # Disabling type resolution as a workaround for
68 # https://github.com/arskom/spyne/issues/567
69 self.parse_xsi_type = False
70 # Bug in spyne is cleaning up the default XSD namespace
71 # and causes validation issues on TR-069 clients
72 self.cleanup_namespaces = False
73
74 def create_in_document(self, ctx, charset=None):
75 """
76 In TR-069, the ACS (e.g Magma) is an HTTP server, but acts as a client
77 for SOAP messages. This is done by the CPE (e.g ENodeB) sending an
78 empty HTTP request, and the ACS responding with a SOAP request in the
79 HTTP response. This code replaces an empty HTTP request with a string
80 that gets decoded to a call to the 'EmptyHttp' RPC .
81 """
82
83 # Try cp437 as default to ensure that we dont get any decoding errors,
84 # since it uses 1-byte encoding and has a 'full' char map
85 if not charset:
86 charset = 'cp437'
87
88 # Convert from generator to bytes before doing comparison
89 # Re-encode to chosen charset to remove invalid characters
90 in_string = b''.join(ctx.in_string).decode(charset, 'ignore')
91 ctx.in_string = [in_string.encode(charset, 'ignore')]
92 if ctx.in_string == [b'']:
93 ctx.in_string = [
94 b'<soap11env:Envelope xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap11env="http://schemas.xmlsoap.org/soap/envelope/">/n'
95 b' <soap11env:Body>/n'
96 b' <cwmp:EmptyHttp/>/n'
97 b' </soap11env:Body>/n'
98 b'</soap11env:Envelope>',
99 ]
100
101 super(Tr069Soap11, self).create_in_document(ctx, charset)
102
103 def decompose_incoming_envelope(self, ctx, message=XmlDocument.REQUEST):
104 """
105 For TR-069, the SOAP fault message (CPE->ACS) contains useful
106 information, and should not result in another fault response (ACS->CPE).
107 Strip the outer SOAP fault structure, so that the CWMP fault structure
108 is treated as a normal RPC call (to the 'Fault' function).
109 """
110 super(Tr069Soap11, self).decompose_incoming_envelope(ctx, message)
111
112 if ctx.in_body_doc.tag == '{%s}Fault' % self.ns_soap_env:
113 faultstring = ctx.in_body_doc.findtext('faultstring')
114 if not faultstring or 'CWMP fault' not in faultstring:
115 # Not a CWMP fault
116 return
117
118 # Strip SOAP fault structure, leaving inner CWMP fault structure
119 detail_elem = ctx.in_body_doc.find('detail')
120 if detail_elem is not None:
121 detail_children = list(detail_elem)
122 if len(detail_children):
123 if len(detail_children) > 1:
124 logger.warning(
125 "Multiple detail elements found in SOAP"
126 " fault - using first one",
127 )
128 ctx.in_body_doc = detail_children[0]
129 ctx.method_request_string = ctx.in_body_doc.tag
130 self.validate_body(ctx, message)
131
132 def get_call_handles(self, ctx):
133 """
134 Modified function to fix bug in receiving SOAP fault. In this case,
135 ctx.method_request_string is None, so 'startswith' errors out.
136 """
137 if ctx.method_request_string is None:
138 return []
139
140 return super(Tr069Soap11, self).get_call_handles(ctx)
141
142 def serialize(self, ctx, message):
143 # Workaround for issue https://github.com/magma/magma/issues/7869
144 # Updates to ctx.descriptor.out_message.Attributes.sub_name are taking
145 # effect on the descriptor. But when puled from _attrcache dictionary,
146 # it still has a stale value.
147 # Force repopulation of dictionary by deleting entry
148 # TODO Remove this code once we have a better fix
149 if (ctx.descriptor and ctx.descriptor.out_message in self._attrcache):
150 del self._attrcache[ctx.descriptor.out_message] # noqa: WPS529
151
152 super(Tr069Soap11, self).serialize(ctx, message)
153
154 # Keep XSD namespace
155 etree.cleanup_namespaces(ctx.out_document, keep_ns_prefixes=['xsd'])