blob: f647bc9dd19116d760f23c7494287c0de5402fe5 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from mock import patch
import mock
import os, sys
sys.path.append("../../..")
config = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml")
from xosconfig import Config
Config.init(config, 'synchronizer-config-schema.yaml')
import synchronizers.new_base.modelaccessor
import synchronizers.new_base.model_policies.model_policy_tenantwithcontainer
import model_policy_vsgtenant
from model_policy_vsgtenant import VSGTenantPolicy
from synchronizers.new_base.model_policies.model_policy_tenantwithcontainer import LeastLoadedNodeScheduler
MockObjectStores = {}
class MockObjectList:
item_list = None
def __init__(self, initial=None):
self.id_counter = 0
if initial:
self.item_list=initial
elif self.item_list is None:
self.item_list=[]
def get_items(self):
return self.item_list
def count(self):
return len(self.get_items())
def first(self):
return self.get_items()[0]
def all(self):
return self.get_items()
def filter(self, **kwargs):
items = self.get_items()
for (k,v) in kwargs.items():
items = [x for x in items if getattr(x,k) == v]
return items
def get(self, **kwargs):
objs = self.filter(**kwargs)
if not objs:
raise Exception("No objects matching %s" % str(kwargs))
return objs[0]
class MockObjectStore(MockObjectList):
def save(self, o):
if (not hasattr(o,"id")) or (not o.id):
for item in self.get_items():
if item.id >= self.id_counter:
self.id_counter = item.id + 1
o.id = self.id_counter
self.id_counter = self.id_counter + 1
for item in self.get_items():
if item.id == o.id:
item = o
break
else:
self.get_items().append(o)
class MockObject(object):
objects = None
id = None
def __init__(self, **kwargs):
for (k,v) in kwargs.items():
setattr(self,k,v)
@property
def self_content_type_id(self):
return self.__class__.__name__
def save(self):
if self.objects:
self.objects.save(self)
def delete(self):
pass
def get_MockObjectStore(x):
return globals()["Mock%sObjects" % x]()
class MockFlavorObjects(MockObjectStore): pass
class MockFlavor(MockObject):
objects = get_MockObjectStore("Flavor")
name = None
class MockInstanceObjects(MockObjectStore): pass
class MockInstance(MockObject):
objects = get_MockObjectStore("Instance")
name = None
class MockDeploymentObjects(MockObjectStore): pass
class MockDeployment(MockObject):
objects = get_MockObjectStore("Deployment")
name = None
class MockUserObjects(MockObjectStore): pass
class MockUser(MockObject):
objects = get_MockObjectStore("User")
email = None
class MockSliceObjects(MockObjectStore): pass
class MockSlice(MockObject):
objects = get_MockObjectStore("Slice")
name = None
default_node = None
networks = None
class MockNodeObjects(MockObjectStore): pass
class MockNode(MockObject):
objects = get_MockObjectStore("Node")
hostname = None
site_deployment = None
class MockImageObjects(MockObjectStore): pass
class MockImage(MockObject):
objects = get_MockObjectStore("Image")
name = None
class MockTagObjects(MockObjectStore): pass
class MockTag(MockObject):
objects = get_MockObjectStore("Tag")
name = None
value = None
class MockNetworkTemplateObjects(MockObjectStore): pass
class MockNetworkTemplate(MockObject):
objects = get_MockObjectStore("NetworkTemplate")
name = None
visibility = None
class MockNetworkParameterTypeObjects(MockObjectStore): pass
class MockNetworkParameterType(MockObject):
objects = get_MockObjectStore("NetworkParameterType")
name = None
class MockNetworkParameterObjects(MockObjectStore): pass
class MockNetworkParameter(MockObject):
objects = get_MockObjectStore("NetworkParameter")
value = None
parameter_id = None
class MockNetworkObjects(MockObjectStore): pass
class MockNetwork(MockObject):
objects = get_MockObjectStore("Network")
name = None
template = None
class MockPortObjects(MockObjectStore): pass
class MockPort(MockObject):
objects = get_MockObjectStore("Port")
name = None
def set_parameter(self, name, value):
pass
class MockVRouterTenantObjects(MockObjectStore): pass
class MockVRouterTenant(MockObject):
objects = get_MockObjectStore("VRouterTenant")
public_ip = None
public_mac = None
address_pool_id = None
def set_attribute(self, name, value):
pass
class MockVoltTenantObjects(MockObjectStore): pass
class MockVoltTenant(MockObject):
objects = get_MockObjectStore("VoltTenant")
c_tag = None
s_tag = None
class MockVSGServiceObjects(MockObjectStore): pass
class MockVSGService(MockObject):
objects = get_MockObjectStore("VSGService")
name = None
node_label = None
slices = None
def __init__(self, **kwargs):
super(MockVSGService, self).__init__(**kwargs)
class MockVSGTenantObjects(MockObjectStore): pass
class MockVSGTenant(MockObject):
objects = get_MockObjectStore("VSGTenant")
owner = None
deleted = False
instance = None
creator = None
volt = None
service_specific_attribute = {}
def get_image(self):
return None
class TestModelPolicyVsgTenant(unittest.TestCase):
def setUp(self):
self.policy = VSGTenantPolicy()
self.tenant = MockVSGTenant()
self.user = MockUser(email="testadmin@test.org")
self.tenant = MockVSGTenant(creator=self.user, id=1)
self.flavor = MockFlavor(name="m1.small")
self.npt_ctag = MockNetworkParameterType(name="c_tag", id=1)
self.npt_stag = MockNetworkParameterType(name="s_tag", id=2)
self.npt_neutron_port_name = MockNetworkParameterType(name="neutron_port_name", id=3)
self.node = MockNode(hostname="my.node.com")
self.slice = MockSlice(name="mysite_test1", default_flavor=self.flavor, default_isolation="vm")
self.priv_template = MockNetworkTemplate(name="access_network", visibility="private")
self.priv_network = MockNetwork(name="mysite_test1_private", template=self.priv_template)
self.image = MockImage(name="trusty-server-multi-nic")
self.deployment = MockDeployment(name="testdeployment")
synchronizers.new_base.model_policies.model_policy_tenantwithcontainer.Instance = MockInstance
synchronizers.new_base.model_policies.model_policy_tenantwithcontainer.Flavor = MockFlavor
synchronizers.new_base.model_policies.model_policy_tenantwithcontainer.Tag = MockTag
synchronizers.new_base.model_policies.model_policy_tenantwithcontainer.Node = MockNode
model_policy_vsgtenant.Instance = MockInstance
model_policy_vsgtenant.Flavor = MockFlavor
model_policy_vsgtenant.Tag = MockTag
model_policy_vsgtenant.VSGService = MockVSGService
model_policy_vsgtenant.Node = MockNode
model_policy_vsgtenant.Port = MockPort
model_policy_vsgtenant.NetworkParameterType = MockNetworkParameterType
model_policy_vsgtenant.NetworkParameter = MockNetworkParameter
@patch.object(VSGTenantPolicy, "manage_container")
@patch.object(VSGTenantPolicy, "manage_vrouter")
@patch.object(VSGTenantPolicy, "cleanup_orphans")
def test_handle_create(self, cleanup_orphans, manage_vrouter, manage_container):
self.policy.handle_create(self.tenant)
manage_container.assert_called_with(self.tenant)
manage_vrouter.assert_called_with(self.tenant)
cleanup_orphans.assert_called_with(self.tenant)
@patch.object(VSGTenantPolicy, "manage_container")
@patch.object(VSGTenantPolicy, "manage_vrouter")
@patch.object(VSGTenantPolicy, "cleanup_orphans")
def test_handle_update(self, cleanup_orphans, manage_vrouter, manage_container):
self.policy.handle_create(self.tenant)
manage_container.assert_called_with(self.tenant)
manage_vrouter.assert_called_with(self.tenant)
cleanup_orphans.assert_called_with(self.tenant)
@patch.object(MockVRouterTenant, "delete")
def test_handle_delete_vrouter_exist(self, vroutertenant_delete):
vrtenant = MockVRouterTenant()
self.tenant.vrouter = vrtenant
self.policy.handle_delete(self.tenant)
vroutertenant_delete.assert_called()
@patch.object(MockVRouterTenant, "delete")
def test_handle_delete_vrouter_noexist(self, vroutertenant_delete):
self.tenant.vrouter = None
self.policy.handle_delete(self.tenant)
vroutertenant_delete.assert_not_called()
@patch.object(MockVRouterTenantObjects, "get_items")
@patch.object(MockVRouterTenant, "delete")
def test_cleanup_orphans(self, vroutertenant_delete, vroutertenant_objects):
vrtenant = MockVRouterTenant(id=1)
self.tenant.vrouter = vrtenant
some_other_vrtenant = MockVRouterTenant(id=2, subscriber_tenant_id = self.tenant.id)
vroutertenant_objects.get_items = [some_other_vrtenant]
self.policy.handle_delete(self.tenant)
vroutertenant_delete.assert_called()
@patch.object(MockTag, "objects")
def test_find_instance_for_s_tag_noexist(self, tag_objects):
tag_objects.filter.return_value = []
instance = self.policy.find_instance_for_s_tag(3)
self.assertEqual(instance, None)
@patch.object(MockTag, "objects")
def test_find_instance_for_s_tag(self, tag_objects):
tagged_instance = MockInstance()
tag = MockTag(content_object = tagged_instance)
tag_objects.filter.return_value = [tag]
instance = self.policy.find_instance_for_s_tag(3)
self.assertEqual(instance, tagged_instance)
def test_manage_container_no_volt(self):
with self.assertRaises(Exception) as e:
self.policy.manage_container(self.tenant)
self.assertEqual(e.exception.message, "This VSG container has no volt")
@patch.object(VSGTenantPolicy, "find_or_make_instance_for_s_tag")
@patch.object(MockVSGTenant, "save")
@patch.object(MockVSGTenant, "volt")
def test_manage_container_noinstance(self, volt, tenant_save, find_or_make_instance_for_s_tag):
instance = MockInstance()
volt.s_tag=222
volt.c_tag=111
find_or_make_instance_for_s_tag.return_value = instance
self.policy.manage_container(self.tenant)
self.assertEqual(self.tenant.instance, instance)
tenant_save.assert_called()
@patch.object(VSGTenantPolicy, "find_or_make_instance_for_s_tag")
@patch.object(MockVSGTenant, "save")
@patch.object(MockVSGTenant, "volt")
def test_manage_container_hasinstance(self, volt, tenant_save, find_or_make_instance_for_s_tag):
instance = MockInstance()
volt.s_tag=222
volt.c_tag=111
self.tenant.instance = instance
self.policy.manage_container(self.tenant)
find_or_make_instance_for_s_tag.assert_not_called()
self.assertEqual(self.tenant.instance, instance)
tenant_save.assert_not_called()
@patch.object(VSGTenantPolicy, "find_or_make_instance_for_s_tag")
@patch.object(MockVSGTenant, "save")
@patch.object(MockVSGTenant, "volt")
def test_manage_container_deleted(self, volt, tenant_save, find_or_make_instance_for_s_tag):
self.tenant.deleted = True
self.policy.manage_container(self.tenant)
find_or_make_instance_for_s_tag.assert_not_called()
tenant_save.assert_not_called()
@patch.object(MockPort, "save")
@patch.object(MockPort, "objects")
def test_find_or_make_port_noexist(self, port_objects, port_save):
instance = MockInstance(id=123)
network = MockInstance(id=456)
port_objects.filter.return_value = []
port=self.policy.find_or_make_port(instance, network)
self.assertNotEqual(port, None)
port_save.assert_called()
@patch.object(MockPort, "save")
@patch.object(MockPort, "objects")
def test_find_or_make_port_exists(self, port_objects, port_save):
someport = MockPort()
def mock_port_filter(network_id, instance_id):
if (network_id==456) and (instance_id==123):
return [someport]
return None
instance = MockInstance(id=123)
network = MockInstance(id=456)
port_objects.filter.side_effect = mock_port_filter
port=self.policy.find_or_make_port(instance, network)
self.assertEqual(port, someport)
port_save.assert_not_called()
@patch.object(MockVSGServiceObjects, "get_items")
def test_get_lan_network_noexist(self, vsgservice_objects):
vsgservice=MockVSGService(name="myvsgservice", id=1, slices=MockObjectList(initial=[self.slice]))
vsgservice_objects.return_value = [vsgservice]
self.tenant.owner = vsgservice
self.slice.networks = MockObjectList()
with self.assertRaises(Exception) as e:
self.policy.get_lan_network(self.tenant, None)
self.assertEqual(e.exception.message, "No lan_network")
@patch.object(MockVSGServiceObjects, "get_items")
def test_get_lan_network(self, vsgservice_objects):
vsgservice=MockVSGService(name="myvsgservice", id=1, slices=MockObjectList(initial=[self.slice]))
vsgservice_objects.return_value = [vsgservice]
self.tenant.owner = vsgservice
self.slice.networks = MockObjectList([self.priv_network])
lan_network = self.policy.get_lan_network(self.tenant, None)
self.assertEqual(lan_network, self.priv_network)
@patch.object(MockVSGServiceObjects, "get_items")
def test_get_lan_network_toomany(self, vsgservice_objects):
some_other_network = MockNetwork(name="mysite_test1_private", template=self.priv_template)
vsgservice=MockVSGService(name="myvsgservice", id=1, slices=MockObjectList(initial=[self.slice]))
vsgservice_objects.return_value = [vsgservice]
self.tenant.owner = vsgservice
self.slice.networks = MockObjectList([self.priv_network, some_other_network])
with self.assertRaises(Exception) as e:
lan_network = self.policy.get_lan_network(self.tenant, None)
self.assertEqual(e.exception.message, "The vSG slice should only have one non-management private network")
@patch.object(MockNetworkParameterTypeObjects, "get_items")
def test_port_set_parameter_noparamexist(self, npt_objects):
npt_objects.return_value = [self.npt_stag]
port = MockPort()
self.policy.port_set_parameter(port, "s_tag", "123")
self.assertNotEqual(MockNetworkParameter.objects.all(), [])
param = MockNetworkParameter.objects.first()
self.assertEqual(param.value, "123")
self.assertEqual(param.parameter, self.npt_stag)
@patch.object(MockNetworkParameterTypeObjects, "get_items")
@patch.object(MockNetworkParameterObjects, "get_items")
def test_port_set_parameter_paramexist(self, np_objects, npt_objects):
port = MockPort(id=1)
np_orig = MockNetworkParameter(parameter_id=self.npt_stag.id, value="456", object_id=port.id, content_type=port.self_content_type_id)
np_objects.return_value = [np_orig]
npt_objects.return_value = [self.npt_stag]
self.policy.port_set_parameter(port, "s_tag", "123")
self.assertEqual(MockNetworkParameter.objects.count(), 1)
param = MockNetworkParameter.objects.first()
self.assertEqual(param.value, "123")
@patch.object(MockNetworkParameterTypeObjects, "get_items")
@patch.object(MockNodeObjects, "get_items")
@patch.object(MockFlavorObjects, "get_items")
@patch.object(MockVSGServiceObjects, "get_items")
@patch.object(MockVSGTenant, "volt")
@patch.object(MockVSGTenant, "save")
@patch.object(VSGTenantPolicy, "get_image")
@patch.object(VSGTenantPolicy, "allocate_public_service_instance")
@patch.object(LeastLoadedNodeScheduler, "pick")
@patch.object(MockNode, "site_deployment")
@patch.object(MockInstance, "save")
@patch.object(MockInstance, "delete")
@patch.object(VSGTenantPolicy, "port_set_parameter")
def test_find_or_make_instance_for_s_tag(self, port_set_parameter, instance_delete, instance_save, site_deployment,
pick, get_psi, get_image, tenant_save, volt,
vsgservice_objects, flavor_objects, node_objects, npt_objects):
# setup mocks
vrtenant = MockVRouterTenant(public_ip="1.2.3.4", public_mac="01:02:03:04:05:06")
vsgservice=MockVSGService(name="myvsgservice", id=1, slices=MockObjectList(initial=[self.slice]))
vsgservice_objects.return_value = [vsgservice]
self.tenant.owner = vsgservice
volt.s_tag=222
volt.c_tag=111
get_image.return_value = self.image
get_psi.return_value = vrtenant
pick.return_value = (self.node, None)
site_deployment.deployment = self.deployment
flavor_objects.return_value=[self.flavor]
node_objects.return_value=[self.node]
npt_objects.return_value=[self.npt_stag, self.npt_ctag, self.npt_neutron_port_name]
self.slice.networks = MockObjectList([self.priv_network])
# done setup mocks
# call the function under test
instance = self.policy.find_or_make_instance_for_s_tag(self.tenant, self.tenant.volt.s_tag)
# make sure Instance was created
self.assertNotEqual(instance, None)
self.assertEqual(instance.creator.email, "testadmin@test.org")
self.assertEqual(instance.image.name, "trusty-server-multi-nic")
self.assertEqual(instance.flavor.name, "m1.small")
self.assertEqual(instance.isolation, "vm")
self.assertEqual(instance.node.hostname, "my.node.com")
self.assertEqual(instance.slice.name, "mysite_test1")
self.assertEqual(instance.parent, None)
instance_save.assert_called()
instance_delete.assert_not_called()
# Access Network Port should have tags to c-tag and s-tag
port = MockPort.objects.first()
self.assertEqual(port.instance, instance)
self.assertEqual(port.network, self.priv_network)
port_set_parameter.assert_has_calls([mock.call(port, "c_tag", 111),
mock.call(port, "s_tag", 222),
mock.call(port, "neutron_port_name", "stag-222")])
# The instance should be tagged with the s-tag
tag = MockTag.objects.get(name="s_tag")
self.assertEqual(tag.value, "222")
self.assertEqual(tag.object_id, instance.id)
# The instance should have a tag pointing to its vrouter
tag = MockTag.objects.get(name="vm_vrouter_tenant")
self.assertNotEqual(tag.value, vrtenant.id)
self.assertEqual(tag.object_id, instance.id)
# Allocate_public_service_instance should have been called
get_psi.assert_called()
@patch.object(VSGTenantPolicy, "allocate_public_service_instance")
def test_manage_vrouter(self, get_psi):
vrtenant = MockVRouterTenant(public_ip="1.2.3.4", public_mac="01:02:03:04:05:06")
get_psi.return_value = vrtenant
self.tenant.vrouter = None
self.policy.manage_vrouter(self.tenant)
get_psi.assert_called_with(address_pool_name="addresses_vsg", subscriber_tenant=self.tenant)
if __name__ == '__main__':
unittest.main()