blob: e0a4baf677ac4499f6d09681024f30363b4651a2 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import structlog
import io
from lxml import etree
from lxml.builder import E
import netconf.nc_common.error as ncerror
from netconf import NSMAP, qmap
from utils import elm
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from capabilities import Capabilities
from netconf.nc_rpc.rpc_factory import get_rpc_factory_instance
from netconf.constants import Constants as C
log = structlog.get_logger()
class NetconfProtocolError(Exception): pass
class NetconfProtocolHandler:
def __init__(self, nc_server, nc_conn, session, grpc_stub):
self.started = True
self.conn = nc_conn
self.nc_server = nc_server
self.grpc_stub = grpc_stub
self.new_framing = False
self.capabilities = Capabilities()
self.session = session
self.exiting = False
self.connected = Deferred()
self.connected.addCallback(self.nc_server.client_disconnected,
self, None)
def send_message(self, msg):
self.conn.send_msg(C.XML_HEADER + msg, self.new_framing)
def receive_message(self):
return self.conn.receive_msg_any(self.new_framing)
def send_hello(self, caplist, session=None):
msg = elm(C.HELLO, attrib={C.XMLNS: NSMAP[C.NC]})
caps = E.capabilities(*[E.capability(x) for x in caplist])
msg.append(caps)
if session is not None:
msg.append(E(C.SESSION_ID, str(session.session_id)))
msg = etree.tostring(msg)
log.info("Sending HELLO", msg=msg)
msg = msg.decode('utf-8')
self.send_message(msg)
def send_rpc_reply(self, rpc_reply, origmsg):
reply = etree.Element(qmap(C.NC) + C.RPC_REPLY, attrib=origmsg.attrib,
nsmap=origmsg.nsmap)
try:
rpc_reply.getchildren
reply.append(rpc_reply)
except AttributeError:
reply.extend(rpc_reply)
ucode = etree.tounicode(reply, pretty_print=True)
log.info("RPC-Reply", reply=ucode)
self.send_message(ucode)
def set_framing_version(self):
if C.NETCONF_BASE_11 in self.capabilities.client_caps:
self.new_framing = True
elif C.NETCONF_BASE_10 not in self.capabilities.client_caps:
raise SessionError(
"Client doesn't implement 1.0 or 1.1 of netconf")
@inlineCallbacks
def open_session(self):
# The transport should be connected at this point.
try:
# Send hello message.
yield self.send_hello(self.capabilities.server_caps, self.session)
# Get reply
reply = yield self.receive_message()
log.info("reply-received", reply=reply)
# Parse reply
tree = etree.parse(io.BytesIO(reply.encode('utf-8')))
root = tree.getroot()
caps = root.xpath(C.CAPABILITY_XPATH, namespaces=NSMAP)
# Store capabilities
for cap in caps:
self.capabilities.add_client_capability(cap.text)
self.set_framing_version()
self.session.session_opened = True
log.info('session-opened', session_id=self.session.session_id,
framing="1.1" if self.new_framing else "1.0")
except Exception as e:
log.error('hello-failure', exception=repr(e))
self.stop(repr(e))
raise
@inlineCallbacks
def start(self):
log.info('starting')
try:
yield self.open_session()
while True:
if not self.session.session_opened:
break;
msg = yield self.receive_message()
yield self.handle_request(msg)
except Exception as e:
log.exception('exception', exception=repr(e))
self.stop(repr(e))
log.info('shutdown')
returnValue(self)
@inlineCallbacks
def handle_request(self, msg):
if not self.session.session_opened:
return
# Any error with XML encoding here is going to cause a session close
try:
tree = etree.parse(io.BytesIO(msg.encode('utf-8')))
if not tree:
raise ncerror.SessionError(msg, "Invalid XML from client.")
except etree.XMLSyntaxError:
log.error("malformed-message", msg=msg)
try:
error = ncerror.BadMsg(msg)
self.send_message(error.get_reply_msg())
except AttributeError:
log.error("attribute-error", msg=msg)
# close session
self.close()
return
rpcs = tree.xpath(C.RPC_XPATH, namespaces=NSMAP)
if not rpcs:
raise ncerror.SessionError(msg, "No rpc found")
# A message can have multiple rpc requests
rpc_factory = get_rpc_factory_instance()
for rpc in rpcs:
try:
# Validate message id is received
try:
msg_id = rpc.get(C.MESSAGE_ID)
log.info("Received-rpc-message-id", msg_id=msg_id)
except (TypeError, ValueError):
log.error('no-message-id', rpc=rpc)
raise ncerror.MissingElement(msg, C.MESSAGE_ID)
# Get a rpc handler
rpc_handler = rpc_factory.get_rpc_handler(rpc,
msg,
self.session)
if rpc_handler:
# set the parameters for this handler
response = yield rpc_handler.execute()
log.info('handler',
rpc_handler=rpc_handler,
is_error=response.is_error,
response=response)
self.send_rpc_reply(response.node, rpc)
if response.close_session:
log.info('response-closing-session', response=response)
self.close()
else:
log.error('no-rpc-handler',
request=msg,
session_id=self.session.session_id)
raise ncerror.NotImpl(msg)
except ncerror.BadMsg as err:
log.info('ncerror.BadMsg')
if self.new_framing:
self.send_message(err.get_reply_msg())
else:
# If we are 1.0 we have to simply close the connection
# as we are not allowed to send this error
log.error("Closing-1-0-session--malformed-message")
self.close()
except (ncerror.NotImpl, ncerror.MissingElement) as e:
log.info('error', repr(e))
self.send_message(e.get_reply_msg())
except Exception as ex:
log.info('Exception', repr(ex))
error = ncerror.ServerException(rpc, ex)
self.send_message(error.get_reply_msg())
# @inlineCallbacks
# def invoke_method(self, rpcname, rpc, params):
# try:
# # Handle any namespaces or prefixes in the tag, other than
# # "nc" which was removed above. Of course, this does not handle
# # namespace collisions, but that seems reasonable for now.
# rpcname = rpcname.rpartition("}")[-1]
# method_name = "rpc_" + rpcname.replace('-', '_')
# method = getattr(self.methods, method_name,
# self._rpc_not_implemented)
# log.info("invoking-method", method=method_name)
# reply = yield method(self, rpc, *params)
# returnValue(reply)
# except NotImplementedError:
# raise ncerror.NotImpl(rpc)
def stop(self, reason):
if not self.exiting:
log.debug('stopping')
self.exiting = True
if self.session.session_opened:
# TODO: send a closing message to the far end
self.conn.close_connection()
self.nc_server.session_mgr.remove_session(self.session)
self.session.session_opened = False
self.connected.callback(None)
log.info('stopped')
def close(self):
if not self.exiting:
log.debug('closing-client')
self.exiting = True
if self.session.session_opened:
self.conn.close_connection()
self.nc_server.session_mgr.remove_session(self.session)
self.session.session_opened = False
self.connected.callback(None)
log.info('closing-client')