blob: cc52a272c6aecafdd24c49d320c80f95aef0e1a4 [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -05002# Copyright 2017 the original author or authors.
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu8ffb8932017-01-26 13:40:49 -050039# from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
40# VolthaGlobalServiceStub
41# from google.protobuf import empty_pb2
42# from google.protobuf.json_format import MessageToDict, ParseDict
43from nc_rpc_mapper import get_nc_rpc_mapper_instance
Khen Nursimulua4972742016-12-23 17:15:20 -050044from google.protobuf import descriptor
45import base64
46import math
Khen Nursimulu3676b7c2017-01-31 13:48:38 -050047import collections
Khen Nursimulu0b9aed12017-02-06 15:33:46 -050048from netconf.constants import Constants as C
Khen Nursimulua4972742016-12-23 17:15:20 -050049
50_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
51 descriptor.FieldDescriptor.CPPTYPE_UINT64])
52_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
53 descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
54_INFINITY = 'Infinity'
55_NEG_INFINITY = '-Infinity'
56_NAN = 'NaN'
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050057
58log = get_logger()
59
60
61class GrpcClient(object):
62 """
63 Connect to a gRPC server, fetch its schema, and process the downloaded
64 proto schema files. The goal is to convert the proto schemas into yang
65 schemas which would be exposed to the Netconf client.
66 """
67 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
68
69 def __init__(self, consul_endpoint, work_dir,
70 grpc_endpoint='localhost:50055',
71 reconnect_callback=None,
72 on_start_callback=None):
73 self.consul_endpoint = consul_endpoint
74 self.grpc_endpoint = grpc_endpoint
75 self.work_dir = work_dir
76 self.reconnect_callback = reconnect_callback
77 self.on_start_callback = on_start_callback
78
79 self.plugin_dir = os.path.abspath(os.path.join(
80 os.path.dirname(__file__), '../protoc_plugins'))
81
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -050082 self.yang_schemas = set()
83
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050084 self.channel = None
85 self.local_stub = None
86 self.schema = None
87 self.retries = 0
88 self.shutting_down = False
89 self.connected = False
90
91 def start(self):
92 log.debug('starting')
93 if not self.connected:
94 reactor.callLater(0, self.connect)
95 log.info('started')
96 return self
97
98 def stop(self):
99 log.debug('stopping')
100 if self.shutting_down:
101 return
102 self.shutting_down = True
103 log.info('stopped')
104
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500105 def set_on_start_callback(self, on_start_callback):
106 self.on_start_callback = on_start_callback
107 return self
108
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500109 def set_reconnect_callback(self, reconnect_callback):
110 self.reconnect_callback = reconnect_callback
111 return self
112
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500113 def resolve_endpoint(self, endpoint):
114 ip_port_endpoint = endpoint
115 if endpoint.startswith('@'):
116 try:
117 ip_port_endpoint = get_endpoint_from_consul(
118 self.consul_endpoint, endpoint[1:])
119 log.info('endpoint-found',
120 endpoint=endpoint, ip_port=ip_port_endpoint)
121 except Exception as e:
122 log.error('service-not-found-in-consul', endpoint=endpoint,
123 exception=repr(e))
124 return None, None
125 if ip_port_endpoint:
126 host, port = ip_port_endpoint.split(':', 2)
127 return host, int(port)
128
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500129 @inlineCallbacks
130 def connect(self):
131 """
132 (Re-)Connect to end-point
133 """
134 if self.shutting_down or self.connected:
135 return
136
137 try:
138 host, port = self.resolve_endpoint(self.grpc_endpoint)
139
140 # If host and port is not set then we will retry
141 if host and port:
142 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500143 self.channel = grpc.insecure_channel(
144 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500145
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500146 yang_from = self._retrieve_schema()
147 log.info('proto-to-yang-schema', file=yang_from)
148 self._compile_proto_files(yang_from)
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500149 self._set_yang_schemas()
150
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500151 self._clear_backoff()
152
153 if self.on_start_callback is not None:
154 reactor.callLater(0, self.on_start_callback)
155
156 self.connected = True
157 if self.reconnect_callback is not None:
158 reactor.callLater(0, self.reconnect_callback)
159
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500160 return
161
162 except _Rendezvous, e:
163 if e.code() == grpc.StatusCode.UNAVAILABLE:
164 log.info('grpc-endpoint-not-available')
165 else:
166 log.exception(e)
167 yield self._backoff('not-available')
168
169 except Exception, e:
170 if not self.shutting_down:
171 log.exception('cannot-connect', endpoint=_endpoint)
172 yield self._backoff('unknown-error')
173
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500174 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500175
176 def _backoff(self, msg):
177 wait_time = self.RETRY_BACKOFF[min(self.retries,
178 len(self.RETRY_BACKOFF) - 1)]
179 self.retries += 1
180 log.error(msg, retry_in=wait_time)
181 return asleep(wait_time)
182
183 def _clear_backoff(self):
184 if self.retries:
185 log.info('reconnected', after_retries=self.retries)
186 self.retries = 0
187
188 def _retrieve_schema(self):
189 """
190 Retrieve schema from gRPC end-point, and save all *.proto files in
191 the work directory.
192 """
193 assert isinstance(self.channel, grpc.Channel)
194 stub = SchemaServiceStub(self.channel)
195 # try:
196 schemas = stub.GetSchema(Empty())
197 # except _Rendezvous, e:
198 # if e.code == grpc.StatusCode.UNAVAILABLE:
199 #
200 # else:
201 # raise e
202
203 os.system('mkdir -p %s' % self.work_dir)
204 os.system('rm -fr /tmp/%s/*' %
205 self.work_dir.replace('/tmp/', '')) # safer
206
207 for proto_file in schemas.protos:
208 proto_fname = proto_file.file_name
209 # TODO: Do we need to process a set of files using a prefix
210 # instead of just one?
211 proto_content = proto_file.proto
212 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500213 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500214 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
215 f.write(proto_content)
216
217 desc_content = decompress(proto_file.descriptor)
218 desc_fname = proto_fname.replace('.proto', '.desc')
219 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500220 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500221 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
222 f.write(desc_content)
223 return schemas.yang_from
224
225 def _compile_proto_files(self, yang_from):
226 """
227 For each *.proto file in the work directory, compile the proto
228 file into the respective *_pb2.py file as well as generate the
229 corresponding yang schema.
230 :return: None
231 """
232 log.info('start')
233 google_api_dir = os.path.abspath(os.path.join(
234 os.path.dirname(__file__), '../protos/third_party'
235 ))
236
237 log.info('google-api', api_dir=google_api_dir)
238
239 netconf_base_dir = os.path.abspath(os.path.join(
240 os.path.dirname(__file__), '../..'
241 ))
242 log.info('netconf-dir', dir=netconf_base_dir)
243
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500244 for fname in [f for f in os.listdir(self.work_dir)
245 if f.endswith('.proto')]:
246 log.info('filename', file=fname)
247
248 need_yang = fname == yang_from
249 log.debug('compiling',
250 file=fname,
251 yang_schema_required=need_yang)
252 cmd = (
253 'cd %s && '
254 'env PATH=%s PYTHONPATH=%s '
255 'python -m grpc.tools.protoc '
256 '-I. '
257 '-I%s '
258 '--python_out=. '
259 '--grpc_python_out=. '
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500260 '--plugin=protoc-gen-gw=%s/rpc_gw_gen.py '
261 '--gw_out=. '
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500262 '--plugin=protoc-gen-custom=%s/proto2yang.py '
263 '%s'
264 '%s' % (
265 self.work_dir,
266 ':'.join([os.environ['PATH'], self.plugin_dir]),
267 ':'.join([google_api_dir, netconf_base_dir]),
268 google_api_dir,
269 self.plugin_dir,
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500270 self.plugin_dir,
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500271 '--custom_out=. ' if need_yang else '',
272 fname)
273 )
274 log.debug('executing', cmd=cmd, file=fname)
275 os.system(cmd)
276 log.info('compiled', file=fname)
277
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500278 # Load the generated modules
279 mapper = get_nc_rpc_mapper_instance(self.work_dir, self)
280 mapper.load_modules()
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500281
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500282 def _set_yang_schemas(self):
283 if self.work_dir not in sys.path:
284 sys.path.insert(0, self.work_dir)
285
286 for fname in [f for f in os.listdir(self.work_dir)
287 if f.endswith('.yang')]:
Khen Nursimulu0b9aed12017-02-06 15:33:46 -0500288 # Filter out schemas which are not required
289 if fname not in C.SCHEMAS_TO_IGNORE:
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500290 self.yang_schemas.add(fname[:-len('.yang')])
291 log.info('yang-schemas', schemas=self.yang_schemas)
292
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800293 @inlineCallbacks
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500294 def invoke_voltha_rpc(self, service, method, params, metadata=None):
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800295 try:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500296 mapper = get_nc_rpc_mapper_instance()
Khen Nursimulua4972742016-12-23 17:15:20 -0500297
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500298 # Get the mapping function using the service and method name
299 func = mapper.get_function(service, method)
300 if func is None:
301 log.info('unsupported-rpc', service=service, method=method)
302 return
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800303
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500304 response = yield func(self, params, metadata)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500305
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500306 # Get the XML tag to use in the response
307 xml_tag = mapper.get_xml_tag(service, method)
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500308
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500309 # Get the XML list item name used in the response
310 list_item_name = mapper.get_list_items_name(service, method)
311
312 # Get the YANG defined fields (and their order) for that service
313 # and method
314 fields = mapper.get_fields_from_yang_defs(service, method)
315
316 # TODO: This needs to be investigated further since the Netconf
317 # Client shows a formatting error in the code below is uncommented.
318 # Check if this represents a List and whether the field name is
319 # items. In the response (a dictionary), if a list named 'items'
320 # is returned then 'items' can either:
321 # 1) represent a list of items being returned where 'items' is just
322 # a name to represent a list. In this case, this name will be
323 # discarded
324 # 2) represent the actual field name as defined in the proto
325 # definitions. If this is the case then we need to preserve the
326 # name
327 # list_item_name = ''
328 # if len(fields) == 1:
329 # if fields[0]['name'] == 'items':
330 # list_item_name = 'items'
331
332 # Rearrange the dictionary response as specified by the YANG
333 # definitions
334 rearranged_response = self.rearrange_dict(mapper, response, fields)
335
336 log.info('rpc-result', service=service, method=method,
337 response=response,
338 rearranged_response=rearranged_response, xml_tag=xml_tag,
339 list_item_name=list_item_name, fields=fields)
340
341 returnValue((rearranged_response, (xml_tag, list_item_name)))
Khen Nursimulua4972742016-12-23 17:15:20 -0500342
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800343 except Exception, e:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500344 log.exception('rpc-failure', service=service, method=method,
345 params=params, e=e)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800346
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500347 def rearrange_dict(self, mapper, orig_dict, fields):
348 log.debug('rearranging-dict', fields=fields)
349 result = collections.OrderedDict()
350 if len(orig_dict) == 0 or not fields:
351 return result
352 for f in fields:
353 if orig_dict.has_key(f['name']):
354 if f['type_ref']:
355 # Get the fields for that type
356 sub_fields = mapper.get_fields_from_type_name(f['module'],
357 f['type'])
358 if f['repeated']:
359 result[f['name']] = []
360 for d in orig_dict[f['name']]:
361 result[f['name']].append(self.rearrange_dict(
362 mapper, d, sub_fields))
363 else:
364 result[f['name']] = self.rearrange_dict(mapper,
365 orig_dict[
366 f['name']],
367 sub_fields)
368 else:
369 result[f['name']] = orig_dict[f['name']]
370 return result
371
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500372 @inlineCallbacks
373 def invoke(self, stub, method_name, request, metadata, retry=1):
374 """
375 Invoke a gRPC call to the remote server and return the response.
376 :param stub: Reference to the *_pb2 service stub
377 :param method_name: The method name inside the service stub
378 :param request: The request protobuf message
379 :param metadata: [(str, str), (str, str), ...]
380 :return: The response protobuf message and returned trailing metadata
381 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800382
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500383 if not self.connected:
384 raise ServiceUnavailable()
385
386 try:
387 method = getattr(stub(self.channel), method_name)
388 response, rendezvous = method.with_call(request, metadata=metadata)
389 returnValue((response, rendezvous.trailing_metadata()))
390
391 except grpc._channel._Rendezvous, e:
392 code = e.code()
393 if code == grpc.StatusCode.UNAVAILABLE:
394 e = ServiceUnavailable()
395
396 if self.connected:
397 self.connected = False
398 yield self.connect()
399 if retry > 0:
400 response = yield self.invoke(stub, method_name,
401 request, metadata,
402 retry=retry - 1)
403 returnValue(response)
404
405 elif code in (
406 grpc.StatusCode.NOT_FOUND,
407 grpc.StatusCode.INVALID_ARGUMENT,
408 grpc.StatusCode.ALREADY_EXISTS):
409
410 pass # don't log error, these occur naturally
411
412 else:
413 log.exception(e)
414
415 raise e
Khen Nursimulua4972742016-12-23 17:15:20 -0500416
417 # Below is an adaptation of Google's MessageToDict() which includes
418 # protobuf options extensions
419
420 class Error(Exception):
421 """Top-level module error for json_format."""
422
423 class SerializeToJsonError(Error):
424 """Thrown if serialization to JSON fails."""
425
426 def _IsMapEntry(self, field):
427 return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
428 field.message_type.has_options and
429 field.message_type.GetOptions().map_entry)
430
431 def convertToDict(self, message):
432 """Converts message to an object according to Proto3 JSON Specification."""
433
434 js = {}
435 return self._RegularMessageToJsonObject(message, js)
436
437 def get_yang_option(self, field):
438 opt = field.GetOptions()
439 yang_opt = {}
440 for fd, val in opt.ListFields():
441 if fd.full_name == 'voltha.yang_inline_node':
442 yang_opt['id'] = val.id
443 yang_opt['type'] = val.type
444 # Fow now, a max of 1 yang option is set per field
445 return yang_opt
446
447 def _RegularMessageToJsonObject(self, message, js):
448 """Converts normal message according to Proto3 JSON Specification."""
449 fields = message.ListFields()
450
451 try:
452 for field, value in fields:
453 # Check for options
454 yang_opt = self.get_yang_option(field)
455
456 name = field.name
457 if self._IsMapEntry(field):
458 # Convert a map field.
459 v_field = field.message_type.fields_by_name['value']
460 js_map = {}
461 for key in value:
462 if isinstance(key, bool):
463 if key:
464 recorded_key = 'true'
465 else:
466 recorded_key = 'false'
467 else:
468 recorded_key = key
469 js_map[recorded_key] = self._FieldToJsonObject(
470 v_field, value[key])
471 js[name] = js_map
472 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
473 # Convert a repeated field.
474 js[name] = [self._FieldToJsonObject(field, k)
475 for k in value]
476 else:
477 # This specific yang option applies only to non-repeated
478 # fields
479 if yang_opt: # Create a map
480 js_map = {}
481 js_map['yang_field_option'] = True
482 js_map['yang_field_option_id'] = yang_opt['id']
483 js_map['yang_field_option_type'] = yang_opt['type']
484 js_map['name'] = name
485 js_map[name] = self._FieldToJsonObject(field, value)
486 js[name] = js_map
487 else:
488 js[name] = self._FieldToJsonObject(field, value)
489
490 # Serialize default value if including_default_value_fields is True.
491 message_descriptor = message.DESCRIPTOR
492 for field in message_descriptor.fields:
493 # Singular message fields and oneof fields will not be affected.
494 if ((
495 field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
496 field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
497 field.containing_oneof):
498 continue
499 name = field.name
500 if name in js:
501 # Skip the field which has been serailized already.
502 continue
503 if self._IsMapEntry(field):
504 js[name] = {}
505 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
506 js[name] = []
507 else:
508 js[name] = self._FieldToJsonObject(field,
509 field.default_value)
510
511 except ValueError as e:
512 raise self.SerializeToJsonError(
513 'Failed to serialize {0} field: {1}.'.format(field.name, e))
514
515 return js
516
517 def _FieldToJsonObject(self, field, value):
518 """Converts field value according to Proto3 JSON Specification."""
519 if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
520 return self.convertToDict(value)
521 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
522 enum_value = field.enum_type.values_by_number.get(value, None)
523 if enum_value is not None:
524 return enum_value.name
525 else:
526 raise self.SerializeToJsonError('Enum field contains an '
527 'integer value '
528 'which can not mapped to an enum value.')
529 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
530 if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
531 # Use base64 Data encoding for bytes
532 return base64.b64encode(value).decode('utf-8')
533 else:
534 return value
535 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
536 return bool(value)
537 elif field.cpp_type in _INT64_TYPES:
538 return str(value)
539 elif field.cpp_type in _FLOAT_TYPES:
540 if math.isinf(value):
541 if value < 0.0:
542 return _NEG_INFINITY
543 else:
544 return _INFINITY
545 if math.isnan(value):
546 return _NAN
547 return value