VOL-1254 Revert "VOL-1162 Delete old revs and old transaction branches"
This reverts commit 7211c70f6b044667daa5affe1a3441085734d065.
Change-Id: Idf7a1c31dcb20c4949f650f1dc0323ddde72b754
diff --git a/tests/utests/voltha/core/config/test_config.py b/tests/utests/voltha/core/config/test_config.py
index 08ecf87..556cdd9 100644
--- a/tests/utests/voltha/core/config/test_config.py
+++ b/tests/utests/voltha/core/config/test_config.py
@@ -72,9 +72,107 @@
hash = self.node.latest.hash
self.assertEqual(self.node.revisions, [hash])
+ def test_update(self):
+ hash0 = self.node.latest.hash
+ self.node.update('/', copy(self.other))
+ hash1 = self.node.latest.hash
+ self.assertEqual(len(self.node.revisions), 2)
+ self.assertNotEqual(hash0, hash1)
+ self.assertEqual(self.node.latest.data, VolthaInstance(instance_id='other'))
+
def test_update_with_bad_data(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
+ def test_many_simple_updates(self):
+ n = 1000
+ for i in xrange(n):
+ self.node.update('/', VolthaInstance(instance_id='id%d' % i))
+ self.node.update('/', self.other)
+ self.assertEqual(len(self.node.revisions), 1002)
+ self.assertEqual(self.node.latest.data, self.other)
+
+ def test_retrieve_by_rev_hash(self):
+ n = 1000
+ for i in xrange(n):
+ self.node.update('/', VolthaInstance(instance_id='id%d' % i))
+ self.node.update('/', self.other)
+ hashes = self.node.revisions
+ self.assertEqual(self.node[hashes[0]].data, self.empty)
+ self.assertEqual(self.node[hashes[10]].data, VolthaInstance(instance_id='id9'))
+ self.assertEqual(self.node[hashes[-1]].data, self.other)
+
+ def test_diffs(self):
+ self.node.update('/', self.other)
+ self.assertEqual(self.node.diff(self.node.latest.hash).patch, [])
+ hashes = self.node.revisions
+ self.assertEqual(self.node.diff(hashes[0]).patch, [
+ dict(op='replace', path='/instance_id', value='other')
+ ])
+ self.assertEqual(self.node.diff(hashes[0], hashes[1]).patch, [
+ dict(op='replace', path='/instance_id', value='other')
+ ])
+ self.assertEqual(self.node.diff(hashes[1], hashes[0]).patch, [
+ dict(op='replace', path='/instance_id', value='')
+ ])
+ self.assertEqual(self.node.diff(hashes[1], hashes[1]).patch, [])
+
+ def test_tagging(self):
+ self.node.tag('original')
+ hash1 = self.node.latest.hash
+
+ # add a bunch of changes
+ for a in xrange(10):
+ self.node.update('/', VolthaInstance(instance_id=str(a)))
+ hash2 = self.node.latest.hash
+
+ # apply tag to latest
+ self.node.tag('latest')
+
+ # apply another tag to latest
+ self.node.tag('other')
+
+ # apply tag to specific rev hash
+ self.node.tag('yetmore', hash2)
+
+ # invalid hash
+ self.assertRaises(KeyError, self.node.tag, 'sometag', 'badhash')
+
+ # retrieve data based on tag
+ self.assertEqual(self.node.by_tag('original').hash, hash1)
+ self.assertEqual(self.node.by_tag('latest').hash, hash2)
+ self.assertEqual(self.node.by_tag('other').hash, hash2)
+ self.assertEqual(self.node.by_tag('yetmore').hash, hash2)
+
+ # generate diff from tags
+ self.assertEqual(self.node.diff_by_tag('original', 'latest').patch, [
+ dict(op='replace', path='/instance_id', value='9')
+ ])
+
+ # move tags to given revision
+ self.node.tag('original', self.node.revisions[2])
+ self.node.tag('latest', self.node.revisions[9])
+
+ # add another tag
+ self.node.tag('another', self.node.revisions[7])
+
+ # list tags
+ self.assertEqual(self.node.tags,
+ ['another', 'latest', 'original', 'other', 'yetmore'])
+
+ # delete a tag
+ self.node.delete_tag('another')
+ self.node.delete_tags('yetmore', 'other')
+ self.assertEqual(self.node.tags, ['latest', 'original'])
+
+ # prune untagged revisions from revision list
+ self.node.prune_untagged()
+ self.assertEqual(len(self.node.revisions), 3) # latest is always kept
+
+ # retrieve and compare working tagged revs
+ self.assertEqual(self.node.diff_by_tag('original', 'latest').patch, [
+ dict(op='replace', path='/instance_id', value='8')
+ ])
+
class DeepTestsBase(TestCase):
"""Shared test class for test using a simple node tree"""
@@ -118,6 +216,19 @@
def test_deep_get(self):
self.assertEqual(self.node.get(deep=True), self.base_deep)
+ def test_top_level_update(self):
+ # test that top-level update retains children
+ self.node.update('/', VolthaInstance(version='1.2.3'))
+ hash_new = self.node.latest.hash
+ self.assertNotEqual(self.hash_orig, hash_new)
+ self.assertEqual(self.node.get(
+ hash=self.hash_orig, deep=1), self.base_deep)
+ latest = self.node.get(deep=1)
+ self.assertNotEqual(latest, self.base_deep)
+ self.assertEqual(len(latest.adapters), 5)
+ self.assertEqual(len(latest.logical_devices), 0)
+ self.assertEqual(latest.version, '1.2.3')
+
def test_path_based_get_access(self):
self.assertEqual(self.node.get(path='/'), self.node.get())
self.assertEqual(self.node.get(path='/health'), self.health)
@@ -131,6 +242,42 @@
adapter = self.node.get(path='/adapters/3')
self.assertEqual(adapter.id, '3')
+ def test_deep_update_non_container(self):
+
+ self.node.update('/health', HealthStatus(state=HealthStatus.HEALTHY))
+
+ # root hash is now different
+ hash_new = self.node.latest.hash
+ self.assertNotEqual(self.hash_orig, hash_new)
+
+ # original tree is still intact
+ orig = self.node.get(hash=self.hash_orig, deep=1)
+ self.assertEqual(orig, self.base_deep)
+ self.assertEqual(orig.health.state, HealthStatus.DYING)
+
+ # but the latest contains the change
+ new = self.node.get(deep=1)
+ self.assertNotEqual(new, orig)
+ self.assertEqual(new.health.state, HealthStatus.HEALTHY)
+
+ def test_deep_update_container(self):
+
+ self.node.update('/adapters/0', Adapter(id='0', version='new'))
+
+ # root hash is now different
+ hash_new = self.node.latest.hash
+ self.assertNotEqual(self.hash_orig, hash_new)
+
+ # original tree is still intact
+ orig = self.node.get(hash=self.hash_orig, deep=1)
+ self.assertEqual(orig, self.base_deep)
+ self.assertEqual(orig.adapters[0].id, '0')
+
+ # but the new tree contains the change
+ new = self.node.get(deep=1)
+ self.assertNotEqual(new, orig)
+ self.assertEqual(new.adapters[0].version, 'new')
+
def test_update_handle_invalid_paths(self):
self.assertRaises(KeyError, self.node.update, 'foo', None)
self.assertRaises(KeyError, self.node.update, '/foo', None)
@@ -148,6 +295,14 @@
self.assertRaises(
ValueError, self.node.update, '/adapters/1', Adapter(id='changed'))
+ def test_add_node(self):
+ new = Adapter(id='new')
+ self.node.add('/adapters', new)
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(len(self.node.get('/adapters')), 6)
+ self.assertEqual(
+ len(self.node.get('/adapters', hash=self.hash_orig)), 5)
+ self.assertEqual(self.node.get('/adapters/new'), new)
def test_add_handle_invalid_cases(self):
# invalid paths
@@ -163,6 +318,13 @@
self.assertRaises(
ValueError, self.node.add, '/adapters', Adapter(id='1'))
+ def test_remove_node(self):
+ self.node.remove('/adapters/3')
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(len(self.node.get('/adapters')), 4)
+ self.assertEqual(
+ len(self.node.get('/adapters', hash=self.hash_orig)), 5)
+ self.assertRaises(KeyError, self.node.get, '/adapters/3')
def test_remove_handle_invalid_cases(self):
# invalid paths
@@ -174,10 +336,119 @@
# cannot add to non-container nodes
self.assertRaises(ValueError, self.node.remove, '/health')
+ def test_pruning_after_shallow_change(self):
+
+ self.node.update('/', VolthaInstance(version='10.1'))
+
+ # sanity check
+ self.assertEqual(len(self.node.revisions), 2)
+
+ # prune
+ self.node.prune_untagged()
+
+ self.assertEqual(len(self.node.revisions), 1)
+
+ # we can nevertheless access the whole tree
+ new = self.node.get('/', deep=1)
+ self.assertEqual(new.adapters, self.base_deep.adapters)
+ self.assertEqual(new.version, '10.1')
+
+ def test_pruning_after_deep_change(self):
+
+ self.node.update('/adapters/3', Adapter(id='3', version='changed'))
+
+ # sanity check
+ self.assertEqual(len(self.node.revisions), 2)
+
+ # prune
+ self.node.prune_untagged()
+
+ self.assertEqual(len(self.node.revisions), 1)
+
+ # we can nevertheless access the whole tree
+ new = self.node.get('/', deep=1)
+ self.assertEqual(len(new.adapters), 5)
+ self.assertEqual(new.adapters[2], self.base_deep.adapters[2])
+ self.assertEqual(new.adapters[3].version, 'changed')
+
class TestPruningPerformance(DeepTestsBase):
+ def test_repeated_prunning_keeps_memory_stable(self):
+ # The auto-pruning feature of the config system is that leaf nodes
+ # in the config tree that are no longer in use are pruned from memory,
+ # once they revision is removed from root using the .prune_untagged()
+ # method. This test is to verify that.
+ n = 1000
+
+ seed(0) # makes things consistently random
+
+ # this should be the number of nodes in the VolthaInstance tree
+ self.assertLess(rev_count(), 20)
+ print; print_metrics()
+
+ def mk_change():
+ key = str(randint(0, 4))
+ path = '/adapters/' + key
+ adapter = self.node.get(path)
+ adapter.version = 'v{}'.format(randint(0, 100000))
+ self.node.update(path, adapter)
+
+ # first we perform many changes without pruning
+ for i in xrange(n):
+ mk_change()
+
+ # at this point we shall have more than 2*n revs laying around
+ self.assertGreater(rev_count(), 2 * n)
+ print_metrics()
+
+ # prune now
+ self.node.prune_untagged()
+
+ # at this point the rev count shall fall back to the original
+ self.assertLess(rev_count(), 15)
+ print_metrics()
+
+ # no make an additional set of modifications while constantly pruning
+ for i in xrange(n):
+ mk_change()
+ self.node.prune_untagged()
+
+ # the rev count should not have increased
+ self.assertLess(rev_count(), 15)
+ print_metrics()
+
+ def test_churn_efficiency(self):
+ # Two config revisions that hash to the same hash value also share the
+ # same in-memory object. So if the same config node goes through churn
+ # (like flip-flopping fields), we don't eat up memory unnecessarily.
+ # This test is to verify that behavior.
+
+ n = 1000
+ modulo = 2
+
+ self.assertEqual(rev_count(), 14)
+ print_metrics()
+
+ def mk_change(seq):
+ # make change module of the sequence number so we periodically
+ # return to the same config
+ path = '/adapters/3'
+ adapter = self.node.get(path)
+ adapter.version = 'v{}'.format(seq % modulo)
+ self.node.update(path, adapter)
+
+ # make n changes back and forth
+ for i in xrange(n):
+ _tmp_rc = rev_count()
+ mk_change(i)
+
+ _tmp_rc = rev_count()
+ # verify that the node count did not increase significantly, yet we
+ # have access to all ditinct revisions
+ self.assertEqual(rev_count(), 20)
+ print_metrics()
def test_strict_read_only(self):
# it shall not be possible to change a read-only field
@@ -491,7 +762,264 @@
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
+ def test_transaction_commit(self):
+ """Once committed, changes become latest"""
+ proxy = self.node.get_proxy('/')
+ _latest_root_rev = self.node._branches[None].latest
+ adapter_node = _latest_root_rev._children['adapters'][2].node
+ tx = proxy.open_transaction()
+
+ # publicly visible value before change
+ path = '/adapters/2'
+ self.assertEqual(proxy.get(path).config.log_level, 3)
+ self.assertEqual(self.node.get(path).config.log_level, 3)
+
+ # make the change, but not commit yet
+ self.make_change(tx, path, 'config.log_level', 4)
+ self.assertEqual(proxy.get(path).config.log_level, 3)
+ self.assertEqual(self.node.get(path).config.log_level, 3)
+
+ # commit the change
+ tx.commit()
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(proxy.get('/adapters/2').config.log_level, 4)
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+ self.assertEqual(proxy.get(path).config.log_level, 4)
+ self.assertEqual(self.node.get(path).config.log_level, 4)
+
+ def test_collision_detection(self):
+ """Correctly detect transaction collision and abort the 2nd tx"""
+
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ path = '/adapters/2'
+ self.make_change(tx1, path, 'config.log_level', 0)
+
+ # make another tx before tx1 is committed
+ tx2 = proxy.open_transaction()
+ print tx2._txid
+ self.make_change(tx2, path, 'config.log_level', 4)
+
+ # commit first
+ print tx1._txid
+ tx1.commit()
+
+ # committing 2nd one should fail
+ self.assertRaises(MergeConflictException, tx2.commit)
+ self.check_no_tx_branches()
+
+ def test_nonconfliciting_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/1', 'config.log_level', 1)
+ self.make_change(tx2, '/adapters/2', 'config.log_level', 2)
+ tx1.commit()
+ tx2.commit()
+ self.assertEqual(self.log_levels(), {
+ '0': 3, '1': 1, '2': 2, '3': 3, '4': 3
+ })
+
+ def test_additive_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx1.add('/adapters', Adapter(id='new'))
+ tx1.add('/adapters', Adapter(id='new2'))
+ self.assertEqual(len(proxy.get('/adapters')), 5)
+ self.assertEqual(len(self.node.get('/adapters')), 5)
+ self.assertEqual(len(tx1.get('/adapters')), 7)
+ tx1.commit()
+ self.assertEqual(len(proxy.get('/adapters')), 7)
+ self.assertEqual(len(self.node.get('/adapters')), 7)
+ self.assertEqual(self.log_levels().keys(),
+ ['0', '1', '2', '3', '4', 'new', 'new2'])
+
+ def test_remove_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx1.remove('/adapters/2')
+ tx1.remove('/adapters/4')
+ self.assertEqual(len(proxy.get('/adapters')), 5)
+ self.assertEqual(len(self.node.get('/adapters')), 5)
+ self.assertEqual(len(tx1.get('/adapters')), 3)
+ tx1.commit()
+ self.assertEqual(len(proxy.get('/adapters')), 3)
+ self.assertEqual(len(self.node.get('/adapters')), 3)
+ self.assertEqual(self.log_levels().keys(), ['0', '1', '3'])
+
+ def test_mixed_add_remove_update_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/2', 'config.log_level', 2)
+ tx1.remove('/adapters/0')
+ tx1.add('/adapters', Adapter(id='new'))
+ tx1.remove('/adapters/4')
+ tx1.add('/adapters', Adapter(id='new2'))
+ tx1.add('/adapters', Adapter(id='new3'))
+ self.assertEqual(len(proxy.get('/adapters')), 5)
+ self.assertEqual(len(self.node.get('/adapters')), 5)
+ self.assertEqual(len(tx1.get('/adapters')), 6)
+ tx1.commit()
+ self.assertEqual(len(proxy.get('/adapters')), 6)
+ self.assertEqual(len(self.node.get('/adapters')), 6)
+ self.assertEqual(self.log_levels(), {
+ '1': 3, '2': 2, '3': 3, 'new': 0, 'new2': 0, 'new3': 0
+ })
+
+ def test_compatible_updates(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx4 = proxy.open_transaction()
+ tx5 = proxy.open_transaction()
+ tx1.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
+ self.make_change(tx2, '/adapters/1', 'version', '42')
+ self.make_change(tx3, '/adapters/2', 'config.log_level', 2)
+ self.make_change(tx4, '/adapters/1', 'version', '42')
+ self.make_change(tx5, '/adapters/1', 'version', '422')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ tx4.commit()
+ self.assertRaises(MergeConflictException, tx5.commit)
+
+ # verify outcome
+ self.assertEqual(self.node.get('/health').state, 1)
+ self.assertEqual(self.node.get('/', deep=1).adapters[1].version, '42')
+ self.assertEqual(self.log_levels(), {
+ '0': 3, '1': 3, '2': 2, '3': 3, '4': 3
+ })
+
+ def test_conflciting_updates(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
+ self.make_change(tx2, '/adapters/1', 'version', '42')
+ self.make_change(tx3, '/adapters/1', 'config.log_level', 2)
+ tx1.commit()
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx3.commit)
+
+ # verify outcome
+ self.assertEqual(self.node.get('/health').state, 1)
+ self.assertEqual(self.node.get('/', deep=1).adapters[1].version, '42')
+ self.assertEqual(self.log_levels(), {
+ '0': 3, '1': 3, '2': 3, '3': 3, '4': 3
+ })
+
+ def test_compatible_adds(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.add('/adapters', Adapter(id='new1'))
+ tx2.add('/adapters', Adapter(id='new2'))
+ tx3.add('/adapters', Adapter(id='new3'))
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), [
+ '0', '1', '2', '3', '4', 'new1', 'new2', 'new3'
+ ])
+
+ def test_colliding_adds(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx4 = proxy.open_transaction()
+ tx1.add('/adapters', Adapter(id='new1'))
+ tx2.add('/adapters', Adapter(id='new2'))
+ tx3.add('/adapters', Adapter(id='new1', version='foobar'))
+ tx4.add('/adapters', Adapter(id='new1'))
+ tx1.commit()
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx3.commit)
+ tx4.commit() # is fine since it added the same data
+ self.assertEqual(self.log_levels().keys(), [
+ '0', '1', '2', '3', '4', 'new1', 'new2'
+ ])
+
+ def test_compatible_removes(self):
+ # removes are always compatible with each other
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.remove('/adapters/0')
+ tx2.remove('/adapters/3')
+ tx3.remove('/adapters/0')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), ['1', '2', '4'])
+
+ def test_update_remove_conflict(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/0', 'version', '42')
+ tx1.remove('/adapters/1')
+ self.make_change(tx2, '/adapters/1', 'version', '13')
+ tx3.remove('/adapters/0')
+ tx1.commit()
+ self.assertRaises(MergeConflictException, tx2.commit)
+ self.assertRaises(MergeConflictException, tx3.commit)
+ self.assertEqual(self.log_levels().keys(), ['0', '2', '3', '4'])
+
+ def test_compatible_update_remove_mix(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/0', 'version', '42')
+ tx1.remove('/adapters/1')
+ self.make_change(tx2, '/adapters/2', 'version', '13')
+ tx3.remove('/adapters/3')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), ['0', '2', '4'])
+
+ def test_update_add_mix(self):
+ # at same nodes updates are always compatible with adds
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/0', 'config.log_level', 4)
+ self.make_change(tx1, '/adapters/2', 'config.log_level', 4)
+ tx2.add('/adapters', Adapter(id='new1'))
+ tx3.add('/adapters', Adapter(id='new2'))
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), [
+ '0', '1', '2', '3', '4', 'new1', 'new2'
+ ])
+
+ def test_remove_add_mix(self):
+ # at same node, adds are always compatible with removes
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.remove('/adapters/0')
+ tx2.add('/adapters', Adapter(id='new1'))
+ tx3.add('/adapters', Adapter(id='new2'))
+ tx1.remove('/adapters/4')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), [
+ '1', '2', '3', 'new1', 'new2'
+ ])
def make_complex_changes(self):
@@ -542,7 +1070,38 @@
# - tx5 conflicts with tx3
return tx1, tx2, tx3, tx4, tx5
+ def test_complex_changes_seq1(self):
+ tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertRaises(MergeConflictException, tx4.commit)
+ self.assertRaises(MergeConflictException, tx5.commit)
+ self.assertEqual(self.log_levels(), {
+ '0': 1, '1': 1, '3': 0, '4': 3, 'new1': 0, 'new2': 0
+ })
+ def test_complex_changes_seq2(self):
+ tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
+ tx5.commit()
+ tx4.commit()
+ self.assertRaises(MergeConflictException, tx3.commit)
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx1.commit)
+ self.assertEqual(self.log_levels(), {
+ '0': 0, '1': 4, '2': 3, '3': 0, '4': 3, 'new2': 0
+ })
+
+ def test_complex_changes_seq3(self):
+ tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
+ tx4.commit()
+ tx3.commit()
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx1.commit)
+ self.assertRaises(MergeConflictException, tx5.commit)
+ self.assertEqual(self.log_levels(), {
+ '0': 0, '1': 1, '2': 3, '3': 0, '4': 3, 'new2': 0
+ })
def test_canceling_adds(self):
proxy = self.node.get_proxy('/')
@@ -552,10 +1111,116 @@
tx.cancel()
self.assertEqual(self.log_levels().keys(), ['0', '1', '2', '3', '4'])
+ def test_nested_adds(self):
+ self.node.add('/logical_devices', LogicalDevice(id='0'))
+ self.node.add('/logical_devices', LogicalDevice(id='1'))
+ proxy0 = self.node.get_proxy('/logical_devices/0')
+ proxy1 = self.node.get_proxy('/logical_devices/1')
+ tx0 = proxy0.open_transaction()
+ tx1 = proxy1.open_transaction()
+
+ tx0.add('/ports', LogicalPort(
+ id='0', ofp_port=ofp_port(port_no=0, name='/0')))
+ tx0.add('/ports', LogicalPort(
+ id='1', ofp_port=ofp_port(port_no=1, name='/1')))
+ tx1.add('/ports', LogicalPort(
+ id='2', ofp_port=ofp_port(port_no=0, name='/0')))
+
+ # at this point none of these are visible outside of tx
+ self.assertEqual(len(proxy0.get('/', deep=1).ports), 0)
+ self.assertEqual(len(proxy1.get('/', deep=1).ports), 0)
+
+ tx0.commit()
+ self.assertEqual(len(proxy0.get('/', deep=1).ports), 2)
+ self.assertEqual(len(proxy1.get('/', deep=1).ports), 0)
+
+ tx1.commit()
+ self.assertEqual(len(proxy0.get('/', deep=1).ports), 2)
+ self.assertEqual(len(proxy1.get('/', deep=1).ports), 1)
+
+ def test_nested_removes(self):
+ self.node.add('/logical_devices', LogicalDevice(id='0'))
+ proxy0 = self.node.get_proxy('/logical_devices/0')
+
+ # add some ports to a device
+ tx0 = proxy0.open_transaction()
+ for i in xrange(10):
+ tx0.add('/ports', LogicalPort(
+ id=str(i), ofp_port=ofp_port(port_no=i, name='/{}'.format(i))))
+ # self.assertRaises(ValueError, tx0.add, '/ports', LogicalPort(id='1'))
+ tx0.commit()
+
+ # now to the removal
+
+ tx0 = proxy0.open_transaction()
+ tx0.remove('/ports/0')
+ tx0.remove('/ports/5')
+
+ tx1 = proxy0.open_transaction()
+ tx1.remove('/ports/2')
+ tx1.remove('/ports/7')
+
+ tx0.commit()
+ tx1.commit()
+
+ port_ids = [
+ p.ofp_port.port_no for p
+ in self.node.get(deep=1).logical_devices[0].ports
+ ]
+ self.assertEqual(port_ids, [1, 3, 4, 6, 8, 9])
# TODO need more tests to hammer out potential issues with transactions \
# on nested nodes
+ def test_transactions_defer_post_op_callbacks(self):
+
+ proxy = self.node.get_proxy('/')
+
+ pre_update = Mock()
+ post_update = Mock()
+ pre_add = Mock()
+ post_add = Mock()
+ pre_remove = Mock()
+ post_remove = Mock()
+
+ proxy.register_callback(CallbackType.PRE_UPDATE, pre_update)
+ proxy.register_callback(CallbackType.POST_UPDATE, post_update)
+ proxy.register_callback(CallbackType.PRE_ADD, pre_add)
+ proxy.register_callback(CallbackType.POST_ADD, post_add)
+ proxy.register_callback(CallbackType.PRE_REMOVE, pre_remove)
+ proxy.register_callback(CallbackType.POST_REMOVE, post_remove)
+
+ tx = proxy.open_transaction()
+
+ # make some changes of each type
+ v = tx.get('/')
+ v.version = '42'
+ tx.update('/', v)
+ ad = tx.get('/adapters/1')
+ tx.remove('/adapters/1')
+ ld = LogicalDevice(id='1')
+ tx.add('/logical_devices', ld)
+
+ # each pre_* should have been called exactly once, but none of the
+ # post_* callbacks have been called yet
+ pre_update.assert_called_once_with(v)
+ pre_add.assert_called_once_with(ld)
+ pre_remove.assert_called_once_with(ad)
+ post_update.assert_not_called()
+ post_add.assert_not_called()
+ post_remove.assert_not_called()
+
+ # once we commit, we shall get the other callbacks
+ tx.commit()
+ post_update.assert_called_once_with(v)
+ post_add.assert_called_once_with(ld)
+ # OperationContext(
+ # data=ld,
+ # field_name='logical_devices',
+ # child_key='1'
+ # ))
+ post_remove.assert_called_once_with(ad)
+
if __name__ == '__main__':
main()
diff --git a/tests/utests/voltha/core/config/test_persistence.py b/tests/utests/voltha/core/config/test_persistence.py
new file mode 100644
index 0000000..a9a871c
--- /dev/null
+++ b/tests/utests/voltha/core/config/test_persistence.py
@@ -0,0 +1,109 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from copy import copy
+from random import randint, seed
+from time import time
+from unittest import main, TestCase
+import json
+
+from voltha.core.config.config_root import ConfigRoot
+from voltha.protos.openflow_13_pb2 import ofp_desc
+from voltha.protos.voltha_pb2 import VolthaInstance, HealthStatus, Adapter, \
+ AdapterConfig, LogicalDevice
+
+
+n_adapters = 1000
+n_logical_nodes = 1000
+
+
+class TestPersistence(TestCase):
+
+ def pump_some_data(self, node):
+ seed(0)
+ node.update('/', VolthaInstance(
+ instance_id='1',
+ version='42',
+ log_level=1
+ ))
+ node.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
+ for i in xrange(n_adapters):
+ node.add('/adapters', Adapter(
+ id=str(i),
+ vendor='cord',
+ version=str(randint(1, 10)),
+ config=AdapterConfig(
+ log_level=0
+ )
+ ))
+ for i in xrange(n_logical_nodes):
+ node.add('/logical_devices', LogicalDevice(
+ id=str(i),
+ datapath_id=randint(1, 100000),
+ desc=ofp_desc(
+ mfr_desc='foo',
+ hw_desc='bar',
+ sw_desc='zoo'
+ )
+ ))
+
+ def test_inmemory_kv_store(self):
+ t0 = [time()]
+ def pt(msg=''):
+ t1 = time()
+ print '%20.8f ms - %s' % (1000 * (t1 - t0[0]), msg)
+ t0[0] = t1
+
+ kv_store = dict()
+
+ # create node and pump data
+ node = ConfigRoot(VolthaInstance(), kv_store=kv_store)
+ node.tag('original')
+ pt('init')
+ self.pump_some_data(node)
+ pt('pump')
+ node.tag('pumped')
+
+ # check that content of kv_store looks ok
+ size1 = len(kv_store)
+ self.assertEqual(size1, 14 + 3 * (n_adapters + n_logical_nodes))
+
+ # this should actually drop if we pune
+ node.prune_untagged()
+ pt('prunning')
+
+ size2 = len(kv_store)
+ self.assertEqual(size2, 7 + 2 * (1 + 1 + n_adapters + n_logical_nodes) + 2)
+ all_latest_data = node.get('/', deep=1)
+ pt('deep get')
+
+ # save dict so that deleting the node will not wipe it
+ latest_hash = node.latest.hash
+ kv_store = copy(kv_store)
+ pt('copy kv store')
+ del node
+ pt('delete node')
+ # self.assertEqual(size2, 1 + 2 * (1 + 1 + n_adapters + n_logical_nodes))
+
+ self.assertEqual(json.loads(kv_store['root'])['latest'], latest_hash)
+ # recreate tree from persistence
+ node = ConfigRoot.load(VolthaInstance, kv_store)
+ pt('load from kv store')
+ self.assertEqual(node.get('/', deep=1), all_latest_data)
+ pt('deep get')
+ self.assertEqual(latest_hash, node.latest.hash)
+ self.assertEqual(node.tags, ['original', 'pumped'])
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index b858aef..ab73484 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -303,17 +303,6 @@
# data already saved in the db (with that hash) to be erased
if rev.hash not in branch._revs:
branch._revs[rev.hash] = rev
- # Removing old revs (only keep latest)
- old_revs_hash = []
- for hash in branch._revs:
- if hash != rev.hash:
- old_revs_hash.append(hash)
- for hash in old_revs_hash:
- try:
- del branch._revs[hash]
- log.debug('removing rev from branch', rev_hash=hash)
- except Exception as e:
- log.error('delete rev error', error=e)
if not branch._latest or rev.hash != branch._latest.hash:
branch._latest = rev
diff --git a/voltha/core/config/config_root.py b/voltha/core/config/config_root.py
index 1e5016e..4b1006d 100644
--- a/voltha/core/config/config_root.py
+++ b/voltha/core/config/config_root.py
@@ -82,12 +82,6 @@
try:
self._merge_txbranch(txid)
- log.debug('branches of this node', branches=self._branches.keys())
- # Remove transaction branch if default is there
- if None in self._branches:
- self.del_txbranch(txid)
- del self._branches[txid]
-
finally:
self.execute_deferred_callbacks()