CORD-1385: Autogenerate validation code
Change-Id: I8dda8f78482b382cd5d9c5397070266324d4fab9
diff --git a/lib/xos-genx/tests/general_security_test.py b/lib/xos-genx/tests/general_security_test.py
index f2dd857..d5f3e79 100644
--- a/lib/xos-genx/tests/general_security_test.py
+++ b/lib/xos-genx/tests/general_security_test.py
@@ -1,12 +1,11 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
-import pdb
-"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
"""
-def policy_output_0(x, y):
+def policy_output_enforcer(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
@@ -16,7 +15,7 @@
"""
class XProtoSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output', proto.policies.test_policy, None, '0') }}")
def test_constant(self):
xproto = \
@@ -32,12 +31,12 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i1 = True
return i1
"""
- verdict = policy_output_0({}, {})
+ verdict = policy_output_enforcer({}, {})
self.assertTrue(verdict)
def test_equal(self):
@@ -55,7 +54,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i1 = (ctx.user == obj.user)
return i1
"""
@@ -65,7 +64,7 @@
ctx = FakeArgs()
ctx.user = 1
- verdict = policy_output_0(obj, ctx)
+ verdict = policy_output_enforcer(obj, ctx)
def test_bin(self):
xproto = \
@@ -81,7 +80,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = (ctx.is_admin == True)
i3 = (obj.empty == True)
i1 = (i2 or i3)
@@ -94,7 +93,7 @@
ctx = FakeArgs()
ctx.is_admin = True
- verdict = policy_output_0(obj, ctx)
+ verdict = policy_output_enforcer(obj, ctx)
self.assertTrue(verdict)
@@ -112,12 +111,12 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i1 = Privilege.objects.filter(object_id=obj.id)
return i1
"""
- self.assertTrue(policy_output_0 is not None)
+ self.assertTrue(policy_output_enforcer is not None)
def test_python(self):
xproto = \
@@ -131,13 +130,13 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = ('jack' in ['the', 'box'])
i1 = (i2 == False)
return i1
"""
- self.assertTrue(policy_output_0({}, {}) is True)
+ self.assertTrue(policy_output_enforcer({}, {}) is True)
def test_forall(self):
# This one we only parse
@@ -146,15 +145,13 @@
policy test_policy < forall Credential: Credential.obj_id = obj_id >
"""
- target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
-
args = FakeArgs()
args.inputs = xproto
- args.target = target
+ args.target = self.target
output = XOSGenerator.generate(args)
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
i1 = (not i2)
return i1
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
new file mode 100644
index 0000000..4134fe1
--- /dev/null
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -0,0 +1,202 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
+which is generated and loaded dynamically.
+"""
+def policy_output_validator(x, y):
+ raise Exception("Validator not generated. Test failed.")
+ return False
+
+"""
+The tests below use the Python code target to generate
+Python validation policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoGeneralValidationTest(unittest.TestCase):
+ def setUp(self):
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+
+ def test_constant(self):
+ xproto = \
+"""
+ policy test_policy < False >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i1 = False
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ policy_output_validator({}, {})
+
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_bin(self):
+ xproto = \
+"""
+ policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.is_admin == True)
+ i3 = (obj.empty == True)
+ i1 = (i2 or i3)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.empty = True
+
+ ctx = FakeArgs()
+ ctx.is_admin = True
+
+ with self.assertRaises(Exception):
+ verdict = policy_output_validator(obj, ctx)
+
+
+ def test_exists(self):
+ xproto = \
+"""
+ policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i1 = Privilege.objects.filter(Q(object_id=obj.id))[0]
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ self.assertTrue(policy_output_validator is not None)
+
+ def test_python(self):
+ xproto = \
+"""
+ policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = ('jack' in ['the', 'box'])
+ i1 = (i2 == True)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ self.assertTrue(policy_output_validator({}, {}) is True)
+
+ def test_forall(self):
+ # This one we only parse
+ xproto = \
+"""
+ policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+ i1 = (not i2)
+ return i1
+ """
+
+ self.assertIn('policy_output_validator', output)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/lib/xos-genx/tests/policy_test.py b/lib/xos-genx/tests/policy_test.py
index fcfed8a..25c307e 100644
--- a/lib/xos-genx/tests/policy_test.py
+++ b/lib/xos-genx/tests/policy_test.py
@@ -1,6 +1,7 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
+import pdb
"""
The tests below convert the policy logic expression
@@ -42,6 +43,26 @@
output = XOSGenerator.generate(args).replace('t','T')
self.assertTrue(eval(output))
+ def test_function_term(self):
+ xproto = \
+"""
+ policy slice_user < slice.user.compute_is_admin() >
+"""
+
+ target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_user }}")
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = target
+
+ output = XOSGenerator.generate(args)
+
+ slice = FakeArgs()
+ slice.user = FakeArgs()
+ slice.user.compute_is_admin = lambda: True
+
+ expr = eval(output)
+ self.assertTrue(expr)
+
def test_term(self):
xproto = \
"""
@@ -147,7 +168,29 @@
self.assertFalse(eval(expr))
-
+ def test_implies(self):
+ xproto = \
+"""
+ policy implies < obj.name -> obj.creator >
+"""
+ target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.implies }}")
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = target
+
+ output = XOSGenerator.generate(args)
+
+ slice = FakeArgs()
+ slice.is_admin = False
+ obj = FakeArgs()
+ obj.name = 'Thing 1'
+ obj.creator = None
+
+ (op, operands), = eval(output).items()
+ expr = 'not ' + op.join(operands).replace('->',' or ')
+
+ self.assertFalse(eval(expr))
+
def test_exists(self):
xproto = \
"""
diff --git a/lib/xos-genx/tests/test_cli.py b/lib/xos-genx/tests/test_cli.py
index af21253..6e76073 100644
--- a/lib/xos-genx/tests/test_cli.py
+++ b/lib/xos-genx/tests/test_cli.py
@@ -36,4 +36,4 @@
self.assertEqual(actual_args.output, expected_args.output)
if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
+ unittest.main()
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
index 30a7193..8cdd746 100644
--- a/lib/xos-genx/tests/xos_security_test.py
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -1,12 +1,11 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
-import pdb
-"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
"""
-def policy_output_0(x, y):
+def policy_output_enforcer(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
@@ -16,7 +15,7 @@
"""
class XProtoXOSSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output',proto.policies.test_policy, None, '0') }}")
"""
This is the security policy for controllers
@@ -35,7 +34,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
i1 = (i2 or i3)
@@ -43,7 +42,7 @@
"""
# FIXME: Test this policy by executing it
- self.assertTrue(policy_output_0 is not None)
+ self.assertTrue(policy_output_enforcer is not None)
"""
This is the security policy for controllers
@@ -62,7 +61,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
i1 = (i2 or i3)
@@ -70,7 +69,7 @@
"""
# FIXME: Test this policy by executing it
- self.assertTrue(policy_output_0 is not None)
+ self.assertTrue(policy_output_enforcer is not None)
if __name__ == '__main__':
diff --git a/lib/xos-genx/tests/xos_validation_test.py b/lib/xos-genx/tests/xos_validation_test.py
new file mode 100644
index 0000000..20ba5da
--- /dev/null
+++ b/lib/xos-genx/tests/xos_validation_test.py
@@ -0,0 +1,237 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
+which is generated and loaded dynamically.
+"""
+def policy_output_validator(x, y):
+ raise Exception("Validator not generated. Test failed.")
+ return False
+
+"""
+The tests below use the Python code target to generate
+Python validation policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoXOSModelValidationTest(unittest.TestCase):
+ def setUp(self):
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+
+ def test_instance_container(self):
+ xproto = \
+"""
+ policy test_policy < (obj.isolation = "container" | obj.isolation = "container_vm" ) -> (obj.image.kind = "container") >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ obj = FakeArgs()
+ obj.isolation = 'container'
+ obj.kind = 'not a container'
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i4 = (obj.isolation == 'container')
+ i5 = (self.isolation == 'container_vm')
+ i2 = (i4 or i5)
+ i3 = (obj.image.kind == 'container')
+ i1 = (i2 or i3)
+ return i1
+ """
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, {})
+
+ def test_slice_name_validation(self):
+ xproto = \
+"""
+ policy test_policy < not obj.id -> {{ obj.name.startswith(obj.site.login_base) }} >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ obj = FakeArgs()
+ obj.isolation = 'container'
+ obj.kind = 'not a container'
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i3 = obj.id
+ i4 = obj.name.startswith(obj.site.login_base)
+ i2 = ((not i3) or i4)
+ i1 = (not i2)
+ if (not i1):
+ raise ValidationError('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, {})
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_bin(self):
+ xproto = \
+"""
+ policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.is_admin == True)
+ i3 = (obj.empty == True)
+ i1 = (i2 or i3)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.empty = True
+
+ ctx = FakeArgs()
+ ctx.is_admin = True
+
+ with self.assertRaises(Exception):
+ verdict = policy_output_validator(obj, ctx)
+
+
+ def test_exists(self):
+ xproto = \
+"""
+ policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i1 = Privilege.objects.filter(Q(object_id=obj.id))[0]
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ self.assertTrue(policy_output_validator is not None)
+
+ def test_python(self):
+ xproto = \
+"""
+ policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = ('jack' in ['the', 'box'])
+ i1 = (i2 == True)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ self.assertTrue(policy_output_validator({}, {}) is True)
+
+ def test_forall(self):
+ # This one we only parse
+ xproto = \
+"""
+ policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+ i1 = (not i2)
+ return i1
+ """
+
+ self.assertIn('policy_output_validator', output)
+
+if __name__ == '__main__':
+ unittest.main()