VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/core/config/__init__.py b/python/core/config/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/core/config/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/core/config/config_backend.py b/python/core/config/config_backend.py
new file mode 100644
index 0000000..d906348
--- /dev/null
+++ b/python/core/config/config_backend.py
@@ -0,0 +1,289 @@
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from consul import Consul, ConsulException
+from common.utils.asleep import asleep
+from requests import ConnectionError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+import etcd3
+import structlog
+
+
+class ConsulStore(object):
+ """ Config kv store for consul with a cache for quicker subsequent reads
+
+ TODO: This will block the reactor. Should either change
+ whole call stack to yield or put the put/delete transactions into a
+ queue to write later with twisted. Will need a transaction
+ log to ensure we don't lose anything.
+ Making the whole callstack yield is troublesome because other tasks can
+ come in on the side and start modifying things which could be bad.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, host, port, path_prefix):
+
+ self.log = structlog.get_logger()
+ self._consul = Consul(host=host, port=port)
+ self.host = host
+ self.port = port
+ self._path_prefix = path_prefix
+ self._cache = {}
+ self.retries = 0
+
+ def make_path(self, key):
+ return '{}/{}'.format(self._path_prefix, key)
+
+ def __getitem__(self, key):
+ if key in self._cache:
+ return self._cache[key]
+ value = self._kv_get(self.make_path(key))
+ if value is not None:
+ # consul turns empty strings to None, so we do the reverse here
+ self._cache[key] = value['Value'] or ''
+ return value['Value'] or ''
+ else:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ if key in self._cache:
+ return True
+ value = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value['Value']
+ return True
+ else:
+ return False
+
+ def __setitem__(self, key, value):
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._kv_put(self.make_path(key), value)
+ except Exception, e:
+ self.log.exception('cannot-set-item', e=e)
+
+ def __delitem__(self, key):
+ self._cache.pop(key, None)
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ self.log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_consul_connection(self):
+ self._consul = Consul(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ self.log.info('reconnected-to-consul', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_consul(self):
+ return self._consul
+
+ # Proxy methods for consul with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+ while 1:
+ try:
+ consul = self._get_consul()
+ self.log.debug('consul', consul=consul, operation=operation,
+ args=args)
+ if operation == 'GET':
+ index, result = consul.kv.get(*args, **kw)
+ elif operation == 'PUT':
+ result = consul.kv.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = consul.kv.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except ConsulException, e:
+ self.log.exception('consul-not-up', e=e)
+ self._backoff('consul-not-up')
+ except ConnectionError, e:
+ self.log.exception('cannot-connect-to-consul', e=e)
+ self._backoff('cannot-connect-to-consul')
+ except Exception, e:
+ self.log.exception(e)
+ self._backoff('unknown-error')
+ self._redo_consul_connection()
+
+ return result
+
+
+class EtcdStore(object):
+ """ Config kv store for etcd with a cache for quicker subsequent reads
+
+ TODO: This will block the reactor. Should either change
+ whole call stack to yield or put the put/delete transactions into a
+ queue to write later with twisted. Will need a transaction
+ log to ensure we don't lose anything.
+ Making the whole callstack yield is troublesome because other tasks can
+ come in on the side and start modifying things which could be bad.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, host, port, path_prefix):
+
+ self.log = structlog.get_logger()
+ self._etcd = etcd3.client(host=host, port=port)
+ self.host = host
+ self.port = port
+ self._path_prefix = path_prefix
+ self._cache = {}
+ self.retries = 0
+
+ def make_path(self, key):
+ return '{}/{}'.format(self._path_prefix, key)
+
+ def __getitem__(self, key):
+ if key in self._cache:
+ return self._cache[key]
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return value
+ else:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ if key in self._cache:
+ return True
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return True
+ else:
+ return False
+
+ def __setitem__(self, key, value):
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._kv_put(self.make_path(key), value)
+ except Exception, e:
+ self.log.exception('cannot-set-item', e=e)
+
+ def __delitem__(self, key):
+ self._cache.pop(key, None)
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ self.log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_etcd_connection(self):
+ self._etcd = etcd3.client(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ self.log.info('reconnected-to-etcd', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_etcd(self):
+ return self._etcd
+
+ # Proxy methods for etcd with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+
+ # etcd data sometimes contains non-utf8 sequences, replace
+ self.log.debug('backend-op',
+ operation=operation,
+ args=map(lambda x : unicode(x,'utf8','replace'), args),
+ kw=kw)
+
+ while 1:
+ try:
+ etcd = self._get_etcd()
+ self.log.debug('etcd', etcd=etcd, operation=operation,
+ args=map(lambda x : unicode(x,'utf8','replace'), args))
+ if operation == 'GET':
+ (value, meta) = etcd.get(*args, **kw)
+ result = (value, meta)
+ elif operation == 'PUT':
+ result = etcd.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = etcd.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except Exception, e:
+ self.log.exception(e)
+ self._backoff('unknown-error-with-etcd')
+ self._redo_etcd_connection()
+
+ return result
+
+
+def load_backend(store_id, store_prefix, args):
+ """ Return the kv store backend based on the command line arguments
+ """
+
+ def load_consul_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+ host, port = args.consul.split(':', 1)
+ return ConsulStore(host, int(port), instance_core_store_prefix)
+
+ def load_etcd_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+ host, port = args.etcd.split(':', 1)
+ return EtcdStore(host, int(port), instance_core_store_prefix)
+
+ loaders = {
+ 'none': lambda: None,
+ 'consul': load_consul_store,
+ 'etcd': load_etcd_store
+ }
+
+ return loaders[args.backend]()
diff --git a/python/core/config/config_branch.py b/python/core/config/config_branch.py
new file mode 100644
index 0000000..207818b
--- /dev/null
+++ b/python/core/config/config_branch.py
@@ -0,0 +1,53 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Class to hold revisions, latest revision, etc., for a config node, used
+for the active committed revisions or revisions part of a transaction.
+"""
+
+from collections import OrderedDict
+from weakref import WeakValueDictionary
+
+
+class ConfigBranch(object):
+
+ __slots__ = (
+ '_node', # ref to node
+ '_txid', # txid for this branch (None for the committed branch)
+ '_origin', # _latest at time of branching on default branch
+ '_revs', # dict of rev-hash to ref of ConfigRevision
+ '_latest', # ref to latest committed ConfigRevision
+ '__weakref__'
+ )
+
+ def __init__(self, node, txid=None, origin=None, auto_prune=True):
+ self._node = node
+ self._txid = txid
+ self._origin = origin
+ self._revs = WeakValueDictionary() if auto_prune else OrderedDict()
+ self._latest = origin
+
+ def __getitem__(self, hash):
+ return self._revs[hash]
+
+ @property
+ def latest(self):
+ return self._latest
+
+ @property
+ def origin(self):
+ return self._origin
diff --git a/python/core/config/config_event_bus.py b/python/core/config/config_event_bus.py
new file mode 100644
index 0000000..e56d77a
--- /dev/null
+++ b/python/core/config/config_event_bus.py
@@ -0,0 +1,66 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import structlog
+from enum import Enum
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
+from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
+
+IGNORED_CALLBACKS = [CallbackType.PRE_ADD, CallbackType.GET,
+ CallbackType.POST_LISTCHANGE, CallbackType.PRE_REMOVE,
+ CallbackType.PRE_UPDATE]
+
+log = structlog.get_logger()
+
+class ConfigEventBus(object):
+
+ __slots__ = (
+ '_event_bus_client', # The event bus client used to publish events.
+ '_topic' # the topic to publish to
+ )
+
+ def __init__(self):
+ self._event_bus_client = EventBusClient()
+ self._topic = 'model-change-events'
+
+ def advertise(self, type, data, hash=None):
+ if type in IGNORED_CALLBACKS:
+ log.info('Ignoring event {} with data {}'.format(type, data))
+ return
+
+ if type is CallbackType.POST_ADD:
+ kind = ConfigEventType.add
+ elif type is CallbackType.POST_REMOVE:
+ kind = ConfigEventType.remove
+ else:
+ kind = ConfigEventType.update
+
+ if isinstance(data, Message):
+ msg = dumps(MessageToDict(data, True, True))
+ else:
+ msg = data
+
+ event = ConfigEvent(
+ type=kind,
+ hash=hash,
+ data=msg
+ )
+
+ self._event_bus_client.publish(self._topic, event)
+
diff --git a/python/core/config/config_node.py b/python/core/config/config_node.py
new file mode 100644
index 0000000..ab73484
--- /dev/null
+++ b/python/core/config/config_node.py
@@ -0,0 +1,617 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from copy import copy
+
+from jsonpatch import JsonPatch
+from jsonpatch import make_patch
+
+from common.utils.json_format import MessageToDict
+from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_event_bus import ConfigEventBus
+from voltha.core.config.config_proxy import CallbackType, ConfigProxy
+from voltha.core.config.config_rev import is_proto_message, children_fields, \
+ ConfigRevision, access_rights
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import merge_3way
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def message_to_dict(m):
+ return MessageToDict(m, True, True, False)
+
+
+def check_access_violation(new_msg, old_msg):
+ """Raise ValueError if attempt is made to change a read-only field"""
+ access_map = access_rights(new_msg.__class__)
+ violated_fields = []
+ for field_name, access in access_map.iteritems():
+ if access == meta_pb2.READ_ONLY:
+ if getattr(new_msg, field_name) != getattr(old_msg, field_name):
+ violated_fields.append(field_name)
+ if violated_fields:
+ raise ValueError('Cannot change read-only field(s) %s' %
+ ', '.join('"%s"' % f for f in violated_fields))
+
+
+def find_rev_by_key(revs, keyname, value):
+ for i, rev in enumerate(revs):
+ if getattr(rev._config._data, keyname) == value:
+ return i, rev
+ raise KeyError('key {}={} not found'.format(keyname, value))
+
+
+class ConfigNode(object):
+ """
+ Represents a configuration node which can hold a number of revisions
+ of the configuration for this node.
+ When the configuration changes, the new version is appended to the
+ node.
+ Initial data must be a protobuf message and it will determine the type of
+ this node.
+ """
+ __slots__ = (
+ '_root', # ref to root node
+ '_type', # node type, as __class__ of protobuf message
+ '_branches', # dict of transaction branches and a default (committed)
+ # branch
+ '_tags', # dict of tag-name to ref of ConfigRevision
+ '_proxy', # ref to proxy observer or None if no proxy assigned
+ '_event_bus', # ref to event_bus or None if no event bus is assigned
+ '_auto_prune'
+ )
+
+ def __init__(self, root, initial_data, auto_prune=True, txid=None):
+ self._root = root
+ self._branches = {}
+ self._tags = {}
+ self._proxy = None
+ self._event_bus = None
+ self._auto_prune = auto_prune
+
+ if isinstance(initial_data, type):
+ self._type = initial_data
+ elif is_proto_message(initial_data):
+ self._type = initial_data.__class__
+ copied_data = initial_data.__class__()
+ copied_data.CopyFrom(initial_data)
+ self._initialize(copied_data, txid)
+ else:
+ raise NotImplementedError()
+
+ def _mknode(self, *args, **kw):
+ return ConfigNode(self._root, *args, **kw)
+
+ def _mkrev(self, *args, **kw):
+ return self._root.mkrev(*args, **kw)
+
+ def _initialize(self, data, txid):
+ # separate external children data away from locally stored data
+ # based on child_node annotations in protobuf
+ children = {}
+ for field_name, field in children_fields(self._type).iteritems():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if field.key:
+ keys_seen = set()
+ children[field_name] = lst = []
+ for v in field_value:
+ rev = self._mknode(v, txid=txid).latest
+ key = getattr(v, field.key)
+ if key in keys_seen:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ lst.append(rev)
+ keys_seen.add(key)
+ else:
+ children[field_name] = [
+ self._mknode(v, txid=txid).latest for v in field_value]
+ else:
+ children[field_name] = [
+ self._mknode(field_value, txid=txid).latest]
+ data.ClearField(field_name)
+
+ branch = ConfigBranch(self, auto_prune=self._auto_prune)
+ rev = self._mkrev(branch, data, children)
+ self._make_latest(branch, rev)
+ self._branches[txid] = branch
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # these convenience short-cuts only work for the committed branch
+
+ @property
+ def revisions(self):
+ return [r._hash for r in self._branches[None]._revs.itervalues()]
+
+ @property
+ def latest(self):
+ return self._branches[None]._latest
+
+ def __getitem__(self, hash):
+ return self._branches[None]._revs[hash]
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
+
+ # depth preparation
+ if deep:
+ depth = -1
+
+ # path preparation
+ path = '' if path is None else path
+ while path.startswith('/'):
+ path = path[1:]
+
+ # determine branch; if lookup fails, it is ok to use default branch
+ branch = self._branches.get(txid, None) or self._branches[None]
+
+ # determine rev
+ if hash is not None:
+ rev = branch._revs[hash]
+ else:
+ rev = branch.latest
+
+ return self._get(rev, path, depth)
+
+ def _get(self, rev, path, depth):
+
+ if not path:
+ return self._do_get(rev, depth)
+
+ # ... otherwise
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if field.key:
+ children = rev._children[name]
+ if path:
+ # need to escalate further
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ _, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+ else:
+ # we are the node of interest
+ response = []
+ for child_rev in children:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ if path:
+ raise LookupError(
+ 'Cannot index into container with no key defined')
+ response = []
+ for child_rev in rev._children[name]:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+
+ def _do_get(self, rev, depth):
+ msg = rev.get(depth)
+ if self._proxy is not None:
+ msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
+ return msg
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def update(self, path, data, strict=False, txid=None, mk_branch=None):
+
+ while path.startswith('/'):
+ path = path[1:]
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ if not path:
+ return self._do_update(branch, data, strict)
+
+ rev = branch._latest # change is always made to the latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot update a list')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ # chek if deep copy will work better
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ if new_child_rev.hash == child_rev.hash:
+ # When the new_child_rev goes out of scope,
+ # it's destructor gets invoked as it is not being
+ # referred by any other data structures. To prevent
+ # this to trigger the hash it is holding from being
+ # erased in the db, its hash is set to None. If the
+ # new_child_rev object is pointing at the same address
+ # as the child_rev address then do not clear the hash
+ if new_child_rev != child_rev:
+ log.debug('clear-hash',
+ hash=new_child_rev.hash, object_ref=new_child_rev)
+ new_child_rev.clear_hash()
+ return branch._latest
+ if getattr(new_child_rev.data, field.key) != key:
+ raise ValueError('Cannot change key field')
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ rev = rev.update_children(name, [new_child_rev], branch)
+ self._make_latest(branch, rev)
+ return rev
+
+ def _do_update(self, branch, data, strict):
+ if not isinstance(data, self._type):
+ raise ValueError(
+ '"{}" is not a valid data type for this node'.format(
+ data.__class__.__name__))
+ self._test_no_children(data)
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
+
+ if branch._latest.data != data:
+ if strict:
+ # check if attempt is made to change read-only field
+ check_access_violation(data, branch._latest.data)
+ rev = branch._latest.update_data(data, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_UPDATE, rev.data),))
+ return rev
+ else:
+ return branch._latest
+
+ def _make_latest(self, branch, rev, change_announcements=()):
+ # Update the latest branch only when the hash between the previous
+ # data and the new rev is different, otherwise this will trigger the
+ # data already saved in the db (with that hash) to be erased
+ if rev.hash not in branch._revs:
+ branch._revs[rev.hash] = rev
+
+ if not branch._latest or rev.hash != branch._latest.hash:
+ branch._latest = rev
+
+ # announce only if this is main branch
+ if change_announcements and branch._txid is None:
+
+ if self._proxy is not None:
+ for change_type, data in change_announcements:
+ # since the callback may operate on the config tree,
+ # we have to defer the execution of the callbacks till
+ # the change is propagated to the root, then root will
+ # call the callbacks
+ self._root.enqueue_callback(
+ self._proxy.invoke_callbacks,
+ change_type,
+ data,
+ proceed_on_errors=1,
+ )
+
+ for change_type, data in change_announcements:
+ self._root.enqueue_notification_callback(
+ self._mk_event_bus().advertise,
+ change_type,
+ data,
+ hash=rev.hash
+ )
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot add to non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ # we do need to add a new child to the field
+ if field.key:
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_ADD, data)
+ children = copy(rev._children[name])
+ key = getattr(data, field.key)
+ try:
+ find_rev_by_key(children, field.key, key)
+ except KeyError:
+ pass
+ else:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ child_rev = self._mknode(data).latest
+ children.append(child_rev)
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_ADD, data),))
+ return rev
+ else:
+ # adding to non-keyed containers not implemented yet
+ raise ValueError('Cannot add to non-keyed container')
+ else:
+ if field.key:
+ # need to escalate
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ new_child_rev = child_node.add(path, data, txid, mk_branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError(
+ 'Cannot index into container with no keys')
+ else:
+ raise ValueError('Cannot add to non-container field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def remove(self, path, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot remove from non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError("Cannot remove without a key")
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ if path:
+ # need to escalate
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ new_child_rev = child_node.remove(path, txid, mk_branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ # need to remove from this very node
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ if self._proxy is not None:
+ data = child_rev.data
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_REMOVE, data)
+ post_anno = ((CallbackType.POST_REMOVE, data),)
+ else:
+ post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
+ del children[idx]
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev, post_anno)
+ return rev
+ else:
+ raise ValueError('Cannot remove from non-keyed container')
+ else:
+ raise ValueError('Cannot remove non-conatiner field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _mk_txbranch(self, txid):
+ branch_point = self._branches[None].latest
+ branch = ConfigBranch(self, txid, branch_point)
+ self._branches[txid] = branch
+ return branch
+
+ def _del_txbranch(self, txid):
+ del self._branches[txid]
+
+ def _merge_txbranch(self, txid, dry_run=False):
+ """
+ Make latest in branch to be latest in the common branch, but only
+ if no conflict is detected. Conflict is where the txbranch branch
+ point no longer matches the latest in the default branch. This has
+ to be verified recursively.
+ """
+
+ def merge_child(child_rev):
+ child_branch = child_rev._branch
+ if child_branch._txid == txid:
+ child_rev = child_branch._node._merge_txbranch(txid, dry_run)
+ return child_rev
+
+ src_branch = self._branches[txid]
+ dst_branch = self._branches[None]
+
+ fork_rev = src_branch.origin # rev from which src branch was made
+ src_rev = src_branch.latest # head rev of source branch
+ dst_rev = dst_branch.latest # head rev of target branch
+
+ rev, changes = merge_3way(
+ fork_rev, src_rev, dst_rev, merge_child, dry_run)
+
+ if not dry_run:
+ self._make_latest(dst_branch, rev, change_announcements=changes)
+ del self._branches[txid]
+
+ return rev
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def diff(self, hash1, hash2=None, txid=None):
+ branch = self._branches[txid]
+ rev1 = branch[hash1]
+ rev2 = branch[hash2] if hash2 else branch._latest
+ if rev1.hash == rev2.hash:
+ return JsonPatch([])
+ else:
+ dict1 = message_to_dict(rev1.data)
+ dict2 = message_to_dict(rev2.data)
+ return make_patch(dict1, dict2)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def tag(self, tag, hash=None):
+ branch = self._branches[None] # tag only what has been committed
+ rev = branch._latest if hash is None else branch._revs[hash]
+ self._tags[tag] = rev
+ self.persist_tags()
+ return self
+
+ @property
+ def tags(self):
+ return sorted(self._tags.iterkeys())
+
+ def by_tag(self, tag):
+ """
+ Return revision based on tag
+ :param tag: previously registered tag value
+ :return: revision object
+ """
+ return self._tags[tag]
+
+ def diff_by_tag(self, tag1, tag2):
+ return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
+
+ def delete_tag(self, tag):
+ del self._tags[tag]
+ self.persist_tags()
+
+ def delete_tags(self, *tags):
+ for tag in tags:
+ del self._tags[tag]
+ self.persist_tags()
+
+ def prune_untagged(self):
+ branch = self._branches[None]
+ keep = set(rev.hash for rev in self._tags.itervalues())
+ keep.add(branch._latest.hash)
+ for hash in branch._revs.keys():
+ if hash not in keep:
+ del branch._revs[hash]
+ return self
+
+ def persist_tags(self):
+ """
+ Persist tag information to the backend
+ """
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _test_no_children(self, data):
+ for field_name, field in children_fields(self._type).items():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if len(field_value):
+ raise NotImplementedError(
+ 'Cannot update external children')
+ else:
+ if data.HasField(field_name):
+ raise NotImplementedError(
+ 'Cannot update externel children')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get_proxy(self, path, exclusive=False):
+ return self._get_proxy(path, self, path, exclusive)
+
+ def _get_proxy(self, path, root, full_path, exclusive):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ return self._mk_proxy(root, full_path, exclusive)
+
+ # need to escalate
+ rev = self._branches[None]._latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot proxy a container field')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = rev._children[name]
+ _, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ def _mk_proxy(self, root, full_path, exclusive):
+ if self._proxy is None:
+ self._proxy = ConfigProxy(root, self, full_path, exclusive)
+ else:
+ if self._proxy.exclusive:
+ raise ValueError('Node is already owned exclusively')
+ return self._proxy
+
+ def _mk_event_bus(self):
+ if self._event_bus is None:
+ self._event_bus = ConfigEventBus()
+ return self._event_bus
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def load_latest(self, latest_hash):
+
+ root = self._root
+ kv_store = root._kv_store
+
+ branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
+ rev = PersistedConfigRevision.load(
+ branch, kv_store, self._type, latest_hash)
+ self._make_latest(branch, rev)
+ self._branches[None] = branch
diff --git a/python/core/config/config_proxy.py b/python/core/config/config_proxy.py
new file mode 100644
index 0000000..57d8150
--- /dev/null
+++ b/python/core/config/config_proxy.py
@@ -0,0 +1,155 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from enum import Enum
+
+from voltha.core.config.config_txn import ConfigTransaction
+
+log = structlog.get_logger()
+
+
+class OperationContext(object):
+ def __init__(self, path=None, data=None, field_name=None, child_key=None):
+ self.path = path
+ self._data = data
+ self.field_name = field_name
+ self.child_key = child_key
+ @property
+ def data(self):
+ return self._data
+ def update(self, data):
+ self._data = data
+ return self
+ def __repr__(self):
+ return 'OperationContext({})'.format(self.__dict__)
+
+
+class CallbackType(Enum):
+
+ # GET hooks are called after the data is retrieved and can be used to
+ # augment the data (they should only augment fields marked as REAL_TIME
+ GET = 1
+
+ # PRE_UPDATE hooks are called before the change is made and are supposed
+ # to be used to reject the data by raising an exception. If they don't,
+ # the change will be applied.
+ PRE_UPDATE = 2
+
+ # POST_UPDATE hooks are called after the update has occurred and can
+ # be used to deal with the change. For instance, an adapter can use the
+ # callback to trigger the south-bound configuration
+ POST_UPDATE = 3
+
+ # These behave similarly to the update callbacks as described above.
+ PRE_ADD = 4
+ POST_ADD = 5
+ PRE_REMOVE = 6
+ POST_REMOVE = 7
+
+ # Bulk list change due to transaction commit that changed items in
+ # non-keyed container fields
+ POST_LISTCHANGE = 8
+
+
+class ConfigProxy(object):
+ """
+ Allows an entity to look at a sub-tree and see it as it was the whole tree
+ """
+ __slots__ = (
+ '_root',
+ '_node',
+ '_path',
+ '_exclusive',
+ '_callbacks'
+ )
+
+ def __init__(self, root, node, path, exclusive):
+ self._root = root
+ self._node = node
+ self._exclusive = exclusive
+ self._path = path # full path to proxied node
+ self._callbacks = {} # call back type -> list of callbacks
+
+ @property
+ def exclusive(self):
+ return self._exclusive
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~ CRUD handlers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path='/', depth=None, deep=None, txid=None):
+ return self._node.get(path, depth=depth, deep=deep, txid=txid)
+
+ def update(self, path, data, strict=False, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.update(full_path, data, strict, txid=txid)
+
+ def add(self, path, data, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.add(full_path, data, txid=txid)
+
+ def remove(self, path, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.remove(full_path, txid=txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Transaction support ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def open_transaction(self):
+ """Open a new transaction"""
+ txid = self._root.mk_txbranch()
+ return ConfigTransaction(self, txid)
+
+ def commit_transaction(self, txid):
+ """
+ If having an open transaction, commit it now. Will raise exception
+ if conflict is detected. Either way, transaction will be deleted.
+ """
+ self._root.fold_txbranch(txid)
+
+ def cancel_transaction(self, txid):
+ """
+ Cancel current transaction if we are in a transaction. Always succeeds.
+ """
+ self._root.del_txbranch(txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~ Callbacks registrations ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def register_callback(self, callback_type, callback, *args, **kw):
+ lst = self._callbacks.setdefault(callback_type, [])
+ lst.append((callback, args, kw))
+
+ def unregister_callback(self, callback_type, callback, *args, **kw):
+ lst = self._callbacks.setdefault(callback_type, [])
+ if (callback, args, kw) in lst:
+ lst.remove((callback, args, kw))
+
+ # ~~~~~~~~~~~~~~~~~~~~~ Callback dispatch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def invoke_callbacks(self, callback_type, context, proceed_on_errors=False):
+ lst = self._callbacks.get(callback_type, [])
+ for callback, args, kw in lst:
+ try:
+ context = callback(context, *args, **kw)
+ except Exception, e:
+ if proceed_on_errors:
+ log.exception(
+ 'call-back-error', callback_type=callback_type,
+ context=context, e=e)
+ else:
+ raise
+ return context
diff --git a/python/core/config/config_rev.py b/python/core/config/config_rev.py
new file mode 100644
index 0000000..8bfac18
--- /dev/null
+++ b/python/core/config/config_rev.py
@@ -0,0 +1,342 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Immutable classes to store config revision information arranged in a tree.
+
+Immutability cannot be enforced in Python, so anyoen working with these
+classes directly must obey the rules.
+"""
+
+import weakref
+from copy import copy
+from hashlib import md5
+
+from google.protobuf.descriptor import Descriptor
+from simplejson import dumps
+
+from common.utils.json_format import MessageToJson
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def is_proto_message(o):
+ """
+ Return True if object o appears to be a protobuf message; False otherwise.
+ """
+ # use a somewhat empirical approach to decide if something looks like
+ # a protobuf message
+ return isinstance(getattr(o, 'DESCRIPTOR', None), Descriptor)
+
+
+def message_to_json_concise(m):
+ """
+ Return the most concise string representation of a protobuf. Good for
+ things where size matters (e.g., generating hash).
+ """
+ return MessageToJson(m, False, True, False)
+
+
+_rev_cache = weakref.WeakValueDictionary() # cache of config revs
+
+
+_children_fields_cache = {} # to memoize externally stored field name info
+
+
+class _ChildType(object):
+ """Used to store key metadata about child_node fields in protobuf messages.
+ """
+ __slots__ = (
+ '_module',
+ '_type',
+ '_is_container',
+ '_key',
+ '_key_from_str'
+ )
+
+ def __init__(self, module, type, is_container,
+ key=None, key_from_str=None):
+ self._module = module
+ self._type = type
+ self._is_container = is_container
+ self._key = key
+ self._key_from_str = key_from_str
+
+ @property
+ def is_container(self):
+ return self._is_container
+
+ @property
+ def key(self):
+ return self._key
+
+ @property
+ def key_from_str(self):
+ return self._key_from_str
+
+ @property
+ def module(self):
+ return self._module
+
+ @property
+ def type(self):
+ return self._type
+
+
+def children_fields(cls):
+ """
+ Return a map of externally stored fields for this protobuf message type.
+ What is stored as branch node is determined by the "child_node"
+ annotation in the protobuf definitions.
+ With each external field, we store if the field is a container, if a
+ container is keyed (indexed), and what is the function that converts
+ path substring back to the key.
+ """
+ names = _children_fields_cache.get(cls)
+
+ if names is None:
+ names = {}
+
+ for field in cls.DESCRIPTOR.fields:
+
+ if field.has_options:
+ options = field.GetOptions()
+
+ if options.HasExtension(meta_pb2.child_node):
+ is_container = field.label == 3
+ meta = options.Extensions[meta_pb2.child_node]
+ key_from_str = None
+
+ if meta.key:
+ key_field = field.message_type.fields_by_name[meta.key]
+ key_type = key_field.type
+
+ if key_type == key_field.TYPE_STRING:
+ key_from_str = lambda s: s
+
+ elif key_type in (
+ key_field.TYPE_FIXED32,
+ key_field.TYPE_FIXED64,
+ key_field.TYPE_INT32,
+ key_field.TYPE_INT64,
+ key_field.TYPE_SFIXED32,
+ key_field.TYPE_SFIXED64,
+ key_field.TYPE_SINT32,
+ key_field.TYPE_SINT64,
+ key_field.TYPE_UINT32,
+ key_field.TYPE_UINT64):
+ key_from_str = lambda s: int(s)
+
+ else:
+ raise NotImplementedError()
+
+ field_class = field.message_type._concrete_class
+ names[field.name] = _ChildType(
+ module=field_class.__module__,
+ type=field_class.__name__,
+ is_container=is_container,
+ key=meta.key,
+ key_from_str=key_from_str
+ )
+
+ _children_fields_cache[cls] = names
+
+ return names
+
+
+_access_right_cache = {} # to memoize field access right restrictions
+
+
+def access_rights(cls):
+ """
+ Determine the access rights for each field and cache these maps for
+ fast retrieval.
+ """
+ access_map = _access_right_cache.get(cls)
+ if access_map is None:
+ access_map = {}
+ for field in cls.DESCRIPTOR.fields:
+ if field.has_options:
+ options = field.GetOptions()
+ if options.HasExtension(meta_pb2.access):
+ access = options.Extensions[meta_pb2.access]
+ access_map[field.name] = access
+ _access_right_cache[cls] = access_map
+ return access_map
+
+
+class ConfigDataRevision(object):
+ """
+ Holds a specific snapshot of the local configuration for config node.
+ It shall be treated as an immutable object, although in Python this is
+ very difficult to enforce!
+ As such, we can compute a unique hash based on the config data which
+ can be used to establish equivalence. It also has a time-stamp to track
+ changes.
+
+ This object must be treated as immutable, including its nested config data.
+ This is very important. The entire config module depends on hashes
+ we create over the data, so altering the data can lead to unpredictable
+ detriments.
+ """
+
+ __slots__ = (
+ '_data',
+ '_hash',
+ '__weakref__'
+ )
+
+ def __init__(self, data):
+ self._data = data
+ self._hash = self._hash_data(data)
+
+ @property
+ def data(self):
+ return self._data
+
+ @property
+ def hash(self):
+ return self._hash
+
+ @staticmethod
+ def _hash_data(data):
+ """Hash function to be used to track version changes of config nodes"""
+ if isinstance(data, (dict, list)):
+ to_hash = dumps(data, sort_keys=True)
+ elif is_proto_message(data):
+ to_hash = ':'.join((
+ data.__class__.__module__,
+ data.__class__.__name__,
+ data.SerializeToString()))
+ else:
+ to_hash = str(hash(data))
+ return md5(to_hash).hexdigest()[:12]
+
+
+class ConfigRevision(object):
+ """
+ Holds not only the local config data, but also the external children
+ reference lists, per field name.
+ Recall that externally stored fields are those marked "child_node" in
+ the protobuf definition.
+ This object must be treated as immutable, including its config data.
+ """
+
+ __slots__ = (
+ '_config',
+ '_children',
+ '_hash',
+ '_branch',
+ '__weakref__'
+ )
+
+ def __init__(self, branch, data, children=None):
+ self._branch = branch
+ self._config = ConfigDataRevision(data)
+ self._children = children
+ self._finalize()
+
+ def _finalize(self):
+ self._hash = self._hash_content()
+ if self._hash not in _rev_cache:
+ _rev_cache[self._hash] = self
+ if self._config._hash not in _rev_cache:
+ _rev_cache[self._config._hash] = self._config
+ else:
+ self._config = _rev_cache[self._config._hash] # re-use!
+
+ def _hash_content(self):
+ # hash is derived from config hash and hashes of all children
+ m = md5('' if self._config is None else self._config._hash)
+ if self._children is not None:
+ for child_field in sorted(self._children.keys()):
+ children = self._children[child_field]
+ assert isinstance(children, list)
+ m.update(''.join(c._hash for c in children))
+ return m.hexdigest()[:12]
+
+ @property
+ def hash(self):
+ return self._hash
+
+ @property
+ def data(self):
+ return None if self._config is None else self._config.data
+
+ @property
+ def node(self):
+ return self._branch._node
+
+ @property
+ def type(self):
+ return self._config.data.__class__
+
+ def clear_hash(self):
+ self._hash = None
+
+ def get(self, depth):
+ """
+ Get config data of node. If depth > 0, recursively assemble the
+ branch nodes. If depth is < 0, this results in a fully exhaustive
+ "complete config".
+ """
+ orig_data = self._config.data
+ data = orig_data.__class__()
+ data.CopyFrom(orig_data)
+ if depth:
+ # collect children
+ cfields = children_fields(self.type).iteritems()
+ for field_name, field in cfields:
+ if field.is_container:
+ for rev in self._children[field_name]:
+ child_data = rev.get(depth=depth - 1)
+ child_data_holder = getattr(data, field_name).add()
+ child_data_holder.MergeFrom(child_data)
+ else:
+ rev = self._children[field_name][0]
+ child_data = rev.get(depth=depth - 1)
+ child_data_holder = getattr(data, field_name)
+ child_data_holder.MergeFrom(child_data)
+ return data
+
+ def update_data(self, data, branch):
+ """Return a NEW revision which is updated for the modified data"""
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._config = self._config.__class__(data)
+ new_rev._finalize()
+ return new_rev
+
+ def update_children(self, name, children, branch):
+ """Return a NEW revision which is updated for the modified children"""
+ new_children = self._children.copy()
+ new_children[name] = children
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._children = new_children
+ new_rev._finalize()
+ return new_rev
+
+ def update_all_children(self, children, branch):
+ """Return a NEW revision which is updated for all children entries"""
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._children = children
+ new_rev._finalize()
+ return new_rev
diff --git a/python/core/config/config_rev_persisted.py b/python/core/config/config_rev_persisted.py
new file mode 100644
index 0000000..8b25b82
--- /dev/null
+++ b/python/core/config/config_rev_persisted.py
@@ -0,0 +1,143 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A config rev object that persists itself
+"""
+from bz2 import compress, decompress
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_rev import ConfigRevision, children_fields
+
+log = structlog.get_logger()
+
+
+class PersistedConfigRevision(ConfigRevision):
+
+ compress = False
+
+ __slots__ = ('_kv_store',)
+
+ def __init__(self, branch, data, children=None):
+ self._kv_store = branch._node._root._kv_store
+ super(PersistedConfigRevision, self).__init__(branch, data, children)
+
+ def _finalize(self):
+ super(PersistedConfigRevision, self)._finalize()
+ self.store()
+
+ def __del__(self):
+ try:
+ if self._hash:
+ if self._config.__weakref__ is None:
+ if self._config._hash in self._kv_store:
+ del self._kv_store[self._config._hash]
+ assert self.__weakref__ is None
+ if self._hash in self._kv_store:
+ del self._kv_store[self._hash]
+ except Exception, e:
+ # this should never happen
+ log.exception('del-error', hash=self.hash, e=e)
+
+ def store(self):
+
+ try:
+ # crude serialization of children hash and config data hash
+ if self._hash in self._kv_store:
+ return
+
+ self.store_config()
+
+ children_lists = {}
+ for field_name, children in self._children.iteritems():
+ hashes = [rev.hash for rev in children]
+ children_lists[field_name] = hashes
+
+ data = dict(
+ children=children_lists,
+ config=self._config._hash
+ )
+ blob = dumps(data)
+ if self.compress:
+ blob = compress(blob)
+
+ self._kv_store[self._hash] = blob
+
+ except Exception, e:
+ log.exception('store-error', e=e)
+
+ @classmethod
+ def load(cls, branch, kv_store, msg_cls, hash):
+ # Update the branch's config store
+ blob = kv_store[hash]
+ if cls.compress:
+ blob = decompress(blob)
+ data = loads(blob)
+
+ config_hash = data['config']
+ config_data = cls.load_config(kv_store, msg_cls, config_hash)
+
+ children_list = data['children']
+ assembled_children = {}
+ node = branch._node
+ for field_name, meta in children_fields(msg_cls).iteritems():
+ child_msg_cls = tmp_cls_loader(meta.module, meta.type)
+ children = []
+ for child_hash in children_list[field_name]:
+ child_node = node._mknode(child_msg_cls)
+ child_node.load_latest(child_hash)
+ child_rev = child_node.latest
+ children.append(child_rev)
+ assembled_children[field_name] = children
+ rev = cls(branch, config_data, assembled_children)
+ return rev
+
+ def store_config(self):
+ if self._config._hash in self._kv_store:
+ return
+
+ # crude serialization of config data
+ blob = self._config._data.SerializeToString()
+ if self.compress:
+ blob = compress(blob)
+
+ self._kv_store[self._config._hash] = blob
+
+ @classmethod
+ def load_config(cls, kv_store, msg_cls, config_hash):
+ blob = kv_store[config_hash]
+ if cls.compress:
+ blob = decompress(blob)
+
+ # TODO use a loader later on
+ data = msg_cls()
+ data.ParseFromString(blob)
+ return data
+
+
+def tmp_cls_loader(module_name, cls_name):
+ # TODO this shall be generalized
+ from voltha.protos import voltha_pb2, health_pb2, adapter_pb2, \
+ logical_device_pb2, device_pb2, openflow_13_pb2, bbf_fiber_base_pb2, \
+ bbf_fiber_traffic_descriptor_profile_body_pb2, \
+ bbf_fiber_tcont_body_pb2, bbf_fiber_gemport_body_pb2, \
+ bbf_fiber_multicast_gemport_body_pb2, \
+ bbf_fiber_multicast_distribution_set_body_pb2, \
+ omci_mib_db_pb2, \
+ omci_alarm_db_pb2
+ return getattr(locals()[module_name], cls_name)
diff --git a/python/core/config/config_root.py b/python/core/config/config_root.py
new file mode 100644
index 0000000..4b1006d
--- /dev/null
+++ b/python/core/config/config_root.py
@@ -0,0 +1,229 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from uuid import uuid4
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_node import ConfigNode
+from voltha.core.config.config_rev import ConfigRevision
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import MergeConflictException
+
+log = structlog.get_logger()
+
+
+class ConfigRoot(ConfigNode):
+
+ __slots__ = (
+ '_dirty_nodes', # holds set of modified nodes per transaction branch
+ '_kv_store',
+ '_loading',
+ '_rev_cls',
+ '_deferred_callback_queue',
+ '_notification_deferred_callback_queue'
+ )
+
+ def __init__(self, initial_data, kv_store=None, rev_cls=ConfigRevision):
+ self._kv_store = kv_store
+ self._dirty_nodes = {}
+ self._loading = False
+ if kv_store is not None and \
+ not issubclass(rev_cls, PersistedConfigRevision):
+ rev_cls = PersistedConfigRevision
+ self._rev_cls = rev_cls
+ self._deferred_callback_queue = []
+ self._notification_deferred_callback_queue = []
+ super(ConfigRoot, self).__init__(self, initial_data, False)
+
+ @property
+ def kv_store(self):
+ if self._loading:
+ # provide fake store for storing things
+ # TODO this shall be a fake_dict providing noop for all relevant
+ # operations
+ return dict()
+ else:
+ return self._kv_store
+
+ def mkrev(self, *args, **kw):
+ return self._rev_cls(*args, **kw)
+
+ def mk_txbranch(self):
+ txid = uuid4().hex[:12]
+ self._dirty_nodes[txid] = {self}
+ self._mk_txbranch(txid)
+ return txid
+
+ def del_txbranch(self, txid):
+ for dirty_node in self._dirty_nodes[txid]:
+ dirty_node._del_txbranch(txid)
+ del self._dirty_nodes[txid]
+
+ def fold_txbranch(self, txid):
+ try:
+ self._merge_txbranch(txid, dry_run=1)
+ except MergeConflictException:
+ self.del_txbranch(txid)
+ raise
+
+ try:
+ self._merge_txbranch(txid)
+ finally:
+ self.execute_deferred_callbacks()
+
+ # ~~~~~~ Overridden, root-level CRUD methods to handle transactions ~~~~~~~
+
+ def update(self, path, data, strict=None, txid=None, mk_branch=None):
+ assert mk_branch is None
+ self.check_callback_queue()
+ try:
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ res = super(ConfigRoot, self).update(path, data, strict,
+ txid, track_dirty)
+ else:
+ res = super(ConfigRoot, self).update(path, data, strict)
+ finally:
+ self.execute_deferred_callbacks()
+ return res
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ assert mk_branch is None
+ self.check_callback_queue()
+ try:
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ res = super(ConfigRoot, self).add(path, data, txid, track_dirty)
+ else:
+ res = super(ConfigRoot, self).add(path, data)
+ finally:
+ self.execute_deferred_callbacks()
+ return res
+
+ def remove(self, path, txid=None, mk_branch=None):
+ assert mk_branch is None
+ self.check_callback_queue()
+ try:
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ res = super(ConfigRoot, self).remove(path, txid, track_dirty)
+ else:
+ res = super(ConfigRoot, self).remove(path)
+ finally:
+ self.execute_deferred_callbacks()
+ return res
+
+ def check_callback_queue(self):
+ assert len(self._deferred_callback_queue) == 0
+
+ def enqueue_callback(self, func, *args, **kw):
+ self._deferred_callback_queue.append((func, args, kw))
+
+ def enqueue_notification_callback(self, func, *args, **kw):
+ """
+ A separate queue is required for notification. Previously, when the
+ notifications were added to the self._deferred_callback_queue there
+ was a deadlock condition where two callbacks were added (one
+ related to the model change and one for the notification related to
+ that model change). Since the model change requires the
+ self._deferred_callback_queue to be empty then there was a deadlock
+ in that scenario. The simple approach to avoid this problem is to
+ have separate queues for model and notification.
+ TODO: Investigate whether there is a need for the
+ self._deferred_callback_queue to handle multiple model events at the same time
+ :param func: callback function
+ :param args: args
+ :param kw: key-value args
+ :return: None
+ """
+ self._notification_deferred_callback_queue.append((func, args, kw))
+
+ def execute_deferred_callbacks(self):
+ # First process the model-triggered related callbacks
+ while self._deferred_callback_queue:
+ func, args, kw = self._deferred_callback_queue.pop(0)
+ func(*args, **kw)
+
+ # Execute the notification callbacks
+ while self._notification_deferred_callback_queue:
+ func, args, kw = self._notification_deferred_callback_queue.pop(0)
+ func(*args, **kw)
+
+
+ # ~~~~~~~~~~~~~~~~ Persistence related ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ @classmethod
+ def load(cls, root_msg_cls, kv_store):
+ # need to use fake kv store during initial load for not to override
+ # our real k vstore
+ fake_kv_store = dict() # shall use more efficient mock dict
+ root = cls(root_msg_cls(), kv_store=fake_kv_store,
+ rev_cls=PersistedConfigRevision)
+ # we can install the real store now
+ root._kv_store = kv_store
+ root.load_from_persistence(root_msg_cls)
+ return root
+
+ def _make_latest(self, branch, *args, **kw):
+ super(ConfigRoot, self)._make_latest(branch, *args, **kw)
+ # only persist the committed branch
+ if self._kv_store is not None and branch._txid is None:
+ root_data = dict(
+ latest=branch._latest._hash,
+ tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+ )
+ blob = dumps(root_data)
+ self._kv_store['root'] = blob
+
+ def persist_tags(self):
+ if self._kv_store is not None:
+ root_data = loads(self.kv_store['root'])
+ root_data = dict(
+ latest=root_data['latest'],
+ tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+ )
+ blob = dumps(root_data)
+ self._kv_store['root'] = blob
+
+ def load_from_persistence(self, root_msg_cls):
+ self._loading = True
+ blob = self._kv_store['root']
+ root_data = loads(blob)
+
+ for tag, hash in root_data['tags'].iteritems():
+ self.load_latest(hash)
+ self._tags[tag] = self.latest
+
+ self.load_latest(root_data['latest'])
+
+ self._loading = False
+
diff --git a/python/core/config/config_txn.py b/python/core/config/config_txn.py
new file mode 100644
index 0000000..87dfc59
--- /dev/null
+++ b/python/core/config/config_txn.py
@@ -0,0 +1,73 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class ClosedTransactionError(Exception):
+ pass
+
+
+class ConfigTransaction(object):
+
+ __slots__ = (
+ '_proxy',
+ '_txid'
+ )
+
+ def __init__(self, proxy, txid):
+ self._proxy = proxy
+ self._txid = txid
+
+ def __del__(self):
+ if self._txid:
+ try:
+ self.cancel()
+ except:
+ raise
+
+ # ~~~~~~~~~~~~~~~~~~~~ CRUD ops within the transaction ~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path='/', depth=None, deep=None):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.get(path, depth=depth, deep=deep, txid=self._txid)
+
+ def update(self, path, data, strict=False):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.update(path, data, strict, self._txid)
+
+ def add(self, path, data):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.add(path, data, self._txid)
+
+ def remove(self, path):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.remove(path, self._txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~ transaction finalization ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def cancel(self):
+ """Explicitly cancel the transaction"""
+ self._proxy.cancel_transaction(self._txid)
+ self._txid = None
+
+ def commit(self):
+ """Commit all transaction changes"""
+ try:
+ self._proxy.commit_transaction(self._txid)
+ finally:
+ self._txid = None
diff --git a/python/core/config/merge_3way.py b/python/core/config/merge_3way.py
new file mode 100644
index 0000000..5444a6c
--- /dev/null
+++ b/python/core/config/merge_3way.py
@@ -0,0 +1,267 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+3-way merge function for config rev objects.
+"""
+from collections import OrderedDict
+from copy import copy
+
+from voltha.core.config.config_proxy import CallbackType, OperationContext
+from voltha.core.config.config_rev import children_fields
+
+
+class MergeConflictException(Exception):
+ pass
+
+
+def merge_3way(fork_rev, src_rev, dst_rev, merge_child_func, dry_run=False):
+ """
+ Attempt to merge src_rev into dst_rev but taking into account what have
+ changed in both revs since the last known common point, the fork_rev.
+ In case of conflict, raise a MergeConflictException(). If dry run is True,
+ don't actually perform the merge, but detect potential conflicts.
+
+ This function recurses into all children nodes stored under the rev and
+ performs the merge if the children is also part of a transaction branch.
+
+ :param fork_rev: Point of forking (last known common state between branches
+ :param src_rev: Latest rev from which we merge to dst_rev
+ :param dst_rev: Target (destination) rev
+ :param merge_child_fun: To run a potential merge in all children that
+ may need merge (determined from the local changes)
+ :param dry_run: If True, do not perform the merge, but detect merge
+ conflicts.
+ :return: The new dst_rev (a new rev instance) the list of changes that
+ occurred in this node or any of its children as part of this merge.
+ """
+
+ # to collect change tuples of (<callback-type>, <op-context>)
+ changes = []
+
+ class AnalyzeChanges(object):
+ def __init__(self, lst1, lst2, keyname):
+ self.keymap1 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst1))
+ self.keymap2 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst2))
+ self.added_keys = [
+ k for k in self.keymap2.iterkeys() if k not in self.keymap1]
+ self.removed_keys = [
+ k for k in self.keymap1.iterkeys() if k not in self.keymap2]
+ self.changed_keys = [
+ k for k in self.keymap1.iterkeys()
+ if k in self.keymap2 and
+ lst1[self.keymap1[k]]._hash != lst2[self.keymap2[k]]._hash
+ ]
+
+ # Note: there are a couple of special cases that can be optimized
+ # for larer on. But since premature optimization is a bad idea, we
+ # defer them.
+
+ # deal with config data first
+ if dst_rev._config is fork_rev._config:
+ # no change in master, accept src if different
+ config_changed = dst_rev._config != src_rev._config
+ else:
+ if dst_rev._config.hash != src_rev._config.hash:
+ raise MergeConflictException('Config collision')
+ config_changed = True
+
+ # now to the external children fields
+ new_children = dst_rev._children.copy()
+ _children_fields = children_fields(fork_rev.data.__class__)
+
+ for field_name, field in _children_fields.iteritems():
+
+ fork_list = fork_rev._children[field_name]
+ src_list = src_rev._children[field_name]
+ dst_list = dst_rev._children[field_name]
+
+ if dst_list == src_list:
+ # we do not need to change the dst, however we still need
+ # to complete the branch purging in child nodes so not
+ # to leave dangling branches around
+ [merge_child_func(rev) for rev in src_list]
+ continue
+
+ if not field.key:
+ # If the list is not keyed, we really should not merge. We merely
+ # check for collision, i.e., if both changed (and not same)
+ if dst_list == fork_list:
+ # dst branch did not change since fork
+
+ assert src_list != fork_list, 'We should not be here otherwise'
+
+ # the incoming (src) rev changed, and we have to apply it
+ new_children[field_name] = [
+ merge_child_func(rev) for rev in src_list]
+
+ if field.is_container:
+ changes.append((CallbackType.POST_LISTCHANGE,
+ OperationContext(field_name=field_name)))
+
+ else:
+ if src_list != fork_list:
+ raise MergeConflictException(
+ 'Cannot merge because single child node or un-keyed'
+ 'children list has changed')
+
+ else:
+
+ if dst_list == fork_list:
+ # Destination did not change
+
+ # We need to analyze only the changes on the incoming rev
+ # since fork
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+
+ new_list = copy(src_list) # we start from the source list
+
+ for key in src.added_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ changes.append(
+ (CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.removed_keys:
+ old_rev = fork_list[src.keymap1[key]]
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ for key in src.changed_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ # updated child gets its own change event
+
+ new_children[field_name] = new_list
+
+ else:
+
+ # For keyed fields we can really investigate what has been
+ # added, removed, or changed in both branches and do a
+ # fine-grained collision detection and merge
+
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+ dst = AnalyzeChanges(fork_list, dst_list, field.key)
+
+ new_list = copy(dst_list) # this time we start with the dst
+
+ for key in src.added_keys:
+ # we cannot add if it has been added and is different
+ if key in dst.added_keys:
+ # it has been added to both, we need to check if
+ # they are the same
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_dst_rev)
+ else:
+ raise MergeConflictException(
+ 'Cannot add because it has been added and '
+ 'different'
+ )
+ else:
+ # this is a brand new key, need to add it
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list.append(new_rev)
+ changes.append((
+ CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.changed_keys:
+ # we cannot change if it was removed in dst
+ if key in dst.removed_keys:
+ raise MergeConflictException(
+ 'Cannot change because it has been removed')
+
+ # if it changed in dst as well, we need to check if they
+ # match (same change
+ elif key in dst.changed_keys:
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_src_rev)
+ elif child_dst_rev._config.hash != child_src_rev._config.hash:
+ raise MergeConflictException(
+ 'Cannot update because it has been changed and '
+ 'different'
+ )
+ else:
+ new_rev = merge_child_func(
+ src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ else:
+ # it only changed in src branch
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ for key in reversed(src.removed_keys): # we go from highest
+ # index to lowest
+
+ # we cannot remove if it has changed in dst
+ if key in dst.changed_keys:
+ raise MergeConflictException(
+ 'Cannot remove because it has changed')
+
+ # if it has not been removed yet from dst, then remove it
+ if key not in dst.removed_keys:
+ dst_idx = dst.keymap2[key]
+ old_rev = new_list.pop(dst_idx)
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ new_children[field_name] = new_list
+
+ if not dry_run:
+ rev = src_rev if config_changed else dst_rev
+ rev = rev.update_all_children(new_children, dst_rev._branch)
+ if config_changed:
+ changes.append((CallbackType.POST_UPDATE, rev.data))
+ return rev, changes
+
+ else:
+ return None, None