blob: 455a3950593ecb3196c17d88f0fa5c5f0b84b65b [file] [log] [blame]
Khen Nursimulu3869d8d2016-11-28 20:44:28 -05001# -*- coding: utf-8 -*-#
2#
3# March 31 2015, Christian Hopps <chopps@gmail.com>
4#
5# Copyright (c) 2015, Deutsche Telekom AG
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19from __future__ import absolute_import, division, unicode_literals, \
20 print_function, nested_scopes
21from netconf import NSMAP
22import copy
23from lxml import etree
24
25
26# Tries to somewhat implement RFC6241 filtering
27
28
29def qname(tag):
30 try:
31 return etree.QName(tag)
32 except ValueError:
33 prefix, base = tag.split(":")
34 return etree.QName(NSMAP[prefix], base)
35
36
37def elm(tag, attrib=None, **extra):
38 if attrib is None:
39 attrib = dict()
40 return etree.Element(qname(tag), attrib, **extra)
41
42
43def leaf_elm(tag, value, attrib=None, **extra):
44 e = elm(tag, attrib, **extra)
45 e.text = str(value)
46 return e
47
48
49leaf = leaf_elm
50
51
52def subelm(pelm, tag, attrib=None, **extra):
53 if attrib is None:
54 attrib = dict()
55 return etree.SubElement(pelm, qname(tag), attrib, **extra)
56
57
58def is_selection_node(felm):
59 ftext = felm.text
60 return ftext is None or not ftext.strip()
61
62
63def filter_tag_match(filter_tag, elm_tag):
64 fqname = etree.QName(filter_tag)
65 eqname = qname(elm_tag)
66 if not fqname.namespace:
67 return fqname.localname == eqname.localname
68 return fqname == eqname
69
70
71def filter_node_match_no_value(filter_node, match_elm):
72 # First check to see if tag matches.
73 if not filter_tag_match(filter_node.tag, match_elm.tag):
74 return False
75
76 # Next check for attribute matches.
77 # XXX does this need to filter out namespace attributes?
78 if filter_node.attrib and filter_node.attrib != match_elm.attrib:
79 return False
80
81 return True
82
83
84def filter_node_match(filter_node, match_elm):
85 """Given a filter node element and a nodename and attribute dictionary
86 return true if the filter element matches the elmname, attributes and value
87 (if not None).
88
89 The filter element can use a wildcard namespace or a specific namespace
90 the attributes can be missing from the filter node but otherwise must match
91 and the value is only checked for a match if it is not None.
92 """
93 if not filter_node_match_no_value(filter_node, match_elm):
94 return False
95
96 # Finally check for matching value.
97 ftext = filter_node.text
98 if ftext is None:
99 return True
100
101 ftext = ftext.strip()
102 if not ftext:
103 return True
104
105 return ftext == match_elm.text
106
107
108def filter_leaf_values(fcontain_elm, dest_node, leaf_elms, append_to):
109 """Given a containment element (or None) verify that all leaf elements
110 in leaf_elms either match, have corresponding selection nodes (empty)
111 or are not present.
112
113 Additionally the correct leaf data will be added to dest_node, and dest_node
114 will be appended to append_to if append_to is not None.
115
116 The return value with be True, False, or a possibly empty set of selection/containment nodes
117 The only failing value is False, if True is returned then the caller should include all
118 containment sibling nodes, otherwise the caller should process the list of containment/selection
119 nodes.
120 """
121 children = fcontain_elm.getchildren() if fcontain_elm is not None else []
122 selected_elms = []
123 if not children:
124 selected_elms = leaf_elms
125
126 # Now look at all the leaf filter selector or match nodes
127 include_all_leaves = True
128 othernodes = []
129 for felm in children:
130 fchildren = felm.getchildren()
131 for lelm in leaf_elms:
132 if fchildren:
133 # Verify that this doesn't match a leaf node.
134 if filter_node_match_no_value(felm, lelm):
135 # XXX this is an error we should raise some exception.
136 return False
137 continue
138 elif filter_node_match(felm, lelm):
139 if not felm.text:
140 # This was a selection node.
141 include_all_leaves = False
142
143 selected_elms.append(lelm)
144 break
145 else:
146 if fchildren:
147 # This is OK we verified a containment filter didn't match leaf by getting here.
148 if felm.text:
149 # XXX verify that there is no text on this node, report violation?
150 return False
151
152 # Track selection/filter nodes
153 include_all_leaves = False
154 othernodes.append(felm)
155 elif not felm.text:
156 # This is OK as it means this is a selection node include it in othernodes
157 include_all_leaves = False
158 othernodes.append(felm)
159 else:
160 # We've exhausted all leaf elements to match this leaf filter so we failed.
161 return False
162
163 # Everything matched so add in the leaf data.
164 if append_to is not None:
165 append_to.append(dest_node)
166
167 if include_all_leaves:
168 dest_node.extend(leaf_elms)
169 else:
170 dest_node.extend(selected_elms)
171
172 if include_all_leaves:
173 return True
174 return othernodes
175
176
177def filter_containment_iter(fcontain_elm, dest_node, containment_nodes,
178 leaf_elms, append_to):
179 """Given a containment filter node (or None) verify that all leaf elements
180 either match, have corresponding selection nodes (empty) or are not present.
181
182 If all leaf criteria are met then the iterator will return a triple of
183 (new_filter_node, new_dest_node, new_data). new_filter_node corresponds to the
184 matched containment node which is returned in new_dest_node, and new_data will be
185 an element corresponding to the passed in dest_node.
186
187 These should be processed by calling filter_containment_iter again.
188
189 Additionally the correct leaf data will be added to dest_node, and dest_node
190 will be appended to append_to if append_to is not None.
191
192 This implements RFC6241 section 6.2.5
193 """
194 # No containment node so add everything.
195 if fcontain_elm is None:
196 # Add in the leaf data
197 for e in leaf_elms:
198 dest_node.append(e)
199
200 # Append the match_node to the data
201 if append_to is not None:
202 append_to.append(dest_node)
203
204 for node in containment_nodes:
205 yield None, copy.copy(node), dest_node
206
207 else:
208 othernodes = filter_leaf_values(fcontain_elm, dest_node, leaf_elms,
209 append_to)
210 if othernodes is False:
211 # No match
212 pass
213 elif othernodes is True:
214 # All leaf values have matched and have been added and we should include all containers
215 for node in containment_nodes:
216 yield None, copy.copy(node), dest_node
217 else:
218 for felm in othernodes:
219 for node in containment_nodes:
220 if filter_node_match_no_value(felm, node):
221 yield felm, copy.copy(node), dest_node
222
223
224def filter_leaf_allows_add(filter_elm, tag, data, value):
225 if filter_leaf_allows(filter_elm, tag, value):
226 data.append(leaf_elm(tag, value))
227 return True
228 return False
229
230
231def filter_leaf_allows(filter_elm, xpath, value):
232 """Check the value at the xpath specified leaf matches the value.
233
234 If filter_elm is None then allow.
235 If there is no xpath element then allow if there are no other children.
236 XXX what about xpath that has embedded predicates!
237 perhaps what we want to call this is a normal path not an xpath.
238 """
239 if filter_elm is None:
240 return True
241
242 # If there are no children then allow everything.
243 if not filter_elm.getchildren():
244 return True
245
246 # No match or multiple matches not allowed for leaf.
247 flist = filter_elm.xpath(xpath, namespaces=NSMAP)
248 if not flist or len(flist) > 1:
249 return False
250 felm = flist[0]
251
252 # No children for leaf allowed (leaf)
253 if felm.getchildren():
254 return False
255
256 # Allowed if empty or if value matches.
257 if not felm.text or felm.text == str(value):
258 return True
259
260 return False
261
262
263def filter_list_iter(filter_list, key_xpath, keys):
264 """Return key, elm pairs that are allowed by keys using the values found using the given key_xpath"""
265 # If we have no filter elm then return all keys.
266 if filter_list is None:
267 for key in keys:
268 yield key, None
269
270 try:
271 # If this an element then make it a list of elements
272 filter_list.xpath # pylint: disable=W0104
273 filter_list = [filter_list]
274 except AttributeError:
275 pass
276
277 for filter_elm in filter_list:
278 filter_elms = [x for x in
279 filter_elm.xpath(key_xpath, namespaces=NSMAP)]
280 filter_keys = [x.text for x in filter_elms]
281 if not filter_keys:
282 for key in keys:
283 yield key, filter_elm
284 else:
285 # Now walk our keys returning any that are in the filter list.
286 for key in keys:
287 if key in filter_keys:
288 yield key, filter_elm
289 # try:
290 # idx = filter_keys.index(str(key))
291 # yield key, filter_elm
292 # except ValueError:
293 # pass
294
295
296__author__ = 'Christian Hopps'
297__date__ = 'March 31 2015'
298__version__ = '1.0'
299__docformat__ = "restructuredtext en"