blob: 19ebf1aacf059fa5718b92165b4420f05da27462 [file] [log] [blame]
Martin Cosyns0efdc872021-09-27 16:24:30 +00001# Copyright 2020-present Open Networking Foundation
2# Original copyright 2020-present ADTRAN, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14
15# This is free and unencumbered software released into the public domain
16# by its author, Ben Hodgson <ben@benhodgson.com>.
17#
18# Anyone is free to copy, modify, publish, use, compile, sell, or
19# distribute this software, either in source code form or as a compiled
20# binary, for any purpose, commercial or non-commercial, and by any
21# means.
22#
23# In jurisdictions that recognise copyright laws, the author or authors
24# of this software dedicate any and all copyright interest in the
25# software to the public domain. We make this dedication for the benefit
26# of the public at large and to the detriment of our heirs and
27# successors. We intend this dedication to be an overt act of
28# relinquishment in perpetuity of all present and future rights to this
29# software under copyright law.
30#
31# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
32# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
33# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
34# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
35# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
36# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
37# OTHER DEALINGS IN THE SOFTWARE.
38#
39# For more information, please refer to <http://unlicense.org/>
40
41
42# -*- coding:utf-8 -*-
43
44# copied from https://github.com/kaporzhu/protobuf-to-dict
45# all credits to this script go to Kapor Zhu (kapor.zhu@gmail.com)
46#
47# Comments:
48# - need a fix for bug: "Use enum_label when setting the default value if use_enum_labels is true" (line 95)
49# - try to convert timestaps to a human readable format
50
51import base64
52
53import six
54from datetime import datetime
55
56from google.protobuf.message import Message
57from google.protobuf.descriptor import FieldDescriptor
58
59
60__all__ = ["protobuf_to_dict", "TYPE_CALLABLE_MAP", "dict_to_protobuf",
61 "REVERSE_TYPE_CALLABLE_MAP"]
62
63
64EXTENSION_CONTAINER = '___X'
65
66
67TYPE_CALLABLE_MAP = {
68 FieldDescriptor.TYPE_DOUBLE: float,
69 FieldDescriptor.TYPE_FLOAT: float,
70 FieldDescriptor.TYPE_INT32: int,
71 FieldDescriptor.TYPE_INT64: int if six.PY3 else six.integer_types[1],
72 FieldDescriptor.TYPE_UINT32: int,
73 FieldDescriptor.TYPE_UINT64: int if six.PY3 else six.integer_types[1],
74 FieldDescriptor.TYPE_SINT32: int,
75 FieldDescriptor.TYPE_SINT64: int if six.PY3 else six.integer_types[1],
76 FieldDescriptor.TYPE_FIXED32: int,
77 FieldDescriptor.TYPE_FIXED64: int if six.PY3 else six.integer_types[1],
78 FieldDescriptor.TYPE_SFIXED32: int,
79 FieldDescriptor.TYPE_SFIXED64: int if six.PY3 else six.integer_types[1],
80 FieldDescriptor.TYPE_BOOL: bool,
81 FieldDescriptor.TYPE_STRING: six.text_type,
82 FieldDescriptor.TYPE_BYTES: six.binary_type,
83 FieldDescriptor.TYPE_ENUM: int,
84}
85
86
87def repeated(type_callable):
88 return lambda value_list: [type_callable(value) for value in value_list]
89
90
91def enum_label_name(field, value):
92 return field.enum_type.values_by_number[int(value)].name
93
94
95def _is_map_entry(field):
96 return (field.type == FieldDescriptor.TYPE_MESSAGE and
97 field.message_type.has_options and
98 field.message_type.GetOptions().map_entry)
99
100
101def protobuf_to_dict(pb, type_callable_map=TYPE_CALLABLE_MAP,
102 use_enum_labels=False,
103 including_default_value_fields=False,
104 human_readable_timestamps=False):
105 result_dict = {}
106 extensions = {}
107 for field, value in pb.ListFields():
108 if field.message_type and field.message_type.has_options and field.message_type.GetOptions().map_entry:
109 result_dict[field.name] = dict()
110 value_field = field.message_type.fields_by_name['value']
111 type_callable = _get_field_value_adaptor(
112 pb, value_field, type_callable_map,
113 use_enum_labels, including_default_value_fields)
114 for k, v in value.items():
115 result_dict[field.name][k] = type_callable(v)
116 continue
117 type_callable = _get_field_value_adaptor(pb, field, type_callable_map,
118 use_enum_labels,
119 including_default_value_fields,
120 human_readable_timestamps)
121 if field.label == FieldDescriptor.LABEL_REPEATED:
122 type_callable = repeated(type_callable)
123
124 if field.is_extension:
125 extensions[str(field.number)] = type_callable(value)
126 continue
127
128 if field.full_name in ['google.protobuf.Timestamp.seconds'] and human_readable_timestamps:
129 result_dict[field.name] = datetime.fromtimestamp(type_callable(value)).strftime('%Y-%m-%d %H:%M:%S.%f')
130 else:
131 result_dict[field.name] = type_callable(value)
132
133 # Serialize default value if including_default_value_fields is True.
134 if including_default_value_fields:
135 for field in pb.DESCRIPTOR.fields:
136 # Singular message fields and oneof fields will not be affected.
137 if ((
138 field.label != FieldDescriptor.LABEL_REPEATED and
139 field.cpp_type == FieldDescriptor.CPPTYPE_MESSAGE) or
140 field.containing_oneof):
141 continue
142 if field.name in result_dict:
143 # Skip the field which has been serailized already.
144 continue
145 if _is_map_entry(field):
146 result_dict[field.name] = {}
147 else:
148 if use_enum_labels and field.type == FieldDescriptor.TYPE_ENUM:
149 result_dict[field.name] = enum_label_name(field, field.default_value)
150 else:
151 result_dict[field.name] = field.default_value
152
153 if extensions:
154 result_dict[EXTENSION_CONTAINER] = extensions
155 return result_dict
156
157
158def _get_field_value_adaptor(pb, field, type_callable_map=TYPE_CALLABLE_MAP,
159 use_enum_labels=False,
160 including_default_value_fields=False,
161 human_readable_timestamps=False):
162 if field.type == FieldDescriptor.TYPE_MESSAGE:
163 # recursively encode protobuf sub-message
164 return lambda pb: protobuf_to_dict(
165 pb, type_callable_map=type_callable_map,
166 use_enum_labels=use_enum_labels,
167 including_default_value_fields=including_default_value_fields,
168 human_readable_timestamps=human_readable_timestamps
169 )
170
171 if use_enum_labels and field.type == FieldDescriptor.TYPE_ENUM:
172 return lambda value: enum_label_name(field, value)
173
174 if field.type in type_callable_map:
175 return type_callable_map[field.type]
176
177 raise TypeError("Field %s.%s has unrecognised type id %d" % (
178 pb.__class__.__name__, field.name, field.type))
179
180
181REVERSE_TYPE_CALLABLE_MAP = {
182}
183
184
185def dict_to_protobuf(pb_klass_or_instance, values, type_callable_map=REVERSE_TYPE_CALLABLE_MAP, strict=True, ignore_none=False):
186 """Populates a protobuf model from a dictionary.
187
188 :param pb_klass_or_instance: a protobuf message class, or an protobuf instance
189 :type pb_klass_or_instance: a type or instance of a subclass of google.protobuf.message.Message
190 :param dict values: a dictionary of values. Repeated and nested values are
191 fully supported.
192 :param dict type_callable_map: a mapping of protobuf types to callables for setting
193 values on the target instance.
194 :param bool strict: complain if keys in the map are not fields on the message.
195 :param bool strict: ignore None-values of fields, treat them as empty field
196 """
197 if isinstance(pb_klass_or_instance, Message):
198 instance = pb_klass_or_instance
199 else:
200 instance = pb_klass_or_instance()
201 return _dict_to_protobuf(instance, values, type_callable_map, strict, ignore_none)
202
203
204def _get_field_mapping(pb, dict_value, strict):
205 field_mapping = []
206 for key, value in dict_value.items():
207 if key == EXTENSION_CONTAINER:
208 continue
209 if key not in pb.DESCRIPTOR.fields_by_name:
210 if strict:
211 raise KeyError("%s does not have a field called %s" % (pb, key))
212 continue
213 field_mapping.append((pb.DESCRIPTOR.fields_by_name[key], value, getattr(pb, key, None)))
214
215 for ext_num, ext_val in dict_value.get(EXTENSION_CONTAINER, {}).items():
216 try:
217 ext_num = int(ext_num)
218 except ValueError:
219 raise ValueError("Extension keys must be integers.")
220 if ext_num not in pb._extensions_by_number:
221 if strict:
222 raise KeyError("%s does not have a extension with number %s. Perhaps you forgot to import it?" % (pb, key))
223 continue
224 ext_field = pb._extensions_by_number[ext_num]
225 pb_val = None
226 pb_val = pb.Extensions[ext_field]
227 field_mapping.append((ext_field, ext_val, pb_val))
228
229 return field_mapping
230
231
232def _dict_to_protobuf(pb, value, type_callable_map, strict, ignore_none):
233 fields = _get_field_mapping(pb, value, strict)
234
235 for field, input_value, pb_value in fields:
236 if ignore_none and input_value is None:
237 continue
238 if field.label == FieldDescriptor.LABEL_REPEATED:
239 if field.message_type and field.message_type.has_options and field.message_type.GetOptions().map_entry:
240 value_field = field.message_type.fields_by_name['value']
241 for key, value in input_value.items():
242 if value_field.cpp_type == FieldDescriptor.CPPTYPE_MESSAGE:
243 _dict_to_protobuf(getattr(pb, field.name)[key], value, type_callable_map, strict, ignore_none)
244 else:
245 getattr(pb, field.name)[key] = value
246 continue
247 for item in input_value:
248 if field.type == FieldDescriptor.TYPE_MESSAGE:
249 m = pb_value.add()
250 _dict_to_protobuf(m, item, type_callable_map, strict, ignore_none)
251 elif field.type == FieldDescriptor.TYPE_ENUM and isinstance(item, six.string_types):
252 pb_value.append(_string_to_enum(field, item))
253 else:
254 pb_value.append(item)
255 continue
256 if field.type == FieldDescriptor.TYPE_MESSAGE:
257 _dict_to_protobuf(pb_value, input_value, type_callable_map, strict, ignore_none)
258 continue
259
260 if field.type in type_callable_map:
261 input_value = type_callable_map[field.type](input_value)
262
263 if field.is_extension:
264 pb.Extensions[field] = input_value
265 continue
266
267 if field.type == FieldDescriptor.TYPE_ENUM and isinstance(input_value, six.string_types):
268 input_value = _string_to_enum(field, input_value)
269
270 setattr(pb, field.name, input_value)
271
272 return pb
273
274
275def _string_to_enum(field, input_value):
276 enum_dict = field.enum_type.values_by_name
277 try:
278 input_value = enum_dict[input_value].number
279 except KeyError:
280 raise KeyError("`%s` is not a valid value for field `%s`" % (input_value, field.name))
281 return input_value