blob: 08ecf877a0352efc9cb66cb983e12546ca530c04 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from copy import copy
import resource
from random import randint, seed
from time import time
from unittest import main, TestCase
import gc
from google.protobuf.json_format import MessageToDict
from mock import Mock
from simplejson import dumps
from common.event_bus import EventBusClient
from voltha.core.config.config_proxy import CallbackType
from voltha.core.config.config_rev import _rev_cache
from voltha.core.config.config_root import ConfigRoot, MergeConflictException
from voltha.core.config.config_txn import ClosedTransactionError
from voltha.protos import third_party
from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
from voltha.protos.openflow_13_pb2 import ofp_port
from voltha.protos.voltha_pb2 import VolthaInstance, Adapter, HealthStatus, \
AdapterConfig, LogicalDevice, LogicalPort
def memusage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
def rev_count():
return len(_rev_cache)
def probe():
return time(), memusage() / 1024. / 1024, rev_count()
def print_metrics():
print '%20f %20f %8d' % probe()
class TestConfigNodeShallow(TestCase):
def setUp(self):
self.empty = VolthaInstance()
self.other = VolthaInstance(instance_id='other')
self.node = ConfigRoot(VolthaInstance())
def test_init(self):
pass
def test_immutability(self):
self.assertEqual(self.node.latest.data, self.empty)
self.empty.instance_id = 'overwritten id'
self.assertEqual(self.node.latest.data, VolthaInstance())
def test_retrieve_latest(self):
self.assertEqual(self.node.latest.data, self.empty)
hash = self.node.latest.hash
self.assertEqual(self.node.revisions, [hash])
def test_update_with_bad_data(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
class DeepTestsBase(TestCase):
"""Shared test class for test using a simple node tree"""
def setUp(self):
gc.collect()
_rev_cache.clear()
self.health = HealthStatus(state=HealthStatus.DYING)
self.base_shallow = VolthaInstance(instance_id='1')
self.base_deep = copy(self.base_shallow)
self.base_deep.health.state = HealthStatus.DYING # = self.health
for i in xrange(5):
self.base_deep.adapters.add().MergeFrom(Adapter(
id=str(i),
config=AdapterConfig(
log_level=3
)
))
self.node = ConfigRoot(self.base_deep)
self.hash_orig = self.node.latest.hash
def tearDown(self):
del self.node
class TestConfigNodeDeep(DeepTestsBase):
def test_init(self):
pass
def test_reject_duplicate_keys(self):
data = VolthaInstance(
instance_id='42', adapters=[Adapter(id='same') for _ in xrange(5)])
self.assertRaises(ValueError, ConfigRoot, data)
def test_shallow_get(self):
self.assertEqual(self.node.latest.data, self.base_shallow)
self.assertEqual(self.node.get(), self.base_shallow)
self.assertEqual(self.node.get(hash=self.hash_orig), self.base_shallow)
def test_deep_get(self):
self.assertEqual(self.node.get(deep=True), self.base_deep)
def test_path_based_get_access(self):
self.assertEqual(self.node.get(path='/'), self.node.get())
self.assertEqual(self.node.get(path='/health'), self.health)
def test_path_list_retrieval(self):
adapters = self.node.get(path='/adapters')
self.assertEqual(len(adapters), 5)
self.assertEqual(adapters[2].id, '2')
def test_indexing_into_containers(self):
adapter = self.node.get(path='/adapters/3')
self.assertEqual(adapter.id, '3')
def test_update_handle_invalid_paths(self):
self.assertRaises(KeyError, self.node.update, 'foo', None)
self.assertRaises(KeyError, self.node.update, '/foo', None)
self.assertRaises(KeyError, self.node.update, '/health/foo', None)
self.assertRaises(ValueError, self.node.update, '/adapters', None)
self.assertRaises(KeyError, self.node.update, '/adapters/foo', None)
self.assertRaises(KeyError, self.node.update, '/adapters/1/foo', None)
def test_update_handle_invalid_type(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
self.assertRaises(ValueError, self.node.update, '/health', Adapter())
self.assertRaises(ValueError, self.node.update, '/adapters/1', VolthaInstance())
def test_update_handle_key_change_attempt(self):
self.assertRaises(
ValueError, self.node.update, '/adapters/1', Adapter(id='changed'))
def test_add_handle_invalid_cases(self):
# invalid paths
self.assertRaises(KeyError, self.node.add, 'foo', None)
self.assertRaises(KeyError, self.node.add, '/foo', None)
self.assertRaises(KeyError, self.node.add, '/adapters/foo', None)
# cannot add to non-container nodes
self.assertRaises(ValueError, self.node.add, '/health', None)
self.assertRaises(ValueError, self.node.add, '/adapters/1', None)
# cannot add to container data with duplicate key
self.assertRaises(
ValueError, self.node.add, '/adapters', Adapter(id='1'))
def test_remove_handle_invalid_cases(self):
# invalid paths
self.assertRaises(KeyError, self.node.remove, 'foo')
self.assertRaises(KeyError, self.node.remove, '/foo')
self.assertRaises(KeyError, self.node.remove, '/adapters/foo')
self.assertRaises(KeyError, self.node.remove, '/adapters/1/id')
# cannot add to non-container nodes
self.assertRaises(ValueError, self.node.remove, '/health')
class TestPruningPerformance(DeepTestsBase):
def test_strict_read_only(self):
# it shall not be possible to change a read-only field
self.assertRaises(ValueError, self.node.update,
'/', VolthaInstance(version='foo'), strict=True)
self.assertRaises(ValueError, self.node.update,
'/adapters/1', Adapter(version='foo'), strict=True)
class TestNodeOwnershipAndHooks(DeepTestsBase):
def test_init(self):
pass
def test_passive_ownership(self):
# grab a proxy for a given node
proxy = self.node.get_proxy('/health')
# able to read the value directly using this proxy
self.assertEqual(proxy.get(), HealthStatus(state=HealthStatus.DYING))
# able to update value directly using this proxy, but the whole tree
# updates
proxy.update('/', HealthStatus(state=HealthStatus.HEALTHY))
self.assertEqual(proxy.get().state, HealthStatus.HEALTHY)
self.assertNotEqual(self.node.latest.hash, self.hash_orig)
# access constraints are still enforced
self.assertRaises(
ValueError, proxy.update,
'/', HealthStatus(state=HealthStatus.OVERLOADED), strict=1)
def test_exclusivity(self):
proxy = self.node.get_proxy('/adapters/1', exclusive=True)
self.assertRaises(ValueError, self.node.get_proxy, '/adapters/1')
def test_get_hook(self):
proxy = self.node.get_proxy('/health')
# getting health without callback just returns what's stored in node
self.assertEqual(proxy.get().state, HealthStatus.DYING)
# register callback
def get_health_callback(msg):
msg.state = HealthStatus.OVERLOADED
return msg
proxy.register_callback(CallbackType.GET, get_health_callback)
# once registered, callback can touch up object
self.assertEqual(proxy.get().state, HealthStatus.OVERLOADED)
def test_pre_update_hook(self):
proxy = self.node.get_proxy('/adapters/1')
# before hook, change is allowed
adapter = proxy.get()
adapter.version = 'foo'
proxy.update('/', adapter)
# and for sanity, check if update made it through
self.assertEqual(self.node.get('/adapters/1').version, 'foo')
# regsiter hook that rejects all changes
def bully(msg):
raise RuntimeError('bully')
proxy.register_callback(CallbackType.PRE_UPDATE, bully)
# test that rejection applies
adapter.version = 'bar'
self.assertRaises(RuntimeError, proxy.update, '/', adapter)
self.assertRaises(RuntimeError, self.node.update, '/adapters/1', adapter)
def test_post_update_hook(self):
proxy = self.node.get_proxy('/adapters/1')
callback = Mock()
proxy.register_callback(CallbackType.POST_UPDATE, callback,
'zizi', 42, x=1, y='baz')
data = Adapter(id='1', version='zoo')
proxy.update('/', data)
callback.assert_called_once_with(data, 'zizi', 42, x=1, y='baz')
def test_pre_and_post_add_hooks(self):
proxy = self.node.get_proxy('/')
pre_callback = Mock()
post_callback = Mock()
proxy.register_callback(CallbackType.PRE_ADD, pre_callback)
proxy.register_callback(CallbackType.POST_ADD, post_callback)
new_adapter = Adapter(id='99', version='12.2', vendor='ace')
proxy.add('/adapters', new_adapter)
pre_callback.assert_called_with(new_adapter)
post_callback.assert_called_with(new_adapter)
def test_pre_and_post_remove_hooks(self):
proxy = self.node.get_proxy('/')
pre_callback = Mock()
post_callback = Mock()
proxy.register_callback(CallbackType.PRE_REMOVE, pre_callback)
proxy.register_callback(CallbackType.POST_REMOVE, post_callback)
adapter = proxy.get('/adapters/1') # so that we can verify callback
proxy.remove('/adapters/1')
pre_callback.assert_called_with(adapter)
post_callback.assert_called_with(adapter)
class TestEventLogic(DeepTestsBase):
def setUp(self):
super(TestEventLogic, self).setUp()
self.ebc = EventBusClient()
self.event_mock = Mock()
self.ebc.subscribe('model-change-events', self.event_mock)
def test_add_event(self):
data = Adapter(id='10', version='zoo')
self.node.add('/adapters', data)
event = ConfigEvent(
type=ConfigEventType.add,
hash=self.node.latest.hash,
data=dumps(MessageToDict(data, True, True))
)
self.event_mock.assert_called_once_with('model-change-events', event)
def test_remove_event(self):
data = Adapter(
id='1',
config=AdapterConfig(
log_level=3
)
)
self.node.remove('/adapters/1')
event = ConfigEvent(
type=ConfigEventType.remove,
hash=self.node.latest.hash,
data=dumps(MessageToDict(data, True, True))
)
self.event_mock.assert_called_once_with('model-change-events', event)
class TestTransactionalLogic(DeepTestsBase):
def make_change(self, tx, path, attr_name, new_value):
data = o = tx.get(path)
rest = attr_name
while 1:
subfield, _, rest = rest.partition('.')
if rest:
o = getattr(o, subfield)
attr_name = rest
else:
setattr(o, attr_name, new_value)
break
tx.update(path, data)
def check_no_tx_branches(self):
visited = set()
def check_node(n):
if n not in visited:
self.assertEqual(n._branches.keys(), [None])
for rev in n._branches[None]._revs.itervalues():
for children in rev._children.itervalues():
if isinstance(children, OrderedDict):
children = children.itervalues()
for child_rev in children:
child_node = child_rev.node
check_node(child_node)
visited.add(n)
check_node(self.node)
def log_levels(self):
return OrderedDict(
(a.id, a.config.log_level)
for a in self.node.get('/adapters', deep=1))
def tearDown(self):
self.check_no_tx_branches()
super(TestTransactionalLogic, self).tearDown()
def test_transaction_isolation(self):
"""
Test that changes made in a transaction mode are not visible to others
"""
proxy = self.node.get_proxy('/')
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
tx = proxy.open_transaction()
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 1)
path = '/adapters/2'
self.make_change(tx, path, 'config.log_level', 0)
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 2)
# verify that reading from the transaction exposes the change
self.assertEqual(tx.get(path).config.log_level, 0)
# but that reading from the proxy or directly from tree does not
self.assertEqual(self.node.latest.hash, self.hash_orig)
self.assertEqual(proxy.get(path).config.log_level, 3)
self.assertEqual(self.node.get(path).config.log_level, 3)
tx.cancel()
def test_cannot_reuse_tx(self):
proxy = self.node.get_proxy('/')
tx = proxy.open_transaction()
tx.cancel()
self.assertRaises(ClosedTransactionError, tx.get, '/')
self.assertRaises(ClosedTransactionError, tx.add, '/', None)
self.assertRaises(ClosedTransactionError, tx.remove, '/')
def test_multiple_concurrent_transactions(self):
"""
Test that two transactions can make independent changes, without
affecting each other, until committed.
"""
proxy1 = self.node.get_proxy('/')
proxy2 = self.node.get_proxy('/')
tx1 = proxy1.open_transaction()
tx2 = proxy1.open_transaction()
tx3 = proxy2.open_transaction()
path = '/adapters/2'
self.make_change(tx1, path, 'config.log_level', 0)
# the other transaction does not see the change
self.assertEqual(tx2.get(path).config.log_level, 3)
self.assertEqual(tx3.get(path).config.log_level, 3)
self.assertEqual(proxy1.get(path).config.log_level, 3)
self.assertEqual(proxy2.get(path).config.log_level, 3)
# we can attempt to make change in other txs
self.make_change(tx2, path, 'config.log_level', 1)
self.make_change(tx3, path, 'config.log_level', 2)
# each can see its own tree, but no one else can see theirs
self.assertEqual(tx1.get(path).config.log_level, 0)
self.assertEqual(tx2.get(path).config.log_level, 1)
self.assertEqual(tx3.get(path).config.log_level, 2)
self.assertEqual(proxy1.get(path).config.log_level, 3)
self.assertEqual(proxy2.get(path).config.log_level, 3)
self.assertEqual(self.node.latest.hash, self.hash_orig)
tx1.cancel()
tx2.cancel()
tx3.cancel()
def test_transaction_canceling(self):
"""After abort, transaction is no longer stored"""
proxy = self.node.get_proxy('/')
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
tx = proxy.open_transaction()
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 1)
self.make_change(tx, '/adapters/2', 'config.log_level', 4)
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 2)
del tx
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
def test_transaction_explitic_canceling(self):
"""After abort, transaction is no longer stored"""
proxy = self.node.get_proxy('/')
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
tx = proxy.open_transaction()
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 1)
self.make_change(tx, '/adapters/2', 'config.log_level', 4)
self.assertEqual(len(self.node._branches.keys()), 2)
self.assertEqual(len(adapter_node._branches.keys()), 2)
tx.cancel()
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
def make_complex_changes(self):
# Plan:
# Have two root proxies and two proxies on specific adapters
# Make several transactions, including conflicting ones
# Check as much as possible in terms of expected operations
proxy1 = self.node.get_proxy('/')
proxy2 = self.node.get_proxy('/')
proxy3 = self.node.get_proxy('/adapters/0')
proxy4 = self.node.get_proxy('/adapters/1')
tx1 = proxy1.open_transaction()
tx2 = proxy1.open_transaction()
tx3 = proxy2.open_transaction()
tx4 = proxy3.open_transaction()
tx5 = proxy4.open_transaction()
# Make multiple changes via tx1
self.make_change(tx1, '/adapters/0', 'config.log_level', 1)
tx1.add('/adapters', Adapter(id='new1'))
tx1.remove('/adapters/2')
# Make a non-conflicting change from tx2
self.make_change(tx2, '/adapters/3', 'config.log_level', 0)
# Make some conflicting changes via tx3 now
self.make_change(tx3, '/adapters/1', 'config.log_level', 1)
# Make some changes via leaf proxies
my_adapter = tx4.get('/')
my_adapter.version = 'zulu'
my_adapter.config.log_level = 0
tx4.update('/', my_adapter)
# Make some changes via leaf proxies
my_adapter = tx5.get('/')
my_adapter.version = 'brand new'
my_adapter.config.log_level = 4
tx5.update('/', my_adapter)
# Make some more changes on tx2
tx2.add('/adapters', Adapter(id='new2'))
# Conflicts:
# - tx4 conflicts with tx0
# - tx5 conflicts with tx3
return tx1, tx2, tx3, tx4, tx5
def test_canceling_adds(self):
proxy = self.node.get_proxy('/')
tx = proxy.open_transaction()
tx.add('/adapters', Adapter(id='new'))
tx.add('/adapters', Adapter(id='new2'))
tx.cancel()
self.assertEqual(self.log_levels().keys(), ['0', '1', '2', '3', '4'])
# TODO need more tests to hammer out potential issues with transactions \
# on nested nodes
if __name__ == '__main__':
main()