blob: 9dd7f870395e337bcd0d48f526578f92e6a314e3 [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -05002# Copyright 2017 the original author or authors.
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu8ffb8932017-01-26 13:40:49 -050039# from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
40# VolthaGlobalServiceStub
41# from google.protobuf import empty_pb2
42# from google.protobuf.json_format import MessageToDict, ParseDict
43from nc_rpc_mapper import get_nc_rpc_mapper_instance
Khen Nursimulua4972742016-12-23 17:15:20 -050044from google.protobuf import descriptor
45import base64
46import math
47
48_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
49 descriptor.FieldDescriptor.CPPTYPE_UINT64])
50_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
51 descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
52_INFINITY = 'Infinity'
53_NEG_INFINITY = '-Infinity'
54_NAN = 'NaN'
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050055
56log = get_logger()
57
58
59class GrpcClient(object):
60 """
61 Connect to a gRPC server, fetch its schema, and process the downloaded
62 proto schema files. The goal is to convert the proto schemas into yang
63 schemas which would be exposed to the Netconf client.
64 """
65 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
66
67 def __init__(self, consul_endpoint, work_dir,
68 grpc_endpoint='localhost:50055',
69 reconnect_callback=None,
70 on_start_callback=None):
71 self.consul_endpoint = consul_endpoint
72 self.grpc_endpoint = grpc_endpoint
73 self.work_dir = work_dir
74 self.reconnect_callback = reconnect_callback
75 self.on_start_callback = on_start_callback
76
77 self.plugin_dir = os.path.abspath(os.path.join(
78 os.path.dirname(__file__), '../protoc_plugins'))
79
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -050080 self.yang_schemas = set()
81
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050082 self.channel = None
83 self.local_stub = None
84 self.schema = None
85 self.retries = 0
86 self.shutting_down = False
87 self.connected = False
88
89 def start(self):
90 log.debug('starting')
91 if not self.connected:
92 reactor.callLater(0, self.connect)
93 log.info('started')
94 return self
95
96 def stop(self):
97 log.debug('stopping')
98 if self.shutting_down:
99 return
100 self.shutting_down = True
101 log.info('stopped')
102
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500103 def set_on_start_callback(self, on_start_callback):
104 self.on_start_callback = on_start_callback
105 return self
106
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500107 def set_reconnect_callback(self, reconnect_callback):
108 self.reconnect_callback = reconnect_callback
109 return self
110
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500111 def resolve_endpoint(self, endpoint):
112 ip_port_endpoint = endpoint
113 if endpoint.startswith('@'):
114 try:
115 ip_port_endpoint = get_endpoint_from_consul(
116 self.consul_endpoint, endpoint[1:])
117 log.info('endpoint-found',
118 endpoint=endpoint, ip_port=ip_port_endpoint)
119 except Exception as e:
120 log.error('service-not-found-in-consul', endpoint=endpoint,
121 exception=repr(e))
122 return None, None
123 if ip_port_endpoint:
124 host, port = ip_port_endpoint.split(':', 2)
125 return host, int(port)
126
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500127 @inlineCallbacks
128 def connect(self):
129 """
130 (Re-)Connect to end-point
131 """
132 if self.shutting_down or self.connected:
133 return
134
135 try:
136 host, port = self.resolve_endpoint(self.grpc_endpoint)
137
138 # If host and port is not set then we will retry
139 if host and port:
140 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500141 self.channel = grpc.insecure_channel(
142 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500143
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500144 yang_from = self._retrieve_schema()
145 log.info('proto-to-yang-schema', file=yang_from)
146 self._compile_proto_files(yang_from)
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500147 self._set_yang_schemas()
148
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500149 self._clear_backoff()
150
151 if self.on_start_callback is not None:
152 reactor.callLater(0, self.on_start_callback)
153
154 self.connected = True
155 if self.reconnect_callback is not None:
156 reactor.callLater(0, self.reconnect_callback)
157
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500158 return
159
160 except _Rendezvous, e:
161 if e.code() == grpc.StatusCode.UNAVAILABLE:
162 log.info('grpc-endpoint-not-available')
163 else:
164 log.exception(e)
165 yield self._backoff('not-available')
166
167 except Exception, e:
168 if not self.shutting_down:
169 log.exception('cannot-connect', endpoint=_endpoint)
170 yield self._backoff('unknown-error')
171
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500172 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500173
174 def _backoff(self, msg):
175 wait_time = self.RETRY_BACKOFF[min(self.retries,
176 len(self.RETRY_BACKOFF) - 1)]
177 self.retries += 1
178 log.error(msg, retry_in=wait_time)
179 return asleep(wait_time)
180
181 def _clear_backoff(self):
182 if self.retries:
183 log.info('reconnected', after_retries=self.retries)
184 self.retries = 0
185
186 def _retrieve_schema(self):
187 """
188 Retrieve schema from gRPC end-point, and save all *.proto files in
189 the work directory.
190 """
191 assert isinstance(self.channel, grpc.Channel)
192 stub = SchemaServiceStub(self.channel)
193 # try:
194 schemas = stub.GetSchema(Empty())
195 # except _Rendezvous, e:
196 # if e.code == grpc.StatusCode.UNAVAILABLE:
197 #
198 # else:
199 # raise e
200
201 os.system('mkdir -p %s' % self.work_dir)
202 os.system('rm -fr /tmp/%s/*' %
203 self.work_dir.replace('/tmp/', '')) # safer
204
205 for proto_file in schemas.protos:
206 proto_fname = proto_file.file_name
207 # TODO: Do we need to process a set of files using a prefix
208 # instead of just one?
209 proto_content = proto_file.proto
210 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500211 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500212 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
213 f.write(proto_content)
214
215 desc_content = decompress(proto_file.descriptor)
216 desc_fname = proto_fname.replace('.proto', '.desc')
217 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500218 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500219 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
220 f.write(desc_content)
221 return schemas.yang_from
222
223 def _compile_proto_files(self, yang_from):
224 """
225 For each *.proto file in the work directory, compile the proto
226 file into the respective *_pb2.py file as well as generate the
227 corresponding yang schema.
228 :return: None
229 """
230 log.info('start')
231 google_api_dir = os.path.abspath(os.path.join(
232 os.path.dirname(__file__), '../protos/third_party'
233 ))
234
235 log.info('google-api', api_dir=google_api_dir)
236
237 netconf_base_dir = os.path.abspath(os.path.join(
238 os.path.dirname(__file__), '../..'
239 ))
240 log.info('netconf-dir', dir=netconf_base_dir)
241
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500242 for fname in [f for f in os.listdir(self.work_dir)
243 if f.endswith('.proto')]:
244 log.info('filename', file=fname)
245
246 need_yang = fname == yang_from
247 log.debug('compiling',
248 file=fname,
249 yang_schema_required=need_yang)
250 cmd = (
251 'cd %s && '
252 'env PATH=%s PYTHONPATH=%s '
253 'python -m grpc.tools.protoc '
254 '-I. '
255 '-I%s '
256 '--python_out=. '
257 '--grpc_python_out=. '
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500258 '--plugin=protoc-gen-gw=%s/rpc_gw_gen.py '
259 '--gw_out=. '
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500260 '--plugin=protoc-gen-custom=%s/proto2yang.py '
261 '%s'
262 '%s' % (
263 self.work_dir,
264 ':'.join([os.environ['PATH'], self.plugin_dir]),
265 ':'.join([google_api_dir, netconf_base_dir]),
266 google_api_dir,
267 self.plugin_dir,
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500268 self.plugin_dir,
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500269 '--custom_out=. ' if need_yang else '',
270 fname)
271 )
272 log.debug('executing', cmd=cmd, file=fname)
273 os.system(cmd)
274 log.info('compiled', file=fname)
275
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500276 # Load the generated modules
277 mapper = get_nc_rpc_mapper_instance(self.work_dir, self)
278 mapper.load_modules()
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500279
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500280 def _set_yang_schemas(self):
281 if self.work_dir not in sys.path:
282 sys.path.insert(0, self.work_dir)
283
284 for fname in [f for f in os.listdir(self.work_dir)
285 if f.endswith('.yang')]:
286 # Special case : since ietf-http, ietf-annotations,
287 # ietf-yang_options are not used for yang schema then do not add
288 # them to the set
289 if fname not in ['ietf-http.yang', 'ietf-yang_options.yang',
290 'ietf-descriptor.yang']:
291 self.yang_schemas.add(fname[:-len('.yang')])
292 log.info('yang-schemas', schemas=self.yang_schemas)
293
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800294 @inlineCallbacks
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500295 def invoke_voltha_rpc(self, service, method, params, metadata=None):
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800296 try:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500297 mapper = get_nc_rpc_mapper_instance()
Khen Nursimulua4972742016-12-23 17:15:20 -0500298
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500299 # Get the mapping function using the service and method name
300 func = mapper.get_function(service, method)
301 if func is None:
302 log.info('unsupported-rpc', service=service, method=method)
303 return
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800304
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500305 response = yield func(self, params, metadata)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500306
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500307 log.info('rpc-result', service=service, method=method,
308 response=response)
309
310 returnValue(response)
Khen Nursimulua4972742016-12-23 17:15:20 -0500311
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800312 except Exception, e:
Khen Nursimulu8ffb8932017-01-26 13:40:49 -0500313 log.exception('rpc-failure', service=service, method=method,
314 params=params, e=e)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800315
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500316 @inlineCallbacks
317 def invoke(self, stub, method_name, request, metadata, retry=1):
318 """
319 Invoke a gRPC call to the remote server and return the response.
320 :param stub: Reference to the *_pb2 service stub
321 :param method_name: The method name inside the service stub
322 :param request: The request protobuf message
323 :param metadata: [(str, str), (str, str), ...]
324 :return: The response protobuf message and returned trailing metadata
325 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800326
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500327 if not self.connected:
328 raise ServiceUnavailable()
329
330 try:
331 method = getattr(stub(self.channel), method_name)
332 response, rendezvous = method.with_call(request, metadata=metadata)
333 returnValue((response, rendezvous.trailing_metadata()))
334
335 except grpc._channel._Rendezvous, e:
336 code = e.code()
337 if code == grpc.StatusCode.UNAVAILABLE:
338 e = ServiceUnavailable()
339
340 if self.connected:
341 self.connected = False
342 yield self.connect()
343 if retry > 0:
344 response = yield self.invoke(stub, method_name,
345 request, metadata,
346 retry=retry - 1)
347 returnValue(response)
348
349 elif code in (
350 grpc.StatusCode.NOT_FOUND,
351 grpc.StatusCode.INVALID_ARGUMENT,
352 grpc.StatusCode.ALREADY_EXISTS):
353
354 pass # don't log error, these occur naturally
355
356 else:
357 log.exception(e)
358
359 raise e
Khen Nursimulua4972742016-12-23 17:15:20 -0500360
361 # Below is an adaptation of Google's MessageToDict() which includes
362 # protobuf options extensions
363
364 class Error(Exception):
365 """Top-level module error for json_format."""
366
367 class SerializeToJsonError(Error):
368 """Thrown if serialization to JSON fails."""
369
370 def _IsMapEntry(self, field):
371 return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
372 field.message_type.has_options and
373 field.message_type.GetOptions().map_entry)
374
375 def convertToDict(self, message):
376 """Converts message to an object according to Proto3 JSON Specification."""
377
378 js = {}
379 return self._RegularMessageToJsonObject(message, js)
380
381 def get_yang_option(self, field):
382 opt = field.GetOptions()
383 yang_opt = {}
384 for fd, val in opt.ListFields():
385 if fd.full_name == 'voltha.yang_inline_node':
386 yang_opt['id'] = val.id
387 yang_opt['type'] = val.type
388 # Fow now, a max of 1 yang option is set per field
389 return yang_opt
390
391 def _RegularMessageToJsonObject(self, message, js):
392 """Converts normal message according to Proto3 JSON Specification."""
393 fields = message.ListFields()
394
395 try:
396 for field, value in fields:
397 # Check for options
398 yang_opt = self.get_yang_option(field)
399
400 name = field.name
401 if self._IsMapEntry(field):
402 # Convert a map field.
403 v_field = field.message_type.fields_by_name['value']
404 js_map = {}
405 for key in value:
406 if isinstance(key, bool):
407 if key:
408 recorded_key = 'true'
409 else:
410 recorded_key = 'false'
411 else:
412 recorded_key = key
413 js_map[recorded_key] = self._FieldToJsonObject(
414 v_field, value[key])
415 js[name] = js_map
416 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
417 # Convert a repeated field.
418 js[name] = [self._FieldToJsonObject(field, k)
419 for k in value]
420 else:
421 # This specific yang option applies only to non-repeated
422 # fields
423 if yang_opt: # Create a map
424 js_map = {}
425 js_map['yang_field_option'] = True
426 js_map['yang_field_option_id'] = yang_opt['id']
427 js_map['yang_field_option_type'] = yang_opt['type']
428 js_map['name'] = name
429 js_map[name] = self._FieldToJsonObject(field, value)
430 js[name] = js_map
431 else:
432 js[name] = self._FieldToJsonObject(field, value)
433
434 # Serialize default value if including_default_value_fields is True.
435 message_descriptor = message.DESCRIPTOR
436 for field in message_descriptor.fields:
437 # Singular message fields and oneof fields will not be affected.
438 if ((
439 field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
440 field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
441 field.containing_oneof):
442 continue
443 name = field.name
444 if name in js:
445 # Skip the field which has been serailized already.
446 continue
447 if self._IsMapEntry(field):
448 js[name] = {}
449 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
450 js[name] = []
451 else:
452 js[name] = self._FieldToJsonObject(field,
453 field.default_value)
454
455 except ValueError as e:
456 raise self.SerializeToJsonError(
457 'Failed to serialize {0} field: {1}.'.format(field.name, e))
458
459 return js
460
461 def _FieldToJsonObject(self, field, value):
462 """Converts field value according to Proto3 JSON Specification."""
463 if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
464 return self.convertToDict(value)
465 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
466 enum_value = field.enum_type.values_by_number.get(value, None)
467 if enum_value is not None:
468 return enum_value.name
469 else:
470 raise self.SerializeToJsonError('Enum field contains an '
471 'integer value '
472 'which can not mapped to an enum value.')
473 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
474 if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
475 # Use base64 Data encoding for bytes
476 return base64.b64encode(value).decode('utf-8')
477 else:
478 return value
479 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
480 return bool(value)
481 elif field.cpp_type in _INT64_TYPES:
482 return str(value)
483 elif field.cpp_type in _FLOAT_TYPES:
484 if math.isinf(value):
485 if value < 0.0:
486 return _NEG_INFINITY
487 else:
488 return _INFINITY
489 if math.isnan(value):
490 return _NAN
491 return value