blob: e0a4baf677ac4499f6d09681024f30363b4651a2 [file] [log] [blame]
Khen Nursimulua7b842a2016-12-03 23:28:42 -05001#!/usr/bin/env python
2#
3# Copyright 2016 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import structlog
18import io
19from lxml import etree
20from lxml.builder import E
21import netconf.nc_common.error as ncerror
22from netconf import NSMAP, qmap
23from utils import elm
24from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
25from capabilities import Capabilities
26from netconf.nc_rpc.rpc_factory import get_rpc_factory_instance
27from netconf.constants import Constants as C
28
29log = structlog.get_logger()
30
31
32class NetconfProtocolError(Exception): pass
33
34
35class NetconfProtocolHandler:
36 def __init__(self, nc_server, nc_conn, session, grpc_stub):
37 self.started = True
38 self.conn = nc_conn
39 self.nc_server = nc_server
40 self.grpc_stub = grpc_stub
41 self.new_framing = False
42 self.capabilities = Capabilities()
43 self.session = session
44 self.exiting = False
45 self.connected = Deferred()
46 self.connected.addCallback(self.nc_server.client_disconnected,
47 self, None)
48
49 def send_message(self, msg):
50 self.conn.send_msg(C.XML_HEADER + msg, self.new_framing)
51
52 def receive_message(self):
53 return self.conn.receive_msg_any(self.new_framing)
54
55 def send_hello(self, caplist, session=None):
56 msg = elm(C.HELLO, attrib={C.XMLNS: NSMAP[C.NC]})
57 caps = E.capabilities(*[E.capability(x) for x in caplist])
58 msg.append(caps)
59
60 if session is not None:
61 msg.append(E(C.SESSION_ID, str(session.session_id)))
62 msg = etree.tostring(msg)
63 log.info("Sending HELLO", msg=msg)
64 msg = msg.decode('utf-8')
65 self.send_message(msg)
66
67 def send_rpc_reply(self, rpc_reply, origmsg):
68 reply = etree.Element(qmap(C.NC) + C.RPC_REPLY, attrib=origmsg.attrib,
69 nsmap=origmsg.nsmap)
70 try:
71 rpc_reply.getchildren
72 reply.append(rpc_reply)
73 except AttributeError:
74 reply.extend(rpc_reply)
75 ucode = etree.tounicode(reply, pretty_print=True)
76 log.info("RPC-Reply", reply=ucode)
77 self.send_message(ucode)
78
79 def set_framing_version(self):
80 if C.NETCONF_BASE_11 in self.capabilities.client_caps:
81 self.new_framing = True
82 elif C.NETCONF_BASE_10 not in self.capabilities.client_caps:
83 raise SessionError(
84 "Client doesn't implement 1.0 or 1.1 of netconf")
85
86 @inlineCallbacks
87 def open_session(self):
88 # The transport should be connected at this point.
89 try:
90 # Send hello message.
91 yield self.send_hello(self.capabilities.server_caps, self.session)
92 # Get reply
93 reply = yield self.receive_message()
94 log.info("reply-received", reply=reply)
95
96 # Parse reply
97 tree = etree.parse(io.BytesIO(reply.encode('utf-8')))
98 root = tree.getroot()
99 caps = root.xpath(C.CAPABILITY_XPATH, namespaces=NSMAP)
100
101 # Store capabilities
102 for cap in caps:
103 self.capabilities.add_client_capability(cap.text)
104
105 self.set_framing_version()
106 self.session.session_opened = True
107
108 log.info('session-opened', session_id=self.session.session_id,
109 framing="1.1" if self.new_framing else "1.0")
110 except Exception as e:
111 log.error('hello-failure', exception=repr(e))
112 self.stop(repr(e))
113 raise
114
115 @inlineCallbacks
116 def start(self):
117 log.info('starting')
118
119 try:
120 yield self.open_session()
121 while True:
122 if not self.session.session_opened:
123 break;
124 msg = yield self.receive_message()
125 yield self.handle_request(msg)
126
127 except Exception as e:
128 log.exception('exception', exception=repr(e))
129 self.stop(repr(e))
130
131 log.info('shutdown')
132 returnValue(self)
133
134 @inlineCallbacks
135 def handle_request(self, msg):
136 if not self.session.session_opened:
137 return
138
139 # Any error with XML encoding here is going to cause a session close
140 try:
141 tree = etree.parse(io.BytesIO(msg.encode('utf-8')))
142 if not tree:
143 raise ncerror.SessionError(msg, "Invalid XML from client.")
144 except etree.XMLSyntaxError:
145 log.error("malformed-message", msg=msg)
146 try:
147 error = ncerror.BadMsg(msg)
148 self.send_message(error.get_reply_msg())
149 except AttributeError:
150 log.error("attribute-error", msg=msg)
151 # close session
152 self.close()
153 return
154
155 rpcs = tree.xpath(C.RPC_XPATH, namespaces=NSMAP)
156 if not rpcs:
157 raise ncerror.SessionError(msg, "No rpc found")
158
159 # A message can have multiple rpc requests
160 rpc_factory = get_rpc_factory_instance()
161 for rpc in rpcs:
162 try:
163 # Validate message id is received
164 try:
165 msg_id = rpc.get(C.MESSAGE_ID)
166 log.info("Received-rpc-message-id", msg_id=msg_id)
167 except (TypeError, ValueError):
168 log.error('no-message-id', rpc=rpc)
169 raise ncerror.MissingElement(msg, C.MESSAGE_ID)
170
171 # Get a rpc handler
172 rpc_handler = rpc_factory.get_rpc_handler(rpc,
173 msg,
174 self.session)
175 if rpc_handler:
176 # set the parameters for this handler
177 response = yield rpc_handler.execute()
178 log.info('handler',
179 rpc_handler=rpc_handler,
180 is_error=response.is_error,
181 response=response)
182 self.send_rpc_reply(response.node, rpc)
183 if response.close_session:
184 log.info('response-closing-session', response=response)
185 self.close()
186 else:
187 log.error('no-rpc-handler',
188 request=msg,
189 session_id=self.session.session_id)
190 raise ncerror.NotImpl(msg)
191
192 except ncerror.BadMsg as err:
193 log.info('ncerror.BadMsg')
194 if self.new_framing:
195 self.send_message(err.get_reply_msg())
196 else:
197 # If we are 1.0 we have to simply close the connection
198 # as we are not allowed to send this error
199 log.error("Closing-1-0-session--malformed-message")
200 self.close()
201 except (ncerror.NotImpl, ncerror.MissingElement) as e:
202 log.info('error', repr(e))
203 self.send_message(e.get_reply_msg())
204 except Exception as ex:
205 log.info('Exception', repr(ex))
206 error = ncerror.ServerException(rpc, ex)
207 self.send_message(error.get_reply_msg())
208
209 # @inlineCallbacks
210 # def invoke_method(self, rpcname, rpc, params):
211 # try:
212 # # Handle any namespaces or prefixes in the tag, other than
213 # # "nc" which was removed above. Of course, this does not handle
214 # # namespace collisions, but that seems reasonable for now.
215 # rpcname = rpcname.rpartition("}")[-1]
216 # method_name = "rpc_" + rpcname.replace('-', '_')
217 # method = getattr(self.methods, method_name,
218 # self._rpc_not_implemented)
219 # log.info("invoking-method", method=method_name)
220 # reply = yield method(self, rpc, *params)
221 # returnValue(reply)
222 # except NotImplementedError:
223 # raise ncerror.NotImpl(rpc)
224
225 def stop(self, reason):
226 if not self.exiting:
227 log.debug('stopping')
228 self.exiting = True
229 if self.session.session_opened:
230 # TODO: send a closing message to the far end
231 self.conn.close_connection()
232 self.nc_server.session_mgr.remove_session(self.session)
233 self.session.session_opened = False
234 self.connected.callback(None)
235 log.info('stopped')
236
237 def close(self):
238 if not self.exiting:
239 log.debug('closing-client')
240 self.exiting = True
241 if self.session.session_opened:
242 self.conn.close_connection()
243 self.nc_server.session_mgr.remove_session(self.session)
244 self.session.session_opened = False
245 self.connected.callback(None)
246 log.info('closing-client')