blob: ab73484a0a0c4d97d695d59b274dc524dfa2dbea [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from copy import copy
from jsonpatch import JsonPatch
from jsonpatch import make_patch
from common.utils.json_format import MessageToDict
from voltha.core.config.config_branch import ConfigBranch
from voltha.core.config.config_event_bus import ConfigEventBus
from voltha.core.config.config_proxy import CallbackType, ConfigProxy
from voltha.core.config.config_rev import is_proto_message, children_fields, \
ConfigRevision, access_rights
from voltha.core.config.config_rev_persisted import PersistedConfigRevision
from voltha.core.config.merge_3way import merge_3way
from voltha.protos import third_party
from voltha.protos import meta_pb2
import structlog
log = structlog.get_logger()
def message_to_dict(m):
return MessageToDict(m, True, True, False)
def check_access_violation(new_msg, old_msg):
"""Raise ValueError if attempt is made to change a read-only field"""
access_map = access_rights(new_msg.__class__)
violated_fields = []
for field_name, access in access_map.iteritems():
if access == meta_pb2.READ_ONLY:
if getattr(new_msg, field_name) != getattr(old_msg, field_name):
violated_fields.append(field_name)
if violated_fields:
raise ValueError('Cannot change read-only field(s) %s' %
', '.join('"%s"' % f for f in violated_fields))
def find_rev_by_key(revs, keyname, value):
for i, rev in enumerate(revs):
if getattr(rev._config._data, keyname) == value:
return i, rev
raise KeyError('key {}={} not found'.format(keyname, value))
class ConfigNode(object):
"""
Represents a configuration node which can hold a number of revisions
of the configuration for this node.
When the configuration changes, the new version is appended to the
node.
Initial data must be a protobuf message and it will determine the type of
this node.
"""
__slots__ = (
'_root', # ref to root node
'_type', # node type, as __class__ of protobuf message
'_branches', # dict of transaction branches and a default (committed)
# branch
'_tags', # dict of tag-name to ref of ConfigRevision
'_proxy', # ref to proxy observer or None if no proxy assigned
'_event_bus', # ref to event_bus or None if no event bus is assigned
'_auto_prune'
)
def __init__(self, root, initial_data, auto_prune=True, txid=None):
self._root = root
self._branches = {}
self._tags = {}
self._proxy = None
self._event_bus = None
self._auto_prune = auto_prune
if isinstance(initial_data, type):
self._type = initial_data
elif is_proto_message(initial_data):
self._type = initial_data.__class__
copied_data = initial_data.__class__()
copied_data.CopyFrom(initial_data)
self._initialize(copied_data, txid)
else:
raise NotImplementedError()
def _mknode(self, *args, **kw):
return ConfigNode(self._root, *args, **kw)
def _mkrev(self, *args, **kw):
return self._root.mkrev(*args, **kw)
def _initialize(self, data, txid):
# separate external children data away from locally stored data
# based on child_node annotations in protobuf
children = {}
for field_name, field in children_fields(self._type).iteritems():
field_value = getattr(data, field_name)
if field.is_container:
if field.key:
keys_seen = set()
children[field_name] = lst = []
for v in field_value:
rev = self._mknode(v, txid=txid).latest
key = getattr(v, field.key)
if key in keys_seen:
raise ValueError('Duplicate key "{}"'.format(key))
lst.append(rev)
keys_seen.add(key)
else:
children[field_name] = [
self._mknode(v, txid=txid).latest for v in field_value]
else:
children[field_name] = [
self._mknode(field_value, txid=txid).latest]
data.ClearField(field_name)
branch = ConfigBranch(self, auto_prune=self._auto_prune)
rev = self._mkrev(branch, data, children)
self._make_latest(branch, rev)
self._branches[txid] = branch
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# these convenience short-cuts only work for the committed branch
@property
def revisions(self):
return [r._hash for r in self._branches[None]._revs.itervalues()]
@property
def latest(self):
return self._branches[None]._latest
def __getitem__(self, hash):
return self._branches[None]._revs[hash]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
# depth preparation
if deep:
depth = -1
# path preparation
path = '' if path is None else path
while path.startswith('/'):
path = path[1:]
# determine branch; if lookup fails, it is ok to use default branch
branch = self._branches.get(txid, None) or self._branches[None]
# determine rev
if hash is not None:
rev = branch._revs[hash]
else:
rev = branch.latest
return self._get(rev, path, depth)
def _get(self, rev, path, depth):
if not path:
return self._do_get(rev, depth)
# ... otherwise
name, _, path = path.partition('/')
field = children_fields(self._type)[name]
if field.is_container:
if field.key:
children = rev._children[name]
if path:
# need to escalate further
key, _, path = path.partition('/')
key = field.key_from_str(key)
_, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
return child_node._get(child_rev, path, depth)
else:
# we are the node of interest
response = []
for child_rev in children:
child_node = child_rev.node
value = child_node._do_get(child_rev, depth)
response.append(value)
return response
else:
if path:
raise LookupError(
'Cannot index into container with no key defined')
response = []
for child_rev in rev._children[name]:
child_node = child_rev.node
value = child_node._do_get(child_rev, depth)
response.append(value)
return response
else:
child_rev = rev._children[name][0]
child_node = child_rev.node
return child_node._get(child_rev, path, depth)
def _do_get(self, rev, depth):
msg = rev.get(depth)
if self._proxy is not None:
msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
return msg
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def update(self, path, data, strict=False, txid=None, mk_branch=None):
while path.startswith('/'):
path = path[1:]
try:
branch = self._branches[txid]
except KeyError:
branch = mk_branch(self)
if not path:
return self._do_update(branch, data, strict)
rev = branch._latest # change is always made to the latest
name, _, path = path.partition('/')
field = children_fields(self._type)[name]
if field.is_container:
if not path:
raise ValueError('Cannot update a list')
if field.key:
key, _, path = path.partition('/')
key = field.key_from_str(key)
children = copy(rev._children[name])
idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
# chek if deep copy will work better
new_child_rev = child_node.update(
path, data, strict, txid, mk_branch)
if new_child_rev.hash == child_rev.hash:
# When the new_child_rev goes out of scope,
# it's destructor gets invoked as it is not being
# referred by any other data structures. To prevent
# this to trigger the hash it is holding from being
# erased in the db, its hash is set to None. If the
# new_child_rev object is pointing at the same address
# as the child_rev address then do not clear the hash
if new_child_rev != child_rev:
log.debug('clear-hash',
hash=new_child_rev.hash, object_ref=new_child_rev)
new_child_rev.clear_hash()
return branch._latest
if getattr(new_child_rev.data, field.key) != key:
raise ValueError('Cannot change key field')
children[idx] = new_child_rev
rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
raise ValueError('Cannot index into container with no keys')
else:
child_rev = rev._children[name][0]
child_node = child_rev.node
new_child_rev = child_node.update(
path, data, strict, txid, mk_branch)
rev = rev.update_children(name, [new_child_rev], branch)
self._make_latest(branch, rev)
return rev
def _do_update(self, branch, data, strict):
if not isinstance(data, self._type):
raise ValueError(
'"{}" is not a valid data type for this node'.format(
data.__class__.__name__))
self._test_no_children(data)
if self._proxy is not None:
self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
if branch._latest.data != data:
if strict:
# check if attempt is made to change read-only field
check_access_violation(data, branch._latest.data)
rev = branch._latest.update_data(data, branch)
self._make_latest(branch, rev,
((CallbackType.POST_UPDATE, rev.data),))
return rev
else:
return branch._latest
def _make_latest(self, branch, rev, change_announcements=()):
# Update the latest branch only when the hash between the previous
# data and the new rev is different, otherwise this will trigger the
# data already saved in the db (with that hash) to be erased
if rev.hash not in branch._revs:
branch._revs[rev.hash] = rev
if not branch._latest or rev.hash != branch._latest.hash:
branch._latest = rev
# announce only if this is main branch
if change_announcements and branch._txid is None:
if self._proxy is not None:
for change_type, data in change_announcements:
# since the callback may operate on the config tree,
# we have to defer the execution of the callbacks till
# the change is propagated to the root, then root will
# call the callbacks
self._root.enqueue_callback(
self._proxy.invoke_callbacks,
change_type,
data,
proceed_on_errors=1,
)
for change_type, data in change_announcements:
self._root.enqueue_notification_callback(
self._mk_event_bus().advertise,
change_type,
data,
hash=rev.hash
)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def add(self, path, data, txid=None, mk_branch=None):
while path.startswith('/'):
path = path[1:]
if not path:
raise ValueError('Cannot add to non-container node')
try:
branch = self._branches[txid]
except KeyError:
branch = mk_branch(self)
rev = branch._latest # change is always made to latest
name, _, path = path.partition('/')
field = children_fields(self._type)[name]
if field.is_container:
if not path:
# we do need to add a new child to the field
if field.key:
if self._proxy is not None:
self._proxy.invoke_callbacks(
CallbackType.PRE_ADD, data)
children = copy(rev._children[name])
key = getattr(data, field.key)
try:
find_rev_by_key(children, field.key, key)
except KeyError:
pass
else:
raise ValueError('Duplicate key "{}"'.format(key))
child_rev = self._mknode(data).latest
children.append(child_rev)
rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev,
((CallbackType.POST_ADD, data),))
return rev
else:
# adding to non-keyed containers not implemented yet
raise ValueError('Cannot add to non-keyed container')
else:
if field.key:
# need to escalate
key, _, path = path.partition('/')
key = field.key_from_str(key)
children = copy(rev._children[name])
idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.add(path, data, txid, mk_branch)
children[idx] = new_child_rev
rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
raise ValueError(
'Cannot index into container with no keys')
else:
raise ValueError('Cannot add to non-container field')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def remove(self, path, txid=None, mk_branch=None):
while path.startswith('/'):
path = path[1:]
if not path:
raise ValueError('Cannot remove from non-container node')
try:
branch = self._branches[txid]
except KeyError:
branch = mk_branch(self)
rev = branch._latest # change is always made to latest
name, _, path = path.partition('/')
field = children_fields(self._type)[name]
if field.is_container:
if not path:
raise ValueError("Cannot remove without a key")
if field.key:
key, _, path = path.partition('/')
key = field.key_from_str(key)
if path:
# need to escalate
children = copy(rev._children[name])
idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.remove(path, txid, mk_branch)
children[idx] = new_child_rev
rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
# need to remove from this very node
children = copy(rev._children[name])
idx, child_rev = find_rev_by_key(children, field.key, key)
if self._proxy is not None:
data = child_rev.data
self._proxy.invoke_callbacks(
CallbackType.PRE_REMOVE, data)
post_anno = ((CallbackType.POST_REMOVE, data),)
else:
post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
del children[idx]
rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev, post_anno)
return rev
else:
raise ValueError('Cannot remove from non-keyed container')
else:
raise ValueError('Cannot remove non-conatiner field')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _mk_txbranch(self, txid):
branch_point = self._branches[None].latest
branch = ConfigBranch(self, txid, branch_point)
self._branches[txid] = branch
return branch
def _del_txbranch(self, txid):
del self._branches[txid]
def _merge_txbranch(self, txid, dry_run=False):
"""
Make latest in branch to be latest in the common branch, but only
if no conflict is detected. Conflict is where the txbranch branch
point no longer matches the latest in the default branch. This has
to be verified recursively.
"""
def merge_child(child_rev):
child_branch = child_rev._branch
if child_branch._txid == txid:
child_rev = child_branch._node._merge_txbranch(txid, dry_run)
return child_rev
src_branch = self._branches[txid]
dst_branch = self._branches[None]
fork_rev = src_branch.origin # rev from which src branch was made
src_rev = src_branch.latest # head rev of source branch
dst_rev = dst_branch.latest # head rev of target branch
rev, changes = merge_3way(
fork_rev, src_rev, dst_rev, merge_child, dry_run)
if not dry_run:
self._make_latest(dst_branch, rev, change_announcements=changes)
del self._branches[txid]
return rev
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def diff(self, hash1, hash2=None, txid=None):
branch = self._branches[txid]
rev1 = branch[hash1]
rev2 = branch[hash2] if hash2 else branch._latest
if rev1.hash == rev2.hash:
return JsonPatch([])
else:
dict1 = message_to_dict(rev1.data)
dict2 = message_to_dict(rev2.data)
return make_patch(dict1, dict2)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def tag(self, tag, hash=None):
branch = self._branches[None] # tag only what has been committed
rev = branch._latest if hash is None else branch._revs[hash]
self._tags[tag] = rev
self.persist_tags()
return self
@property
def tags(self):
return sorted(self._tags.iterkeys())
def by_tag(self, tag):
"""
Return revision based on tag
:param tag: previously registered tag value
:return: revision object
"""
return self._tags[tag]
def diff_by_tag(self, tag1, tag2):
return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
def delete_tag(self, tag):
del self._tags[tag]
self.persist_tags()
def delete_tags(self, *tags):
for tag in tags:
del self._tags[tag]
self.persist_tags()
def prune_untagged(self):
branch = self._branches[None]
keep = set(rev.hash for rev in self._tags.itervalues())
keep.add(branch._latest.hash)
for hash in branch._revs.keys():
if hash not in keep:
del branch._revs[hash]
return self
def persist_tags(self):
"""
Persist tag information to the backend
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _test_no_children(self, data):
for field_name, field in children_fields(self._type).items():
field_value = getattr(data, field_name)
if field.is_container:
if len(field_value):
raise NotImplementedError(
'Cannot update external children')
else:
if data.HasField(field_name):
raise NotImplementedError(
'Cannot update externel children')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get_proxy(self, path, exclusive=False):
return self._get_proxy(path, self, path, exclusive)
def _get_proxy(self, path, root, full_path, exclusive):
while path.startswith('/'):
path = path[1:]
if not path:
return self._mk_proxy(root, full_path, exclusive)
# need to escalate
rev = self._branches[None]._latest
name, _, path = path.partition('/')
field = children_fields(self._type)[name]
if field.is_container:
if not path:
raise ValueError('Cannot proxy a container field')
if field.key:
key, _, path = path.partition('/')
key = field.key_from_str(key)
children = rev._children[name]
_, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
return child_node._get_proxy(path, root, full_path, exclusive)
raise ValueError('Cannot index into container with no keys')
else:
child_rev = rev._children[name][0]
child_node = child_rev.node
return child_node._get_proxy(path, root, full_path, exclusive)
def _mk_proxy(self, root, full_path, exclusive):
if self._proxy is None:
self._proxy = ConfigProxy(root, self, full_path, exclusive)
else:
if self._proxy.exclusive:
raise ValueError('Node is already owned exclusively')
return self._proxy
def _mk_event_bus(self):
if self._event_bus is None:
self._event_bus = ConfigEventBus()
return self._event_bus
# ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def load_latest(self, latest_hash):
root = self._root
kv_store = root._kv_store
branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
rev = PersistedConfigRevision.load(
branch, kv_store, self._type, latest_hash)
self._make_latest(branch, rev)
self._branches[None] = branch