Khen Nursimulu | 3869d8d | 2016-11-28 20:44:28 -0500 | [diff] [blame] | 1 | # -*- coding: utf-8 -*-# |
| 2 | # |
| 3 | # March 31 2015, Christian Hopps <chopps@gmail.com> |
| 4 | # |
| 5 | # Copyright (c) 2015, Deutsche Telekom AG |
| 6 | # |
| 7 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 8 | # you may not use this file except in compliance with the License. |
| 9 | # You may obtain a copy of the License at |
| 10 | # |
| 11 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 12 | # |
| 13 | # Unless required by applicable law or agreed to in writing, software |
| 14 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 15 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 16 | # See the License for the specific language governing permissions and |
| 17 | # limitations under the License. |
| 18 | # |
| 19 | from __future__ import absolute_import, division, unicode_literals, \ |
| 20 | print_function, nested_scopes |
| 21 | from netconf import NSMAP |
| 22 | import copy |
| 23 | from lxml import etree |
| 24 | |
| 25 | |
| 26 | # Tries to somewhat implement RFC6241 filtering |
| 27 | |
| 28 | |
| 29 | def qname(tag): |
| 30 | try: |
| 31 | return etree.QName(tag) |
| 32 | except ValueError: |
| 33 | prefix, base = tag.split(":") |
| 34 | return etree.QName(NSMAP[prefix], base) |
| 35 | |
| 36 | |
| 37 | def elm(tag, attrib=None, **extra): |
| 38 | if attrib is None: |
| 39 | attrib = dict() |
| 40 | return etree.Element(qname(tag), attrib, **extra) |
| 41 | |
| 42 | |
| 43 | def leaf_elm(tag, value, attrib=None, **extra): |
| 44 | e = elm(tag, attrib, **extra) |
| 45 | e.text = str(value) |
| 46 | return e |
| 47 | |
| 48 | |
| 49 | leaf = leaf_elm |
| 50 | |
| 51 | |
| 52 | def subelm(pelm, tag, attrib=None, **extra): |
| 53 | if attrib is None: |
| 54 | attrib = dict() |
| 55 | return etree.SubElement(pelm, qname(tag), attrib, **extra) |
| 56 | |
| 57 | |
| 58 | def is_selection_node(felm): |
| 59 | ftext = felm.text |
| 60 | return ftext is None or not ftext.strip() |
| 61 | |
| 62 | |
| 63 | def filter_tag_match(filter_tag, elm_tag): |
| 64 | fqname = etree.QName(filter_tag) |
| 65 | eqname = qname(elm_tag) |
| 66 | if not fqname.namespace: |
| 67 | return fqname.localname == eqname.localname |
| 68 | return fqname == eqname |
| 69 | |
| 70 | |
| 71 | def filter_node_match_no_value(filter_node, match_elm): |
| 72 | # First check to see if tag matches. |
| 73 | if not filter_tag_match(filter_node.tag, match_elm.tag): |
| 74 | return False |
| 75 | |
| 76 | # Next check for attribute matches. |
| 77 | # XXX does this need to filter out namespace attributes? |
| 78 | if filter_node.attrib and filter_node.attrib != match_elm.attrib: |
| 79 | return False |
| 80 | |
| 81 | return True |
| 82 | |
| 83 | |
| 84 | def filter_node_match(filter_node, match_elm): |
| 85 | """Given a filter node element and a nodename and attribute dictionary |
| 86 | return true if the filter element matches the elmname, attributes and value |
| 87 | (if not None). |
| 88 | |
| 89 | The filter element can use a wildcard namespace or a specific namespace |
| 90 | the attributes can be missing from the filter node but otherwise must match |
| 91 | and the value is only checked for a match if it is not None. |
| 92 | """ |
| 93 | if not filter_node_match_no_value(filter_node, match_elm): |
| 94 | return False |
| 95 | |
| 96 | # Finally check for matching value. |
| 97 | ftext = filter_node.text |
| 98 | if ftext is None: |
| 99 | return True |
| 100 | |
| 101 | ftext = ftext.strip() |
| 102 | if not ftext: |
| 103 | return True |
| 104 | |
| 105 | return ftext == match_elm.text |
| 106 | |
| 107 | |
| 108 | def filter_leaf_values(fcontain_elm, dest_node, leaf_elms, append_to): |
| 109 | """Given a containment element (or None) verify that all leaf elements |
| 110 | in leaf_elms either match, have corresponding selection nodes (empty) |
| 111 | or are not present. |
| 112 | |
| 113 | Additionally the correct leaf data will be added to dest_node, and dest_node |
| 114 | will be appended to append_to if append_to is not None. |
| 115 | |
| 116 | The return value with be True, False, or a possibly empty set of selection/containment nodes |
| 117 | The only failing value is False, if True is returned then the caller should include all |
| 118 | containment sibling nodes, otherwise the caller should process the list of containment/selection |
| 119 | nodes. |
| 120 | """ |
| 121 | children = fcontain_elm.getchildren() if fcontain_elm is not None else [] |
| 122 | selected_elms = [] |
| 123 | if not children: |
| 124 | selected_elms = leaf_elms |
| 125 | |
| 126 | # Now look at all the leaf filter selector or match nodes |
| 127 | include_all_leaves = True |
| 128 | othernodes = [] |
| 129 | for felm in children: |
| 130 | fchildren = felm.getchildren() |
| 131 | for lelm in leaf_elms: |
| 132 | if fchildren: |
| 133 | # Verify that this doesn't match a leaf node. |
| 134 | if filter_node_match_no_value(felm, lelm): |
| 135 | # XXX this is an error we should raise some exception. |
| 136 | return False |
| 137 | continue |
| 138 | elif filter_node_match(felm, lelm): |
| 139 | if not felm.text: |
| 140 | # This was a selection node. |
| 141 | include_all_leaves = False |
| 142 | |
| 143 | selected_elms.append(lelm) |
| 144 | break |
| 145 | else: |
| 146 | if fchildren: |
| 147 | # This is OK we verified a containment filter didn't match leaf by getting here. |
| 148 | if felm.text: |
| 149 | # XXX verify that there is no text on this node, report violation? |
| 150 | return False |
| 151 | |
| 152 | # Track selection/filter nodes |
| 153 | include_all_leaves = False |
| 154 | othernodes.append(felm) |
| 155 | elif not felm.text: |
| 156 | # This is OK as it means this is a selection node include it in othernodes |
| 157 | include_all_leaves = False |
| 158 | othernodes.append(felm) |
| 159 | else: |
| 160 | # We've exhausted all leaf elements to match this leaf filter so we failed. |
| 161 | return False |
| 162 | |
| 163 | # Everything matched so add in the leaf data. |
| 164 | if append_to is not None: |
| 165 | append_to.append(dest_node) |
| 166 | |
| 167 | if include_all_leaves: |
| 168 | dest_node.extend(leaf_elms) |
| 169 | else: |
| 170 | dest_node.extend(selected_elms) |
| 171 | |
| 172 | if include_all_leaves: |
| 173 | return True |
| 174 | return othernodes |
| 175 | |
| 176 | |
| 177 | def filter_containment_iter(fcontain_elm, dest_node, containment_nodes, |
| 178 | leaf_elms, append_to): |
| 179 | """Given a containment filter node (or None) verify that all leaf elements |
| 180 | either match, have corresponding selection nodes (empty) or are not present. |
| 181 | |
| 182 | If all leaf criteria are met then the iterator will return a triple of |
| 183 | (new_filter_node, new_dest_node, new_data). new_filter_node corresponds to the |
| 184 | matched containment node which is returned in new_dest_node, and new_data will be |
| 185 | an element corresponding to the passed in dest_node. |
| 186 | |
| 187 | These should be processed by calling filter_containment_iter again. |
| 188 | |
| 189 | Additionally the correct leaf data will be added to dest_node, and dest_node |
| 190 | will be appended to append_to if append_to is not None. |
| 191 | |
| 192 | This implements RFC6241 section 6.2.5 |
| 193 | """ |
| 194 | # No containment node so add everything. |
| 195 | if fcontain_elm is None: |
| 196 | # Add in the leaf data |
| 197 | for e in leaf_elms: |
| 198 | dest_node.append(e) |
| 199 | |
| 200 | # Append the match_node to the data |
| 201 | if append_to is not None: |
| 202 | append_to.append(dest_node) |
| 203 | |
| 204 | for node in containment_nodes: |
| 205 | yield None, copy.copy(node), dest_node |
| 206 | |
| 207 | else: |
| 208 | othernodes = filter_leaf_values(fcontain_elm, dest_node, leaf_elms, |
| 209 | append_to) |
| 210 | if othernodes is False: |
| 211 | # No match |
| 212 | pass |
| 213 | elif othernodes is True: |
| 214 | # All leaf values have matched and have been added and we should include all containers |
| 215 | for node in containment_nodes: |
| 216 | yield None, copy.copy(node), dest_node |
| 217 | else: |
| 218 | for felm in othernodes: |
| 219 | for node in containment_nodes: |
| 220 | if filter_node_match_no_value(felm, node): |
| 221 | yield felm, copy.copy(node), dest_node |
| 222 | |
| 223 | |
| 224 | def filter_leaf_allows_add(filter_elm, tag, data, value): |
| 225 | if filter_leaf_allows(filter_elm, tag, value): |
| 226 | data.append(leaf_elm(tag, value)) |
| 227 | return True |
| 228 | return False |
| 229 | |
| 230 | |
| 231 | def filter_leaf_allows(filter_elm, xpath, value): |
| 232 | """Check the value at the xpath specified leaf matches the value. |
| 233 | |
| 234 | If filter_elm is None then allow. |
| 235 | If there is no xpath element then allow if there are no other children. |
| 236 | XXX what about xpath that has embedded predicates! |
| 237 | perhaps what we want to call this is a normal path not an xpath. |
| 238 | """ |
| 239 | if filter_elm is None: |
| 240 | return True |
| 241 | |
| 242 | # If there are no children then allow everything. |
| 243 | if not filter_elm.getchildren(): |
| 244 | return True |
| 245 | |
| 246 | # No match or multiple matches not allowed for leaf. |
| 247 | flist = filter_elm.xpath(xpath, namespaces=NSMAP) |
| 248 | if not flist or len(flist) > 1: |
| 249 | return False |
| 250 | felm = flist[0] |
| 251 | |
| 252 | # No children for leaf allowed (leaf) |
| 253 | if felm.getchildren(): |
| 254 | return False |
| 255 | |
| 256 | # Allowed if empty or if value matches. |
| 257 | if not felm.text or felm.text == str(value): |
| 258 | return True |
| 259 | |
| 260 | return False |
| 261 | |
| 262 | |
| 263 | def filter_list_iter(filter_list, key_xpath, keys): |
| 264 | """Return key, elm pairs that are allowed by keys using the values found using the given key_xpath""" |
| 265 | # If we have no filter elm then return all keys. |
| 266 | if filter_list is None: |
| 267 | for key in keys: |
| 268 | yield key, None |
| 269 | |
| 270 | try: |
| 271 | # If this an element then make it a list of elements |
| 272 | filter_list.xpath # pylint: disable=W0104 |
| 273 | filter_list = [filter_list] |
| 274 | except AttributeError: |
| 275 | pass |
| 276 | |
| 277 | for filter_elm in filter_list: |
| 278 | filter_elms = [x for x in |
| 279 | filter_elm.xpath(key_xpath, namespaces=NSMAP)] |
| 280 | filter_keys = [x.text for x in filter_elms] |
| 281 | if not filter_keys: |
| 282 | for key in keys: |
| 283 | yield key, filter_elm |
| 284 | else: |
| 285 | # Now walk our keys returning any that are in the filter list. |
| 286 | for key in keys: |
| 287 | if key in filter_keys: |
| 288 | yield key, filter_elm |
| 289 | # try: |
| 290 | # idx = filter_keys.index(str(key)) |
| 291 | # yield key, filter_elm |
| 292 | # except ValueError: |
| 293 | # pass |
| 294 | |
| 295 | |
| 296 | __author__ = 'Christian Hopps' |
| 297 | __date__ = 'March 31 2015' |
| 298 | __version__ = '1.0' |
| 299 | __docformat__ = "restructuredtext en" |