blob: 455a3950593ecb3196c17d88f0fa5c5f0b84b65b [file] [log] [blame]
# -*- coding: utf-8 -*-#
#
# March 31 2015, Christian Hopps <chopps@gmail.com>
#
# Copyright (c) 2015, Deutsche Telekom AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import, division, unicode_literals, \
print_function, nested_scopes
from netconf import NSMAP
import copy
from lxml import etree
# Tries to somewhat implement RFC6241 filtering
def qname(tag):
try:
return etree.QName(tag)
except ValueError:
prefix, base = tag.split(":")
return etree.QName(NSMAP[prefix], base)
def elm(tag, attrib=None, **extra):
if attrib is None:
attrib = dict()
return etree.Element(qname(tag), attrib, **extra)
def leaf_elm(tag, value, attrib=None, **extra):
e = elm(tag, attrib, **extra)
e.text = str(value)
return e
leaf = leaf_elm
def subelm(pelm, tag, attrib=None, **extra):
if attrib is None:
attrib = dict()
return etree.SubElement(pelm, qname(tag), attrib, **extra)
def is_selection_node(felm):
ftext = felm.text
return ftext is None or not ftext.strip()
def filter_tag_match(filter_tag, elm_tag):
fqname = etree.QName(filter_tag)
eqname = qname(elm_tag)
if not fqname.namespace:
return fqname.localname == eqname.localname
return fqname == eqname
def filter_node_match_no_value(filter_node, match_elm):
# First check to see if tag matches.
if not filter_tag_match(filter_node.tag, match_elm.tag):
return False
# Next check for attribute matches.
# XXX does this need to filter out namespace attributes?
if filter_node.attrib and filter_node.attrib != match_elm.attrib:
return False
return True
def filter_node_match(filter_node, match_elm):
"""Given a filter node element and a nodename and attribute dictionary
return true if the filter element matches the elmname, attributes and value
(if not None).
The filter element can use a wildcard namespace or a specific namespace
the attributes can be missing from the filter node but otherwise must match
and the value is only checked for a match if it is not None.
"""
if not filter_node_match_no_value(filter_node, match_elm):
return False
# Finally check for matching value.
ftext = filter_node.text
if ftext is None:
return True
ftext = ftext.strip()
if not ftext:
return True
return ftext == match_elm.text
def filter_leaf_values(fcontain_elm, dest_node, leaf_elms, append_to):
"""Given a containment element (or None) verify that all leaf elements
in leaf_elms either match, have corresponding selection nodes (empty)
or are not present.
Additionally the correct leaf data will be added to dest_node, and dest_node
will be appended to append_to if append_to is not None.
The return value with be True, False, or a possibly empty set of selection/containment nodes
The only failing value is False, if True is returned then the caller should include all
containment sibling nodes, otherwise the caller should process the list of containment/selection
nodes.
"""
children = fcontain_elm.getchildren() if fcontain_elm is not None else []
selected_elms = []
if not children:
selected_elms = leaf_elms
# Now look at all the leaf filter selector or match nodes
include_all_leaves = True
othernodes = []
for felm in children:
fchildren = felm.getchildren()
for lelm in leaf_elms:
if fchildren:
# Verify that this doesn't match a leaf node.
if filter_node_match_no_value(felm, lelm):
# XXX this is an error we should raise some exception.
return False
continue
elif filter_node_match(felm, lelm):
if not felm.text:
# This was a selection node.
include_all_leaves = False
selected_elms.append(lelm)
break
else:
if fchildren:
# This is OK we verified a containment filter didn't match leaf by getting here.
if felm.text:
# XXX verify that there is no text on this node, report violation?
return False
# Track selection/filter nodes
include_all_leaves = False
othernodes.append(felm)
elif not felm.text:
# This is OK as it means this is a selection node include it in othernodes
include_all_leaves = False
othernodes.append(felm)
else:
# We've exhausted all leaf elements to match this leaf filter so we failed.
return False
# Everything matched so add in the leaf data.
if append_to is not None:
append_to.append(dest_node)
if include_all_leaves:
dest_node.extend(leaf_elms)
else:
dest_node.extend(selected_elms)
if include_all_leaves:
return True
return othernodes
def filter_containment_iter(fcontain_elm, dest_node, containment_nodes,
leaf_elms, append_to):
"""Given a containment filter node (or None) verify that all leaf elements
either match, have corresponding selection nodes (empty) or are not present.
If all leaf criteria are met then the iterator will return a triple of
(new_filter_node, new_dest_node, new_data). new_filter_node corresponds to the
matched containment node which is returned in new_dest_node, and new_data will be
an element corresponding to the passed in dest_node.
These should be processed by calling filter_containment_iter again.
Additionally the correct leaf data will be added to dest_node, and dest_node
will be appended to append_to if append_to is not None.
This implements RFC6241 section 6.2.5
"""
# No containment node so add everything.
if fcontain_elm is None:
# Add in the leaf data
for e in leaf_elms:
dest_node.append(e)
# Append the match_node to the data
if append_to is not None:
append_to.append(dest_node)
for node in containment_nodes:
yield None, copy.copy(node), dest_node
else:
othernodes = filter_leaf_values(fcontain_elm, dest_node, leaf_elms,
append_to)
if othernodes is False:
# No match
pass
elif othernodes is True:
# All leaf values have matched and have been added and we should include all containers
for node in containment_nodes:
yield None, copy.copy(node), dest_node
else:
for felm in othernodes:
for node in containment_nodes:
if filter_node_match_no_value(felm, node):
yield felm, copy.copy(node), dest_node
def filter_leaf_allows_add(filter_elm, tag, data, value):
if filter_leaf_allows(filter_elm, tag, value):
data.append(leaf_elm(tag, value))
return True
return False
def filter_leaf_allows(filter_elm, xpath, value):
"""Check the value at the xpath specified leaf matches the value.
If filter_elm is None then allow.
If there is no xpath element then allow if there are no other children.
XXX what about xpath that has embedded predicates!
perhaps what we want to call this is a normal path not an xpath.
"""
if filter_elm is None:
return True
# If there are no children then allow everything.
if not filter_elm.getchildren():
return True
# No match or multiple matches not allowed for leaf.
flist = filter_elm.xpath(xpath, namespaces=NSMAP)
if not flist or len(flist) > 1:
return False
felm = flist[0]
# No children for leaf allowed (leaf)
if felm.getchildren():
return False
# Allowed if empty or if value matches.
if not felm.text or felm.text == str(value):
return True
return False
def filter_list_iter(filter_list, key_xpath, keys):
"""Return key, elm pairs that are allowed by keys using the values found using the given key_xpath"""
# If we have no filter elm then return all keys.
if filter_list is None:
for key in keys:
yield key, None
try:
# If this an element then make it a list of elements
filter_list.xpath # pylint: disable=W0104
filter_list = [filter_list]
except AttributeError:
pass
for filter_elm in filter_list:
filter_elms = [x for x in
filter_elm.xpath(key_xpath, namespaces=NSMAP)]
filter_keys = [x.text for x in filter_elms]
if not filter_keys:
for key in keys:
yield key, filter_elm
else:
# Now walk our keys returning any that are in the filter list.
for key in keys:
if key in filter_keys:
yield key, filter_elm
# try:
# idx = filter_keys.index(str(key))
# yield key, filter_elm
# except ValueError:
# pass
__author__ = 'Christian Hopps'
__date__ = 'March 31 2015'
__version__ = '1.0'
__docformat__ = "restructuredtext en"