blob: 9f7f498f2ecaa0c8daabbe243bc9d7bf9e58a071 [file] [log] [blame]
Zsolt Harasztidafefe12016-11-14 21:29:58 -08001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17from copy import copy
18
19from jsonpatch import JsonPatch
20from jsonpatch import make_patch
21
22from common.utils.json_format import MessageToDict
23from voltha.core.config.config_branch import ConfigBranch
24from voltha.core.config.config_proxy import CallbackType, ConfigProxy
25from voltha.core.config.config_rev import is_proto_message, children_fields, \
26 ConfigRevision, access_rights
27from voltha.protos import third_party
28from voltha.protos import meta_pb2
29
30
31class MergeConflictException(Exception):
32 pass
33
34
35def message_to_dict(m):
36 return MessageToDict(m, True, True, False)
37
38
39def check_access_violation(new_msg, old_msg):
40 """Raise ValueError if attempt is made to change a read-only field"""
41 access_map = access_rights(new_msg.__class__)
42 violated_fields = []
43 for field_name, access in access_map.iteritems():
44 if access == meta_pb2.READ_ONLY:
45 if getattr(new_msg, field_name) != getattr(old_msg, field_name):
46 violated_fields.append(field_name)
47 if violated_fields:
48 raise ValueError('Cannot change read-only field(s) %s' %
49 ', '.join('"%s"' % f for f in violated_fields))
50
51
52class ConfigNode(object):
53 """
54 Represents a configuration node which can hold a number of revisions
55 of the configuration for this node.
56 When the configuration changes, the new version is appended to the
57 node.
58 Initial data must be a protobuf message and it will determine the type of
59 this node.
60 """
61 __slots__ = (
62 '_type', # node type, as __class__ of protobuf message
63 '_branches', # dict of transaction branches and a default (committed)
64 # branch
65 '_tags', # dict of tag-name to ref of ConfigRevision
66 '_proxy', # ref to proxy observer or None if no proxy assigned
67 )
68
69 def __init__(self, initial_data, auto_prune=True, txid=None):
70 assert is_proto_message(initial_data)
71 self._type = initial_data.__class__
72 self._branches = {}
73 self._tags = {}
74 self._proxy = None
75
76 self._initialize(copy(initial_data), auto_prune, txid)
77
78 def _initialize(self, data, auto_prune, txid):
79 # separate external children data away from locally stored data
80 # based on child_node annotations in protobuf
81 children = {}
82 for field_name, field in children_fields(self._type).iteritems():
83 field_value = getattr(data, field_name)
84 if field.is_container:
85 if field.key:
86 children[field_name] = od = OrderedDict()
87 for v in field_value:
88 rev = ConfigNode(v, txid=txid).latest
89 key = getattr(v, field.key)
90 if key in od:
91 raise ValueError('Duplicate key "{}"'.format(key))
92 od[key] = rev
93 else:
94 children[field_name] = [
95 ConfigNode(v, txid=txid).latest for v in field_value]
96 else:
97 children[field_name] = [
98 ConfigNode(field_value, txid=txid).latest]
99 data.ClearField(field_name)
100
101
102 branch = ConfigBranch(self, auto_prune=auto_prune)
103 rev = ConfigRevision(branch, data, children)
104 branch._latest = rev
105 branch._revs[rev.hash] = rev
106 self._branches[txid] = branch
107
108 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
109 # these convenience short-cuts only work for the committed branch
110
111 @property
112 def revisions(self):
113 return [r._hash for r in self._branches[None]._revs.itervalues()]
114
115 @property
116 def latest(self):
117 return self._branches[None]._latest
118
119 def __getitem__(self, hash):
120 return self._branches[None]._revs[hash]
121
122 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
123
124 def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
125
126 # depth preparation
127 if deep:
128 depth = -1
129
130 # path preparation
131 path = '' if path is None else path
132 while path.startswith('/'):
133 path = path[1:]
134
135 # determine branch; if lookup fails, it is ok to use default branch
136 branch = self._branches.get(txid, None) or self._branches[None]
137
138 # determine rev
139 if hash is not None:
140 rev = branch._revs[hash]
141 else:
142 rev = branch.latest
143
144 return self._get(rev, path, depth)
145
146 def _get(self, rev, path, depth):
147
148 if not path:
149 return self._do_get(rev, depth)
150
151 # ... otherwise
152 name, _, path = path.partition('/')
153 field = children_fields(self._type)[name]
154 if field.is_container:
155 if field.key:
156 children_od = rev._children[name]
157 if path:
158 # need to escalate further
159 key, _, path = path.partition('/')
160 child_rev = children_od[field.key_from_str(key)]
161 child_node = child_rev.node
162 return child_node._get(child_rev, path, depth)
163 else:
164 # we are the node of interest
165 response = []
166 for child_rev in children_od.itervalues():
167 child_node = child_rev.node
168 value = child_node._do_get(child_rev, depth)
169 response.append(value)
170 return response
171 else:
172 if path:
173 raise LookupError(
174 'Cannot index into container with no key defined')
175 response = []
176 for child_rev in rev._children[name]:
177 child_node = child_rev.node
178 value = child_node._do_get(child_rev, depth)
179 response.append(value)
180 return response
181 else:
182 child_rev = rev._children[name][0]
183 child_node = child_rev.node
184 return child_node._get(child_rev, path, depth)
185
186 def _do_get(self, rev, depth):
187 msg = rev.get(depth)
188 if self._proxy is not None:
189 msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
190 return msg
191
192 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
193
194 def update(self, path, data, strict=False, txid=None, mk_branch=None):
195
196 while path.startswith('/'):
197 path = path[1:]
198
199 try:
200 branch = self._branches[txid]
201 except KeyError:
202 branch = mk_branch(self)
203
204 if not path:
205 return self._do_update(branch, data, strict)
206
207 rev = branch._latest # change is always made to the latest
208 name, _, path = path.partition('/')
209 field = children_fields(self._type)[name]
210 if field.is_container:
211 if not path:
212 raise ValueError('Cannot update a list')
213 if field.key:
214 key, _, path = path.partition('/')
215 key = field.key_from_str(key)
216 children_od = copy(rev._children[name])
217 child_rev = children_od[key]
218 child_node = child_rev.node
219 new_child_rev = child_node.update(
220 path, data, strict, txid, mk_branch)
221 if new_child_rev.hash == child_rev.hash:
222 # no change, we can return
223 return branch._latest
224 if getattr(new_child_rev.data, field.key) != key:
225 raise ValueError('Cannot change key field')
226 children_od[key] = new_child_rev
227 rev = rev.update_children(name, children_od, branch)
228 self._make_latest(branch, rev)
229 return rev
230 else:
231 raise ValueError('Cannot index into container with no keys')
232
233 else:
234 child_rev = rev._children[name][0]
235 child_node = child_rev.node
236 new_child_rev = child_node.update(
237 path, data, strict, txid, mk_branch)
238 rev = rev.update_children(name, [new_child_rev], branch)
239 self._make_latest(branch, rev)
240 return rev
241
242 def _do_update(self, branch, data, strict):
243 if not isinstance(data, self._type):
244 raise ValueError(
245 '"{}" is not a valid data type for this node'.format(
246 data.__class__.__name__))
247 self._test_no_children(data)
248 if self._proxy is not None:
249 self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
250
251 if branch._latest.data != data:
252 if strict:
253 # check if attempt is made to change read-only field
254 check_access_violation(data, branch._latest.data)
255 rev = branch._latest.update_data(data, branch)
256 self._make_latest(branch, rev,
257 ((CallbackType.POST_UPDATE, rev.data),))
258 return rev
259 else:
260 return branch._latest
261
262 def _make_latest(self, branch, rev, change_announcements=()):
263 branch._latest = rev
264 if rev.hash not in branch._revs:
265 branch._revs[rev.hash] = rev
266
267 # announce only if this is main branch
268 if change_announcements and branch._txid is None and \
269 self._proxy is not None:
270 for change_type, data in change_announcements:
271 self._proxy.invoke_callbacks(
272 change_type, data, proceed_on_errors=1)
273
274 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
275
276 def add(self, path, data, txid=None, mk_branch=None):
277 while path.startswith('/'):
278 path = path[1:]
279 if not path:
280 raise ValueError('Cannot add to non-container node')
281
282 try:
283 branch = self._branches[txid]
284 except KeyError:
285 branch = mk_branch(self)
286
287 rev = branch._latest # change is always made to latest
288 name, _, path = path.partition('/')
289 field = children_fields(self._type)[name]
290 if field.is_container:
291 if not path:
292 # we do need to add a new child to the field
293 if field.key:
294 if self._proxy is not None:
295 self._proxy.invoke_callbacks(
296 CallbackType.PRE_ADD, data)
297 children_od = copy(rev._children[name])
298 key = getattr(data, field.key)
299 if key in children_od:
300 raise ValueError('Duplicate key "{}"'.format(key))
301 child_rev = ConfigNode(data).latest
302 children_od[key] = child_rev
303 rev = rev.update_children(name, children_od, branch)
304 self._make_latest(branch, rev,
305 ((CallbackType.POST_ADD, data),))
306 return rev
307 else:
308 # adding to non-keyed containers not implemented yet
309 raise ValueError('Cannot add to non-keyed container')
310 else:
311 if field.key:
312 # need to escalate
313 key, _, path = path.partition('/')
314 key = field.key_from_str(key)
315 children_od = copy(rev._children[name])
316 child_rev = children_od[key]
317 child_node = child_rev.node
318 new_child_rev = child_node.add(path, data, txid, mk_branch)
319 children_od[key] = new_child_rev
320 rev = rev.update_children(name, children_od, branch)
321 self._make_latest(branch, rev)
322 return rev
323 else:
324 raise ValueError(
325 'Cannot index into container with no keys')
326 else:
327 raise ValueError('Cannot add to non-container field')
328
329 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
330
331 def remove(self, path, txid=None, mk_branch=None):
332 while path.startswith('/'):
333 path = path[1:]
334 if not path:
335 raise ValueError('Cannot remove from non-container node')
336
337 try:
338 branch = self._branches[txid]
339 except KeyError:
340 branch = mk_branch(self)
341
342 rev = branch._latest # change is always made to latest
343 name, _, path = path.partition('/')
344 field = children_fields(self._type)[name]
345 if field.is_container:
346 if not path:
347 raise ValueError("Cannot remove without a key")
348 if field.key:
349 key, _, path = path.partition('/')
350 key = field.key_from_str(key)
351 if path:
352 # need to escalate
353 children_od = copy(rev._children[name])
354 child_rev = children_od[key]
355 child_node = child_rev.node
356 new_child_rev = child_node.remove(path, txid, mk_branch)
357 children_od[key] = new_child_rev
358 rev = rev.update_children(name, children_od, branch)
359 self._make_latest(branch, rev)
360 return rev
361 else:
362 # need to remove from this very node
363 children_od = copy(rev._children[name])
364 if self._proxy is not None:
365 data = children_od[field.key_from_str(key)].data
366 self._proxy.invoke_callbacks(
367 CallbackType.PRE_REMOVE, data)
368 post_anno = ((CallbackType.POST_REMOVE, data),)
369 else:
370 post_anno = ()
371 del children_od[field.key_from_str(key)]
372 rev = rev.update_children(name, children_od, branch)
373 self._make_latest(branch, rev, post_anno)
374 return rev
375 else:
376 raise ValueError('Cannot remove from non-keyed container')
377 else:
378 raise ValueError('Cannot remove non-conatiner field')
379
380 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
381
382 def _mk_txbranch(self, txid):
383 branch_point = self._branches[None].latest
384 branch = ConfigBranch(self, txid, branch_point)
385 self._branches[txid] = branch
386 return branch
387
388 def _del_txbranch(self, txid):
389 del self._branches[txid]
390
391 # def can_txbranch_be_merged(self, txid):
392 # try:
393 # self._merge_txbranch(txid, dry_run=True)
394 # except MergeConflictException:
395 # return False
396 # else:
397 # return True
398
399 def _merge_txbranch(self, txid, dry_run=False):
400 """
401 Make latest in branch to be latest in the common branch, but only
402 if no conflict is detected. Conflict is where the txbranch branch
403 point no longer matches the latest in the default branch. This has
404 to be verified recursively.
405 """
406
407 """
408 A transaction branch can be merged only if none of the following
409 happened with the master branch since the fork rev:
410 - the local data was changed both in the incoming node and in the
411 default branch since the branch point, and they differ now
412 - both branches changed the same children nodes in any way (local or
413 deep)
414 """
415
416 announcements = []
417
418 def _get_od_changes(lst1, lst2):
419 assert isinstance(lst2, OrderedDict)
420 added_keys = [k for k in lst2.iterkeys() if k not in lst1]
421 removed_keys = [k for k in lst1.iterkeys() if k not in lst2]
422 changed_keys = [k for k in lst1.iterkeys()
423 if k in lst2 and lst1[k].hash != lst2[k].hash]
424 return added_keys, removed_keys, changed_keys
425
426 def _get_changes(lst1, lst2):
427 if isinstance(lst1, OrderedDict):
428 return _get_od_changes(lst1, lst2)
429 assert isinstance(lst1, list)
430 assert isinstance(lst2, list)
431 set1 = set(lst1)
432 set2 = set(lst2)
433 added = set2.difference(set1)
434 removed = set1.difference(set2)
435 changed = set() # no such thing in plain (unkeyed) lists
436 return added, removed, changed
437
438 def _escalate(child_rev):
439 child_branch = child_rev._branch
440 if child_branch._txid == txid:
441 child_rev = child_branch._node._merge_txbranch(txid, dry_run)
442 return child_rev
443
444 def _escalate_list(src_list):
445 if isinstance(src_list, list):
446 lst = []
447 for child_rev in src_list:
448 lst.append(_escalate(child_rev))
449 return lst
450 else: # OrderedDict
451 od = OrderedDict()
452 for key, child_rev in src_list.iteritems():
453 od[key] = _escalate(child_rev)
454 return od
455
456 def _add(dst, rev_or_key, src):
457 if isinstance(dst, list):
458 dst.append(_escalate(rev_or_key))
459 announcements.append((CallbackType.POST_ADD, rev_or_key.data))
460 else: # OrderedDict key, data is in lst
461 rev = src[rev_or_key]
462 dst[rev_or_key] = _escalate(rev)
463 announcements.append((CallbackType.POST_ADD, rev.data))
464
465 def _remove(dst, rev_or_key):
466 if isinstance(dst, list):
467 dst.remove(rev_or_key)
468 announcements.append((CallbackType.POST_REMOVE, rev_or_key))
469 else:
470 rev = dst[rev_or_key]
471 del dst[rev_or_key]
472 announcements.append((CallbackType.POST_REMOVE, rev.data))
473
474 src_branch = self._branches[txid]
475 dst_branch = self._branches[None]
476
477 fork_rev = src_branch.origin # rev from which src branch was made
478 src_rev = src_branch.latest # head rev of source branch
479 dst_rev = dst_branch.latest # head rev of target branch
480
481 # deal with config data first
482 if dst_rev._config is fork_rev._config:
483 # no change in master, accept src if different
484 config_changed = dst_rev._config != src_rev._config
485 else:
486 if dst_rev._config.hash != src_rev._config.hash:
487 raise MergeConflictException('Config collision')
488 config_changed = True
489
490 new_children = copy(dst_rev._children)
491 for field_name, field in children_fields(self._type).iteritems():
492 fork_list = fork_rev._children[field_name]
493 src_list = src_rev._children[field_name]
494 dst_list = dst_rev._children[field_name]
495 if 0: #dst_list == fork_list:
496 # no change in master, accept src if different
497 if src_list != fork_list:
498 new_children[field_name] = _escalate_list(src_list)
499 else:
500 src_added, src_removed, src_changed = _get_changes(
501 fork_list, src_list)
502 dst_added, dst_removed, dst_changed = _get_changes(
503 fork_list, dst_list)
504
505 lst = copy(new_children[field_name])
506 for to_add in src_added:
507 # we cannot add if it has been added and is different
508 if to_add in dst_added:
509 # this can happen only to keyed containers
510 assert isinstance(src_list, OrderedDict)
511 if src_list[to_add].hash != dst_list[to_add].hash:
512 raise MergeConflictException(
513 'Cannot add because it has been added and '
514 'different'
515 )
516 _add(lst, to_add, src_list)
517 for to_remove in src_removed:
518 # we cannot remove if it has changed in dst
519 if to_remove in dst_changed:
520 raise MergeConflictException(
521 'Cannot remove because it has changed')
522 if to_remove not in dst_removed:
523 _remove(lst, to_remove)
524 for to_change in src_changed:
525 # we cannot change if it was removed in dst
526 if to_change in dst_removed:
527 raise MergeConflictException(
528 'Cannot change because it has been removed')
529 # change can only be in keyed containers (OrderedDict)
530 lst[to_change] = _escalate(src_list[to_change])
531 new_children[field_name] = lst
532
533 if not dry_run:
534 rev = src_rev if config_changed else dst_rev
535 rev = rev.update_all_children(new_children, dst_branch)
536 if config_changed:
537 announcements.append((CallbackType.POST_UPDATE, rev.data))
538 self._make_latest(dst_branch, rev, announcements)
539 del self._branches[txid]
540 return rev
541
542 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
543
544 def diff(self, hash1, hash2=None, txid=None):
545 branch = self._branches[txid]
546 rev1 = branch[hash1]
547 rev2 = branch[hash2] if hash2 else branch._latest
548 if rev1.hash == rev2.hash:
549 return JsonPatch([])
550 else:
551 dict1 = message_to_dict(rev1.data)
552 dict2 = message_to_dict(rev2.data)
553 return make_patch(dict1, dict2)
554
555 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
556
557 def tag(self, tag, hash=None):
558 branch = self._branches[None] # tag only what has been committed
559 rev = branch._latest if hash is None else branch._revs[hash]
560 self._tags[tag] = rev
561 return self
562
563 @property
564 def tags(self):
565 return sorted(self._tags.iterkeys())
566
567 def by_tag(self, tag):
568 """
569 Return revision based on tag
570 :param tag: previously registered tag value
571 :return: revision object
572 """
573 return self._tags[tag]
574
575 def diff_by_tag(self, tag1, tag2):
576 return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
577
578 def delete_tag(self, tag):
579 del self._tags[tag]
580
581 def delete_tags(self, *tags):
582 for tag in tags:
583 del self._tags[tag]
584
585 def prune_untagged(self):
586 branch = self._branches[None]
587 keep = set(rev.hash for rev in self._tags.itervalues())
588 keep.add(branch._latest.hash)
589 for hash in branch._revs.keys():
590 if hash not in keep:
591 del branch._revs[hash]
592 return self
593
594 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
595
596 def _test_no_children(self, data):
597 for field_name, field in children_fields(self._type).items():
598 field_value = getattr(data, field_name)
599 if field.is_container:
600 if len(field_value):
601 raise NotImplementedError(
602 'Cannot update external children')
603 else:
604 if data.HasField(field_name):
605 raise NotImplementedError(
606 'Cannot update externel children')
607
608 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
609
610 def get_proxy(self, path, exclusive=False):
611 return self._get_proxy(path, self, path, exclusive)
612
613 def _get_proxy(self, path, root, full_path, exclusive):
614 while path.startswith('/'):
615 path = path[1:]
616 if not path:
617 return self._mk_proxy(root, full_path, exclusive)
618
619 # need to escalate
620 rev = self._branches[None]._latest
621 name, _, path = path.partition('/')
622 field = children_fields(self._type)[name]
623 if field.is_container:
624 if not path:
625 raise ValueError('Cannot proxy a container field')
626 if field.key:
627 key, _, path = path.partition('/')
628 children_od = rev._children[name]
629 child_rev = children_od[key]
630 child_node = child_rev.node
631 return child_node._get_proxy(path, root, full_path, exclusive)
632
633 raise ValueError('Cannot index into container with no keys')
634
635 else:
636 child_rev = rev._children[name][0]
637 child_node = child_rev.node
638 return child_node._get_proxy(path, root, full_path, exclusive)
639
640 def _mk_proxy(self, root, full_path, exclusive):
641 if self._proxy is None:
642 self._proxy = ConfigProxy(root, self, full_path, exclusive)
643 else:
644 if self._proxy.exclusive:
645 raise ValueError('Node is already owned exclusively')
646 return self._proxy