blob: 159abc205c49fb61947595f168b2b2b988cc1a16 [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -05002# Copyright 2017 the original author or authors.
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu7626ce12016-12-21 11:51:46 -050039from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
Khen Nursimulue0d53f82016-12-14 11:05:44 -080040 VolthaGlobalServiceStub
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050041from google.protobuf import empty_pb2
42from google.protobuf.json_format import MessageToDict, ParseDict
Khen Nursimulua4972742016-12-23 17:15:20 -050043from google.protobuf import descriptor
44import base64
45import math
46
47_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
48 descriptor.FieldDescriptor.CPPTYPE_UINT64])
49_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
50 descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
51_INFINITY = 'Infinity'
52_NEG_INFINITY = '-Infinity'
53_NAN = 'NaN'
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050054
55log = get_logger()
56
57
58class GrpcClient(object):
59 """
60 Connect to a gRPC server, fetch its schema, and process the downloaded
61 proto schema files. The goal is to convert the proto schemas into yang
62 schemas which would be exposed to the Netconf client.
63 """
64 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
65
66 def __init__(self, consul_endpoint, work_dir,
67 grpc_endpoint='localhost:50055',
68 reconnect_callback=None,
69 on_start_callback=None):
70 self.consul_endpoint = consul_endpoint
71 self.grpc_endpoint = grpc_endpoint
72 self.work_dir = work_dir
73 self.reconnect_callback = reconnect_callback
74 self.on_start_callback = on_start_callback
75
76 self.plugin_dir = os.path.abspath(os.path.join(
77 os.path.dirname(__file__), '../protoc_plugins'))
78
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -050079 self.yang_schemas = set()
80
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050081 self.channel = None
82 self.local_stub = None
83 self.schema = None
84 self.retries = 0
85 self.shutting_down = False
86 self.connected = False
87
88 def start(self):
89 log.debug('starting')
90 if not self.connected:
91 reactor.callLater(0, self.connect)
92 log.info('started')
93 return self
94
95 def stop(self):
96 log.debug('stopping')
97 if self.shutting_down:
98 return
99 self.shutting_down = True
100 log.info('stopped')
101
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500102 def set_on_start_callback(self, on_start_callback):
103 self.on_start_callback = on_start_callback
104 return self
105
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500106 def set_reconnect_callback(self, reconnect_callback):
107 self.reconnect_callback = reconnect_callback
108 return self
109
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500110 def resolve_endpoint(self, endpoint):
111 ip_port_endpoint = endpoint
112 if endpoint.startswith('@'):
113 try:
114 ip_port_endpoint = get_endpoint_from_consul(
115 self.consul_endpoint, endpoint[1:])
116 log.info('endpoint-found',
117 endpoint=endpoint, ip_port=ip_port_endpoint)
118 except Exception as e:
119 log.error('service-not-found-in-consul', endpoint=endpoint,
120 exception=repr(e))
121 return None, None
122 if ip_port_endpoint:
123 host, port = ip_port_endpoint.split(':', 2)
124 return host, int(port)
125
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500126 @inlineCallbacks
127 def connect(self):
128 """
129 (Re-)Connect to end-point
130 """
131 if self.shutting_down or self.connected:
132 return
133
134 try:
135 host, port = self.resolve_endpoint(self.grpc_endpoint)
136
137 # If host and port is not set then we will retry
138 if host and port:
139 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500140 self.channel = grpc.insecure_channel(
141 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500142
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500143 yang_from = self._retrieve_schema()
144 log.info('proto-to-yang-schema', file=yang_from)
145 self._compile_proto_files(yang_from)
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500146 self._set_yang_schemas()
147
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500148 self._clear_backoff()
149
150 if self.on_start_callback is not None:
151 reactor.callLater(0, self.on_start_callback)
152
153 self.connected = True
154 if self.reconnect_callback is not None:
155 reactor.callLater(0, self.reconnect_callback)
156
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500157 # self.local_stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
158 # self.global_stub = voltha_pb2.VolthaGlobalServiceStub(self.channel)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500159
160 return
161
162 except _Rendezvous, e:
163 if e.code() == grpc.StatusCode.UNAVAILABLE:
164 log.info('grpc-endpoint-not-available')
165 else:
166 log.exception(e)
167 yield self._backoff('not-available')
168
169 except Exception, e:
170 if not self.shutting_down:
171 log.exception('cannot-connect', endpoint=_endpoint)
172 yield self._backoff('unknown-error')
173
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500174 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500175
176 def _backoff(self, msg):
177 wait_time = self.RETRY_BACKOFF[min(self.retries,
178 len(self.RETRY_BACKOFF) - 1)]
179 self.retries += 1
180 log.error(msg, retry_in=wait_time)
181 return asleep(wait_time)
182
183 def _clear_backoff(self):
184 if self.retries:
185 log.info('reconnected', after_retries=self.retries)
186 self.retries = 0
187
188 def _retrieve_schema(self):
189 """
190 Retrieve schema from gRPC end-point, and save all *.proto files in
191 the work directory.
192 """
193 assert isinstance(self.channel, grpc.Channel)
194 stub = SchemaServiceStub(self.channel)
195 # try:
196 schemas = stub.GetSchema(Empty())
197 # except _Rendezvous, e:
198 # if e.code == grpc.StatusCode.UNAVAILABLE:
199 #
200 # else:
201 # raise e
202
203 os.system('mkdir -p %s' % self.work_dir)
204 os.system('rm -fr /tmp/%s/*' %
205 self.work_dir.replace('/tmp/', '')) # safer
206
207 for proto_file in schemas.protos:
208 proto_fname = proto_file.file_name
209 # TODO: Do we need to process a set of files using a prefix
210 # instead of just one?
211 proto_content = proto_file.proto
212 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500213 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500214 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
215 f.write(proto_content)
216
217 desc_content = decompress(proto_file.descriptor)
218 desc_fname = proto_fname.replace('.proto', '.desc')
219 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500220 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500221 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
222 f.write(desc_content)
223 return schemas.yang_from
224
225 def _compile_proto_files(self, yang_from):
226 """
227 For each *.proto file in the work directory, compile the proto
228 file into the respective *_pb2.py file as well as generate the
229 corresponding yang schema.
230 :return: None
231 """
232 log.info('start')
233 google_api_dir = os.path.abspath(os.path.join(
234 os.path.dirname(__file__), '../protos/third_party'
235 ))
236
237 log.info('google-api', api_dir=google_api_dir)
238
239 netconf_base_dir = os.path.abspath(os.path.join(
240 os.path.dirname(__file__), '../..'
241 ))
242 log.info('netconf-dir', dir=netconf_base_dir)
243
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500244 for fname in [f for f in os.listdir(self.work_dir)
245 if f.endswith('.proto')]:
246 log.info('filename', file=fname)
247
248 need_yang = fname == yang_from
249 log.debug('compiling',
250 file=fname,
251 yang_schema_required=need_yang)
252 cmd = (
253 'cd %s && '
254 'env PATH=%s PYTHONPATH=%s '
255 'python -m grpc.tools.protoc '
256 '-I. '
257 '-I%s '
258 '--python_out=. '
259 '--grpc_python_out=. '
260 '--plugin=protoc-gen-custom=%s/proto2yang.py '
261 '%s'
262 '%s' % (
263 self.work_dir,
264 ':'.join([os.environ['PATH'], self.plugin_dir]),
265 ':'.join([google_api_dir, netconf_base_dir]),
266 google_api_dir,
267 self.plugin_dir,
268 '--custom_out=. ' if need_yang else '',
269 fname)
270 )
271 log.debug('executing', cmd=cmd, file=fname)
272 os.system(cmd)
273 log.info('compiled', file=fname)
274
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500275 # # test-load each _pb2 file to see all is right
276 # if self.work_dir not in sys.path:
277 # sys.path.insert(0, self.work_dir)
278 #
279 # for fname in [f for f in os.listdir(self.work_dir)
280 # if f.endswith('_pb2.py')]:
281 # modname = fname[:-len('.py')]
282 # log.debug('test-import', modname=modname)
283 # _ = __import__(modname)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500284
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500285 # TODO: find a different way to test the generated yang files
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500286
Khen Nursimuluc9ef7c12017-01-04 20:40:53 -0500287 def _set_yang_schemas(self):
288 if self.work_dir not in sys.path:
289 sys.path.insert(0, self.work_dir)
290
291 for fname in [f for f in os.listdir(self.work_dir)
292 if f.endswith('.yang')]:
293 # Special case : since ietf-http, ietf-annotations,
294 # ietf-yang_options are not used for yang schema then do not add
295 # them to the set
296 if fname not in ['ietf-http.yang', 'ietf-yang_options.yang',
297 'ietf-descriptor.yang']:
298 self.yang_schemas.add(fname[:-len('.yang')])
299 log.info('yang-schemas', schemas=self.yang_schemas)
300
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500301 # TODO: should be generated code
302 # Focus for now is issuing a GET request for VolthaGlobalService or VolthaLocalService
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800303 @inlineCallbacks
304 def invoke_voltha_api(self, key):
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500305 # TODO: This should be part of a parameter request
306 depth = [('get-depth', '-1')]
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800307 try:
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500308 data = {}
309 req = ParseDict(data, empty_pb2.Empty())
310 service_method = key.split('-')
311 service = service_method[0]
312 method = service_method[1]
Khen Nursimulua4972742016-12-23 17:15:20 -0500313 if service == 'VolthaGlobalService':
314 stub = VolthaGlobalServiceStub
315 elif service == 'VolthaLocalService':
316 stub = VolthaLocalServiceStub
317 else:
318 raise # Exception
319
320 log.info('voltha-rpc', service=service, method=method, req=req,
321 depth=depth)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800322
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500323 res, metadata = yield self.invoke(stub, method, req, depth)
324
Khen Nursimulua4972742016-12-23 17:15:20 -0500325 # returnValue(MessageToDict(res, True, True))
326 returnValue(self.convertToDict(res))
327
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800328 except Exception, e:
329 log.error('failure', exception=repr(e))
330
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500331 @inlineCallbacks
332 def invoke(self, stub, method_name, request, metadata, retry=1):
333 """
334 Invoke a gRPC call to the remote server and return the response.
335 :param stub: Reference to the *_pb2 service stub
336 :param method_name: The method name inside the service stub
337 :param request: The request protobuf message
338 :param metadata: [(str, str), (str, str), ...]
339 :return: The response protobuf message and returned trailing metadata
340 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800341
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500342 if not self.connected:
343 raise ServiceUnavailable()
344
345 try:
346 method = getattr(stub(self.channel), method_name)
347 response, rendezvous = method.with_call(request, metadata=metadata)
348 returnValue((response, rendezvous.trailing_metadata()))
349
350 except grpc._channel._Rendezvous, e:
351 code = e.code()
352 if code == grpc.StatusCode.UNAVAILABLE:
353 e = ServiceUnavailable()
354
355 if self.connected:
356 self.connected = False
357 yield self.connect()
358 if retry > 0:
359 response = yield self.invoke(stub, method_name,
360 request, metadata,
361 retry=retry - 1)
362 returnValue(response)
363
364 elif code in (
365 grpc.StatusCode.NOT_FOUND,
366 grpc.StatusCode.INVALID_ARGUMENT,
367 grpc.StatusCode.ALREADY_EXISTS):
368
369 pass # don't log error, these occur naturally
370
371 else:
372 log.exception(e)
373
374 raise e
Khen Nursimulua4972742016-12-23 17:15:20 -0500375
376 # Below is an adaptation of Google's MessageToDict() which includes
377 # protobuf options extensions
378
379 class Error(Exception):
380 """Top-level module error for json_format."""
381
382 class SerializeToJsonError(Error):
383 """Thrown if serialization to JSON fails."""
384
385 def _IsMapEntry(self, field):
386 return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
387 field.message_type.has_options and
388 field.message_type.GetOptions().map_entry)
389
390 def convertToDict(self, message):
391 """Converts message to an object according to Proto3 JSON Specification."""
392
393 js = {}
394 return self._RegularMessageToJsonObject(message, js)
395
396 def get_yang_option(self, field):
397 opt = field.GetOptions()
398 yang_opt = {}
399 for fd, val in opt.ListFields():
400 if fd.full_name == 'voltha.yang_inline_node':
401 yang_opt['id'] = val.id
402 yang_opt['type'] = val.type
403 # Fow now, a max of 1 yang option is set per field
404 return yang_opt
405
406 def _RegularMessageToJsonObject(self, message, js):
407 """Converts normal message according to Proto3 JSON Specification."""
408 fields = message.ListFields()
409
410 try:
411 for field, value in fields:
412 # Check for options
413 yang_opt = self.get_yang_option(field)
414
415 name = field.name
416 if self._IsMapEntry(field):
417 # Convert a map field.
418 v_field = field.message_type.fields_by_name['value']
419 js_map = {}
420 for key in value:
421 if isinstance(key, bool):
422 if key:
423 recorded_key = 'true'
424 else:
425 recorded_key = 'false'
426 else:
427 recorded_key = key
428 js_map[recorded_key] = self._FieldToJsonObject(
429 v_field, value[key])
430 js[name] = js_map
431 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
432 # Convert a repeated field.
433 js[name] = [self._FieldToJsonObject(field, k)
434 for k in value]
435 else:
436 # This specific yang option applies only to non-repeated
437 # fields
438 if yang_opt: # Create a map
439 js_map = {}
440 js_map['yang_field_option'] = True
441 js_map['yang_field_option_id'] = yang_opt['id']
442 js_map['yang_field_option_type'] = yang_opt['type']
443 js_map['name'] = name
444 js_map[name] = self._FieldToJsonObject(field, value)
445 js[name] = js_map
446 else:
447 js[name] = self._FieldToJsonObject(field, value)
448
449 # Serialize default value if including_default_value_fields is True.
450 message_descriptor = message.DESCRIPTOR
451 for field in message_descriptor.fields:
452 # Singular message fields and oneof fields will not be affected.
453 if ((
454 field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
455 field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
456 field.containing_oneof):
457 continue
458 name = field.name
459 if name in js:
460 # Skip the field which has been serailized already.
461 continue
462 if self._IsMapEntry(field):
463 js[name] = {}
464 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
465 js[name] = []
466 else:
467 js[name] = self._FieldToJsonObject(field,
468 field.default_value)
469
470 except ValueError as e:
471 raise self.SerializeToJsonError(
472 'Failed to serialize {0} field: {1}.'.format(field.name, e))
473
474 return js
475
476 def _FieldToJsonObject(self, field, value):
477 """Converts field value according to Proto3 JSON Specification."""
478 if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
479 return self.convertToDict(value)
480 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
481 enum_value = field.enum_type.values_by_number.get(value, None)
482 if enum_value is not None:
483 return enum_value.name
484 else:
485 raise self.SerializeToJsonError('Enum field contains an '
486 'integer value '
487 'which can not mapped to an enum value.')
488 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
489 if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
490 # Use base64 Data encoding for bytes
491 return base64.b64encode(value).decode('utf-8')
492 else:
493 return value
494 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
495 return bool(value)
496 elif field.cpp_type in _INT64_TYPES:
497 return str(value)
498 elif field.cpp_type in _FLOAT_TYPES:
499 if math.isinf(value):
500 if value < 0.0:
501 return _NEG_INFINITY
502 else:
503 return _INFINITY
504 if math.isnan(value):
505 return _NAN
506 return value