blob: 556cdd950b41a498098661b9d7bbd5601e937f41 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from copy import copy
import resource
from random import randint, seed
from time import time
from unittest import main, TestCase
import gc
from google.protobuf.json_format import MessageToDict
from mock import Mock
from simplejson import dumps
from common.event_bus import EventBusClient
from voltha.core.config.config_proxy import CallbackType
from voltha.core.config.config_rev import _rev_cache
from voltha.core.config.config_root import ConfigRoot, MergeConflictException
from voltha.core.config.config_txn import ClosedTransactionError
from voltha.protos import third_party
from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
from voltha.protos.openflow_13_pb2 import ofp_port
from voltha.protos.voltha_pb2 import VolthaInstance, Adapter, HealthStatus, \
AdapterConfig, LogicalDevice, LogicalPort
def memusage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
def rev_count():
return len(_rev_cache)
def probe():
return time(), memusage() / 1024. / 1024, rev_count()
def print_metrics():
print '%20f %20f %8d' % probe()
class TestConfigNodeShallow(TestCase):
def setUp(self):
self.empty = VolthaInstance()
self.other = VolthaInstance(instance_id='other')
self.node = ConfigRoot(VolthaInstance())
def test_init(self):
pass
def test_immutability(self):
self.assertEqual(self.node.latest.data, self.empty)
self.empty.instance_id = 'overwritten id'
self.assertEqual(self.node.latest.data, VolthaInstance())
def test_retrieve_latest(self):
self.assertEqual(self.node.latest.data, self.empty)
hash = self.node.latest.hash
self.assertEqual(self.node.revisions, [hash])
def test_update(self):
hash0 = self.node.latest.hash
self.node.update('/', copy(self.other))
hash1 = self.node.latest.hash
self.assertEqual(len(self.node.revisions), 2)
self.assertNotEqual(hash0, hash1)
self.assertEqual(self.node.latest.data, VolthaInstance(instance_id='other'))
def test_update_with_bad_data(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
def test_many_simple_updates(self):
n = 1000
for i in xrange(n):
self.node.update('/', VolthaInstance(instance_id='id%d' % i))
self.node.update('/', self.other)
self.assertEqual(len(self.node.revisions), 1002)
self.assertEqual(self.node.latest.data, self.other)
def test_retrieve_by_rev_hash(self):
n = 1000
for i in xrange(n):
self.node.update('/', VolthaInstance(instance_id='id%d' % i))
self.node.update('/', self.other)
hashes = self.node.revisions
self.assertEqual(self.node[hashes[0]].data, self.empty)
self.assertEqual(self.node[hashes[10]].data, VolthaInstance(instance_id='id9'))
self.assertEqual(self.node[hashes[-1]].data, self.other)
def test_diffs(self):
self.node.update('/', self.other)
self.assertEqual(self.node.diff(self.node.latest.hash).patch, [])
hashes = self.node.revisions
self.assertEqual(self.node.diff(hashes[0]).patch, [
dict(op='replace', path='/instance_id', value='other')
])
self.assertEqual(self.node.diff(hashes[0], hashes[1]).patch, [
dict(op='replace', path='/instance_id', value='other')
])
self.assertEqual(self.node.diff(hashes[1], hashes[0]).patch, [
dict(op='replace', path='/instance_id', value='')
])
self.assertEqual(self.node.diff(hashes[1], hashes[1]).patch, [])
def test_tagging(self):
self.node.tag('original')
hash1 = self.node.latest.hash
# add a bunch of changes
for a in xrange(10):
self.node.update('/', VolthaInstance(instance_id=str(a)))
hash2 = self.node.latest.hash
# apply tag to latest
self.node.tag('latest')
# apply another tag to latest
self.node.tag('other')
# apply tag to specific rev hash
self.node.tag('yetmore', hash2)
# invalid hash
self.assertRaises(KeyError, self.node.tag, 'sometag', 'badhash')
# retrieve data based on tag
self.assertEqual(self.node.by_tag('original').hash, hash1)
self.assertEqual(self.node.by_tag('latest').hash, hash2)
self.assertEqual(self.node.by_tag('other').hash, hash2)
self.assertEqual(self.node.by_tag('yetmore').hash, hash2)
# generate diff from tags
self.assertEqual(self.node.diff_by_tag('original', 'latest').patch, [
dict(op='replace', path='/instance_id', value='9')
])
# move tags to given revision
self.node.tag('original', self.node.revisions[2])
self.node.tag('latest', self.node.revisions[9])
# add another tag
self.node.tag('another', self.node.revisions[7])
# list tags
self.assertEqual(self.node.tags,
['another', 'latest', 'original', 'other', 'yetmore'])
# delete a tag
self.node.delete_tag('another')
self.node.delete_tags('yetmore', 'other')
self.assertEqual(self.node.tags, ['latest', 'original'])
# prune untagged revisions from revision list
self.node.prune_untagged()
self.assertEqual(len(self.node.revisions), 3) # latest is always kept
# retrieve and compare working tagged revs
self.assertEqual(self.node.diff_by_tag('original', 'latest').patch, [
dict(op='replace', path='/instance_id', value='8')
])
class DeepTestsBase(TestCase):
"""Shared test class for test using a simple node tree"""
def setUp(self):
gc.collect()
_rev_cache.clear()
self.health = HealthStatus(state=HealthStatus.DYING)
self.base_shallow = VolthaInstance(instance_id='1')
self.base_deep = copy(self.base_shallow)
self.base_deep.health.state = HealthStatus.DYING # = self.health
for i in xrange(5):
self.base_deep.adapters.add().MergeFrom(Adapter(
id=str(i),
config=AdapterConfig(
log_level=3
)
))
self.node = ConfigRoot(self.base_deep)
self.hash_orig = self.node.latest.hash
def tearDown(self):
del self.node
class TestConfigNodeDeep(DeepTestsBase):
def test_init(self):
pass
def test_reject_duplicate_keys(self):
data = VolthaInstance(
instance_id='42', adapters=[Adapter(id='same') for _ in xrange(5)])
self.assertRaises(ValueError, ConfigRoot, data)
def test_shallow_get(self):
self.assertEqual(self.node.latest.data, self.base_shallow)
self.assertEqual(self.node.get(), self.base_shallow)
self.assertEqual(self.node.get(hash=self.hash_orig), self.base_shallow)
def test_deep_get(self):
self.assertEqual(self.node.get(deep=True), self.base_deep)
def test_top_level_update(self):
# test that top-level update retains children
self.node.update('/', VolthaInstance(version='1.2.3'))
hash_new = self.node.latest.hash
self.assertNotEqual(self.hash_orig, hash_new)
self.assertEqual(self.node.get(
hash=self.hash_orig, deep=1), self.base_deep)
latest = self.node.get(deep=1)
self.assertNotEqual(latest, self.base_deep)
self.assertEqual(len(latest.adapters), 5)
self.assertEqual(len(latest.logical_devices), 0)
self.assertEqual(latest.version, '1.2.3')
def test_path_based_get_access(self):
self.assertEqual(self.node.get(path='/'), self.node.get())
self.assertEqual(self.node.get(path='/health'), self.health)
def test_path_list_retrieval(self):
adapters = self.node.get(path='/adapters')
self.assertEqual(len(adapters), 5)
self.assertEqual(adapters[2].id, '2')
def test_indexing_into_containers(self):
adapter = self.node.get(path='/adapters/3')
self.assertEqual(adapter.id, '3')
def test_deep_update_non_container(self):
self.node.update('/health', HealthStatus(state=HealthStatus.HEALTHY))
# root hash is now different
hash_new = self.node.latest.hash
self.assertNotEqual(self.hash_orig, hash_new)
# original tree is still intact
orig = self.node.get(hash=self.hash_orig, deep=1)
self.assertEqual(orig, self.base_deep)
self.assertEqual(orig.health.state, HealthStatus.DYING)
# but the latest contains the change
new = self.node.get(deep=1)
self.assertNotEqual(new, orig)
self.assertEqual(new.health.state, HealthStatus.HEALTHY)
def test_deep_update_container(self):
self.node.update('/adapters/0', Adapter(id='0', version='new'))
# root hash is now different
hash_new = self.node.latest.hash
self.assertNotEqual(self.hash_orig, hash_new)
# original tree is still intact
orig = self.node.get(hash=self.hash_orig, deep=1)
self.assertEqual(orig, self.base_deep)
self.assertEqual(orig.adapters[0].id, '0')
# but the new tree contains the change
new = self.node.get(deep=1)
self.assertNotEqual(new, orig)
self.assertEqual(new.adapters[0].version, 'new')
def test_update_handle_invalid_paths(self):
self.assertRaises(KeyError, self.node.update, 'foo', None)
self.assertRaises(KeyError, self.node.update, '/foo', None)
self.assertRaises(KeyError, self.node.update, '/health/foo', None)
self.assertRaises(ValueError, self.node.update, '/adapters', None)
self.assertRaises(KeyError, self.node.update, '/adapters/foo', None)
self.assertRaises(KeyError, self.node.update, '/adapters/1/foo', None)
def test_update_handle_invalid_type(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
self.assertRaises(ValueError, self.node.update, '/health', Adapter())
self.assertRaises(ValueError, self.node.update, '/adapters/1', VolthaInstance())
def test_update_handle_key_change_attempt(self):
self.assertRaises(
ValueError, self.node.update, '/adapters/1', Adapter(id='changed'))
def test_add_node(self):
new = Adapter(id='new')
self.node.add('/adapters', new)
self.assertNotEqual(self.node.latest.hash, self.hash_orig)
self.assertEqual(len(self.node.get('/adapters')), 6)
self.assertEqual(
len(self.node.get('/adapters', hash=self.hash_orig)), 5)
self.assertEqual(self.node.get('/adapters/new'), new)
def test_add_handle_invalid_cases(self):
# invalid paths
self.assertRaises(KeyError, self.node.add, 'foo', None)
self.assertRaises(KeyError, self.node.add, '/foo', None)
self.assertRaises(KeyError, self.node.add, '/adapters/foo', None)
# cannot add to non-container nodes
self.assertRaises(ValueError, self.node.add, '/health', None)
self.assertRaises(ValueError, self.node.add, '/adapters/1', None)
# cannot add to container data with duplicate key
self.assertRaises(
ValueError, self.node.add, '/adapters', Adapter(id='1'))
def test_remove_node(self):
self.node.remove('/adapters/3')
self.assertNotEqual(self.node.latest.hash, self.hash_orig)
self.assertEqual(len(self.node.get('/adapters')), 4)
self.assertEqual(
len(self.node.get('/adapters', hash=self.hash_orig)), 5)
self.assertRaises(KeyError, self.node.get, '/adapters/3')
def test_remove_handle_invalid_cases(self):
# invalid paths
self.assertRaises(KeyError, self.node.remove, 'foo')
self.assertRaises(KeyError, self.node.remove, '/foo')
self.assertRaises(KeyError, self.node.remove, '/adapters/foo')
self.assertRaises(KeyError, self.node.remove, '/adapters/1/id')
# cannot add to non-container nodes
self.assertRaises(ValueError, self.node.remove, '/health')
def test_pruning_after_shallow_change(self):
self.node.update('/', VolthaInstance(version='10.1'))
# sanity check
self.assertEqual(len(self.node.revisions), 2)
# prune
self.node.prune_untagged()
self.assertEqual(len(self.node.revisions), 1)
# we can nevertheless access the whole tree
new = self.node.get('/', deep=1)
self.assertEqual(new.adapters, self.base_deep.adapters)
self.assertEqual(new.version, '10.1')
def test_pruning_after_deep_change(self):
self.node.update('/adapters/3', Adapter(id='3', version='changed'))
# sanity check
self.assertEqual(len(self.node.revisions), 2)
# prune
self.node.prune_untagged()
self.assertEqual(len(self.node.revisions), 1)
# we can nevertheless access the whole tree
new = self.node.get('/', deep=1)
self.assertEqual(len(new.adapters), 5)
self.assertEqual(new.adapters[2], self.base_deep.adapters[2])
self.assertEqual(new.adapters[3].version, 'changed')
class TestPruningPerformance(DeepTestsBase):
def test_repeated_prunning_keeps_memory_stable(self):
# The auto-pruning feature of the config system is that leaf nodes
# in the config tree that are no longer in use are pruned from memory,
# once they revision is removed from root using the .prune_untagged()
# method. This test is to verify that.
n = 1000
seed(0) # makes things consistently random
# this should be the number of nodes in the VolthaInstance tree
self.assertLess(rev_count(), 20)
print; print_metrics()
def mk_change():
key = str(randint(0, 4))
path = '/adapters/' + key
adapter = self.node.get(path)
adapter.version = 'v{}'.format(randint(0, 100000))
self.node.update(path, adapter)
# first we perform many changes without pruning
for i in xrange(n):
mk_change()
# at this point we shall have more than 2*n revs laying around
self.assertGreater(rev_count(), 2 * n)
print_metrics()
# prune now
self.node.prune_untagged()
# at this point the rev count shall fall back to the original
self.assertLess(rev_count(), 15)
print_metrics()
# no make an additional set of modifications while constantly pruning
for i in xrange(n):
mk_change()
self.node.prune_untagged()
# the rev count should not have increased
self.assertLess(rev_count(), 15)
print_metrics()
def test_churn_efficiency(self):
# Two config revisions that hash to the same hash value also share the
# same in-memory object. So if the same config node goes through churn
# (like flip-flopping fields), we don't eat up memory unnecessarily.
# This test is to verify that behavior.
n = 1000
modulo = 2
self.assertEqual(rev_count(), 14)
print_metrics()
def mk_change(seq):
# make change module of the sequence number so we periodically
# return to the same config
path = '/adapters/3'
adapter = self.node.get(path)
adapter.version = 'v{}'.format(seq % modulo)
self.node.update(path, adapter)
# make n changes back and forth
for i in xrange(n):
_tmp_rc = rev_count()
mk_change(i)
_tmp_rc = rev_count()
# verify that the node count did not increase significantly, yet we
# have access to all ditinct revisions
self.assertEqual(rev_count(), 20)
print_metrics()
def test_strict_read_only(self):
# it shall not be possible to change a read-only field
self.assertRaises(ValueError, self.node.update,
'/', VolthaInstance(version='foo'), strict=True)
self.assertRaises(ValueError, self.node.update,
'/adapters/1', Adapter(version='foo'), strict=True)
class TestNodeOwnershipAndHooks(DeepTestsBase):
def test_init(self):
pass
def test_passive_ownership(self):
# grab a proxy for a given node
proxy = self.node.get_proxy('/health')
# able to read the value directly using this proxy
self.assertEqual(proxy.get(), HealthStatus(state=HealthStatus.DYING))
# able to update value directly using this proxy, but the whole tree
# updates
proxy.update('/', HealthStatus(state=HealthStatus.HEALTHY))
self.assertEqual(proxy.get().state, HealthStatus.HEALTHY)
self.assertNotEqual(self.node.latest.hash, self.hash_orig)
# access constraints are still enforced
self.assertRaises(
ValueError, proxy.update,
'/', HealthStatus(state=HealthStatus.OVERLOADED), strict=1)
def test_exclusivity(self):
proxy = self.node.get_proxy('/adapters/1', exclusive=True)
self.assertRaises(ValueError, self.node.get_proxy, '/adapters/1')
def test_get_hook(self):
proxy = self.node.get_proxy('/health')
# getting health without callback just returns what's stored in node
self.assertEqual(proxy.get().state, HealthStatus.DYING)
# register callback
def get_health_callback(msg):
msg.state = HealthStatus.OVERLOADED
return msg
proxy.register_callback(CallbackType.GET, get_health_callback)
# once registered, callback can touch up object
self.assertEqual(proxy.get().state, HealthStatus.OVERLOADED)
def test_pre_update_hook(self):
proxy = self.node.get_proxy('/adapters/1')
# before hook, change is allowed
adapter = proxy.get()
adapter.version = 'foo'
proxy.update('/', adapter)
# and for sanity, check if update made it through
self.assertEqual(self.node.get('/adapters/1').version, 'foo')
# regsiter hook that rejects all changes
def bully(msg):
raise RuntimeError('bully')
proxy.register_callback(CallbackType.PRE_UPDATE, bully)
# test that rejection applies
adapter.version = 'bar'
self.assertRaises(RuntimeError, proxy.update, '/', adapter)
self.assertRaises(RuntimeError, self.node.update, '/adapters/1', adapter)
def test_post_update_hook(self):
proxy = self.node.get_proxy('/adapters/1')
callback = Mock()
proxy.register_callback(CallbackType.POST_UPDATE, callback,
'zizi', 42, x=1, y='baz')
data = Adapter(id='1', version='zoo')
proxy.update('/', data)
callback.assert_called_once_with(data, 'zizi', 42, x=1, y='baz')
def test_pre_and_post_add_hooks(self):
proxy = self.node.get_proxy('/')
pre_callback = Mock()
post_callback = Mock()
proxy.register_callback(CallbackType.PRE_ADD, pre_callback)
proxy.register_callback(CallbackType.POST_ADD, post_callback)
new_adapter = Adapter(id='99', version='12.2', vendor='ace')
proxy.add('/adapters', new_adapter)
pre_callback.assert_called_with(new_adapter)
post_callback.assert_called_with(new_adapter)
def test_pre_and_post_remove_hooks(self):
proxy = self.node.get_proxy('/')
pre_callback = Mock()
post_callback = Mock()
proxy.register_callback(CallbackType.PRE_REMOVE, pre_callback)
proxy.register_callback(CallbackType.POST_REMOVE, post_callback)
adapter = proxy.get('/adapters/1') # so that we can verify callback
proxy.remove('/adapters/1')
pre_callback.assert_called_with(adapter)
post_callback.assert_called_with(adapter)
class TestEventLogic(DeepTestsBase):
def setUp(self):
super(TestEventLogic, self).setUp()
self.ebc = EventBusClient()
self.event_mock = Mock()
self.ebc.subscribe('model-change-events', self.event_mock)
def test_add_event(self):
data = Adapter(id='10', version='zoo')
self.node.add('/adapters', data)
event = ConfigEvent(
type=ConfigEventType.add,
hash=self.node.latest.hash,
data=dumps(MessageToDict(data, True, True))
)
self.event_mock.assert_called_once_with('model-change-events', event)
def test_remove_event(self):
data = Adapter(
id='1',
config=AdapterConfig(
log_level=3
)
)
self.node.remove('/adapters/1')
event = ConfigEvent(
type=ConfigEventType.remove,
hash=self.node.latest.hash,
data=dumps(MessageToDict(data, True, True))
)
self.event_mock.assert_called_once_with('model-change-events', event)
class TestTransactionalLogic(DeepTestsBase):
def make_change(self, tx, path, attr_name, new_value):
data = o = tx.get(path)
rest = attr_name
while 1:
subfield, _, rest = rest.partition('.')
if rest:
o = getattr(o, subfield)
attr_name = rest
else:
setattr(o, attr_name, new_value)
break
tx.update(path, data)
def check_no_tx_branches(self):
visited = set()
def check_node(n):
if n not in visited:
self.assertEqual(n._branches.keys(), [None])
for rev in n._branches[None]._revs.itervalues():
for children in rev._children.itervalues():
if isinstance(children, OrderedDict):
children = children.itervalues()
for child_rev in children:
child_node = child_rev.node
check_node(child_node)
visited.add(n)
check_node(self.node)
def log_levels(self):
return OrderedDict(
(a.id, a.config.log_level)
for a in self.node.get('/adapters', deep=1))
def tearDown(self):
self.check_no_tx_branches()
super(TestTransactionalLogic, self).tearDown()
def test_transaction_isolation(self):
"""
Test that changes made in a transaction mode are not visible to others
"""
proxy = self.node.get_proxy('/')
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
tx = proxy.open_transaction()
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 1)
path = '/adapters/2'
self.make_change(tx, path, 'config.log_level', 0)
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 2)
# verify that reading from the transaction exposes the change
self.assertEqual(tx.get(path).config.log_level, 0)
# but that reading from the proxy or directly from tree does not
self.assertEqual(self.node.latest.hash, self.hash_orig)
self.assertEqual(proxy.get(path).config.log_level, 3)
self.assertEqual(self.node.get(path).config.log_level, 3)
tx.cancel()
def test_cannot_reuse_tx(self):
proxy = self.node.get_proxy('/')
tx = proxy.open_transaction()
tx.cancel()
self.assertRaises(ClosedTransactionError, tx.get, '/')
self.assertRaises(ClosedTransactionError, tx.add, '/', None)
self.assertRaises(ClosedTransactionError, tx.remove, '/')
def test_multiple_concurrent_transactions(self):
"""
Test that two transactions can make independent changes, without
affecting each other, until committed.
"""
proxy1 = self.node.get_proxy('/')
proxy2 = self.node.get_proxy('/')
tx1 = proxy1.open_transaction()
tx2 = proxy1.open_transaction()
tx3 = proxy2.open_transaction()
path = '/adapters/2'
self.make_change(tx1, path, 'config.log_level', 0)
# the other transaction does not see the change
self.assertEqual(tx2.get(path).config.log_level, 3)
self.assertEqual(tx3.get(path).config.log_level, 3)
self.assertEqual(proxy1.get(path).config.log_level, 3)
self.assertEqual(proxy2.get(path).config.log_level, 3)
# we can attempt to make change in other txs
self.make_change(tx2, path, 'config.log_level', 1)
self.make_change(tx3, path, 'config.log_level', 2)
# each can see its own tree, but no one else can see theirs
self.assertEqual(tx1.get(path).config.log_level, 0)
self.assertEqual(tx2.get(path).config.log_level, 1)
self.assertEqual(tx3.get(path).config.log_level, 2)
self.assertEqual(proxy1.get(path).config.log_level, 3)
self.assertEqual(proxy2.get(path).config.log_level, 3)
self.assertEqual(self.node.latest.hash, self.hash_orig)
tx1.cancel()
tx2.cancel()
tx3.cancel()
def test_transaction_canceling(self):
"""After abort, transaction is no longer stored"""
proxy = self.node.get_proxy('/')
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
tx = proxy.open_transaction()
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 1)
self.make_change(tx, '/adapters/2', 'config.log_level', 4)
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 2)
del tx
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
def test_transaction_explitic_canceling(self):
"""After abort, transaction is no longer stored"""
proxy = self.node.get_proxy('/')
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
tx = proxy.open_transaction()
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 1)
self.make_change(tx, '/adapters/2', 'config.log_level', 4)
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 2)
tx.cancel()
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
def test_transaction_commit(self):
"""Once committed, changes become latest"""
proxy = self.node.get_proxy('/')
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
tx = proxy.open_transaction()
# publicly visible value before change
path = '/adapters/2'
self.assertEqual(proxy.get(path).config.log_level, 3)
self.assertEqual(self.node.get(path).config.log_level, 3)
# make the change, but not commit yet
self.make_change(tx, path, 'config.log_level', 4)
self.assertEqual(proxy.get(path).config.log_level, 3)
self.assertEqual(self.node.get(path).config.log_level, 3)
# commit the change
tx.commit()
self.assertNotEqual(self.node.latest.hash, self.hash_orig)
self.assertEqual(proxy.get('/adapters/2').config.log_level, 4)
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
self.assertEqual(proxy.get(path).config.log_level, 4)
self.assertEqual(self.node.get(path).config.log_level, 4)
def test_collision_detection(self):
"""Correctly detect transaction collision and abort the 2nd tx"""
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
path = '/adapters/2'
self.make_change(tx1, path, 'config.log_level', 0)
# make another tx before tx1 is committed
tx2 = proxy.open_transaction()
print tx2._txid
self.make_change(tx2, path, 'config.log_level', 4)
# commit first
print tx1._txid
tx1.commit()
# committing 2nd one should fail
self.assertRaises(MergeConflictException, tx2.commit)
self.check_no_tx_branches()
def test_nonconfliciting_changes(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
self.make_change(tx1, '/adapters/1', 'config.log_level', 1)
self.make_change(tx2, '/adapters/2', 'config.log_level', 2)
tx1.commit()
tx2.commit()
self.assertEqual(self.log_levels(), {
'0': 3, '1': 1, '2': 2, '3': 3, '4': 3
})
def test_additive_changes(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx1.add('/adapters', Adapter(id='new'))
tx1.add('/adapters', Adapter(id='new2'))
self.assertEqual(len(proxy.get('/adapters')), 5)
self.assertEqual(len(self.node.get('/adapters')), 5)
self.assertEqual(len(tx1.get('/adapters')), 7)
tx1.commit()
self.assertEqual(len(proxy.get('/adapters')), 7)
self.assertEqual(len(self.node.get('/adapters')), 7)
self.assertEqual(self.log_levels().keys(),
['0', '1', '2', '3', '4', 'new', 'new2'])
def test_remove_changes(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx1.remove('/adapters/2')
tx1.remove('/adapters/4')
self.assertEqual(len(proxy.get('/adapters')), 5)
self.assertEqual(len(self.node.get('/adapters')), 5)
self.assertEqual(len(tx1.get('/adapters')), 3)
tx1.commit()
self.assertEqual(len(proxy.get('/adapters')), 3)
self.assertEqual(len(self.node.get('/adapters')), 3)
self.assertEqual(self.log_levels().keys(), ['0', '1', '3'])
def test_mixed_add_remove_update_changes(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
self.make_change(tx1, '/adapters/2', 'config.log_level', 2)
tx1.remove('/adapters/0')
tx1.add('/adapters', Adapter(id='new'))
tx1.remove('/adapters/4')
tx1.add('/adapters', Adapter(id='new2'))
tx1.add('/adapters', Adapter(id='new3'))
self.assertEqual(len(proxy.get('/adapters')), 5)
self.assertEqual(len(self.node.get('/adapters')), 5)
self.assertEqual(len(tx1.get('/adapters')), 6)
tx1.commit()
self.assertEqual(len(proxy.get('/adapters')), 6)
self.assertEqual(len(self.node.get('/adapters')), 6)
self.assertEqual(self.log_levels(), {
'1': 3, '2': 2, '3': 3, 'new': 0, 'new2': 0, 'new3': 0
})
def test_compatible_updates(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
tx4 = proxy.open_transaction()
tx5 = proxy.open_transaction()
tx1.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
self.make_change(tx2, '/adapters/1', 'version', '42')
self.make_change(tx3, '/adapters/2', 'config.log_level', 2)
self.make_change(tx4, '/adapters/1', 'version', '42')
self.make_change(tx5, '/adapters/1', 'version', '422')
tx1.commit()
tx2.commit()
tx3.commit()
tx4.commit()
self.assertRaises(MergeConflictException, tx5.commit)
# verify outcome
self.assertEqual(self.node.get('/health').state, 1)
self.assertEqual(self.node.get('/', deep=1).adapters[1].version, '42')
self.assertEqual(self.log_levels(), {
'0': 3, '1': 3, '2': 2, '3': 3, '4': 3
})
def test_conflciting_updates(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
tx1.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
self.make_change(tx2, '/adapters/1', 'version', '42')
self.make_change(tx3, '/adapters/1', 'config.log_level', 2)
tx1.commit()
tx2.commit()
self.assertRaises(MergeConflictException, tx3.commit)
# verify outcome
self.assertEqual(self.node.get('/health').state, 1)
self.assertEqual(self.node.get('/', deep=1).adapters[1].version, '42')
self.assertEqual(self.log_levels(), {
'0': 3, '1': 3, '2': 3, '3': 3, '4': 3
})
def test_compatible_adds(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
tx1.add('/adapters', Adapter(id='new1'))
tx2.add('/adapters', Adapter(id='new2'))
tx3.add('/adapters', Adapter(id='new3'))
tx1.commit()
tx2.commit()
tx3.commit()
self.assertEqual(self.log_levels().keys(), [
'0', '1', '2', '3', '4', 'new1', 'new2', 'new3'
])
def test_colliding_adds(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
tx4 = proxy.open_transaction()
tx1.add('/adapters', Adapter(id='new1'))
tx2.add('/adapters', Adapter(id='new2'))
tx3.add('/adapters', Adapter(id='new1', version='foobar'))
tx4.add('/adapters', Adapter(id='new1'))
tx1.commit()
tx2.commit()
self.assertRaises(MergeConflictException, tx3.commit)
tx4.commit() # is fine since it added the same data
self.assertEqual(self.log_levels().keys(), [
'0', '1', '2', '3', '4', 'new1', 'new2'
])
def test_compatible_removes(self):
# removes are always compatible with each other
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
tx1.remove('/adapters/0')
tx2.remove('/adapters/3')
tx3.remove('/adapters/0')
tx1.commit()
tx2.commit()
tx3.commit()
self.assertEqual(self.log_levels().keys(), ['1', '2', '4'])
def test_update_remove_conflict(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
self.make_change(tx1, '/adapters/0', 'version', '42')
tx1.remove('/adapters/1')
self.make_change(tx2, '/adapters/1', 'version', '13')
tx3.remove('/adapters/0')
tx1.commit()
self.assertRaises(MergeConflictException, tx2.commit)
self.assertRaises(MergeConflictException, tx3.commit)
self.assertEqual(self.log_levels().keys(), ['0', '2', '3', '4'])
def test_compatible_update_remove_mix(self):
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
self.make_change(tx1, '/adapters/0', 'version', '42')
tx1.remove('/adapters/1')
self.make_change(tx2, '/adapters/2', 'version', '13')
tx3.remove('/adapters/3')
tx1.commit()
tx2.commit()
tx3.commit()
self.assertEqual(self.log_levels().keys(), ['0', '2', '4'])
def test_update_add_mix(self):
# at same nodes updates are always compatible with adds
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
self.make_change(tx1, '/adapters/0', 'config.log_level', 4)
self.make_change(tx1, '/adapters/2', 'config.log_level', 4)
tx2.add('/adapters', Adapter(id='new1'))
tx3.add('/adapters', Adapter(id='new2'))
tx1.commit()
tx2.commit()
tx3.commit()
self.assertEqual(self.log_levels().keys(), [
'0', '1', '2', '3', '4', 'new1', 'new2'
])
def test_remove_add_mix(self):
# at same node, adds are always compatible with removes
proxy = self.node.get_proxy('/')
tx1 = proxy.open_transaction()
tx2 = proxy.open_transaction()
tx3 = proxy.open_transaction()
tx1.remove('/adapters/0')
tx2.add('/adapters', Adapter(id='new1'))
tx3.add('/adapters', Adapter(id='new2'))
tx1.remove('/adapters/4')
tx1.commit()
tx2.commit()
tx3.commit()
self.assertEqual(self.log_levels().keys(), [
'1', '2', '3', 'new1', 'new2'
])
def make_complex_changes(self):
# Plan:
# Have two root proxies and two proxies on specific adapters
# Make several transactions, including conflicting ones
# Check as much as possible in terms of expected operations
proxy1 = self.node.get_proxy('/')
proxy2 = self.node.get_proxy('/')
proxy3 = self.node.get_proxy('/adapters/0')
proxy4 = self.node.get_proxy('/adapters/1')
tx1 = proxy1.open_transaction()
tx2 = proxy1.open_transaction()
tx3 = proxy2.open_transaction()
tx4 = proxy3.open_transaction()
tx5 = proxy4.open_transaction()
# Make multiple changes via tx1
self.make_change(tx1, '/adapters/0', 'config.log_level', 1)
tx1.add('/adapters', Adapter(id='new1'))
tx1.remove('/adapters/2')
# Make a non-conflicting change from tx2
self.make_change(tx2, '/adapters/3', 'config.log_level', 0)
# Make some conflicting changes via tx3 now
self.make_change(tx3, '/adapters/1', 'config.log_level', 1)
# Make some changes via leaf proxies
my_adapter = tx4.get('/')
my_adapter.version = 'zulu'
my_adapter.config.log_level = 0
tx4.update('/', my_adapter)
# Make some changes via leaf proxies
my_adapter = tx5.get('/')
my_adapter.version = 'brand new'
my_adapter.config.log_level = 4
tx5.update('/', my_adapter)
# Make some more changes on tx2
tx2.add('/adapters', Adapter(id='new2'))
# Conflicts:
# - tx4 conflicts with tx0
# - tx5 conflicts with tx3
return tx1, tx2, tx3, tx4, tx5
def test_complex_changes_seq1(self):
tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
tx1.commit()
tx2.commit()
tx3.commit()
self.assertRaises(MergeConflictException, tx4.commit)
self.assertRaises(MergeConflictException, tx5.commit)
self.assertEqual(self.log_levels(), {
'0': 1, '1': 1, '3': 0, '4': 3, 'new1': 0, 'new2': 0
})
def test_complex_changes_seq2(self):
tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
tx5.commit()
tx4.commit()
self.assertRaises(MergeConflictException, tx3.commit)
tx2.commit()
self.assertRaises(MergeConflictException, tx1.commit)
self.assertEqual(self.log_levels(), {
'0': 0, '1': 4, '2': 3, '3': 0, '4': 3, 'new2': 0
})
def test_complex_changes_seq3(self):
tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
tx4.commit()
tx3.commit()
tx2.commit()
self.assertRaises(MergeConflictException, tx1.commit)
self.assertRaises(MergeConflictException, tx5.commit)
self.assertEqual(self.log_levels(), {
'0': 0, '1': 1, '2': 3, '3': 0, '4': 3, 'new2': 0
})
def test_canceling_adds(self):
proxy = self.node.get_proxy('/')
tx = proxy.open_transaction()
tx.add('/adapters', Adapter(id='new'))
tx.add('/adapters', Adapter(id='new2'))
tx.cancel()
self.assertEqual(self.log_levels().keys(), ['0', '1', '2', '3', '4'])
def test_nested_adds(self):
self.node.add('/logical_devices', LogicalDevice(id='0'))
self.node.add('/logical_devices', LogicalDevice(id='1'))
proxy0 = self.node.get_proxy('/logical_devices/0')
proxy1 = self.node.get_proxy('/logical_devices/1')
tx0 = proxy0.open_transaction()
tx1 = proxy1.open_transaction()
tx0.add('/ports', LogicalPort(
id='0', ofp_port=ofp_port(port_no=0, name='/0')))
tx0.add('/ports', LogicalPort(
id='1', ofp_port=ofp_port(port_no=1, name='/1')))
tx1.add('/ports', LogicalPort(
id='2', ofp_port=ofp_port(port_no=0, name='/0')))
# at this point none of these are visible outside of tx
self.assertEqual(len(proxy0.get('/', deep=1).ports), 0)
self.assertEqual(len(proxy1.get('/', deep=1).ports), 0)
tx0.commit()
self.assertEqual(len(proxy0.get('/', deep=1).ports), 2)
self.assertEqual(len(proxy1.get('/', deep=1).ports), 0)
tx1.commit()
self.assertEqual(len(proxy0.get('/', deep=1).ports), 2)
self.assertEqual(len(proxy1.get('/', deep=1).ports), 1)
def test_nested_removes(self):
self.node.add('/logical_devices', LogicalDevice(id='0'))
proxy0 = self.node.get_proxy('/logical_devices/0')
# add some ports to a device
tx0 = proxy0.open_transaction()
for i in xrange(10):
tx0.add('/ports', LogicalPort(
id=str(i), ofp_port=ofp_port(port_no=i, name='/{}'.format(i))))
# self.assertRaises(ValueError, tx0.add, '/ports', LogicalPort(id='1'))
tx0.commit()
# now to the removal
tx0 = proxy0.open_transaction()
tx0.remove('/ports/0')
tx0.remove('/ports/5')
tx1 = proxy0.open_transaction()
tx1.remove('/ports/2')
tx1.remove('/ports/7')
tx0.commit()
tx1.commit()
port_ids = [
p.ofp_port.port_no for p
in self.node.get(deep=1).logical_devices[0].ports
]
self.assertEqual(port_ids, [1, 3, 4, 6, 8, 9])
# TODO need more tests to hammer out potential issues with transactions \
# on nested nodes
def test_transactions_defer_post_op_callbacks(self):
proxy = self.node.get_proxy('/')
pre_update = Mock()
post_update = Mock()
pre_add = Mock()
post_add = Mock()
pre_remove = Mock()
post_remove = Mock()
proxy.register_callback(CallbackType.PRE_UPDATE, pre_update)
proxy.register_callback(CallbackType.POST_UPDATE, post_update)
proxy.register_callback(CallbackType.PRE_ADD, pre_add)
proxy.register_callback(CallbackType.POST_ADD, post_add)
proxy.register_callback(CallbackType.PRE_REMOVE, pre_remove)
proxy.register_callback(CallbackType.POST_REMOVE, post_remove)
tx = proxy.open_transaction()
# make some changes of each type
v = tx.get('/')
v.version = '42'
tx.update('/', v)
ad = tx.get('/adapters/1')
tx.remove('/adapters/1')
ld = LogicalDevice(id='1')
tx.add('/logical_devices', ld)
# each pre_* should have been called exactly once, but none of the
# post_* callbacks have been called yet
pre_update.assert_called_once_with(v)
pre_add.assert_called_once_with(ld)
pre_remove.assert_called_once_with(ad)
post_update.assert_not_called()
post_add.assert_not_called()
post_remove.assert_not_called()
# once we commit, we shall get the other callbacks
tx.commit()
post_update.assert_called_once_with(v)
post_add.assert_called_once_with(ld)
# OperationContext(
# data=ld,
# field_name='logical_devices',
# child_key='1'
# ))
post_remove.assert_called_once_with(ad)
if __name__ == '__main__':
main()