blob: bce6de3a351a878a247fff13f7c66215b5f0cb4d [file] [log] [blame]
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
gRPC client meant to connect to a gRPC server endpoint, and query the
end-point's schema by calling SchemaService.Schema(Empty) and all of its
semantics are derived from the recovered schema.
"""
import os
import sys
from zlib import decompress
import grpc
from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from werkzeug.exceptions import ServiceUnavailable
from common.utils.asleep import asleep
from netconf.protos import third_party
from netconf.protos.schema_pb2 import SchemaServiceStub
from google.protobuf.empty_pb2 import Empty
from common.utils.consulhelpers import get_endpoint_from_consul
from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
VolthaGlobalServiceStub
from google.protobuf import empty_pb2
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf import descriptor
import base64
import math
_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
descriptor.FieldDescriptor.CPPTYPE_UINT64])
_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
_INFINITY = 'Infinity'
_NEG_INFINITY = '-Infinity'
_NAN = 'NaN'
log = get_logger()
class GrpcClient(object):
"""
Connect to a gRPC server, fetch its schema, and process the downloaded
proto schema files. The goal is to convert the proto schemas into yang
schemas which would be exposed to the Netconf client.
"""
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
def __init__(self, consul_endpoint, work_dir,
grpc_endpoint='localhost:50055',
reconnect_callback=None,
on_start_callback=None):
self.consul_endpoint = consul_endpoint
self.grpc_endpoint = grpc_endpoint
self.work_dir = work_dir
self.reconnect_callback = reconnect_callback
self.on_start_callback = on_start_callback
self.plugin_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../protoc_plugins'))
self.channel = None
self.local_stub = None
self.schema = None
self.retries = 0
self.shutting_down = False
self.connected = False
def start(self):
log.debug('starting')
if not self.connected:
reactor.callLater(0, self.connect)
log.info('started')
return self
def stop(self):
log.debug('stopping')
if self.shutting_down:
return
self.shutting_down = True
log.info('stopped')
def set_on_start_callback(self, on_start_callback):
self.on_start_callback = on_start_callback
return self
def set_reconnect_callback(self, reconnect_callback):
self.reconnect_callback = reconnect_callback
return self
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
if endpoint.startswith('@'):
try:
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
log.info('endpoint-found',
endpoint=endpoint, ip_port=ip_port_endpoint)
except Exception as e:
log.error('service-not-found-in-consul', endpoint=endpoint,
exception=repr(e))
return None, None
if ip_port_endpoint:
host, port = ip_port_endpoint.split(':', 2)
return host, int(port)
@inlineCallbacks
def connect(self):
"""
(Re-)Connect to end-point
"""
if self.shutting_down or self.connected:
return
try:
host, port = self.resolve_endpoint(self.grpc_endpoint)
# If host and port is not set then we will retry
if host and port:
log.info('grpc-endpoint-connecting', host=host, port=port)
self.channel = grpc.insecure_channel(
'{}:{}'.format(host, port))
yang_from = self._retrieve_schema()
log.info('proto-to-yang-schema', file=yang_from)
self._compile_proto_files(yang_from)
self._clear_backoff()
if self.on_start_callback is not None:
reactor.callLater(0, self.on_start_callback)
self.connected = True
if self.reconnect_callback is not None:
reactor.callLater(0, self.reconnect_callback)
# self.local_stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
# self.global_stub = voltha_pb2.VolthaGlobalServiceStub(self.channel)
return
except _Rendezvous, e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
log.info('grpc-endpoint-not-available')
else:
log.exception(e)
yield self._backoff('not-available')
except Exception, e:
if not self.shutting_down:
log.exception('cannot-connect', endpoint=_endpoint)
yield self._backoff('unknown-error')
reactor.callLater(1, self.connect)
def _backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
log.error(msg, retry_in=wait_time)
return asleep(wait_time)
def _clear_backoff(self):
if self.retries:
log.info('reconnected', after_retries=self.retries)
self.retries = 0
def _retrieve_schema(self):
"""
Retrieve schema from gRPC end-point, and save all *.proto files in
the work directory.
"""
assert isinstance(self.channel, grpc.Channel)
stub = SchemaServiceStub(self.channel)
# try:
schemas = stub.GetSchema(Empty())
# except _Rendezvous, e:
# if e.code == grpc.StatusCode.UNAVAILABLE:
#
# else:
# raise e
os.system('mkdir -p %s' % self.work_dir)
os.system('rm -fr /tmp/%s/*' %
self.work_dir.replace('/tmp/', '')) # safer
for proto_file in schemas.protos:
proto_fname = proto_file.file_name
# TODO: Do we need to process a set of files using a prefix
# instead of just one?
proto_content = proto_file.proto
log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
length=len(proto_content))
with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
f.write(proto_content)
desc_content = decompress(proto_file.descriptor)
desc_fname = proto_fname.replace('.proto', '.desc')
log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
length=len(desc_content))
with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
f.write(desc_content)
return schemas.yang_from
def _compile_proto_files(self, yang_from):
"""
For each *.proto file in the work directory, compile the proto
file into the respective *_pb2.py file as well as generate the
corresponding yang schema.
:return: None
"""
log.info('start')
google_api_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../protos/third_party'
))
log.info('google-api', api_dir=google_api_dir)
netconf_base_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../..'
))
log.info('netconf-dir', dir=netconf_base_dir)
for fname in [f for f in os.listdir(self.work_dir)
if f.endswith('.proto')]:
log.info('filename', file=fname)
need_yang = fname == yang_from
log.debug('compiling',
file=fname,
yang_schema_required=need_yang)
cmd = (
'cd %s && '
'env PATH=%s PYTHONPATH=%s '
'python -m grpc.tools.protoc '
'-I. '
'-I%s '
'--python_out=. '
'--grpc_python_out=. '
'--plugin=protoc-gen-custom=%s/proto2yang.py '
'%s'
'%s' % (
self.work_dir,
':'.join([os.environ['PATH'], self.plugin_dir]),
':'.join([google_api_dir, netconf_base_dir]),
google_api_dir,
self.plugin_dir,
'--custom_out=. ' if need_yang else '',
fname)
)
log.debug('executing', cmd=cmd, file=fname)
os.system(cmd)
log.info('compiled', file=fname)
# # test-load each _pb2 file to see all is right
# if self.work_dir not in sys.path:
# sys.path.insert(0, self.work_dir)
#
# for fname in [f for f in os.listdir(self.work_dir)
# if f.endswith('_pb2.py')]:
# modname = fname[:-len('.py')]
# log.debug('test-import', modname=modname)
# _ = __import__(modname)
# TODO: find a different way to test the generated yang files
# TODO: should be generated code
# Focus for now is issuing a GET request for VolthaGlobalService or VolthaLocalService
@inlineCallbacks
def invoke_voltha_api(self, key):
# TODO: This should be part of a parameter request
depth = [('get-depth', '-1')]
try:
data = {}
req = ParseDict(data, empty_pb2.Empty())
service_method = key.split('-')
service = service_method[0]
method = service_method[1]
if service == 'VolthaGlobalService':
stub = VolthaGlobalServiceStub
elif service == 'VolthaLocalService':
stub = VolthaLocalServiceStub
else:
raise # Exception
log.info('voltha-rpc', service=service, method=method, req=req,
depth=depth)
res, metadata = yield self.invoke(stub, method, req, depth)
# returnValue(MessageToDict(res, True, True))
returnValue(self.convertToDict(res))
except Exception, e:
log.error('failure', exception=repr(e))
@inlineCallbacks
def invoke(self, stub, method_name, request, metadata, retry=1):
"""
Invoke a gRPC call to the remote server and return the response.
:param stub: Reference to the *_pb2 service stub
:param method_name: The method name inside the service stub
:param request: The request protobuf message
:param metadata: [(str, str), (str, str), ...]
:return: The response protobuf message and returned trailing metadata
"""
if not self.connected:
raise ServiceUnavailable()
try:
method = getattr(stub(self.channel), method_name)
response, rendezvous = method.with_call(request, metadata=metadata)
returnValue((response, rendezvous.trailing_metadata()))
except grpc._channel._Rendezvous, e:
code = e.code()
if code == grpc.StatusCode.UNAVAILABLE:
e = ServiceUnavailable()
if self.connected:
self.connected = False
yield self.connect()
if retry > 0:
response = yield self.invoke(stub, method_name,
request, metadata,
retry=retry - 1)
returnValue(response)
elif code in (
grpc.StatusCode.NOT_FOUND,
grpc.StatusCode.INVALID_ARGUMENT,
grpc.StatusCode.ALREADY_EXISTS):
pass # don't log error, these occur naturally
else:
log.exception(e)
raise e
# Below is an adaptation of Google's MessageToDict() which includes
# protobuf options extensions
class Error(Exception):
"""Top-level module error for json_format."""
class SerializeToJsonError(Error):
"""Thrown if serialization to JSON fails."""
def _IsMapEntry(self, field):
return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
field.message_type.has_options and
field.message_type.GetOptions().map_entry)
def convertToDict(self, message):
"""Converts message to an object according to Proto3 JSON Specification."""
js = {}
return self._RegularMessageToJsonObject(message, js)
def get_yang_option(self, field):
opt = field.GetOptions()
yang_opt = {}
for fd, val in opt.ListFields():
if fd.full_name == 'voltha.yang_inline_node':
yang_opt['id'] = val.id
yang_opt['type'] = val.type
# Fow now, a max of 1 yang option is set per field
return yang_opt
def _RegularMessageToJsonObject(self, message, js):
"""Converts normal message according to Proto3 JSON Specification."""
fields = message.ListFields()
try:
for field, value in fields:
# Check for options
yang_opt = self.get_yang_option(field)
name = field.name
if self._IsMapEntry(field):
# Convert a map field.
v_field = field.message_type.fields_by_name['value']
js_map = {}
for key in value:
if isinstance(key, bool):
if key:
recorded_key = 'true'
else:
recorded_key = 'false'
else:
recorded_key = key
js_map[recorded_key] = self._FieldToJsonObject(
v_field, value[key])
js[name] = js_map
elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
# Convert a repeated field.
js[name] = [self._FieldToJsonObject(field, k)
for k in value]
else:
# This specific yang option applies only to non-repeated
# fields
if yang_opt: # Create a map
js_map = {}
js_map['yang_field_option'] = True
js_map['yang_field_option_id'] = yang_opt['id']
js_map['yang_field_option_type'] = yang_opt['type']
js_map['name'] = name
js_map[name] = self._FieldToJsonObject(field, value)
js[name] = js_map
else:
js[name] = self._FieldToJsonObject(field, value)
# Serialize default value if including_default_value_fields is True.
message_descriptor = message.DESCRIPTOR
for field in message_descriptor.fields:
# Singular message fields and oneof fields will not be affected.
if ((
field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
field.containing_oneof):
continue
name = field.name
if name in js:
# Skip the field which has been serailized already.
continue
if self._IsMapEntry(field):
js[name] = {}
elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
js[name] = []
else:
js[name] = self._FieldToJsonObject(field,
field.default_value)
except ValueError as e:
raise self.SerializeToJsonError(
'Failed to serialize {0} field: {1}.'.format(field.name, e))
return js
def _FieldToJsonObject(self, field, value):
"""Converts field value according to Proto3 JSON Specification."""
if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
return self.convertToDict(value)
elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
enum_value = field.enum_type.values_by_number.get(value, None)
if enum_value is not None:
return enum_value.name
else:
raise self.SerializeToJsonError('Enum field contains an '
'integer value '
'which can not mapped to an enum value.')
elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
# Use base64 Data encoding for bytes
return base64.b64encode(value).decode('utf-8')
else:
return value
elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
return bool(value)
elif field.cpp_type in _INT64_TYPES:
return str(value)
elif field.cpp_type in _FLOAT_TYPES:
if math.isinf(value):
if value < 0.0:
return _NEG_INFINITY
else:
return _INFINITY
if math.isnan(value):
return _NAN
return value