Major rework of gRPC handling (do not merge yet)
Includes the following chages:
* Refactored proto files
- separation of logical devices vs devices
- common flow related message types moved to openflow_13
- most RPC is defined in voltha.proto now
* Expanded RPC definitions to cover now most of what we
need (a few device provisioning RPCs are still missing)
* Reworked RPC handlers to work with new config tree
* Implemented test cases for all existing RPCs, tested via
chameleon's REST service
* Did away wih the OrderedDict internal representation
in the config nodes (3x performance boost on bulk
add, and negligible penalty in other ops)
* Refactored transacton merge handling to align with
new structures
Change-Id: I3740ec13b8296943b307782e86e6b596af78140e
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index 6df8b4c..969ba8e 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from collections import OrderedDict
from copy import copy
from jsonpatch import JsonPatch
@@ -25,14 +24,11 @@
from voltha.core.config.config_rev import is_proto_message, children_fields, \
ConfigRevision, access_rights
from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import merge_3way
from voltha.protos import third_party
from voltha.protos import meta_pb2
-class MergeConflictException(Exception):
- pass
-
-
def message_to_dict(m):
return MessageToDict(m, True, True, False)
@@ -50,6 +46,13 @@
', '.join('"%s"' % f for f in violated_fields))
+def find_rev_by_key(revs, keyname, value):
+ for i, rev in enumerate(revs):
+ if getattr(rev._config._data, keyname) == value:
+ return i, rev
+ raise KeyError('key {}={} not found'.format(keyname, value))
+
+
class ConfigNode(object):
"""
Represents a configuration node which can hold a number of revisions
@@ -80,7 +83,9 @@
self._type = initial_data
elif is_proto_message(initial_data):
self._type = initial_data.__class__
- self._initialize(copy(initial_data), txid)
+ copied_data = initial_data.__class__()
+ copied_data.CopyFrom(initial_data)
+ self._initialize(copied_data, txid)
else:
raise NotImplementedError()
@@ -98,13 +103,15 @@
field_value = getattr(data, field_name)
if field.is_container:
if field.key:
- children[field_name] = od = OrderedDict()
+ keys_seen = set()
+ children[field_name] = lst = []
for v in field_value:
rev = self._mknode(v, txid=txid).latest
key = getattr(v, field.key)
- if key in od:
+ if key in keys_seen:
raise ValueError('Duplicate key "{}"'.format(key))
- od[key] = rev
+ lst.append(rev)
+ keys_seen.add(key)
else:
children[field_name] = [
self._mknode(v, txid=txid).latest for v in field_value]
@@ -166,17 +173,18 @@
field = children_fields(self._type)[name]
if field.is_container:
if field.key:
- children_od = rev._children[name]
+ children = rev._children[name]
if path:
# need to escalate further
key, _, path = path.partition('/')
- child_rev = children_od[field.key_from_str(key)]
+ key = field.key_from_str(key)
+ _, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
return child_node._get(child_rev, path, depth)
else:
# we are the node of interest
response = []
- for child_rev in children_od.itervalues():
+ for child_rev in children:
child_node = child_rev.node
value = child_node._do_get(child_rev, depth)
response.append(value)
@@ -226,8 +234,8 @@
if field.key:
key, _, path = path.partition('/')
key = field.key_from_str(key)
- children_od = copy(rev._children[name])
- child_rev = children_od[key]
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.update(
path, data, strict, txid, mk_branch)
@@ -236,8 +244,8 @@
return branch._latest
if getattr(new_child_rev.data, field.key) != key:
raise ValueError('Cannot change key field')
- children_od[key] = new_child_rev
- rev = rev.update_children(name, children_od, branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
@@ -307,13 +315,17 @@
if self._proxy is not None:
self._proxy.invoke_callbacks(
CallbackType.PRE_ADD, data)
- children_od = copy(rev._children[name])
+ children = copy(rev._children[name])
key = getattr(data, field.key)
- if key in children_od:
+ try:
+ find_rev_by_key(children, field.key, key)
+ except KeyError:
+ pass
+ else:
raise ValueError('Duplicate key "{}"'.format(key))
child_rev = self._mknode(data).latest
- children_od[key] = child_rev
- rev = rev.update_children(name, children_od, branch)
+ children.append(child_rev)
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev,
((CallbackType.POST_ADD, data),))
return rev
@@ -325,12 +337,12 @@
# need to escalate
key, _, path = path.partition('/')
key = field.key_from_str(key)
- children_od = copy(rev._children[name])
- child_rev = children_od[key]
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.add(path, data, txid, mk_branch)
- children_od[key] = new_child_rev
- rev = rev.update_children(name, children_od, branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
@@ -363,26 +375,27 @@
key = field.key_from_str(key)
if path:
# need to escalate
- children_od = copy(rev._children[name])
- child_rev = children_od[key]
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.remove(path, txid, mk_branch)
- children_od[key] = new_child_rev
- rev = rev.update_children(name, children_od, branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
# need to remove from this very node
- children_od = copy(rev._children[name])
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
if self._proxy is not None:
- data = children_od[field.key_from_str(key)].data
+ data = child_rev.data
self._proxy.invoke_callbacks(
CallbackType.PRE_REMOVE, data)
post_anno = ((CallbackType.POST_REMOVE, data),)
else:
post_anno = ()
- del children_od[field.key_from_str(key)]
- rev = rev.update_children(name, children_od, branch)
+ del children[idx]
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev, post_anno)
return rev
else:
@@ -401,14 +414,6 @@
def _del_txbranch(self, txid):
del self._branches[txid]
- # def can_txbranch_be_merged(self, txid):
- # try:
- # self._merge_txbranch(txid, dry_run=True)
- # except MergeConflictException:
- # return False
- # else:
- # return True
-
def _merge_txbranch(self, txid, dry_run=False):
"""
Make latest in branch to be latest in the common branch, but only
@@ -417,73 +422,12 @@
to be verified recursively.
"""
- """
- A transaction branch can be merged only if none of the following
- happened with the master branch since the fork rev:
- - the local data was changed both in the incoming node and in the
- default branch since the branch point, and they differ now
- - both branches changed the same children nodes in any way (local or
- deep)
- """
-
- announcements = []
-
- def _get_od_changes(lst1, lst2):
- assert isinstance(lst2, dict)
- added_keys = [k for k in lst2.iterkeys() if k not in lst1]
- removed_keys = [k for k in lst1.iterkeys() if k not in lst2]
- changed_keys = [k for k in lst1.iterkeys()
- if k in lst2 and lst1[k].hash != lst2[k].hash]
- return added_keys, removed_keys, changed_keys
-
- def _get_changes(lst1, lst2):
- if isinstance(lst1, dict):
- return _get_od_changes(lst1, lst2)
- assert isinstance(lst1, list)
- assert isinstance(lst2, list)
- set1 = set(lst1)
- set2 = set(lst2)
- added = set2.difference(set1)
- removed = set1.difference(set2)
- changed = set() # no such thing in plain (unkeyed) lists
- return added, removed, changed
-
- def _escalate(child_rev):
+ def merge_child(child_rev):
child_branch = child_rev._branch
if child_branch._txid == txid:
child_rev = child_branch._node._merge_txbranch(txid, dry_run)
return child_rev
- def _escalate_list(src_list):
- if isinstance(src_list, list):
- lst = []
- for child_rev in src_list:
- lst.append(_escalate(child_rev))
- return lst
- else: # OrderedDict
- od = OrderedDict()
- for key, child_rev in src_list.iteritems():
- od[key] = _escalate(child_rev)
- return od
-
- def _add(dst, rev_or_key, src):
- if isinstance(dst, list):
- dst.append(_escalate(rev_or_key))
- announcements.append((CallbackType.POST_ADD, rev_or_key.data))
- else: # OrderedDict key, data is in lst
- rev = src[rev_or_key]
- dst[rev_or_key] = _escalate(rev)
- announcements.append((CallbackType.POST_ADD, rev.data))
-
- def _remove(dst, rev_or_key):
- if isinstance(dst, list):
- dst.remove(rev_or_key)
- announcements.append((CallbackType.POST_REMOVE, rev_or_key))
- else:
- rev = dst[rev_or_key]
- del dst[rev_or_key]
- announcements.append((CallbackType.POST_REMOVE, rev.data))
-
src_branch = self._branches[txid]
dst_branch = self._branches[None]
@@ -491,66 +435,14 @@
src_rev = src_branch.latest # head rev of source branch
dst_rev = dst_branch.latest # head rev of target branch
- # deal with config data first
- if dst_rev._config is fork_rev._config:
- # no change in master, accept src if different
- config_changed = dst_rev._config != src_rev._config
- else:
- if dst_rev._config.hash != src_rev._config.hash:
- raise MergeConflictException('Config collision')
- config_changed = True
-
- new_children = copy(dst_rev._children)
- for field_name, field in children_fields(self._type).iteritems():
- fork_list = fork_rev._children[field_name]
- src_list = src_rev._children[field_name]
- dst_list = dst_rev._children[field_name]
- if 0: #dst_list == fork_list:
- # no change in master, accept src if different
- if src_list != fork_list:
- new_children[field_name] = _escalate_list(src_list)
- else:
- src_added, src_removed, src_changed = _get_changes(
- fork_list, src_list)
- dst_added, dst_removed, dst_changed = _get_changes(
- fork_list, dst_list)
-
- lst = copy(new_children[field_name])
- for to_add in src_added:
- # we cannot add if it has been added and is different
- if to_add in dst_added:
- # this can happen only to keyed containers
- assert isinstance(src_list, dict)
- if src_list[to_add].hash != dst_list[to_add].hash:
- raise MergeConflictException(
- 'Cannot add because it has been added and '
- 'different'
- )
- _add(lst, to_add, src_list)
- for to_remove in src_removed:
- # we cannot remove if it has changed in dst
- if to_remove in dst_changed:
- raise MergeConflictException(
- 'Cannot remove because it has changed')
- if to_remove not in dst_removed:
- _remove(lst, to_remove)
- for to_change in src_changed:
- # we cannot change if it was removed in dst
- if to_change in dst_removed:
- raise MergeConflictException(
- 'Cannot change because it has been removed')
- # change can only be in keyed containers (OrderedDict)
- lst[to_change] = _escalate(src_list[to_change])
- new_children[field_name] = lst
+ rev, changes = merge_3way(
+ fork_rev, src_rev, dst_rev, merge_child, dry_run)
if not dry_run:
- rev = src_rev if config_changed else dst_rev
- rev = rev.update_all_children(new_children, dst_branch)
- if config_changed:
- announcements.append((CallbackType.POST_UPDATE, rev.data))
- self._make_latest(dst_branch, rev, announcements)
+ self._make_latest(dst_branch, rev, change_announcements=changes)
del self._branches[txid]
- return rev
+
+ return rev
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -638,8 +530,9 @@
raise ValueError('Cannot proxy a container field')
if field.key:
key, _, path = path.partition('/')
- children_od = rev._children[name]
- child_rev = children_od[key]
+ key = field.key_from_str(key)
+ children = rev._children[name]
+ _, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
return child_node._get_proxy(path, root, full_path, exclusive)
diff --git a/voltha/core/config/config_proxy.py b/voltha/core/config/config_proxy.py
index e4c6245..0769a94 100644
--- a/voltha/core/config/config_proxy.py
+++ b/voltha/core/config/config_proxy.py
@@ -21,6 +21,22 @@
log = structlog.get_logger()
+class OperationContext(object):
+ def __init__(self, path=None, data=None, field_name=None, child_key=None):
+ self.path = path
+ self._data = data
+ self.field_name = field_name
+ self.child_key = child_key
+ @property
+ def data(self):
+ return self._data
+ def update(self, data):
+ self._data = data
+ return self
+ def __repr__(self):
+ return 'OperationContext({})'.format(self.__dict__)
+
+
class CallbackType(Enum):
# GET hooks are called after the data is retrieved and can be used to
@@ -43,6 +59,10 @@
PRE_REMOVE = 6
POST_REMOVE = 7
+ # Bulk list change due to transaction commit that changed items in
+ # non-keyed container fields
+ POST_LISTCHANGE = 8
+
class ConfigProxy(object):
"""
@@ -115,16 +135,16 @@
# ~~~~~~~~~~~~~~~~~~~~~ Callback dispatch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- def invoke_callbacks(self, callback_type, msg, proceed_on_errors=False):
+ def invoke_callbacks(self, callback_type, context, proceed_on_errors=False):
lst = self._callbacks.get(callback_type, [])
for callback, args, kw in lst:
try:
- msg = callback(msg, *args, **kw)
+ context = callback(context, *args, **kw)
except Exception, e:
if proceed_on_errors:
log.exception(
'call-back-error', callback_type=callback_type,
- msg=msg, e=e)
+ context=context, e=e)
else:
raise
- return msg
+ return context
diff --git a/voltha/core/config/config_rev.py b/voltha/core/config/config_rev.py
index 411ede2..dc0eb5e 100644
--- a/voltha/core/config/config_rev.py
+++ b/voltha/core/config/config_rev.py
@@ -263,10 +263,8 @@
m = md5('' if self._config is None else self._config._hash)
if self._children is not None:
for children in self._children.itervalues():
- if isinstance(children, dict):
- m.update(''.join(c._hash for c in children.itervalues()))
- else:
- m.update(''.join(c._hash for c in children))
+ assert isinstance(children, list)
+ m.update(''.join(c._hash for c in children))
return m.hexdigest()[:12]
@property
@@ -291,17 +289,15 @@
branch nodes. If depth is < 0, this results in a fully exhaustive
"complete config".
"""
- data = copy(self._config.data)
+ orig_data = self._config.data
+ data = orig_data.__class__()
+ data.CopyFrom(orig_data)
if depth:
# collect children
cfields = children_fields(self.type).iteritems()
for field_name, field in cfields:
if field.is_container:
- if field.key:
- children = self._children[field_name].itervalues()
- else:
- children = self._children[field_name]
- for rev in children:
+ for rev in self._children[field_name]:
child_data = rev.get(depth=depth - 1)
child_data_holder = getattr(data, field_name).add()
child_data_holder.MergeFrom(child_data)
@@ -322,7 +318,7 @@
def update_children(self, name, children, branch):
"""Return a NEW revision which is updated for the modified children"""
- new_children = copy(self._children)
+ new_children = self._children.copy()
new_children[name] = children
new_rev = copy(self)
new_rev._branch = branch
diff --git a/voltha/core/config/config_rev_persisted.py b/voltha/core/config/config_rev_persisted.py
index f1fad1c..d3983a1 100644
--- a/voltha/core/config/config_rev_persisted.py
+++ b/voltha/core/config/config_rev_persisted.py
@@ -18,7 +18,6 @@
A config rev object that persists itself
"""
from bz2 import compress, decompress
-from collections import OrderedDict
import structlog
from simplejson import dumps, loads
@@ -61,11 +60,8 @@
children_lists = {}
for field_name, children in self._children.iteritems():
- if isinstance(children, list):
- lst = [rev.hash for rev in children]
- else:
- lst = [rev.hash for rev in children.itervalues()]
- children_lists[field_name] = lst
+ hashes = [rev.hash for rev in children]
+ children_lists[field_name] = hashes
data = dict(
children=children_lists,
@@ -92,25 +88,13 @@
node = branch._node
for field_name, meta in children_fields(msg_cls).iteritems():
child_msg_cls = tmp_cls_loader(meta.module, meta.type)
- if meta.key:
- # we need to assemble an ordered dict using the key
- lst = OrderedDict()
- for child_hash in children_list[field_name]:
- child_node = node._mknode(child_msg_cls)
- child_node.load_latest(child_hash)
- child_rev = child_node.latest
- key = getattr(child_rev.data, meta.key)
- lst[key] = child_rev
- else:
- lst = []
- for child_hash in children_list[field_name]:
- child_node = node._mknode(child_msg_cls)
- child_node.load_latest(child_hash)
- child_rev = child_node.latest
- lst.append(child_rev)
-
- assembled_children[field_name] = lst
-
+ children = []
+ for child_hash in children_list[field_name]:
+ child_node = node._mknode(child_msg_cls)
+ child_node.load_latest(child_hash)
+ child_rev = child_node.latest
+ children.append(child_rev)
+ assembled_children[field_name] = children
rev = cls(branch, config_data, assembled_children)
return rev
@@ -139,5 +123,5 @@
def tmp_cls_loader(module_name, cls_name):
# TODO this shall be generalized
from voltha.protos import voltha_pb2, health_pb2, adapter_pb2, \
- logical_layer_pb2, openflow_13_pb2
+ logical_device_pb2, device_pb2, openflow_13_pb2
return getattr(locals()[module_name], cls_name)
diff --git a/voltha/core/config/config_root.py b/voltha/core/config/config_root.py
index c229a42..6b45a90 100644
--- a/voltha/core/config/config_root.py
+++ b/voltha/core/config/config_root.py
@@ -18,9 +18,10 @@
import structlog
from simplejson import dumps, loads
-from voltha.core.config.config_node import ConfigNode, MergeConflictException
+from voltha.core.config.config_node import ConfigNode
from voltha.core.config.config_rev import ConfigRevision
from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import MergeConflictException
log = structlog.get_logger()
diff --git a/voltha/core/config/merge_3way.py b/voltha/core/config/merge_3way.py
new file mode 100644
index 0000000..be87f5c
--- /dev/null
+++ b/voltha/core/config/merge_3way.py
@@ -0,0 +1,267 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+3-way merge function for config rev objects.
+"""
+from collections import OrderedDict
+from copy import copy
+
+from voltha.core.config.config_proxy import CallbackType, OperationContext
+from voltha.core.config.config_rev import children_fields
+
+
+class MergeConflictException(Exception):
+ pass
+
+
+def merge_3way(fork_rev, src_rev, dst_rev, merge_child_func, dry_run=False):
+ """
+ Attempt to merge src_rev into dst_rev but taking into account what have
+ changed in both revs since the last known common point, the fork_rev.
+ In case of conflict, raise a MergeConflictException(). If dry run is True,
+ don't actually perform the merge, but detect potential conflicts.
+
+ This function recurses into all children nodes stored under the rev and
+ performs the merge if the children is also part of a transaction branch.
+
+ :param fork_rev: Point of forking (last known common state between branches
+ :param src_rev: Latest rev from which we merge to dst_rev
+ :param dst_rev: Target (destination) rev
+ :param merge_child_fun: To run a potential merge in all children that
+ may need merge (determined from the local changes)
+ :param dry_run: If True, do not perform the merge, but detect merge
+ conflicts.
+ :return: The new dst_rev (a new rev instance) the list of changes that
+ occurred in this node or any of its children as part of this merge.
+ """
+
+ # to collect change tuples of (<callback-type>, <op-context>)
+ changes = []
+
+ class AnalyzeChanges(object):
+ def __init__(self, lst1, lst2, keyname):
+ self.keymap1 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst1))
+ self.keymap2 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst2))
+ self.added_keys = [
+ k for k in self.keymap2.iterkeys() if k not in self.keymap1]
+ self.removed_keys = [
+ k for k in self.keymap1.iterkeys() if k not in self.keymap2]
+ self.changed_keys = [
+ k for k in self.keymap1.iterkeys()
+ if k in self.keymap2 and
+ lst1[self.keymap1[k]]._hash != lst2[self.keymap2[k]]._hash
+ ]
+
+ # Note: there are a couple of special cases that can be optimized
+ # for larer on. But since premature optimization is a bad idea, we
+ # defer them.
+
+ # deal with config data first
+ if dst_rev._config is fork_rev._config:
+ # no change in master, accept src if different
+ config_changed = dst_rev._config != src_rev._config
+ else:
+ if dst_rev._config.hash != src_rev._config.hash:
+ raise MergeConflictException('Config collision')
+ config_changed = True
+
+ # now to the external children fields
+ new_children = dst_rev._children.copy()
+ _children_fields = children_fields(fork_rev.data.__class__)
+
+ for field_name, field in _children_fields.iteritems():
+
+ fork_list = fork_rev._children[field_name]
+ src_list = src_rev._children[field_name]
+ dst_list = dst_rev._children[field_name]
+
+ if dst_list == src_list:
+ # we do not need to change the dst, however we still need
+ # to complete the branch purging in child nodes so not
+ # to leave dangling branches around
+ [merge_child_func(rev) for rev in src_list]
+ continue
+
+ if not field.key:
+ # If the list is not keyed, we really should not merge. We merely
+ # check for collision, i.e., if both changed (and not same)
+ if dst_list == fork_list:
+ # dst branch did not change since fork
+
+ assert src_list != fork_list, 'We should not be here otherwise'
+
+ # the incoming (src) rev changed, and we have to apply it
+ new_children[field_name] = [
+ merge_child_func(rev) for rev in src_list]
+
+ if field.is_container:
+ changes.append((CallbackType.POST_LISTCHANGE,
+ OperationContext(field_name=field_name)))
+
+ else:
+ if src_list != fork_list:
+ raise MergeConflictException(
+ 'Cannot merge because single child node or un-keyed'
+ 'children list has changed')
+
+ else:
+
+ if dst_list == fork_list:
+ # Destination did not change
+
+ # We need to analyze only the changes on the incoming rev
+ # since fork
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+
+ new_list = copy(src_list) # we start from the source list
+
+ for key in src.added_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ changes.append(
+ (CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.removed_keys:
+ old_rev = fork_list[src.keymap1[key]]
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ for key in src.changed_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ # updated child gets its own change event
+
+ new_children[field_name] = new_list
+
+ else:
+
+ # For keyed fields we can really investigate what has been
+ # added, removed, or changed in both branches and do a
+ # fine-grained collision detection and merge
+
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+ dst = AnalyzeChanges(fork_list, dst_list, field.key)
+
+ new_list = copy(dst_list) # this time we start with the dst
+
+ for key in src.added_keys:
+ # we cannot add if it has been added and is different
+ if key in dst.added_keys:
+ # it has been added to both, we need to check if
+ # they are the same
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_dst_rev)
+ else:
+ raise MergeConflictException(
+ 'Cannot add because it has been added and '
+ 'different'
+ )
+ else:
+ # this is a brand new key, need to add it
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list.append(new_rev)
+ changes.append((
+ CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.changed_keys:
+ # we cannot change if it was removed in dst
+ if key in dst.removed_keys:
+ raise MergeConflictException(
+ 'Cannot change because it has been removed')
+
+ # if it changed in dst as well, we need to check if they
+ # match (same change
+ elif key in dst.changed_keys:
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_src_rev)
+ elif child_dst_rev._config.hash != child_src_rev._config.hash:
+ raise MergeConflictException(
+ 'Cannot update because it has been changed and '
+ 'different'
+ )
+ else:
+ new_rev = merge_child_func(
+ src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ else:
+ # it only changed in src branch
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ for key in reversed(src.removed_keys): # we go from highest
+ # index to lowest
+
+ # we cannot remove if it has changed in dst
+ if key in dst.changed_keys:
+ raise MergeConflictException(
+ 'Cannot remove because it has changed')
+
+ # if it has not been removed yet from dst, then remove it
+ if key not in dst.removed_keys:
+ dst_idx = dst.keymap2[key]
+ old_rev = new_list.pop(dst_idx)
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ new_children[field_name] = new_list
+
+ if not dry_run:
+ rev = src_rev if config_changed else dst_rev
+ rev = rev.update_all_children(new_children, dst_rev._branch)
+ if config_changed:
+ changes.append((CallbackType.POST_UPDATE, rev.data))
+ return rev, changes
+
+ else:
+ return None, None