blob: bcfb972e3c5f3a8224976c2ee392e4cb9e8c8ef6 [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -05002# Copyright 2017 the original author or authors.
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu8ffb8932017-01-26 13:40:49 -050039# from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
40# VolthaGlobalServiceStub
41# from google.protobuf import empty_pb2
42# from google.protobuf.json_format import MessageToDict, ParseDict
43from nc_rpc_mapper import get_nc_rpc_mapper_instance
Khen Nursimulua4972742016-12-23 17:15:20 -050044from google.protobuf import descriptor
45import base64
46import math
Khen Nursimulu3676b7c2017-01-31 13:48:38 -050047import collections
Khen Nursimulu0b9aed12017-02-06 15:33:46 -050048from netconf.constants import Constants as C
Khen Nursimulua4972742016-12-23 17:15:20 -050049
50_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
51 descriptor.FieldDescriptor.CPPTYPE_UINT64])
52_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
53 descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
54_INFINITY = 'Infinity'
55_NEG_INFINITY = '-Infinity'
56_NAN = 'NaN'
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050057
58log = get_logger()
59
60
61class GrpcClient(object):
62 """
63 Connect to a gRPC server, fetch its schema, and process the downloaded
64 proto schema files. The goal is to convert the proto schemas into yang
65 schemas which would be exposed to the Netconf client.
66 """
67 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
68
69 def __init__(self, consul_endpoint, work_dir,
70 grpc_endpoint='localhost:50055',
71 reconnect_callback=None,
72 on_start_callback=None):
73 self.consul_endpoint = consul_endpoint
74 self.grpc_endpoint = grpc_endpoint
75 self.work_dir = work_dir
76 self.reconnect_callback = reconnect_callback
77 self.on_start_callback = on_start_callback
78
79 self.plugin_dir = os.path.abspath(os.path.join(
80 os.path.dirname(__file__), '../protoc_plugins'))
81
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -050082 self.yang_schemas = set()
83
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050084 self.channel = None
85 self.local_stub = None
86 self.schema = None
87 self.retries = 0
88 self.shutting_down = False
89 self.connected = False
90
91 def start(self):
92 log.debug('starting')
93 if not self.connected:
94 reactor.callLater(0, self.connect)
95 log.info('started')
96 return self
97
98 def stop(self):
99 log.debug('stopping')
100 if self.shutting_down:
101 return
102 self.shutting_down = True
103 log.info('stopped')
104
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500105 def set_on_start_callback(self, on_start_callback):
106 self.on_start_callback = on_start_callback
107 return self
108
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500109 def set_reconnect_callback(self, reconnect_callback):
110 self.reconnect_callback = reconnect_callback
111 return self
112
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500113 def resolve_endpoint(self, endpoint):
114 ip_port_endpoint = endpoint
115 if endpoint.startswith('@'):
116 try:
117 ip_port_endpoint = get_endpoint_from_consul(
118 self.consul_endpoint, endpoint[1:])
119 log.info('endpoint-found',
120 endpoint=endpoint, ip_port=ip_port_endpoint)
121 except Exception as e:
122 log.error('service-not-found-in-consul', endpoint=endpoint,
123 exception=repr(e))
124 return None, None
125 if ip_port_endpoint:
126 host, port = ip_port_endpoint.split(':', 2)
127 return host, int(port)
128
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500129 @inlineCallbacks
130 def connect(self):
131 """
132 (Re-)Connect to end-point
133 """
134 if self.shutting_down or self.connected:
135 return
136
137 try:
138 host, port = self.resolve_endpoint(self.grpc_endpoint)
139
140 # If host and port is not set then we will retry
141 if host and port:
142 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500143 self.channel = grpc.insecure_channel(
144 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500145
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500146 yang_from = self._retrieve_schema()
147 log.info('proto-to-yang-schema', file=yang_from)
148 self._compile_proto_files(yang_from)
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500149 self._set_yang_schemas()
150
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500151 self._clear_backoff()
152
153 if self.on_start_callback is not None:
154 reactor.callLater(0, self.on_start_callback)
155
156 self.connected = True
157 if self.reconnect_callback is not None:
158 reactor.callLater(0, self.reconnect_callback)
159
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500160 return
161
162 except _Rendezvous, e:
163 if e.code() == grpc.StatusCode.UNAVAILABLE:
164 log.info('grpc-endpoint-not-available')
165 else:
166 log.exception(e)
167 yield self._backoff('not-available')
168
169 except Exception, e:
170 if not self.shutting_down:
171 log.exception('cannot-connect', endpoint=_endpoint)
172 yield self._backoff('unknown-error')
173
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500174 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500175
176 def _backoff(self, msg):
177 wait_time = self.RETRY_BACKOFF[min(self.retries,
178 len(self.RETRY_BACKOFF) - 1)]
179 self.retries += 1
180 log.error(msg, retry_in=wait_time)
181 return asleep(wait_time)
182
183 def _clear_backoff(self):
184 if self.retries:
185 log.info('reconnected', after_retries=self.retries)
186 self.retries = 0
187
188 def _retrieve_schema(self):
189 """
190 Retrieve schema from gRPC end-point, and save all *.proto files in
191 the work directory.
192 """
193 assert isinstance(self.channel, grpc.Channel)
194 stub = SchemaServiceStub(self.channel)
195 # try:
196 schemas = stub.GetSchema(Empty())
197 # except _Rendezvous, e:
198 # if e.code == grpc.StatusCode.UNAVAILABLE:
199 #
200 # else:
201 # raise e
202
203 os.system('mkdir -p %s' % self.work_dir)
204 os.system('rm -fr /tmp/%s/*' %
205 self.work_dir.replace('/tmp/', '')) # safer
206
207 for proto_file in schemas.protos:
208 proto_fname = proto_file.file_name
209 # TODO: Do we need to process a set of files using a prefix
210 # instead of just one?
211 proto_content = proto_file.proto
212 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500213 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500214 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
215 f.write(proto_content)
216
217 desc_content = decompress(proto_file.descriptor)
218 desc_fname = proto_fname.replace('.proto', '.desc')
219 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500220 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500221 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
222 f.write(desc_content)
223 return schemas.yang_from
224
225 def _compile_proto_files(self, yang_from):
226 """
227 For each *.proto file in the work directory, compile the proto
228 file into the respective *_pb2.py file as well as generate the
229 corresponding yang schema.
230 :return: None
231 """
232 log.info('start')
233 google_api_dir = os.path.abspath(os.path.join(
234 os.path.dirname(__file__), '../protos/third_party'
235 ))
236
237 log.info('google-api', api_dir=google_api_dir)
238
239 netconf_base_dir = os.path.abspath(os.path.join(
240 os.path.dirname(__file__), '../..'
241 ))
242 log.info('netconf-dir', dir=netconf_base_dir)
243
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500244 for fname in [f for f in os.listdir(self.work_dir)
245 if f.endswith('.proto')]:
246 log.info('filename', file=fname)
247
248 need_yang = fname == yang_from
249 log.debug('compiling',
250 file=fname,
251 yang_schema_required=need_yang)
252 cmd = (
253 'cd %s && '
254 'env PATH=%s PYTHONPATH=%s '
255 'python -m grpc.tools.protoc '
256 '-I. '
257 '-I%s '
258 '--python_out=. '
259 '--grpc_python_out=. '
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500260 '--plugin=protoc-gen-gw=%s/rpc_gw_gen.py '
261 '--gw_out=. '
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500262 '--plugin=protoc-gen-custom=%s/proto2yang.py '
263 '%s'
264 '%s' % (
265 self.work_dir,
266 ':'.join([os.environ['PATH'], self.plugin_dir]),
267 ':'.join([google_api_dir, netconf_base_dir]),
268 google_api_dir,
269 self.plugin_dir,
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500270 self.plugin_dir,
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500271 '--custom_out=. ' if need_yang else '',
272 fname)
273 )
274 log.debug('executing', cmd=cmd, file=fname)
275 os.system(cmd)
276 log.info('compiled', file=fname)
277
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500278 # Load the generated modules
279 mapper = get_nc_rpc_mapper_instance(self.work_dir, self)
280 mapper.load_modules()
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500281
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500282 def _set_yang_schemas(self):
283 if self.work_dir not in sys.path:
284 sys.path.insert(0, self.work_dir)
285
286 for fname in [f for f in os.listdir(self.work_dir)
287 if f.endswith('.yang')]:
Khen Nursimulu0b9aed12017-02-06 15:33:46 -0500288 # Filter out schemas which are not required
289 if fname not in C.SCHEMAS_TO_IGNORE:
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500290 self.yang_schemas.add(fname[:-len('.yang')])
291 log.info('yang-schemas', schemas=self.yang_schemas)
292
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800293 @inlineCallbacks
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500294 def invoke_voltha_rpc(self, service, method, params, metadata=None):
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800295 try:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500296 mapper = get_nc_rpc_mapper_instance()
Khen Nursimulua4972742016-12-23 17:15:20 -0500297
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500298 # Get the mapping function using the service and method name
299 func = mapper.get_function(service, method)
300 if func is None:
301 log.info('unsupported-rpc', service=service, method=method)
302 return
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800303
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500304 response = yield func(self, params, metadata)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500305
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500306 # Get the XML tag to use in the response
307 xml_tag = mapper.get_xml_tag(service, method)
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500308
Khen Nursimulub21bd642017-02-20 21:32:27 -0500309 # # Get the XML list item name used in the response
310 # list_item_name = mapper.get_list_items_name(service, method)
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500311
312 # Get the YANG defined fields (and their order) for that service
313 # and method
314 fields = mapper.get_fields_from_yang_defs(service, method)
315
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500316 # Check if this represents a List and whether the field name is
317 # items. In the response (a dictionary), if a list named 'items'
318 # is returned then 'items' can either:
319 # 1) represent a list of items being returned where 'items' is just
320 # a name to represent a list. In this case, this name will be
321 # discarded
322 # 2) represent the actual field name as defined in the proto
323 # definitions. If this is the case then we need to preserve the
324 # name
Khen Nursimulub21bd642017-02-20 21:32:27 -0500325 list_item_name = ''
326 if fields: # if the return type is empty then fields will be None
327 if len(fields) == 1:
328 if fields[0]['name'] == 'items':
329 list_item_name = 'items'
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500330
331 # Rearrange the dictionary response as specified by the YANG
332 # definitions
333 rearranged_response = self.rearrange_dict(mapper, response, fields)
334
335 log.info('rpc-result', service=service, method=method,
336 response=response,
337 rearranged_response=rearranged_response, xml_tag=xml_tag,
338 list_item_name=list_item_name, fields=fields)
339
340 returnValue((rearranged_response, (xml_tag, list_item_name)))
Khen Nursimulua4972742016-12-23 17:15:20 -0500341
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800342 except Exception, e:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500343 log.exception('rpc-failure', service=service, method=method,
344 params=params, e=e)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800345
Khen Nursimulu3676b7c2017-01-31 13:48:38 -0500346 def rearrange_dict(self, mapper, orig_dict, fields):
347 log.debug('rearranging-dict', fields=fields)
348 result = collections.OrderedDict()
349 if len(orig_dict) == 0 or not fields:
350 return result
351 for f in fields:
352 if orig_dict.has_key(f['name']):
353 if f['type_ref']:
354 # Get the fields for that type
355 sub_fields = mapper.get_fields_from_type_name(f['module'],
356 f['type'])
357 if f['repeated']:
358 result[f['name']] = []
359 for d in orig_dict[f['name']]:
360 result[f['name']].append(self.rearrange_dict(
361 mapper, d, sub_fields))
362 else:
363 result[f['name']] = self.rearrange_dict(mapper,
364 orig_dict[
365 f['name']],
366 sub_fields)
367 else:
368 result[f['name']] = orig_dict[f['name']]
369 return result
370
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500371 @inlineCallbacks
372 def invoke(self, stub, method_name, request, metadata, retry=1):
373 """
374 Invoke a gRPC call to the remote server and return the response.
375 :param stub: Reference to the *_pb2 service stub
376 :param method_name: The method name inside the service stub
377 :param request: The request protobuf message
378 :param metadata: [(str, str), (str, str), ...]
379 :return: The response protobuf message and returned trailing metadata
380 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800381
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500382 if not self.connected:
383 raise ServiceUnavailable()
384
385 try:
386 method = getattr(stub(self.channel), method_name)
387 response, rendezvous = method.with_call(request, metadata=metadata)
388 returnValue((response, rendezvous.trailing_metadata()))
389
390 except grpc._channel._Rendezvous, e:
391 code = e.code()
392 if code == grpc.StatusCode.UNAVAILABLE:
393 e = ServiceUnavailable()
394
395 if self.connected:
396 self.connected = False
397 yield self.connect()
398 if retry > 0:
399 response = yield self.invoke(stub, method_name,
400 request, metadata,
401 retry=retry - 1)
402 returnValue(response)
403
404 elif code in (
405 grpc.StatusCode.NOT_FOUND,
406 grpc.StatusCode.INVALID_ARGUMENT,
407 grpc.StatusCode.ALREADY_EXISTS):
408
409 pass # don't log error, these occur naturally
410
411 else:
412 log.exception(e)
413
414 raise e
Khen Nursimulua4972742016-12-23 17:15:20 -0500415
416 # Below is an adaptation of Google's MessageToDict() which includes
417 # protobuf options extensions
418
419 class Error(Exception):
420 """Top-level module error for json_format."""
421
422 class SerializeToJsonError(Error):
423 """Thrown if serialization to JSON fails."""
424
425 def _IsMapEntry(self, field):
426 return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
427 field.message_type.has_options and
428 field.message_type.GetOptions().map_entry)
429
430 def convertToDict(self, message):
431 """Converts message to an object according to Proto3 JSON Specification."""
432
433 js = {}
434 return self._RegularMessageToJsonObject(message, js)
435
436 def get_yang_option(self, field):
437 opt = field.GetOptions()
438 yang_opt = {}
439 for fd, val in opt.ListFields():
440 if fd.full_name == 'voltha.yang_inline_node':
441 yang_opt['id'] = val.id
442 yang_opt['type'] = val.type
443 # Fow now, a max of 1 yang option is set per field
444 return yang_opt
445
446 def _RegularMessageToJsonObject(self, message, js):
447 """Converts normal message according to Proto3 JSON Specification."""
448 fields = message.ListFields()
449
450 try:
451 for field, value in fields:
452 # Check for options
453 yang_opt = self.get_yang_option(field)
454
455 name = field.name
456 if self._IsMapEntry(field):
457 # Convert a map field.
458 v_field = field.message_type.fields_by_name['value']
459 js_map = {}
460 for key in value:
461 if isinstance(key, bool):
462 if key:
463 recorded_key = 'true'
464 else:
465 recorded_key = 'false'
466 else:
467 recorded_key = key
468 js_map[recorded_key] = self._FieldToJsonObject(
469 v_field, value[key])
470 js[name] = js_map
471 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
472 # Convert a repeated field.
473 js[name] = [self._FieldToJsonObject(field, k)
474 for k in value]
475 else:
476 # This specific yang option applies only to non-repeated
477 # fields
478 if yang_opt: # Create a map
479 js_map = {}
480 js_map['yang_field_option'] = True
481 js_map['yang_field_option_id'] = yang_opt['id']
482 js_map['yang_field_option_type'] = yang_opt['type']
483 js_map['name'] = name
484 js_map[name] = self._FieldToJsonObject(field, value)
485 js[name] = js_map
486 else:
487 js[name] = self._FieldToJsonObject(field, value)
488
489 # Serialize default value if including_default_value_fields is True.
490 message_descriptor = message.DESCRIPTOR
491 for field in message_descriptor.fields:
492 # Singular message fields and oneof fields will not be affected.
493 if ((
494 field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
495 field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
496 field.containing_oneof):
497 continue
498 name = field.name
499 if name in js:
500 # Skip the field which has been serailized already.
501 continue
502 if self._IsMapEntry(field):
503 js[name] = {}
504 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
505 js[name] = []
506 else:
507 js[name] = self._FieldToJsonObject(field,
508 field.default_value)
509
510 except ValueError as e:
511 raise self.SerializeToJsonError(
512 'Failed to serialize {0} field: {1}.'.format(field.name, e))
513
514 return js
515
516 def _FieldToJsonObject(self, field, value):
517 """Converts field value according to Proto3 JSON Specification."""
518 if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
519 return self.convertToDict(value)
520 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
521 enum_value = field.enum_type.values_by_number.get(value, None)
522 if enum_value is not None:
523 return enum_value.name
524 else:
525 raise self.SerializeToJsonError('Enum field contains an '
526 'integer value '
527 'which can not mapped to an enum value.')
528 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
529 if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
530 # Use base64 Data encoding for bytes
531 return base64.b64encode(value).decode('utf-8')
532 else:
533 return value
534 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
535 return bool(value)
536 elif field.cpp_type in _INT64_TYPES:
537 return str(value)
538 elif field.cpp_type in _FLOAT_TYPES:
539 if math.isinf(value):
540 if value < 0.0:
541 return _NEG_INFINITY
542 else:
543 return _INFINITY
544 if math.isnan(value):
545 return _NAN
546 return value