blob: 341f180a9b8bf35c5e66eab650da698ef83d977e [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -05002# Copyright 2017 the original author or authors.
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu8ffb8932017-01-26 13:40:49 -050039# from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
40# VolthaGlobalServiceStub
41# from google.protobuf import empty_pb2
42# from google.protobuf.json_format import MessageToDict, ParseDict
43from nc_rpc_mapper import get_nc_rpc_mapper_instance
Khen Nursimulua4972742016-12-23 17:15:20 -050044from google.protobuf import descriptor
45import base64
46import math
Khen Nursimulu3676b7c2017-01-31 13:48:38 -050047import collections
Khen Nursimulua4972742016-12-23 17:15:20 -050048
49_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
50 descriptor.FieldDescriptor.CPPTYPE_UINT64])
51_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
52 descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
53_INFINITY = 'Infinity'
54_NEG_INFINITY = '-Infinity'
55_NAN = 'NaN'
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050056
57log = get_logger()
58
59
60class GrpcClient(object):
61 """
62 Connect to a gRPC server, fetch its schema, and process the downloaded
63 proto schema files. The goal is to convert the proto schemas into yang
64 schemas which would be exposed to the Netconf client.
65 """
66 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
67
68 def __init__(self, consul_endpoint, work_dir,
69 grpc_endpoint='localhost:50055',
70 reconnect_callback=None,
71 on_start_callback=None):
72 self.consul_endpoint = consul_endpoint
73 self.grpc_endpoint = grpc_endpoint
74 self.work_dir = work_dir
75 self.reconnect_callback = reconnect_callback
76 self.on_start_callback = on_start_callback
77
78 self.plugin_dir = os.path.abspath(os.path.join(
79 os.path.dirname(__file__), '../protoc_plugins'))
80
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -050081 self.yang_schemas = set()
82
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050083 self.channel = None
84 self.local_stub = None
85 self.schema = None
86 self.retries = 0
87 self.shutting_down = False
88 self.connected = False
89
90 def start(self):
91 log.debug('starting')
92 if not self.connected:
93 reactor.callLater(0, self.connect)
94 log.info('started')
95 return self
96
97 def stop(self):
98 log.debug('stopping')
99 if self.shutting_down:
100 return
101 self.shutting_down = True
102 log.info('stopped')
103
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500104 def set_on_start_callback(self, on_start_callback):
105 self.on_start_callback = on_start_callback
106 return self
107
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500108 def set_reconnect_callback(self, reconnect_callback):
109 self.reconnect_callback = reconnect_callback
110 return self
111
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500112 def resolve_endpoint(self, endpoint):
113 ip_port_endpoint = endpoint
114 if endpoint.startswith('@'):
115 try:
116 ip_port_endpoint = get_endpoint_from_consul(
117 self.consul_endpoint, endpoint[1:])
118 log.info('endpoint-found',
119 endpoint=endpoint, ip_port=ip_port_endpoint)
120 except Exception as e:
121 log.error('service-not-found-in-consul', endpoint=endpoint,
122 exception=repr(e))
123 return None, None
124 if ip_port_endpoint:
125 host, port = ip_port_endpoint.split(':', 2)
126 return host, int(port)
127
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500128 @inlineCallbacks
129 def connect(self):
130 """
131 (Re-)Connect to end-point
132 """
133 if self.shutting_down or self.connected:
134 return
135
136 try:
137 host, port = self.resolve_endpoint(self.grpc_endpoint)
138
139 # If host and port is not set then we will retry
140 if host and port:
141 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500142 self.channel = grpc.insecure_channel(
143 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500144
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500145 yang_from = self._retrieve_schema()
146 log.info('proto-to-yang-schema', file=yang_from)
147 self._compile_proto_files(yang_from)
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500148 self._set_yang_schemas()
149
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500150 self._clear_backoff()
151
152 if self.on_start_callback is not None:
153 reactor.callLater(0, self.on_start_callback)
154
155 self.connected = True
156 if self.reconnect_callback is not None:
157 reactor.callLater(0, self.reconnect_callback)
158
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500159 return
160
161 except _Rendezvous, e:
162 if e.code() == grpc.StatusCode.UNAVAILABLE:
163 log.info('grpc-endpoint-not-available')
164 else:
165 log.exception(e)
166 yield self._backoff('not-available')
167
168 except Exception, e:
169 if not self.shutting_down:
170 log.exception('cannot-connect', endpoint=_endpoint)
171 yield self._backoff('unknown-error')
172
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500173 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500174
175 def _backoff(self, msg):
176 wait_time = self.RETRY_BACKOFF[min(self.retries,
177 len(self.RETRY_BACKOFF) - 1)]
178 self.retries += 1
179 log.error(msg, retry_in=wait_time)
180 return asleep(wait_time)
181
182 def _clear_backoff(self):
183 if self.retries:
184 log.info('reconnected', after_retries=self.retries)
185 self.retries = 0
186
187 def _retrieve_schema(self):
188 """
189 Retrieve schema from gRPC end-point, and save all *.proto files in
190 the work directory.
191 """
192 assert isinstance(self.channel, grpc.Channel)
193 stub = SchemaServiceStub(self.channel)
194 # try:
195 schemas = stub.GetSchema(Empty())
196 # except _Rendezvous, e:
197 # if e.code == grpc.StatusCode.UNAVAILABLE:
198 #
199 # else:
200 # raise e
201
202 os.system('mkdir -p %s' % self.work_dir)
203 os.system('rm -fr /tmp/%s/*' %
204 self.work_dir.replace('/tmp/', '')) # safer
205
206 for proto_file in schemas.protos:
207 proto_fname = proto_file.file_name
208 # TODO: Do we need to process a set of files using a prefix
209 # instead of just one?
210 proto_content = proto_file.proto
211 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500212 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500213 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
214 f.write(proto_content)
215
216 desc_content = decompress(proto_file.descriptor)
217 desc_fname = proto_fname.replace('.proto', '.desc')
218 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500219 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500220 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
221 f.write(desc_content)
222 return schemas.yang_from
223
224 def _compile_proto_files(self, yang_from):
225 """
226 For each *.proto file in the work directory, compile the proto
227 file into the respective *_pb2.py file as well as generate the
228 corresponding yang schema.
229 :return: None
230 """
231 log.info('start')
232 google_api_dir = os.path.abspath(os.path.join(
233 os.path.dirname(__file__), '../protos/third_party'
234 ))
235
236 log.info('google-api', api_dir=google_api_dir)
237
238 netconf_base_dir = os.path.abspath(os.path.join(
239 os.path.dirname(__file__), '../..'
240 ))
241 log.info('netconf-dir', dir=netconf_base_dir)
242
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500243 for fname in [f for f in os.listdir(self.work_dir)
244 if f.endswith('.proto')]:
245 log.info('filename', file=fname)
246
247 need_yang = fname == yang_from
248 log.debug('compiling',
249 file=fname,
250 yang_schema_required=need_yang)
251 cmd = (
252 'cd %s && '
253 'env PATH=%s PYTHONPATH=%s '
254 'python -m grpc.tools.protoc '
255 '-I. '
256 '-I%s '
257 '--python_out=. '
258 '--grpc_python_out=. '
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500259 '--plugin=protoc-gen-gw=%s/rpc_gw_gen.py '
260 '--gw_out=. '
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500261 '--plugin=protoc-gen-custom=%s/proto2yang.py '
262 '%s'
263 '%s' % (
264 self.work_dir,
265 ':'.join([os.environ['PATH'], self.plugin_dir]),
266 ':'.join([google_api_dir, netconf_base_dir]),
267 google_api_dir,
268 self.plugin_dir,
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500269 self.plugin_dir,
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500270 '--custom_out=. ' if need_yang else '',
271 fname)
272 )
273 log.debug('executing', cmd=cmd, file=fname)
274 os.system(cmd)
275 log.info('compiled', file=fname)
276
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500277 # Load the generated modules
278 mapper = get_nc_rpc_mapper_instance(self.work_dir, self)
279 mapper.load_modules()
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500280
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500281 def _set_yang_schemas(self):
282 if self.work_dir not in sys.path:
283 sys.path.insert(0, self.work_dir)
284
285 for fname in [f for f in os.listdir(self.work_dir)
286 if f.endswith('.yang')]:
287 # Special case : since ietf-http, ietf-annotations,
288 # ietf-yang_options are not used for yang schema then do not add
289 # them to the set
290 if fname not in ['ietf-http.yang', 'ietf-yang_options.yang',
291 'ietf-descriptor.yang']:
292 self.yang_schemas.add(fname[:-len('.yang')])
293 log.info('yang-schemas', schemas=self.yang_schemas)
294
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800295 @inlineCallbacks
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500296 def invoke_voltha_rpc(self, service, method, params, metadata=None):
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800297 try:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500298 mapper = get_nc_rpc_mapper_instance()
Khen Nursimulua4972742016-12-23 17:15:20 -0500299
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500300 # Get the mapping function using the service and method name
301 func = mapper.get_function(service, method)
302 if func is None:
303 log.info('unsupported-rpc', service=service, method=method)
304 return
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800305
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500306 response = yield func(self, params, metadata)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500307
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500308 # Get the XML tag to use in the response
309 xml_tag = mapper.get_xml_tag(service, method)
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500310
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500311 # Get the XML list item name used in the response
312 list_item_name = mapper.get_list_items_name(service, method)
313
314 # Get the YANG defined fields (and their order) for that service
315 # and method
316 fields = mapper.get_fields_from_yang_defs(service, method)
317
318 # TODO: This needs to be investigated further since the Netconf
319 # Client shows a formatting error in the code below is uncommented.
320 # Check if this represents a List and whether the field name is
321 # items. In the response (a dictionary), if a list named 'items'
322 # is returned then 'items' can either:
323 # 1) represent a list of items being returned where 'items' is just
324 # a name to represent a list. In this case, this name will be
325 # discarded
326 # 2) represent the actual field name as defined in the proto
327 # definitions. If this is the case then we need to preserve the
328 # name
329 # list_item_name = ''
330 # if len(fields) == 1:
331 # if fields[0]['name'] == 'items':
332 # list_item_name = 'items'
333
334 # Rearrange the dictionary response as specified by the YANG
335 # definitions
336 rearranged_response = self.rearrange_dict(mapper, response, fields)
337
338 log.info('rpc-result', service=service, method=method,
339 response=response,
340 rearranged_response=rearranged_response, xml_tag=xml_tag,
341 list_item_name=list_item_name, fields=fields)
342
343 returnValue((rearranged_response, (xml_tag, list_item_name)))
Khen Nursimulua4972742016-12-23 17:15:20 -0500344
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800345 except Exception, e:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500346 log.exception('rpc-failure', service=service, method=method,
347 params=params, e=e)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800348
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500349 def rearrange_dict(self, mapper, orig_dict, fields):
350 log.debug('rearranging-dict', fields=fields)
351 result = collections.OrderedDict()
352 if len(orig_dict) == 0 or not fields:
353 return result
354 for f in fields:
355 if orig_dict.has_key(f['name']):
356 if f['type_ref']:
357 # Get the fields for that type
358 sub_fields = mapper.get_fields_from_type_name(f['module'],
359 f['type'])
360 if f['repeated']:
361 result[f['name']] = []
362 for d in orig_dict[f['name']]:
363 result[f['name']].append(self.rearrange_dict(
364 mapper, d, sub_fields))
365 else:
366 result[f['name']] = self.rearrange_dict(mapper,
367 orig_dict[
368 f['name']],
369 sub_fields)
370 else:
371 result[f['name']] = orig_dict[f['name']]
372 return result
373
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500374 @inlineCallbacks
375 def invoke(self, stub, method_name, request, metadata, retry=1):
376 """
377 Invoke a gRPC call to the remote server and return the response.
378 :param stub: Reference to the *_pb2 service stub
379 :param method_name: The method name inside the service stub
380 :param request: The request protobuf message
381 :param metadata: [(str, str), (str, str), ...]
382 :return: The response protobuf message and returned trailing metadata
383 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800384
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500385 if not self.connected:
386 raise ServiceUnavailable()
387
388 try:
389 method = getattr(stub(self.channel), method_name)
390 response, rendezvous = method.with_call(request, metadata=metadata)
391 returnValue((response, rendezvous.trailing_metadata()))
392
393 except grpc._channel._Rendezvous, e:
394 code = e.code()
395 if code == grpc.StatusCode.UNAVAILABLE:
396 e = ServiceUnavailable()
397
398 if self.connected:
399 self.connected = False
400 yield self.connect()
401 if retry > 0:
402 response = yield self.invoke(stub, method_name,
403 request, metadata,
404 retry=retry - 1)
405 returnValue(response)
406
407 elif code in (
408 grpc.StatusCode.NOT_FOUND,
409 grpc.StatusCode.INVALID_ARGUMENT,
410 grpc.StatusCode.ALREADY_EXISTS):
411
412 pass # don't log error, these occur naturally
413
414 else:
415 log.exception(e)
416
417 raise e
Khen Nursimulua4972742016-12-23 17:15:20 -0500418
419 # Below is an adaptation of Google's MessageToDict() which includes
420 # protobuf options extensions
421
422 class Error(Exception):
423 """Top-level module error for json_format."""
424
425 class SerializeToJsonError(Error):
426 """Thrown if serialization to JSON fails."""
427
428 def _IsMapEntry(self, field):
429 return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
430 field.message_type.has_options and
431 field.message_type.GetOptions().map_entry)
432
433 def convertToDict(self, message):
434 """Converts message to an object according to Proto3 JSON Specification."""
435
436 js = {}
437 return self._RegularMessageToJsonObject(message, js)
438
439 def get_yang_option(self, field):
440 opt = field.GetOptions()
441 yang_opt = {}
442 for fd, val in opt.ListFields():
443 if fd.full_name == 'voltha.yang_inline_node':
444 yang_opt['id'] = val.id
445 yang_opt['type'] = val.type
446 # Fow now, a max of 1 yang option is set per field
447 return yang_opt
448
449 def _RegularMessageToJsonObject(self, message, js):
450 """Converts normal message according to Proto3 JSON Specification."""
451 fields = message.ListFields()
452
453 try:
454 for field, value in fields:
455 # Check for options
456 yang_opt = self.get_yang_option(field)
457
458 name = field.name
459 if self._IsMapEntry(field):
460 # Convert a map field.
461 v_field = field.message_type.fields_by_name['value']
462 js_map = {}
463 for key in value:
464 if isinstance(key, bool):
465 if key:
466 recorded_key = 'true'
467 else:
468 recorded_key = 'false'
469 else:
470 recorded_key = key
471 js_map[recorded_key] = self._FieldToJsonObject(
472 v_field, value[key])
473 js[name] = js_map
474 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
475 # Convert a repeated field.
476 js[name] = [self._FieldToJsonObject(field, k)
477 for k in value]
478 else:
479 # This specific yang option applies only to non-repeated
480 # fields
481 if yang_opt: # Create a map
482 js_map = {}
483 js_map['yang_field_option'] = True
484 js_map['yang_field_option_id'] = yang_opt['id']
485 js_map['yang_field_option_type'] = yang_opt['type']
486 js_map['name'] = name
487 js_map[name] = self._FieldToJsonObject(field, value)
488 js[name] = js_map
489 else:
490 js[name] = self._FieldToJsonObject(field, value)
491
492 # Serialize default value if including_default_value_fields is True.
493 message_descriptor = message.DESCRIPTOR
494 for field in message_descriptor.fields:
495 # Singular message fields and oneof fields will not be affected.
496 if ((
497 field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
498 field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
499 field.containing_oneof):
500 continue
501 name = field.name
502 if name in js:
503 # Skip the field which has been serailized already.
504 continue
505 if self._IsMapEntry(field):
506 js[name] = {}
507 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
508 js[name] = []
509 else:
510 js[name] = self._FieldToJsonObject(field,
511 field.default_value)
512
513 except ValueError as e:
514 raise self.SerializeToJsonError(
515 'Failed to serialize {0} field: {1}.'.format(field.name, e))
516
517 return js
518
519 def _FieldToJsonObject(self, field, value):
520 """Converts field value according to Proto3 JSON Specification."""
521 if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
522 return self.convertToDict(value)
523 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
524 enum_value = field.enum_type.values_by_number.get(value, None)
525 if enum_value is not None:
526 return enum_value.name
527 else:
528 raise self.SerializeToJsonError('Enum field contains an '
529 'integer value '
530 'which can not mapped to an enum value.')
531 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
532 if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
533 # Use base64 Data encoding for bytes
534 return base64.b64encode(value).decode('utf-8')
535 else:
536 return value
537 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
538 return bool(value)
539 elif field.cpp_type in _INT64_TYPES:
540 return str(value)
541 elif field.cpp_type in _FLOAT_TYPES:
542 if math.isinf(value):
543 if value < 0.0:
544 return _NEG_INFINITY
545 else:
546 return _INFINITY
547 if math.isnan(value):
548 return _NAN
549 return value