blob: bce6de3a351a878a247fff13f7c66215b5f0cb4d [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu7626ce12016-12-21 11:51:46 -050039from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
Khen Nursimulue0d53f82016-12-14 11:05:44 -080040 VolthaGlobalServiceStub
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050041from google.protobuf import empty_pb2
42from google.protobuf.json_format import MessageToDict, ParseDict
Khen Nursimulua4972742016-12-23 17:15:20 -050043from google.protobuf import descriptor
44import base64
45import math
46
47_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
48 descriptor.FieldDescriptor.CPPTYPE_UINT64])
49_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
50 descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
51_INFINITY = 'Infinity'
52_NEG_INFINITY = '-Infinity'
53_NAN = 'NaN'
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050054
55log = get_logger()
56
57
58class GrpcClient(object):
59 """
60 Connect to a gRPC server, fetch its schema, and process the downloaded
61 proto schema files. The goal is to convert the proto schemas into yang
62 schemas which would be exposed to the Netconf client.
63 """
64 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
65
66 def __init__(self, consul_endpoint, work_dir,
67 grpc_endpoint='localhost:50055',
68 reconnect_callback=None,
69 on_start_callback=None):
70 self.consul_endpoint = consul_endpoint
71 self.grpc_endpoint = grpc_endpoint
72 self.work_dir = work_dir
73 self.reconnect_callback = reconnect_callback
74 self.on_start_callback = on_start_callback
75
76 self.plugin_dir = os.path.abspath(os.path.join(
77 os.path.dirname(__file__), '../protoc_plugins'))
78
79 self.channel = None
80 self.local_stub = None
81 self.schema = None
82 self.retries = 0
83 self.shutting_down = False
84 self.connected = False
85
86 def start(self):
87 log.debug('starting')
88 if not self.connected:
89 reactor.callLater(0, self.connect)
90 log.info('started')
91 return self
92
93 def stop(self):
94 log.debug('stopping')
95 if self.shutting_down:
96 return
97 self.shutting_down = True
98 log.info('stopped')
99
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500100 def set_on_start_callback(self, on_start_callback):
101 self.on_start_callback = on_start_callback
102 return self
103
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500104 def set_reconnect_callback(self, reconnect_callback):
105 self.reconnect_callback = reconnect_callback
106 return self
107
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500108 def resolve_endpoint(self, endpoint):
109 ip_port_endpoint = endpoint
110 if endpoint.startswith('@'):
111 try:
112 ip_port_endpoint = get_endpoint_from_consul(
113 self.consul_endpoint, endpoint[1:])
114 log.info('endpoint-found',
115 endpoint=endpoint, ip_port=ip_port_endpoint)
116 except Exception as e:
117 log.error('service-not-found-in-consul', endpoint=endpoint,
118 exception=repr(e))
119 return None, None
120 if ip_port_endpoint:
121 host, port = ip_port_endpoint.split(':', 2)
122 return host, int(port)
123
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500124 @inlineCallbacks
125 def connect(self):
126 """
127 (Re-)Connect to end-point
128 """
129 if self.shutting_down or self.connected:
130 return
131
132 try:
133 host, port = self.resolve_endpoint(self.grpc_endpoint)
134
135 # If host and port is not set then we will retry
136 if host and port:
137 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500138 self.channel = grpc.insecure_channel(
139 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500140
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500141 yang_from = self._retrieve_schema()
142 log.info('proto-to-yang-schema', file=yang_from)
143 self._compile_proto_files(yang_from)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500144 self._clear_backoff()
145
146 if self.on_start_callback is not None:
147 reactor.callLater(0, self.on_start_callback)
148
149 self.connected = True
150 if self.reconnect_callback is not None:
151 reactor.callLater(0, self.reconnect_callback)
152
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500153 # self.local_stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
154 # self.global_stub = voltha_pb2.VolthaGlobalServiceStub(self.channel)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500155
156 return
157
158 except _Rendezvous, e:
159 if e.code() == grpc.StatusCode.UNAVAILABLE:
160 log.info('grpc-endpoint-not-available')
161 else:
162 log.exception(e)
163 yield self._backoff('not-available')
164
165 except Exception, e:
166 if not self.shutting_down:
167 log.exception('cannot-connect', endpoint=_endpoint)
168 yield self._backoff('unknown-error')
169
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500170 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500171
172 def _backoff(self, msg):
173 wait_time = self.RETRY_BACKOFF[min(self.retries,
174 len(self.RETRY_BACKOFF) - 1)]
175 self.retries += 1
176 log.error(msg, retry_in=wait_time)
177 return asleep(wait_time)
178
179 def _clear_backoff(self):
180 if self.retries:
181 log.info('reconnected', after_retries=self.retries)
182 self.retries = 0
183
184 def _retrieve_schema(self):
185 """
186 Retrieve schema from gRPC end-point, and save all *.proto files in
187 the work directory.
188 """
189 assert isinstance(self.channel, grpc.Channel)
190 stub = SchemaServiceStub(self.channel)
191 # try:
192 schemas = stub.GetSchema(Empty())
193 # except _Rendezvous, e:
194 # if e.code == grpc.StatusCode.UNAVAILABLE:
195 #
196 # else:
197 # raise e
198
199 os.system('mkdir -p %s' % self.work_dir)
200 os.system('rm -fr /tmp/%s/*' %
201 self.work_dir.replace('/tmp/', '')) # safer
202
203 for proto_file in schemas.protos:
204 proto_fname = proto_file.file_name
205 # TODO: Do we need to process a set of files using a prefix
206 # instead of just one?
207 proto_content = proto_file.proto
208 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500209 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500210 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
211 f.write(proto_content)
212
213 desc_content = decompress(proto_file.descriptor)
214 desc_fname = proto_fname.replace('.proto', '.desc')
215 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500216 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500217 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
218 f.write(desc_content)
219 return schemas.yang_from
220
221 def _compile_proto_files(self, yang_from):
222 """
223 For each *.proto file in the work directory, compile the proto
224 file into the respective *_pb2.py file as well as generate the
225 corresponding yang schema.
226 :return: None
227 """
228 log.info('start')
229 google_api_dir = os.path.abspath(os.path.join(
230 os.path.dirname(__file__), '../protos/third_party'
231 ))
232
233 log.info('google-api', api_dir=google_api_dir)
234
235 netconf_base_dir = os.path.abspath(os.path.join(
236 os.path.dirname(__file__), '../..'
237 ))
238 log.info('netconf-dir', dir=netconf_base_dir)
239
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500240 for fname in [f for f in os.listdir(self.work_dir)
241 if f.endswith('.proto')]:
242 log.info('filename', file=fname)
243
244 need_yang = fname == yang_from
245 log.debug('compiling',
246 file=fname,
247 yang_schema_required=need_yang)
248 cmd = (
249 'cd %s && '
250 'env PATH=%s PYTHONPATH=%s '
251 'python -m grpc.tools.protoc '
252 '-I. '
253 '-I%s '
254 '--python_out=. '
255 '--grpc_python_out=. '
256 '--plugin=protoc-gen-custom=%s/proto2yang.py '
257 '%s'
258 '%s' % (
259 self.work_dir,
260 ':'.join([os.environ['PATH'], self.plugin_dir]),
261 ':'.join([google_api_dir, netconf_base_dir]),
262 google_api_dir,
263 self.plugin_dir,
264 '--custom_out=. ' if need_yang else '',
265 fname)
266 )
267 log.debug('executing', cmd=cmd, file=fname)
268 os.system(cmd)
269 log.info('compiled', file=fname)
270
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500271 # # test-load each _pb2 file to see all is right
272 # if self.work_dir not in sys.path:
273 # sys.path.insert(0, self.work_dir)
274 #
275 # for fname in [f for f in os.listdir(self.work_dir)
276 # if f.endswith('_pb2.py')]:
277 # modname = fname[:-len('.py')]
278 # log.debug('test-import', modname=modname)
279 # _ = __import__(modname)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500280
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500281 # TODO: find a different way to test the generated yang files
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500282
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500283 # TODO: should be generated code
284 # Focus for now is issuing a GET request for VolthaGlobalService or VolthaLocalService
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800285 @inlineCallbacks
286 def invoke_voltha_api(self, key):
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500287 # TODO: This should be part of a parameter request
288 depth = [('get-depth', '-1')]
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800289 try:
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500290 data = {}
291 req = ParseDict(data, empty_pb2.Empty())
292 service_method = key.split('-')
293 service = service_method[0]
294 method = service_method[1]
Khen Nursimulua4972742016-12-23 17:15:20 -0500295 if service == 'VolthaGlobalService':
296 stub = VolthaGlobalServiceStub
297 elif service == 'VolthaLocalService':
298 stub = VolthaLocalServiceStub
299 else:
300 raise # Exception
301
302 log.info('voltha-rpc', service=service, method=method, req=req,
303 depth=depth)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800304
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500305 res, metadata = yield self.invoke(stub, method, req, depth)
306
Khen Nursimulua4972742016-12-23 17:15:20 -0500307 # returnValue(MessageToDict(res, True, True))
308 returnValue(self.convertToDict(res))
309
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800310 except Exception, e:
311 log.error('failure', exception=repr(e))
312
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500313 @inlineCallbacks
314 def invoke(self, stub, method_name, request, metadata, retry=1):
315 """
316 Invoke a gRPC call to the remote server and return the response.
317 :param stub: Reference to the *_pb2 service stub
318 :param method_name: The method name inside the service stub
319 :param request: The request protobuf message
320 :param metadata: [(str, str), (str, str), ...]
321 :return: The response protobuf message and returned trailing metadata
322 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800323
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500324 if not self.connected:
325 raise ServiceUnavailable()
326
327 try:
328 method = getattr(stub(self.channel), method_name)
329 response, rendezvous = method.with_call(request, metadata=metadata)
330 returnValue((response, rendezvous.trailing_metadata()))
331
332 except grpc._channel._Rendezvous, e:
333 code = e.code()
334 if code == grpc.StatusCode.UNAVAILABLE:
335 e = ServiceUnavailable()
336
337 if self.connected:
338 self.connected = False
339 yield self.connect()
340 if retry > 0:
341 response = yield self.invoke(stub, method_name,
342 request, metadata,
343 retry=retry - 1)
344 returnValue(response)
345
346 elif code in (
347 grpc.StatusCode.NOT_FOUND,
348 grpc.StatusCode.INVALID_ARGUMENT,
349 grpc.StatusCode.ALREADY_EXISTS):
350
351 pass # don't log error, these occur naturally
352
353 else:
354 log.exception(e)
355
356 raise e
Khen Nursimulua4972742016-12-23 17:15:20 -0500357
358 # Below is an adaptation of Google's MessageToDict() which includes
359 # protobuf options extensions
360
361 class Error(Exception):
362 """Top-level module error for json_format."""
363
364 class SerializeToJsonError(Error):
365 """Thrown if serialization to JSON fails."""
366
367 def _IsMapEntry(self, field):
368 return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
369 field.message_type.has_options and
370 field.message_type.GetOptions().map_entry)
371
372 def convertToDict(self, message):
373 """Converts message to an object according to Proto3 JSON Specification."""
374
375 js = {}
376 return self._RegularMessageToJsonObject(message, js)
377
378 def get_yang_option(self, field):
379 opt = field.GetOptions()
380 yang_opt = {}
381 for fd, val in opt.ListFields():
382 if fd.full_name == 'voltha.yang_inline_node':
383 yang_opt['id'] = val.id
384 yang_opt['type'] = val.type
385 # Fow now, a max of 1 yang option is set per field
386 return yang_opt
387
388 def _RegularMessageToJsonObject(self, message, js):
389 """Converts normal message according to Proto3 JSON Specification."""
390 fields = message.ListFields()
391
392 try:
393 for field, value in fields:
394 # Check for options
395 yang_opt = self.get_yang_option(field)
396
397 name = field.name
398 if self._IsMapEntry(field):
399 # Convert a map field.
400 v_field = field.message_type.fields_by_name['value']
401 js_map = {}
402 for key in value:
403 if isinstance(key, bool):
404 if key:
405 recorded_key = 'true'
406 else:
407 recorded_key = 'false'
408 else:
409 recorded_key = key
410 js_map[recorded_key] = self._FieldToJsonObject(
411 v_field, value[key])
412 js[name] = js_map
413 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
414 # Convert a repeated field.
415 js[name] = [self._FieldToJsonObject(field, k)
416 for k in value]
417 else:
418 # This specific yang option applies only to non-repeated
419 # fields
420 if yang_opt: # Create a map
421 js_map = {}
422 js_map['yang_field_option'] = True
423 js_map['yang_field_option_id'] = yang_opt['id']
424 js_map['yang_field_option_type'] = yang_opt['type']
425 js_map['name'] = name
426 js_map[name] = self._FieldToJsonObject(field, value)
427 js[name] = js_map
428 else:
429 js[name] = self._FieldToJsonObject(field, value)
430
431 # Serialize default value if including_default_value_fields is True.
432 message_descriptor = message.DESCRIPTOR
433 for field in message_descriptor.fields:
434 # Singular message fields and oneof fields will not be affected.
435 if ((
436 field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
437 field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
438 field.containing_oneof):
439 continue
440 name = field.name
441 if name in js:
442 # Skip the field which has been serailized already.
443 continue
444 if self._IsMapEntry(field):
445 js[name] = {}
446 elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
447 js[name] = []
448 else:
449 js[name] = self._FieldToJsonObject(field,
450 field.default_value)
451
452 except ValueError as e:
453 raise self.SerializeToJsonError(
454 'Failed to serialize {0} field: {1}.'.format(field.name, e))
455
456 return js
457
458 def _FieldToJsonObject(self, field, value):
459 """Converts field value according to Proto3 JSON Specification."""
460 if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
461 return self.convertToDict(value)
462 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
463 enum_value = field.enum_type.values_by_number.get(value, None)
464 if enum_value is not None:
465 return enum_value.name
466 else:
467 raise self.SerializeToJsonError('Enum field contains an '
468 'integer value '
469 'which can not mapped to an enum value.')
470 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
471 if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
472 # Use base64 Data encoding for bytes
473 return base64.b64encode(value).decode('utf-8')
474 else:
475 return value
476 elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
477 return bool(value)
478 elif field.cpp_type in _INT64_TYPES:
479 return str(value)
480 elif field.cpp_type in _FLOAT_TYPES:
481 if math.isinf(value):
482 if value < 0.0:
483 return _NEG_INFINITY
484 else:
485 return _INFINITY
486 if math.isnan(value):
487 return _NAN
488 return value