Chameleon swagger support
Change-Id: I63b8dc7b31d5e87aa0e5153da302537d90ff733e
diff --git a/protoc_plugins/swagger_template.py b/protoc_plugins/swagger_template.py
new file mode 100644
index 0000000..d93c8da
--- /dev/null
+++ b/protoc_plugins/swagger_template.py
@@ -0,0 +1,468 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import re
+from collections import OrderedDict
+from copy import copy
+
+from google.protobuf.descriptor import FieldDescriptor
+
+re_path_param = re.compile(r'/{([^{]+)}')
+re_segment = re.compile(r'/(?P<absolute>[^{}/]+)|(?P<symbolic>{[^}]+})')
+
+
+class DuplicateMethodAndPathError(Exception): pass
+class ProtobufCompilationFailedError(Exception): pass
+class InvalidPathArgumentError(Exception): pass
+
+
+def native_descriptors_to_swagger(native_descriptors):
+ """
+ Generate a swagger data dict from the native descriptors extracted
+ from protobuf file(s).
+ :param native_descriptors:
+ Dict as extracted from proto file descriptors.
+ See DescriptorParser and its parse_file_descriptors() method.
+ :return: dict ready to be serialized to JSON as swagger.json file.
+ """
+
+ # gather all top-level and nested message type definitions and build map
+ message_types_dict = gather_all_message_types(native_descriptors)
+ message_type_names = set(message_types_dict.iterkeys())
+
+ # create similar map for all top-level and nested enum definitions
+ enum_types_dict = gather_all_enum_types(native_descriptors)
+ enum_type_names = set(enum_types_dict.iterkeys())
+
+ # make sure none clashes and generate set of all names (for sanity checks)
+ assert not message_type_names.intersection(enum_type_names)
+ all_type_names = message_type_names.union(enum_type_names)
+ all_types = {}
+ all_types.update(message_types_dict)
+ all_types.update(enum_types_dict)
+
+ # gather all method definitions and collect all referenced input/output
+ # types
+ types_referenced, methods_dict = gather_all_methods(native_descriptors)
+
+ # process all directly and indirectly referenced types into JSON schema
+ # type definitions
+ definitions = generate_definitions(types_referenced, all_types)
+
+ # process all method and generate the swagger path entries
+ paths = generate_paths(methods_dict, definitions)
+
+ # static part
+ # last descriptor is assumed to be the top-most one
+ root_descriptor = native_descriptors[-1]
+ swagger = {
+ 'swagger': "2.0",
+ 'info': {
+ 'title': root_descriptor['name'],
+ 'version': "version not set"
+ },
+ 'schemes': ["http", "https"],
+ 'consumes': ["application/json"],
+ 'produces': ["application/json"],
+ 'paths': paths,
+ 'definitions': definitions
+ }
+
+ return swagger
+
+
+def gather_all_message_types(descriptors):
+ return dict(
+ (full_name, message_type)
+ for full_name, message_type
+ in iterate_message_types(descriptors)
+ )
+
+
+def gather_all_enum_types(descriptors):
+ return dict(
+ (full_name, enum_type)
+ for full_name, enum_type
+ in iterate_enum_types(descriptors)
+ )
+
+
+def gather_all_methods(descriptors):
+ types_referenced = set()
+ methods = OrderedDict()
+ for full_name, service, method in iterate_methods(descriptors):
+ methods[full_name] = (service, method)
+ types_referenced.add(method['input_type'].strip('.'))
+ types_referenced.add(method['output_type'].strip('.'))
+ return types_referenced, methods
+
+
+def iterate_methods(descriptors):
+ for descriptor in descriptors:
+ package = descriptor['package']
+ for service in descriptor.get('service', []):
+ service_prefix = package + '.' + service['name']
+ for method in service.get('method', []):
+ # skip methods that do not have http options
+ options = method['options']
+ if options.has_key('http'):
+ full_name = service_prefix + '.' + method['name']
+ yield full_name, service, method
+
+
+def iterate_for_type_in(message_types, prefix):
+ for message_type in message_types:
+ full_name = prefix + '.' + message_type['name']
+ yield full_name, message_type
+ for nested_full_name, nested in iterate_for_type_in(
+ message_type.get('nested_type', []), full_name):
+ yield nested_full_name, nested
+
+
+def iterate_message_types(descriptors):
+ for descriptor in descriptors:
+ package = descriptor['package']
+ top_types = descriptor.get('message_type', [])
+ for full_name, message_type in iterate_for_type_in(top_types, package):
+ yield full_name, message_type
+
+
+def iterate_enum_types(descriptors):
+ for descriptor in descriptors:
+ package = descriptor['package']
+ for enum in descriptor.get('enum_type', []):
+ enum_full_name = package + '.' + enum['name']
+ yield enum_full_name, enum
+ top_types = descriptor.get('message_type', [])
+ for full_name, message_type in iterate_for_type_in(top_types, package):
+ for enum in message_type.get('enum_type', []):
+ enum_full_name = full_name + '.' + enum['name']
+ yield enum_full_name, enum
+
+
+def generate_definitions(types_referenced, types):
+ """Walk all the referenced types and for each, generate a JSON schema
+ definition. These may also refer to other types, so keep the needed
+ set up-to-date.
+ """
+ definitions = {}
+ wanted = copy(types_referenced)
+ while wanted:
+ full_name = wanted.pop()
+ type = types[full_name]
+ definition, types_referenced = make_definition(type, types)
+ definitions[full_name] = definition
+ for type_referenced in types_referenced:
+ if not definitions.has_key(type_referenced):
+ wanted.add(type_referenced)
+ return definitions
+
+
+def make_definition(type, types):
+ if type['_type'] == 'google.protobuf.EnumDescriptorProto':
+ return make_enum_definition(type), set()
+ else:
+ return make_object_definition(type, types)
+
+
+def make_enum_definition(type):
+
+ def make_value_desc(enum_value):
+ txt = ' - {}'.format(enum_value['name'])
+ description = enum_value.get('_description', '')
+ if description:
+ txt += ': {}'.format(description)
+ return txt
+
+ string_values = [v['name'] for v in type['value']]
+ default = type['value'][0]['name']
+ description = (
+ (type.get('_description', '') or type['name'])
+ + '\nValid values:\n'
+ + '\n'.join(make_value_desc(v) for v in type['value'])
+ )
+
+ definition = {
+ 'type': 'string',
+ 'enum': string_values,
+ 'default': default,
+ 'description': description
+ }
+
+ return definition
+
+
+def make_object_definition(type, types):
+
+ definition = {
+ 'type': 'object'
+ }
+
+ referenced = set()
+ properties = {}
+ for field in type.get('field', []):
+ field_name, property, referenced_by_field = make_property(field, types)
+ properties[field_name] = property
+ referenced.update(referenced_by_field)
+
+ if properties:
+ definition['properties'] = properties
+
+ if type.has_key('_description'):
+ definition['description'] = type['_description']
+
+ return definition, referenced
+
+
+def make_property(field, types):
+
+ referenced = set()
+
+ repeated = field['label'] == FieldDescriptor.LABEL_REPEATED
+
+ def check_if_map_entry(type_name):
+ type = types[type_name]
+ if type.get('options', {}).get('map_entry', False):
+ _, property, __ = make_property(type['field'][1], types)
+ return property
+
+ if field['type'] == FieldDescriptor.TYPE_MESSAGE:
+
+ type_name = field['type_name'].strip('.')
+
+ maybe_map_value_type = check_if_map_entry(type_name)
+ if maybe_map_value_type:
+ # map-entries are inlined
+ repeated = False
+ property = {
+ 'type': 'object',
+ 'additionalProperties': maybe_map_value_type
+ }
+
+ elif type_name == 'google.protobuf.Timestamp':
+ # time-stamp is mapped back to JSON schema date-time string
+ property = {
+ 'type': 'string',
+ 'format': 'date-time'
+ }
+
+ else:
+ # normal nested object field
+ property = {
+ '$ref': '#/definitions/{}'.format(type_name)
+ }
+ referenced.add(type_name)
+
+ elif field['type'] == FieldDescriptor.TYPE_ENUM:
+ type_name = field['type_name'].strip('.')
+ property = {
+ '$ref': '#/definitions/{}'.format(type_name)
+ }
+ referenced.add(type_name)
+
+ elif field['type'] == FieldDescriptor.TYPE_GROUP:
+ raise NotImplementedError()
+
+ else:
+ _type, format = TYPE_MAP[field['type']]
+ property = {
+ 'type': _type,
+ 'format': format
+ }
+
+ if repeated:
+ property = {
+ 'type': 'array',
+ 'items': property
+ }
+
+ if field.has_key('_description'):
+ property['description'] = field['_description']
+
+ return field['name'], property, referenced
+
+
+def generate_paths(methods_dict, definitions):
+
+ paths = {}
+
+ def _iterate():
+ for full_name, (service, method) in methods_dict.iteritems():
+ http_option = method['options']['http']
+ yield service, method, http_option
+ for binding in http_option.get('additional_bindings', []):
+ yield service, method, binding
+
+ def prune_path(path):
+ """rid '=<stuff>' pattern from path symbolic segments"""
+ segments = re_segment.findall(path)
+ pruned_segments = []
+ for absolute, symbolic in segments:
+ if symbolic:
+ full_symbol = symbolic[1:-1]
+ pruned_symbol = full_symbol.split('=', 2)[0]
+ pruned_segments.append('{' + pruned_symbol + '}')
+ else:
+ pruned_segments.append(absolute)
+
+ return '/' + '/'.join(pruned_segments)
+
+ def lookup_input_type(input_type_name):
+ return definitions[input_type_name.strip('.')]
+
+ def lookup_type(input_type, field_name):
+ local_field_name, _, rest = field_name.partition('.')
+ properties = input_type['properties']
+ if not properties.has_key(local_field_name):
+ raise InvalidPathArgumentError(
+ 'Input type has no field {}'.format(field_name))
+ field = properties[local_field_name]
+ if rest:
+ field_type = field.get('type', 'object')
+ assert field_type == 'object', (
+ 'Nested field name "%s" refers to field that of type "%s" '
+ '(.%s should be nested object field)'
+ % (field_name, field_type, local_field_name))
+ ref = field['$ref']
+ assert ref.startswith('#/definitions/')
+ type_name = ref.replace('#/definitions/', '')
+ nested_input_type = lookup_input_type(type_name)
+ return lookup_type(nested_input_type, rest)
+ else:
+ return field['type'], field['format']
+
+ def make_entry(service, method, http):
+ parameters = []
+ verb = None
+ for verb_candidate in ('get', 'delete', 'patch', 'post', 'put'):
+ if verb_candidate in http:
+ verb, path = verb_candidate, http[verb_candidate]
+ break
+ if 'custom' in http:
+ assert verb is None
+ verb = http['custom']['kind']
+ path = http['custom']['path']
+ assert verb is not None
+ path = prune_path(path)
+
+ # for each symbolic segment in path, add a path parameter entry
+ input_type = lookup_input_type(method['input_type'])
+ for segment in re_path_param.findall(path):
+ symbol = segment.split('=')[0]
+ _type, format = lookup_type(input_type, symbol)
+ parameters.append({
+ 'in': 'path',
+ 'name': symbol,
+ 'required': True,
+ 'type': _type,
+ 'format': format
+ })
+
+ if 'body' in http:
+ if 'body' in http: # TODO validate if body lists fields
+ parameters.append({
+ 'in': 'body',
+ 'name': 'body',
+ 'required': True,
+ 'schema': {'$ref': '#/definitions/{}'.format(
+ method['input_type'].strip('.'))}
+ })
+
+ entry = {
+ 'operationId': method['name'],
+ 'tags': [service['name'],],
+ 'responses': {
+ '200': { # TODO: code is 201 and 209 in POST/DELETE?
+ 'description': unicode(""), # TODO: ever filled by proto?
+ 'schema': {
+ '$ref': '#/definitions/{}'.format(
+ method['output_type'].strip('.'))
+ }
+ },
+ # TODO shall we prefill with standard error (verb specific),
+ # such as 400, 403, 404, 409, 509, 500, 503 etc.
+ }
+ }
+
+ if parameters:
+ entry['parameters'] = parameters
+
+ summary, description = extract_summary_and_description(method)
+ if summary:
+ entry['summary'] = summary
+ if description:
+ entry['description'] = description
+
+ return path, verb, entry
+
+ for service, method, http in _iterate():
+ path, verb, entry = make_entry(service, method, http)
+ path_dict = paths.setdefault(path, {})
+ if verb in path_dict:
+ raise DuplicateMethodAndPathError(
+ 'There is already a {} method defined for path ({})'.format(
+ verb, path))
+ path_dict[verb] = entry
+
+ return paths
+
+
+def extract_summary_and_description(obj):
+ """
+ Break raw _description field (if present) into a summary line and/or
+ detailed description text as follows:
+ * if text is a single line (not counting white-spaces), then it is a
+ summary and there is no detailed description.
+ * if text starts with a non-empty line followied by an empty line followed
+ by at least one non-empty line, that the 1s line is the summary and the
+ lines after the empty line is the description.
+ * in all other cases the text is considered a description and no summary
+ is generated.
+ """
+ assert isinstance(obj, dict)
+ summary, description = None, None
+ text = obj.get('_description', '')
+ if text:
+ s, blank, d = (text.split('\n', 2) + ['', ''])[:3] # so we can demux
+ if not blank.strip():
+ summary = s
+ if d.strip():
+ description = d
+ else:
+ description = text
+
+ return summary, description
+
+
+TYPE_MAP = {
+ FieldDescriptor.TYPE_BOOL: ('boolean', 'boolean'),
+ FieldDescriptor.TYPE_BYTES: ('string', 'byte'),
+ FieldDescriptor.TYPE_DOUBLE: ('number', 'double'),
+ FieldDescriptor.TYPE_ENUM: ('string', 'string'),
+ FieldDescriptor.TYPE_FIXED32: ('integer', 'int64'),
+ FieldDescriptor.TYPE_FIXED64: ('string', 'uint64'),
+ FieldDescriptor.TYPE_FLOAT: ('number', 'float'),
+ FieldDescriptor.TYPE_INT32: ('integer', 'int32'),
+ FieldDescriptor.TYPE_INT64: ('string', 'int64'),
+ FieldDescriptor.TYPE_SFIXED32: ('integer', 'int32'),
+ FieldDescriptor.TYPE_SFIXED64: ('string', 'int64'),
+ FieldDescriptor.TYPE_STRING: ('string', 'string'),
+ FieldDescriptor.TYPE_SINT32: ('integer', 'int32'),
+ FieldDescriptor.TYPE_SINT64: ('string', 'int64'),
+ FieldDescriptor.TYPE_UINT32: ('integer', 'int64'),
+ FieldDescriptor.TYPE_UINT64: ('string', 'uint64'),
+ # FieldDescriptor.TYPE_MESSAGE:
+ # FieldDescriptor.TYPE_GROUP:
+}