CORD-1570: Several bug fixes, expanded unit tests for security refactoring
Change-Id: Ied8dca916d3c22a252f6de38a65ef1b20c9d639d
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
index c1e6820..15cdb56 100644
--- a/lib/xos-genx/tests/general_validation_test.py
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -108,7 +108,7 @@
def test_bin(self):
xproto = \
"""
- policy output < (ctx.is_admin = True | obj.empty = True) & False>
+ policy output < (ctx.is_admin = True | obj.empty = True) | False>
"""
args = FakeArgs()
@@ -128,10 +128,10 @@
"""
obj = FakeArgs()
- obj.empty = True
+ obj.empty = False
ctx = FakeArgs()
- ctx.is_admin = True
+ ctx.is_admin = False
with self.assertRaises(Exception):
verdict = policy_output_validator(obj, ctx)
diff --git a/lib/xos-genx/tests/optimize_test.py b/lib/xos-genx/tests/optimize_test.py
index ce15bf4..af40575 100644
--- a/lib/xos-genx/tests/optimize_test.py
+++ b/lib/xos-genx/tests/optimize_test.py
@@ -1,70 +1,78 @@
import unittest
from xosgenx.jinja2_extensions.fol2 import FOL2Python
-import pdb
+import vimpdb
class XProtoOptimizeTest(unittest.TestCase):
def setUp(self):
self.f2p = FOL2Python()
+ self.maxDiff=None
def test_constant(self):
input = 'True'
- output = self.f2p.hoist_constants(input)
+ output = self.f2p.hoist_outer(input)
self.assertEqual(output, input)
def test_exists(self):
- input = {'exists': ['x',{'|':['x','y']}]}
+ input = {'exists': ['X',{'|':['X.foo','y']}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['y', {'exists': ['x', 'x']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
- def test_forall(self):
- input = {'forall': ['x',{'|':['x','y']}]}
+ def test_exists_implies(self):
+ input = {'exists': ['Foo', {'&': [{'=': ('Foo.a', '1')}, {'->': ['write_access', {'=': ('Foo.b', '1')}]}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['y', {'forall': ['x', 'x']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': [{'&': ['write_access', {'exists': ['Foo', {'&': [{'=': ['Foo.a', '1']}, {'=': ['Foo.b', '1']}]}]}]}, {'&': [{'not': 'write_access'}, {'exists': ['Foo', {'=': ['Foo.a', '1']}]}]}]}
+ self.assertEqual(output, expected)
+
+ def test_forall(self):
+ input = {'forall': ['X',{'|':['X.foo','y']}]}
+
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'forall': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
def test_exists_embedded(self):
- input = {'|':['True',{'exists': ['x',{'|':['x','y']}]}]}
+ input = {'&':['True',{'exists': ['X',{'|':['X.foo','y']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['True', {'|': ['y', {'exists': ['x', 'x']}]}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
def test_exists_equals(self):
- input = {'|':['True',{'exists': ['x',{'|':['x',{'=':['y','z']}]}]}]}
+ input = {'&':['True',{'exists': ['X',{'|':['X.foo',{'=':['y','z']}]}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['True', {'|': [{'=': ['y', 'z']}, {'exists': ['x', 'x']}]}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': [{'=': ['y', 'z']}, {'&': [{'not': {'=': ['y', 'z']}}, {'exists': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
def test_exists_nested_constant(self):
- input = {'|':['True',{'exists': ['x',{'|':['y',{'=':['y','x']}]}]}]}
+ input = {'&':['True',{'exists': ['X',{'|':['y',{'=':['y','X.foo']}]}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['True', {'|': ['y', {'exists': ['x', {'=': ['y', 'x']}]}]}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', {'=': ['False', 'X.foo']}]}]}]}
self.assertEqual(output, expected)
def test_exists_nested(self):
- input = {'exists': ['x',{'exists':['y',{'=':['y','x']}]}]}
+ input = {'exists': ['X',{'exists':['Y',{'=':['Y.foo','X.foo']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'exists': ['x', {'exists': ['y', {'=': ['y', 'x']}]}]}
- self.assertEqual(output, expected)
+ output = self.f2p.hoist_outer(input)
+ expected = input
+ self.assertEqual(input, output)
def test_exists_nested2(self):
- input = {'exists': ['x',{'exists':['y',{'=':['z','y']}]}]}
+ input = {'exists': ['X',{'exists':['Y',{'=':['Z','Y']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'exists': ['y', {'=': ['z', 'y']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'exists': ['Y', {'=': ['Z', 'Y']}]}
self.assertEqual(output, expected)
def test_exists_nested3(self):
- input = {'exists': ['x',{'exists':['y',{'=':['z','x']}]}]}
+ input = {'exists': ['X',{'exists':['Y',{'=':['Z','X']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'exists': ['x', {'=': ['z', 'x']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'exists': ['X', {'=': ['Z', 'X']}]}
self.assertEqual(output, expected)
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
index 8cdd746..e6d2c6c 100644
--- a/lib/xos-genx/tests/xos_security_test.py
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -1,6 +1,8 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+import mock
"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
@@ -12,6 +14,7 @@
"""
The tests below use the Python code target to generate
Python security policies, set up an appropriate environment and execute the Python.
+The security policies here deliberately made complex in order to stress the processor.
"""
class XProtoXOSSecurityTest(unittest.TestCase):
def setUp(self):
@@ -23,7 +26,7 @@
def test_controller_policy(self):
xproto = \
"""
- policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.user_id = ctx.user.id & Privilege.object_type = "Deployment" >
+ policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.accessor_id = ctx.user.id & Privilege.object_type = "Deployment" & Privilege.permission = "role:admin" & Privilege.object_id = obj.id >
"""
args = FakeArgs()
args.inputs = xproto
@@ -36,21 +39,127 @@
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
- i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+ i3 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(object_type='Deployment'), Q(permission='role:admin'), Q(object_id=obj.id))[0]
i1 = (i2 or i3)
- return i1
+ return i1
"""
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
"""
- This is the security policy for controllers
+ This is the security policy for ControllerNetworks
"""
- def _test_controller_network_policy(self):
+ def test_controller_network_policy(self):
xproto = \
"""
- policy test_policy < ctx.user.is_admin | exists Slice: forall ctx.networks: ctx.networks.owner.id = Slice.id >
+ policy test_policy <
+ ctx.user.is_admin
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Slice"
+ & Privilege.object_id = obj.owner.id)
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Site"
+ & Privilege.object_id = obj.owner.site.id
+ & Privilege.permission = "role:admin") >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = ctx.user.is_admin
+ i4 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.owner.id))[0]
+ i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.owner.site.id), Q(permission='role:admin'))[0]
+ i3 = (i4 or i5)
+ i1 = (i2 or i3)
+ return i1
+ """
+
+ # FIXME: Test this policy by executing it
+ self.assertTrue(policy_output_enforcer is not None)
+
+ """
+ This is the security policy for Slices
+ """
+ def test_slice_policy(self):
+ xproto = \
+"""
+ policy site_policy <
+ ctx.user.is_admin
+ | (ctx.write_access -> exists Privilege: Privilege.object_type = "Site" & Privilege.object_id = obj.id & Privilege.accessor_id = ctx.user.id & Privilege.permission_id = "role:admin") >
+
+ policy test_policy <
+ ctx.user.is_admin
+ | (*site_policy(site)
+ & ((exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Slice"
+ & Privilege.object_id = obj.id
+ & (ctx.write_access->Privilege.permission="role:admin"))
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Site"
+ & Privilege.object_id = obj.site.id
+ & Privilege.permission = "role:admin"))
+ )>
+
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = ctx.user.is_admin
+ i4 = policy_site_policy_enforcer(obj.site, ctx)
+ i10 = ctx.write_access
+ i11 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id), Q(permission='role:admin'))))
+ i8 = (i10 and i11)
+ i14 = ctx.write_access
+ i12 = (not i14)
+ i13 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id))))
+ i9 = (i12 and i13)
+ i6 = (i8 or i9)
+ i7 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.site.id), Q(permission='role:admin'))))
+ i5 = (i6 or i7)
+ i3 = (i4 and i5)
+ i1 = (i2 or i3)
+ return i1
+ """
+
+ # FIXME: Test this policy by executing it
+ self.assertTrue(policy_output_enforcer is not None)
+
+ """
+ This is the security policy for Users
+ """
+ def test_user_policy(self):
+ xproto = \
+"""
+ policy test_policy <
+ ctx.user.is_admin
+ | ctx.user.id = obj.id
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.permission = "role:admin"
+ & Privilege.object_type = "Site"
+ & Privilege.object_id = ctx.user.site.id) >
"""
args = FakeArgs()
args.inputs = xproto
@@ -63,7 +172,9 @@
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
- i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+ i4 = (ctx.user.id == obj.id)
+ i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(permission='role:admin'), Q(object_type='Site'), Q(object_id=ctx.user.site.id))[0]
+ i3 = (i4 or i5)
i1 = (i2 or i3)
return i1
"""
@@ -71,6 +182,5 @@
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
-
if __name__ == '__main__':
unittest.main()
diff --git a/lib/xos-genx/tests/xos_validation_test.py b/lib/xos-genx/tests/xos_validation_test.py
index 20ba5da..7e70ede 100644
--- a/lib/xos-genx/tests/xos_validation_test.py
+++ b/lib/xos-genx/tests/xos_validation_test.py
@@ -76,35 +76,6 @@
with self.assertRaises(Exception):
policy_output_validator(obj, {})
- def test_equal(self):
- xproto = \
-"""
- policy test_policy < not (ctx.user = obj.user) >
-"""
-
- args = FakeArgs()
- args.inputs = xproto
- args.target = self.target
-
- output = XOSGenerator.generate(args)
-
- exec(output) # This loads the generated function, which should look like this:
-
- """
- def policy_output_validator(obj, ctx):
- i2 = (ctx.user == obj.user)
- i1 = (not i2)
- if (not i1):
- raise Exception('Necessary Failure')
- """
-
- obj = FakeArgs()
- obj.user = 1
- ctx = FakeArgs()
- ctx.user = 1
-
- with self.assertRaises(Exception):
- policy_output_validator(obj, ctx)
def test_equal(self):
xproto = \
@@ -139,7 +110,7 @@
def test_bin(self):
xproto = \
"""
- policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+ policy test_policy < not (ctx.is_admin = True | obj.empty = True) | False>
"""
args = FakeArgs()