blob: aced3f9476b409ae916436dc69c110bf676c3d98 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2017 the original author or authors.
#
# Code adapted from https://github.com/choppsv1/netconf
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import structlog
from lxml import etree
import netconf.nc_common.error as ncerror
log = structlog.get_logger()
class RpcResponse():
def __init__(self, capabilities):
self.is_error = False
# if there is an error then the reply_node will contains an Error
# object
self.reply_node = None
self.close_session = False
self.capabilities = capabilities
self.custom_rpc = False
def build_xml_response(self, request, voltha_response, custom_rpc=False):
if request is None:
return
voltha_xml_string = etree.tostring(voltha_response)
# Remove the leading and trailing <yang> tags
if voltha_xml_string.startswith('<yang>'):
voltha_xml_string = voltha_xml_string[len('<yang>'):]
if voltha_xml_string.endswith('</yang>'):
voltha_xml_string = voltha_xml_string[:-len('</yang>')]
# Empty response
elif voltha_xml_string.startswith('<yang/>'):
voltha_xml_string = ''
# Replace any True/False data to true/false
voltha_xml_string = voltha_xml_string.replace('>False<', '>false<')
voltha_xml_string = voltha_xml_string.replace('>True<', '>true<')
if not custom_rpc:
# Create the xml body as
if request.has_key('subclass'):
body = ''.join([
'<data>',
'<',
request['class'],
' xmlns="',
request['namespace'],
'">',
'<',
request['subclass'],
'>',
voltha_xml_string,
'</',
request['subclass'],
'>',
'</',
request['class'],
'>',
'</data>'
])
else:
body = ''.join([
'<data>',
'<',
request['class'],
' xmlns="urn:opencord:params:xml:ns:voltha:voltha">',
voltha_xml_string,
'</',
request['class'],
'>',
'</data>'
])
else: # custom_rpc
body = ''.join([
'<rpc-reply',
' xmlns="urn:opencord:params:xml:ns:voltha:voltha">',
voltha_xml_string,
'</rpc-reply>',
])
return etree.fromstring(body)
def add_node(self, new_node, tree):
if new_node.tag == 'ignore':
# We want only sub-elements
for elem in list(new_node):
tree.append(elem)
else:
tree.append(new_node)
def copy_basic_element(self, elm):
new_elem = etree.Element(elm.tag)
new_elem.text = elm.text
return new_elem
def process_inline_option(self, elem):
inline_option = False
inline_node_name = None
for elm in list(elem):
if elm.tag == 'yang_field_option':
inline_option = True
if elm.tag == 'name':
inline_node_name = elm.text
if not inline_option:
new_elem = etree.Element(elem.tag)
return new_elem, elem
# look for the node with the inline_node_name
for elm in list(elem):
if elm.tag == inline_node_name:
new_elem = etree.Element('ignore')
return new_elem, elm
def process_element(self, elem):
attrib = elem.get('type')
if (attrib == 'list'):
if list(elem) is None:
return self.copy_basic_element(elem)
new_elem = etree.Element('ignore')
for elm in list(elem):
elm.tag = elem.tag
if elm.get('type') in ['list', 'dict']:
self.add_node(self.process_element(elm), new_elem)
else:
new_elem.append(self.copy_basic_element(elm))
return new_elem
elif (attrib == 'dict'):
# Empty case
if list(elem) is None:
return self.copy_basic_element(elem)
# Process field option.
new_elem, elem = self.process_inline_option(elem)
for elm in list(elem):
if elm.get('type') in ['list', 'dict']:
self.add_node(self.process_element(elm), new_elem)
else:
new_elem.append(self.copy_basic_element(elm))
return new_elem
else:
return self.copy_basic_element(elem)
def to_yang_xml(self, from_xml, request, yang_options=None,
custom_rpc=False):
# Parse from_xml as follows:
# 1. Any element having a list attribute shoud have each item move 1 level
# up and retag using the parent tag
# 2. Any element having a dict attribute and has a <yang_field_option>
# sub-element should have all it's items move to teh parent level
top = etree.Element('yang')
elms = list(from_xml)
xml_tag = yang_options[0]
list_items_name = yang_options[1]
# Handle the special case when the the xml contains 1 element which
# is a list type
if len(elms) == 1:
item = elms[0]
if item.get('type') == 'list':
if custom_rpc: # custom rpc request
if list_items_name == 'items': # no pre-processing required
self.add_node(self.process_element(item), top)
return top
if xml_tag:
item.tag = xml_tag
else:
item.tag = 'ignore'
else:
# Default netconf operations - may end up needing
# specific parsing per request type
if request.has_key('subclass'):
item.tag = request['subclass']
# remove the subclass element in request to avoid duplicate tag
del request['subclass']
elif list_items_name == 'items':
item.tag = xml_tag
else:
item.tag = 'ignore'
self.add_node(self.process_element(item), top)
return top
# Process normally for all other cases
for elm in elms:
self.add_node(self.process_element(elm), top)
return top
# Helper method to sort the xml message based on the xml tags
def sort_xml_response(self, xml):
for parent in xml.xpath('//*[./*]'): # Search for parent elements
parent[:] = sorted(parent, key=lambda x: x.tag)
return xml
# custom_rpc refers to custom RPCs different from Netconf default RPCs
# like get, get-config, edit-config, etc
def build_yang_response(self, root, request, yang_options=None,
custom_rpc=False):
try:
self.custom_rpc = custom_rpc
yang_xml = self.to_yang_xml(root, request, yang_options,
custom_rpc)
log.info('yang-xml', yang_xml=etree.tounicode(yang_xml,
pretty_print=True))
return self.build_xml_response(request, yang_xml, custom_rpc)
except Exception as e:
log.exception('error-building-yang-response', request=request,
xml=etree.tostring(root))
self.rpc_response.is_error = True
self.rpc_response.node = ncerror.BadMsg(request)
return