Initial take on persistence support via k/v stores,
This is the initial mechanisms for persistence, supported
by a k/v store. In the initial testing the k/v store
is mcoked by a simple (in-memory) dict, but the concept
will carry to external, replicated, distributed k/v
store options, such as cassandra, redis, or even
consul for small-scale deployments.
Change-Id: I83d2bbb327c4516fbc15d1d9979a1e89d3e7a7e7
diff --git a/tests/utests/voltha/core/config/__init__.py b/tests/utests/voltha/core/config/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/utests/voltha/core/config/__init__.py
diff --git a/tests/utests/voltha/core/test_config.py b/tests/utests/voltha/core/config/test_config.py
similarity index 98%
rename from tests/utests/voltha/core/test_config.py
rename to tests/utests/voltha/core/config/test_config.py
index 7baa3fe..d76e511 100644
--- a/tests/utests/voltha/core/test_config.py
+++ b/tests/utests/voltha/core/config/test_config.py
@@ -3,8 +3,7 @@
import resource
from random import randint, seed
from time import time
-from unittest import TestCase
-from unittest import main
+from unittest import main, TestCase
import gc
@@ -68,15 +67,15 @@
self.assertRaises(ValueError, self.node.update, '/', Adapter())
def test_many_simple_updates(self):
- n = 100
+ n = 1000
for i in xrange(n):
self.node.update('/', Voltha(instance_id='id%d' % i))
self.node.update('/', self.other)
- self.assertEqual(len(self.node.revisions), 102)
+ self.assertEqual(len(self.node.revisions), 1002)
self.assertEqual(self.node.latest.data, self.other)
def test_retrieve_by_rev_hash(self):
- n = 100
+ n = 1000
for i in xrange(n):
self.node.update('/', Voltha(instance_id='id%d' % i))
self.node.update('/', self.other)
@@ -369,7 +368,7 @@
seed(0) # makes things consistently random
# this should be the number of nodes in the Voltha tree
- self.assertLess(rev_count(), 10)
+ self.assertLess(rev_count(), 20)
print; print_metrics()
def mk_change():
@@ -391,7 +390,7 @@
self.node.prune_untagged()
# at this point the rev count shall fall back to the original
- self.assertLess(rev_count(), 10)
+ self.assertLess(rev_count(), 15)
print_metrics()
# no make an additional set of modifications while constantly pruning
@@ -400,7 +399,7 @@
self.node.prune_untagged()
# the rev count should not have increased
- self.assertLess(rev_count(), 10)
+ self.assertLess(rev_count(), 15)
print_metrics()
def test_churn_efficiency(self):
@@ -412,7 +411,7 @@
n = 1000
modulo = 2
- self.assertEqual(rev_count(), 7)
+ self.assertEqual(rev_count(), 14)
print_metrics()
def mk_change(seq):
@@ -431,7 +430,7 @@
_tmp_rc = rev_count()
# verify that the node count did not increase significantly, yet we
# have access to all ditinct revisions
- self.assertEqual(rev_count(), 11)
+ self.assertEqual(rev_count(), 20)
print_metrics()
def test_strict_read_only(self):
diff --git a/tests/utests/voltha/core/config/test_persistence.py b/tests/utests/voltha/core/config/test_persistence.py
new file mode 100644
index 0000000..ce2dec5
--- /dev/null
+++ b/tests/utests/voltha/core/config/test_persistence.py
@@ -0,0 +1,88 @@
+from copy import copy
+from random import randint, seed
+from time import time
+from unittest import main, TestCase
+
+from voltha.core.config.config_root import ConfigRoot
+from voltha.protos.openflow_13_pb2 import ofp_desc
+from voltha.protos.voltha_pb2 import Voltha, HealthStatus, Adapter, \
+ AdapterConfig, LogicalDevice
+
+
+n_adapters = 100
+n_logical_nodes = 100
+
+
+class TestPersistence(TestCase):
+
+ def pump_some_data(self, node):
+ seed(0)
+ node.update('/', Voltha(
+ instance_id='1',
+ version='42',
+ log_level=1
+ ))
+ node.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
+ for i in xrange(n_adapters):
+ node.add('/adapters', Adapter(
+ id=str(i),
+ vendor='cord',
+ version=str(randint(1, 10)),
+ config=AdapterConfig(
+ log_level=0
+ )
+ ))
+ for i in xrange(n_logical_nodes):
+ node.add('/logical_devices', LogicalDevice(
+ id=str(i),
+ datapath_id=randint(1, 100000),
+ desc=ofp_desc(
+ mfr_desc='foo',
+ hw_desc='bar',
+ sw_desc='zoo'
+ )
+ ))
+
+ def test_inmemory_kv_store(self):
+ t0 = [time()]
+ def pt(msg=''):
+ t1 = time()
+ print '%20.8f ms - %s' % (1000 * (t1 - t0[0]), msg)
+ t0[0] = t1
+
+ kv_store = dict()
+
+ # create node and pump data
+ node = ConfigRoot(Voltha(), kv_store=kv_store)
+ pt('init')
+ self.pump_some_data(node)
+ pt('pump')
+
+ # check that content of kv_store looks ok
+ size1 = len(kv_store)
+ self.assertEqual(size1, 10 + 3 * (n_adapters + n_logical_nodes))
+
+ # this should actually drop if we pune
+ node.prune_untagged()
+ pt('prunning')
+
+ size2 = len(kv_store)
+ self.assertEqual(size2, 3 + 2 * (1 + 1 + n_adapters + n_logical_nodes))
+ all_latest_data = node.get('/', deep=1)
+ pt('deep get')
+
+ # save dict so that deleting the node will not wipe it
+ kv_store = copy(kv_store)
+ pt('copy kv store')
+ del node
+ pt('delete node')
+ # self.assertEqual(size2, 1 + 2 * (1 + 1 + n_adapters + n_logical_nodes))
+
+ # recreate tree from persistence
+ node = ConfigRoot.load(Voltha, kv_store)
+ pt('load from kv store')
+ self.assertEqual(node.get('/', deep=1), all_latest_data)
+ pt('deep get')
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/core/config/config_branch.py b/voltha/core/config/config_branch.py
index 468f56e..f74d4c4 100644
--- a/voltha/core/config/config_branch.py
+++ b/voltha/core/config/config_branch.py
@@ -20,8 +20,7 @@
"""
from collections import OrderedDict
-
-from common.utils.ordered_weakvalue_dict import OrderedWeakValueDict
+from weakref import WeakValueDictionary
class ConfigBranch(object):
@@ -39,7 +38,7 @@
self._node = node
self._txid = txid
self._origin = origin
- self._revs = OrderedWeakValueDict() if auto_prune else OrderedDict()
+ self._revs = WeakValueDictionary() if auto_prune else OrderedDict()
self._latest = origin
def __getitem__(self, hash):
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index 9f7f498..6df8b4c 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -24,6 +24,7 @@
from voltha.core.config.config_proxy import CallbackType, ConfigProxy
from voltha.core.config.config_rev import is_proto_message, children_fields, \
ConfigRevision, access_rights
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
from voltha.protos import third_party
from voltha.protos import meta_pb2
@@ -59,23 +60,37 @@
this node.
"""
__slots__ = (
+ '_root', # ref to root node
'_type', # node type, as __class__ of protobuf message
'_branches', # dict of transaction branches and a default (committed)
# branch
'_tags', # dict of tag-name to ref of ConfigRevision
'_proxy', # ref to proxy observer or None if no proxy assigned
+ '_auto_prune'
)
- def __init__(self, initial_data, auto_prune=True, txid=None):
- assert is_proto_message(initial_data)
- self._type = initial_data.__class__
+ def __init__(self, root, initial_data, auto_prune=True, txid=None):
+ self._root = root
self._branches = {}
self._tags = {}
self._proxy = None
+ self._auto_prune = auto_prune
- self._initialize(copy(initial_data), auto_prune, txid)
+ if isinstance(initial_data, type):
+ self._type = initial_data
+ elif is_proto_message(initial_data):
+ self._type = initial_data.__class__
+ self._initialize(copy(initial_data), txid)
+ else:
+ raise NotImplementedError()
- def _initialize(self, data, auto_prune, txid):
+ def _mknode(self, *args, **kw):
+ return ConfigNode(self._root, *args, **kw)
+
+ def _mkrev(self, *args, **kw):
+ return self._root.mkrev(*args, **kw)
+
+ def _initialize(self, data, txid):
# separate external children data away from locally stored data
# based on child_node annotations in protobuf
children = {}
@@ -85,24 +100,22 @@
if field.key:
children[field_name] = od = OrderedDict()
for v in field_value:
- rev = ConfigNode(v, txid=txid).latest
+ rev = self._mknode(v, txid=txid).latest
key = getattr(v, field.key)
if key in od:
raise ValueError('Duplicate key "{}"'.format(key))
od[key] = rev
else:
children[field_name] = [
- ConfigNode(v, txid=txid).latest for v in field_value]
+ self._mknode(v, txid=txid).latest for v in field_value]
else:
children[field_name] = [
- ConfigNode(field_value, txid=txid).latest]
+ self._mknode(field_value, txid=txid).latest]
data.ClearField(field_name)
-
- branch = ConfigBranch(self, auto_prune=auto_prune)
- rev = ConfigRevision(branch, data, children)
- branch._latest = rev
- branch._revs[rev.hash] = rev
+ branch = ConfigBranch(self, auto_prune=self._auto_prune)
+ rev = self._mkrev(branch, data, children)
+ self._make_latest(branch, rev)
self._branches[txid] = branch
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -298,7 +311,7 @@
key = getattr(data, field.key)
if key in children_od:
raise ValueError('Duplicate key "{}"'.format(key))
- child_rev = ConfigNode(data).latest
+ child_rev = self._mknode(data).latest
children_od[key] = child_rev
rev = rev.update_children(name, children_od, branch)
self._make_latest(branch, rev,
@@ -416,7 +429,7 @@
announcements = []
def _get_od_changes(lst1, lst2):
- assert isinstance(lst2, OrderedDict)
+ assert isinstance(lst2, dict)
added_keys = [k for k in lst2.iterkeys() if k not in lst1]
removed_keys = [k for k in lst1.iterkeys() if k not in lst2]
changed_keys = [k for k in lst1.iterkeys()
@@ -424,7 +437,7 @@
return added_keys, removed_keys, changed_keys
def _get_changes(lst1, lst2):
- if isinstance(lst1, OrderedDict):
+ if isinstance(lst1, dict):
return _get_od_changes(lst1, lst2)
assert isinstance(lst1, list)
assert isinstance(lst2, list)
@@ -507,7 +520,7 @@
# we cannot add if it has been added and is different
if to_add in dst_added:
# this can happen only to keyed containers
- assert isinstance(src_list, OrderedDict)
+ assert isinstance(src_list, dict)
if src_list[to_add].hash != dst_list[to_add].hash:
raise MergeConflictException(
'Cannot add because it has been added and '
@@ -644,3 +657,16 @@
if self._proxy.exclusive:
raise ValueError('Node is already owned exclusively')
return self._proxy
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def load_latest(self, latest_hash):
+
+ root = self._root
+ kv_store = root._kv_store
+
+ branch = ConfigBranch(self, self._auto_prune)
+ rev = PersistedConfigRevision.load(
+ branch, kv_store, self._type, latest_hash)
+ self._make_latest(branch, rev)
+ self._branches[None] = branch
diff --git a/voltha/core/config/config_rev.py b/voltha/core/config/config_rev.py
index 35f0ef5..411ede2 100644
--- a/voltha/core/config/config_rev.py
+++ b/voltha/core/config/config_rev.py
@@ -22,9 +22,7 @@
"""
import weakref
-from collections import OrderedDict
from copy import copy
-from time import time
from hashlib import md5
from google.protobuf.descriptor import Descriptor
@@ -61,21 +59,42 @@
class _ChildType(object):
"""Used to store key metadata about child_node fields in protobuf messages.
"""
- __slots__ = ('_is_container', '_key', '_key_from_str')
- def __init__(self, is_container, key=None, key_from_str=None):
+ __slots__ = (
+ '_module',
+ '_type',
+ '_is_container',
+ '_key',
+ '_key_from_str'
+ )
+
+ def __init__(self, module, type, is_container,
+ key=None, key_from_str=None):
+ self._module = module
+ self._type = type
self._is_container = is_container
self._key = key
self._key_from_str = key_from_str
+
@property
def is_container(self):
return self._is_container
+
@property
def key(self):
return self._key
+
@property
def key_from_str(self):
return self._key_from_str
+ @property
+ def module(self):
+ return self._module
+
+ @property
+ def type(self):
+ return self._type
+
def children_fields(cls):
"""
@@ -87,20 +106,27 @@
path substring back to the key.
"""
names = _children_fields_cache.get(cls)
+
if names is None:
names = {}
+
for field in cls.DESCRIPTOR.fields:
+
if field.has_options:
options = field.GetOptions()
+
if options.HasExtension(meta_pb2.child_node):
is_container = field.label == 3
meta = options.Extensions[meta_pb2.child_node]
key_from_str = None
+
if meta.key:
key_field = field.message_type.fields_by_name[meta.key]
key_type = key_field.type
+
if key_type == key_field.TYPE_STRING:
key_from_str = lambda s: s
+
elif key_type in (
key_field.TYPE_FIXED32,
key_field.TYPE_FIXED64,
@@ -113,11 +139,21 @@
key_field.TYPE_UINT32,
key_field.TYPE_UINT64):
key_from_str = lambda s: int(s)
+
else:
raise NotImplementedError()
- names[field.name] = _ChildType(is_container, meta.key,
- key_from_str)
+
+ field_class = field.message_type._concrete_class
+ names[field.name] = _ChildType(
+ module=field_class.__module__,
+ type=field_class.__name__,
+ is_container=is_container,
+ key=meta.key,
+ key_from_str=key_from_str
+ )
+
_children_fields_cache[cls] = names
+
return names
@@ -157,11 +193,14 @@
detriments.
"""
- __slots__ = ('_data', '_ts', '_hash')
+ __slots__ = (
+ '_data',
+ '_hash',
+ '__weakref__'
+ )
def __init__(self, data):
self._data = data
- self._ts = time()
self._hash = self._hash_data(data)
@property
@@ -169,10 +208,6 @@
return self._data
@property
- def ts(self):
- return self._ts
-
- @property
def hash(self):
return self._hash
@@ -180,12 +215,15 @@
def _hash_data(data):
"""Hash function to be used to track version changes of config nodes"""
if isinstance(data, (dict, list)):
- signature = dumps(data)
+ to_hash = dumps(data)
elif is_proto_message(data):
- signature = message_to_json_concise(data)
+ to_hash = ':'.join((
+ data.__class__.__module__,
+ data.__class__.__name__,
+ data.SerializeToString()))
else:
- signature = str(hash(data))
- return md5(signature).hexdigest()[:12]
+ to_hash = str(hash(data))
+ return md5(to_hash).hexdigest()[:12]
class ConfigRevision(object):
@@ -209,24 +247,26 @@
self._branch = branch
self._config = ConfigDataRevision(data)
self._children = children
- self._update_metainfo()
+ self._finalize()
- def _update_metainfo(self):
+ def _finalize(self):
self._hash = self._hash_content()
if self._hash not in _rev_cache:
_rev_cache[self._hash] = self
+ if self._config._hash not in _rev_cache:
+ _rev_cache[self._config._hash] = self._config
+ else:
+ self._config = _rev_cache[self._config._hash] # re-use!
def _hash_content(self):
# hash is derived from config hash and hashes of all children
- m = md5('' if self._config is None else self._config.hash)
+ m = md5('' if self._config is None else self._config._hash)
if self._children is not None:
for children in self._children.itervalues():
- if isinstance(children, OrderedDict):
- for child in children.itervalues():
- m.update(child.hash)
+ if isinstance(children, dict):
+ m.update(''.join(c._hash for c in children.itervalues()))
else:
- for child in children:
- m.update(child.hash)
+ m.update(''.join(c._hash for c in children))
return m.hexdigest()[:12]
@property
@@ -274,7 +314,11 @@
def update_data(self, data, branch):
"""Return a NEW revision which is updated for the modified data"""
- return ConfigRevision(branch, data=data, children=self._children)
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._config = self._config.__class__(data)
+ new_rev._finalize()
+ return new_rev
def update_children(self, name, children, branch):
"""Return a NEW revision which is updated for the modified children"""
@@ -283,7 +327,7 @@
new_rev = copy(self)
new_rev._branch = branch
new_rev._children = new_children
- new_rev._update_metainfo()
+ new_rev._finalize()
return new_rev
def update_all_children(self, children, branch):
@@ -291,5 +335,5 @@
new_rev = copy(self)
new_rev._branch = branch
new_rev._children = children
- new_rev._update_metainfo()
+ new_rev._finalize()
return new_rev
diff --git a/voltha/core/config/config_rev_persisted.py b/voltha/core/config/config_rev_persisted.py
new file mode 100644
index 0000000..f1fad1c
--- /dev/null
+++ b/voltha/core/config/config_rev_persisted.py
@@ -0,0 +1,143 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A config rev object that persists itself
+"""
+from bz2 import compress, decompress
+from collections import OrderedDict
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_rev import ConfigRevision, children_fields
+
+log = structlog.get_logger()
+
+
+class PersistedConfigRevision(ConfigRevision):
+
+ compress = False
+
+ __slots__ = ('_kv_store',)
+
+ def __init__(self, branch, data, children=None):
+ self._kv_store = branch._node._root.kv_store
+ super(PersistedConfigRevision, self).__init__(branch, data, children)
+
+ def _finalize(self):
+ super(PersistedConfigRevision, self)._finalize()
+ self.store()
+
+ def __del__(self):
+ try:
+ if self._config.__weakref__ is None:
+ del self._kv_store[self._config._hash]
+ assert self.__weakref__ is None
+ del self._kv_store[self._hash]
+ except Exception, e:
+ # this should never happen
+ log.exception('del-error', hash=hash, e=e)
+
+ def store(self):
+ # crude serialization of children hash and config data hash
+ if self._hash in self._kv_store:
+ return
+
+ self.store_config()
+
+ children_lists = {}
+ for field_name, children in self._children.iteritems():
+ if isinstance(children, list):
+ lst = [rev.hash for rev in children]
+ else:
+ lst = [rev.hash for rev in children.itervalues()]
+ children_lists[field_name] = lst
+
+ data = dict(
+ children=children_lists,
+ config=self._config._hash
+ )
+ blob = dumps(data)
+ if self.compress:
+ blob = compress(blob)
+
+ self._kv_store[self._hash] = blob
+
+ @classmethod
+ def load(cls, branch, kv_store, msg_cls, hash):
+ blob = kv_store[hash]
+ if cls.compress:
+ blob = decompress(blob)
+ data = loads(blob)
+
+ config_hash = data['config']
+ config_data = cls.load_config(kv_store, msg_cls, config_hash)
+
+ children_list = data['children']
+ assembled_children = {}
+ node = branch._node
+ for field_name, meta in children_fields(msg_cls).iteritems():
+ child_msg_cls = tmp_cls_loader(meta.module, meta.type)
+ if meta.key:
+ # we need to assemble an ordered dict using the key
+ lst = OrderedDict()
+ for child_hash in children_list[field_name]:
+ child_node = node._mknode(child_msg_cls)
+ child_node.load_latest(child_hash)
+ child_rev = child_node.latest
+ key = getattr(child_rev.data, meta.key)
+ lst[key] = child_rev
+ else:
+ lst = []
+ for child_hash in children_list[field_name]:
+ child_node = node._mknode(child_msg_cls)
+ child_node.load_latest(child_hash)
+ child_rev = child_node.latest
+ lst.append(child_rev)
+
+ assembled_children[field_name] = lst
+
+ rev = cls(branch, config_data, assembled_children)
+ return rev
+
+ def store_config(self):
+ if self._config._hash in self._kv_store:
+ return
+
+ # crude serialization of config data
+ blob = self._config._data.SerializeToString()
+ if self.compress:
+ blob = compress(blob)
+ self._kv_store[self._config._hash] = blob
+
+ @classmethod
+ def load_config(cls, kv_store, msg_cls, config_hash):
+ blob = kv_store[config_hash]
+ if cls.compress:
+ blob = decompress(blob)
+
+ # TODO use a loader later on
+ data = msg_cls()
+ data.ParseFromString(blob)
+ return data
+
+
+def tmp_cls_loader(module_name, cls_name):
+ # TODO this shall be generalized
+ from voltha.protos import voltha_pb2, health_pb2, adapter_pb2, \
+ logical_layer_pb2, openflow_13_pb2
+ return getattr(locals()[module_name], cls_name)
diff --git a/voltha/core/config/config_root.py b/voltha/core/config/config_root.py
index 92e50cc..c229a42 100644
--- a/voltha/core/config/config_root.py
+++ b/voltha/core/config/config_root.py
@@ -16,8 +16,11 @@
from uuid import uuid4
import structlog
+from simplejson import dumps, loads
from voltha.core.config.config_node import ConfigNode, MergeConflictException
+from voltha.core.config.config_rev import ConfigRevision
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
log = structlog.get_logger()
@@ -26,11 +29,33 @@
__slots__ = (
'_dirty_nodes', # holds set of modified nodes per transaction branch
+ '_kv_store',
+ '_loading',
+ '_rev_cls'
)
- def __init__(self, initial_data):
- super(ConfigRoot, self).__init__(initial_data, False)
+ def __init__(self, initial_data, kv_store=None, rev_cls=ConfigRevision):
+ self._kv_store = kv_store
self._dirty_nodes = {}
+ self._loading = False
+ if kv_store is not None and \
+ not issubclass(rev_cls, PersistedConfigRevision):
+ rev_cls = PersistedConfigRevision
+ self._rev_cls = rev_cls
+ super(ConfigRoot, self).__init__(self, initial_data, False)
+
+ @property
+ def kv_store(self):
+ if self._loading:
+ # provide fake store for storing things
+ # TODO this shall be a fake_dict providing noop for all relevant
+ # operations
+ return dict()
+ else:
+ return self._kv_store
+
+ def mkrev(self, *args, **kw):
+ return self._rev_cls(*args, **kw)
def mk_txbranch(self):
txid = uuid4().hex[:12]
@@ -94,3 +119,40 @@
else:
return super(ConfigRoot, self).remove(path)
+ # ~~~~~~~~~~~~~~~~ Persistence related ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ @classmethod
+ def load(cls, root_msg_cls, kv_store):
+ # need to use fake kv store during initial load for not to override
+ # our real k vstore
+ fake_kv_store = dict() # shall use more efficient mock dict
+ root = cls(root_msg_cls(), kv_store=fake_kv_store,
+ rev_cls=PersistedConfigRevision)
+ # we can install the real store now
+ root._kv_store = kv_store
+ root.load_from_persistence(root_msg_cls)
+ return root
+
+ def _make_latest(self, branch, *args, **kw):
+ super(ConfigRoot, self)._make_latest(branch, *args, **kw)
+ # only persist the committed branch
+ if self._kv_store is not None and branch._txid is None:
+ root_data = dict(
+ latest=branch._latest._hash,
+ tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+ )
+ blob = dumps(root_data)
+ self._kv_store['root'] = blob
+
+ def load_from_persistence(self, root_msg_cls):
+ self._loading = True
+ blob = self._kv_store['root']
+ root_data = loads(blob)
+
+ for tag, hash in root_data['tags'].iteritems():
+ raise NotImplementedError()
+
+ self.load_latest(root_data['latest'])
+
+ self._loading = False
+