Initial take on persistence support via k/v stores,
This is the initial mechanisms for persistence, supported
by a k/v store. In the initial testing the k/v store
is mcoked by a simple (in-memory) dict, but the concept
will carry to external, replicated, distributed k/v
store options, such as cassandra, redis, or even
consul for small-scale deployments.
Change-Id: I83d2bbb327c4516fbc15d1d9979a1e89d3e7a7e7
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index 9f7f498..6df8b4c 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -24,6 +24,7 @@
from voltha.core.config.config_proxy import CallbackType, ConfigProxy
from voltha.core.config.config_rev import is_proto_message, children_fields, \
ConfigRevision, access_rights
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
from voltha.protos import third_party
from voltha.protos import meta_pb2
@@ -59,23 +60,37 @@
this node.
"""
__slots__ = (
+ '_root', # ref to root node
'_type', # node type, as __class__ of protobuf message
'_branches', # dict of transaction branches and a default (committed)
# branch
'_tags', # dict of tag-name to ref of ConfigRevision
'_proxy', # ref to proxy observer or None if no proxy assigned
+ '_auto_prune'
)
- def __init__(self, initial_data, auto_prune=True, txid=None):
- assert is_proto_message(initial_data)
- self._type = initial_data.__class__
+ def __init__(self, root, initial_data, auto_prune=True, txid=None):
+ self._root = root
self._branches = {}
self._tags = {}
self._proxy = None
+ self._auto_prune = auto_prune
- self._initialize(copy(initial_data), auto_prune, txid)
+ if isinstance(initial_data, type):
+ self._type = initial_data
+ elif is_proto_message(initial_data):
+ self._type = initial_data.__class__
+ self._initialize(copy(initial_data), txid)
+ else:
+ raise NotImplementedError()
- def _initialize(self, data, auto_prune, txid):
+ def _mknode(self, *args, **kw):
+ return ConfigNode(self._root, *args, **kw)
+
+ def _mkrev(self, *args, **kw):
+ return self._root.mkrev(*args, **kw)
+
+ def _initialize(self, data, txid):
# separate external children data away from locally stored data
# based on child_node annotations in protobuf
children = {}
@@ -85,24 +100,22 @@
if field.key:
children[field_name] = od = OrderedDict()
for v in field_value:
- rev = ConfigNode(v, txid=txid).latest
+ rev = self._mknode(v, txid=txid).latest
key = getattr(v, field.key)
if key in od:
raise ValueError('Duplicate key "{}"'.format(key))
od[key] = rev
else:
children[field_name] = [
- ConfigNode(v, txid=txid).latest for v in field_value]
+ self._mknode(v, txid=txid).latest for v in field_value]
else:
children[field_name] = [
- ConfigNode(field_value, txid=txid).latest]
+ self._mknode(field_value, txid=txid).latest]
data.ClearField(field_name)
-
- branch = ConfigBranch(self, auto_prune=auto_prune)
- rev = ConfigRevision(branch, data, children)
- branch._latest = rev
- branch._revs[rev.hash] = rev
+ branch = ConfigBranch(self, auto_prune=self._auto_prune)
+ rev = self._mkrev(branch, data, children)
+ self._make_latest(branch, rev)
self._branches[txid] = branch
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -298,7 +311,7 @@
key = getattr(data, field.key)
if key in children_od:
raise ValueError('Duplicate key "{}"'.format(key))
- child_rev = ConfigNode(data).latest
+ child_rev = self._mknode(data).latest
children_od[key] = child_rev
rev = rev.update_children(name, children_od, branch)
self._make_latest(branch, rev,
@@ -416,7 +429,7 @@
announcements = []
def _get_od_changes(lst1, lst2):
- assert isinstance(lst2, OrderedDict)
+ assert isinstance(lst2, dict)
added_keys = [k for k in lst2.iterkeys() if k not in lst1]
removed_keys = [k for k in lst1.iterkeys() if k not in lst2]
changed_keys = [k for k in lst1.iterkeys()
@@ -424,7 +437,7 @@
return added_keys, removed_keys, changed_keys
def _get_changes(lst1, lst2):
- if isinstance(lst1, OrderedDict):
+ if isinstance(lst1, dict):
return _get_od_changes(lst1, lst2)
assert isinstance(lst1, list)
assert isinstance(lst2, list)
@@ -507,7 +520,7 @@
# we cannot add if it has been added and is different
if to_add in dst_added:
# this can happen only to keyed containers
- assert isinstance(src_list, OrderedDict)
+ assert isinstance(src_list, dict)
if src_list[to_add].hash != dst_list[to_add].hash:
raise MergeConflictException(
'Cannot add because it has been added and '
@@ -644,3 +657,16 @@
if self._proxy.exclusive:
raise ValueError('Node is already owned exclusively')
return self._proxy
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def load_latest(self, latest_hash):
+
+ root = self._root
+ kv_store = root._kv_store
+
+ branch = ConfigBranch(self, self._auto_prune)
+ rev = PersistedConfigRevision.load(
+ branch, kv_store, self._type, latest_hash)
+ self._make_latest(branch, rev)
+ self._branches[None] = branch