CORD-1385: Autogenerate validation code
Change-Id: I8dda8f78482b382cd5d9c5397070266324d4fab9
diff --git a/lib/xos-genx/tests/general_security_test.py b/lib/xos-genx/tests/general_security_test.py
index f2dd857..d5f3e79 100644
--- a/lib/xos-genx/tests/general_security_test.py
+++ b/lib/xos-genx/tests/general_security_test.py
@@ -1,12 +1,11 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
-import pdb
-"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
"""
-def policy_output_0(x, y):
+def policy_output_enforcer(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
@@ -16,7 +15,7 @@
"""
class XProtoSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output', proto.policies.test_policy, None, '0') }}")
def test_constant(self):
xproto = \
@@ -32,12 +31,12 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i1 = True
return i1
"""
- verdict = policy_output_0({}, {})
+ verdict = policy_output_enforcer({}, {})
self.assertTrue(verdict)
def test_equal(self):
@@ -55,7 +54,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i1 = (ctx.user == obj.user)
return i1
"""
@@ -65,7 +64,7 @@
ctx = FakeArgs()
ctx.user = 1
- verdict = policy_output_0(obj, ctx)
+ verdict = policy_output_enforcer(obj, ctx)
def test_bin(self):
xproto = \
@@ -81,7 +80,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = (ctx.is_admin == True)
i3 = (obj.empty == True)
i1 = (i2 or i3)
@@ -94,7 +93,7 @@
ctx = FakeArgs()
ctx.is_admin = True
- verdict = policy_output_0(obj, ctx)
+ verdict = policy_output_enforcer(obj, ctx)
self.assertTrue(verdict)
@@ -112,12 +111,12 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i1 = Privilege.objects.filter(object_id=obj.id)
return i1
"""
- self.assertTrue(policy_output_0 is not None)
+ self.assertTrue(policy_output_enforcer is not None)
def test_python(self):
xproto = \
@@ -131,13 +130,13 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = ('jack' in ['the', 'box'])
i1 = (i2 == False)
return i1
"""
- self.assertTrue(policy_output_0({}, {}) is True)
+ self.assertTrue(policy_output_enforcer({}, {}) is True)
def test_forall(self):
# This one we only parse
@@ -146,15 +145,13 @@
policy test_policy < forall Credential: Credential.obj_id = obj_id >
"""
- target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
-
args = FakeArgs()
args.inputs = xproto
- args.target = target
+ args.target = self.target
output = XOSGenerator.generate(args)
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
i1 = (not i2)
return i1
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
new file mode 100644
index 0000000..4134fe1
--- /dev/null
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -0,0 +1,202 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
+which is generated and loaded dynamically.
+"""
+def policy_output_validator(x, y):
+ raise Exception("Validator not generated. Test failed.")
+ return False
+
+"""
+The tests below use the Python code target to generate
+Python validation policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoGeneralValidationTest(unittest.TestCase):
+ def setUp(self):
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+
+ def test_constant(self):
+ xproto = \
+"""
+ policy test_policy < False >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i1 = False
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ policy_output_validator({}, {})
+
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_bin(self):
+ xproto = \
+"""
+ policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.is_admin == True)
+ i3 = (obj.empty == True)
+ i1 = (i2 or i3)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.empty = True
+
+ ctx = FakeArgs()
+ ctx.is_admin = True
+
+ with self.assertRaises(Exception):
+ verdict = policy_output_validator(obj, ctx)
+
+
+ def test_exists(self):
+ xproto = \
+"""
+ policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i1 = Privilege.objects.filter(Q(object_id=obj.id))[0]
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ self.assertTrue(policy_output_validator is not None)
+
+ def test_python(self):
+ xproto = \
+"""
+ policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = ('jack' in ['the', 'box'])
+ i1 = (i2 == True)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ self.assertTrue(policy_output_validator({}, {}) is True)
+
+ def test_forall(self):
+ # This one we only parse
+ xproto = \
+"""
+ policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+ i1 = (not i2)
+ return i1
+ """
+
+ self.assertIn('policy_output_validator', output)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/lib/xos-genx/tests/policy_test.py b/lib/xos-genx/tests/policy_test.py
index fcfed8a..25c307e 100644
--- a/lib/xos-genx/tests/policy_test.py
+++ b/lib/xos-genx/tests/policy_test.py
@@ -1,6 +1,7 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
+import pdb
"""
The tests below convert the policy logic expression
@@ -42,6 +43,26 @@
output = XOSGenerator.generate(args).replace('t','T')
self.assertTrue(eval(output))
+ def test_function_term(self):
+ xproto = \
+"""
+ policy slice_user < slice.user.compute_is_admin() >
+"""
+
+ target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_user }}")
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = target
+
+ output = XOSGenerator.generate(args)
+
+ slice = FakeArgs()
+ slice.user = FakeArgs()
+ slice.user.compute_is_admin = lambda: True
+
+ expr = eval(output)
+ self.assertTrue(expr)
+
def test_term(self):
xproto = \
"""
@@ -147,7 +168,29 @@
self.assertFalse(eval(expr))
-
+ def test_implies(self):
+ xproto = \
+"""
+ policy implies < obj.name -> obj.creator >
+"""
+ target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.implies }}")
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = target
+
+ output = XOSGenerator.generate(args)
+
+ slice = FakeArgs()
+ slice.is_admin = False
+ obj = FakeArgs()
+ obj.name = 'Thing 1'
+ obj.creator = None
+
+ (op, operands), = eval(output).items()
+ expr = 'not ' + op.join(operands).replace('->',' or ')
+
+ self.assertFalse(eval(expr))
+
def test_exists(self):
xproto = \
"""
diff --git a/lib/xos-genx/tests/test_cli.py b/lib/xos-genx/tests/test_cli.py
index af21253..6e76073 100644
--- a/lib/xos-genx/tests/test_cli.py
+++ b/lib/xos-genx/tests/test_cli.py
@@ -36,4 +36,4 @@
self.assertEqual(actual_args.output, expected_args.output)
if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
+ unittest.main()
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
index 30a7193..8cdd746 100644
--- a/lib/xos-genx/tests/xos_security_test.py
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -1,12 +1,11 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
-import pdb
-"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
"""
-def policy_output_0(x, y):
+def policy_output_enforcer(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
@@ -16,7 +15,7 @@
"""
class XProtoXOSSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output',proto.policies.test_policy, None, '0') }}")
"""
This is the security policy for controllers
@@ -35,7 +34,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
i1 = (i2 or i3)
@@ -43,7 +42,7 @@
"""
# FIXME: Test this policy by executing it
- self.assertTrue(policy_output_0 is not None)
+ self.assertTrue(policy_output_enforcer is not None)
"""
This is the security policy for controllers
@@ -62,7 +61,7 @@
exec(output) # This loads the generated function, which should look like this:
"""
- def policy_output_0(obj, ctx):
+ def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
i1 = (i2 or i3)
@@ -70,7 +69,7 @@
"""
# FIXME: Test this policy by executing it
- self.assertTrue(policy_output_0 is not None)
+ self.assertTrue(policy_output_enforcer is not None)
if __name__ == '__main__':
diff --git a/lib/xos-genx/tests/xos_validation_test.py b/lib/xos-genx/tests/xos_validation_test.py
new file mode 100644
index 0000000..20ba5da
--- /dev/null
+++ b/lib/xos-genx/tests/xos_validation_test.py
@@ -0,0 +1,237 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
+which is generated and loaded dynamically.
+"""
+def policy_output_validator(x, y):
+ raise Exception("Validator not generated. Test failed.")
+ return False
+
+"""
+The tests below use the Python code target to generate
+Python validation policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoXOSModelValidationTest(unittest.TestCase):
+ def setUp(self):
+ self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+
+ def test_instance_container(self):
+ xproto = \
+"""
+ policy test_policy < (obj.isolation = "container" | obj.isolation = "container_vm" ) -> (obj.image.kind = "container") >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ obj = FakeArgs()
+ obj.isolation = 'container'
+ obj.kind = 'not a container'
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i4 = (obj.isolation == 'container')
+ i5 = (self.isolation == 'container_vm')
+ i2 = (i4 or i5)
+ i3 = (obj.image.kind == 'container')
+ i1 = (i2 or i3)
+ return i1
+ """
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, {})
+
+ def test_slice_name_validation(self):
+ xproto = \
+"""
+ policy test_policy < not obj.id -> {{ obj.name.startswith(obj.site.login_base) }} >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ obj = FakeArgs()
+ obj.isolation = 'container'
+ obj.kind = 'not a container'
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i3 = obj.id
+ i4 = obj.name.startswith(obj.site.login_base)
+ i2 = ((not i3) or i4)
+ i1 = (not i2)
+ if (not i1):
+ raise ValidationError('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, {})
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_equal(self):
+ xproto = \
+"""
+ policy test_policy < not (ctx.user = obj.user) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.user == obj.user)
+ i1 = (not i2)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.user = 1
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ policy_output_validator(obj, ctx)
+
+ def test_bin(self):
+ xproto = \
+"""
+ policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = (ctx.is_admin == True)
+ i3 = (obj.empty == True)
+ i1 = (i2 or i3)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.empty = True
+
+ ctx = FakeArgs()
+ ctx.is_admin = True
+
+ with self.assertRaises(Exception):
+ verdict = policy_output_validator(obj, ctx)
+
+
+ def test_exists(self):
+ xproto = \
+"""
+ policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i1 = Privilege.objects.filter(Q(object_id=obj.id))[0]
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ self.assertTrue(policy_output_validator is not None)
+
+ def test_python(self):
+ xproto = \
+"""
+ policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_validator(obj, ctx):
+ i2 = ('jack' in ['the', 'box'])
+ i1 = (i2 == True)
+ if (not i1):
+ raise Exception('Necessary Failure')
+ """
+
+ with self.assertRaises(Exception):
+ self.assertTrue(policy_output_validator({}, {}) is True)
+
+ def test_forall(self):
+ # This one we only parse
+ xproto = \
+"""
+ policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+ i1 = (not i2)
+ return i1
+ """
+
+ self.assertIn('policy_output_validator', output)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/django.py b/lib/xos-genx/xosgenx/jinja2_extensions/django.py
index 9c52fbd..eaf21d1 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/django.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/django.py
@@ -1,3 +1,6 @@
+from base import *
+import pdb
+
def django_content_type_string(xptags):
# Check possibility of KeyError in caller
content_type = xptags['content_type']
@@ -137,3 +140,9 @@
output_dict['related_name'] = '%r'%dport
return format_options_string(output_dict)
+
+def xproto_validations(options):
+ try:
+ return [map(str.strip, validation.split(':')) for validation in unquote(options['validators']).split(',')]
+ except KeyError:
+ return []
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
index a929a9c..336373f 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
@@ -2,10 +2,11 @@
import ast
import random
import string
+import jinja2
from plyxproto.parser import *
import pdb
-BINOPS = ['|', '&', '=>']
+BINOPS = ['|', '&', '->']
QUANTS = ['exists', 'forall']
@@ -89,13 +90,14 @@
or_expr = ' or '
and_expr = ' and '
- if k == '=':
+ if k in ['=','in']:
v = [self.format_term_for_query(
model, term, django=django) for term in v]
if django:
- operator = ' = '
+ operator_map = {'=':' = ','in':'__in'}
else:
- operator = ' == '
+ operator_map = {'=':' == ','in':'in'}
+ operator = operator_map[k]
return [q_bracket % operator.join(v)]
elif k == '|':
components = [self.fol_to_python_filter(
@@ -105,7 +107,7 @@
components = [self.fol_to_python_filter(
model, x, django=django).pop() for x in v]
return [and_expr.join(components)]
- elif k == '=>':
+ elif k == '->':
components = [self.fol_to_python_filter(
model, x, django=django).pop() for x in v]
return ['~%s | %s' % (components[0], components[1])]
@@ -137,10 +139,10 @@
return {'hoist': ['const', fol], 'result': 'True'}
else:
return {'hoist': [], 'result': fol}
- elif k == '=':
+ elif k in ['=', 'in']:
lhs, rhs = v
if not lhs.startswith(var) and not rhs.startswith(var):
- return {'hoist': ['=', fol], 'result': 'True'} # XXX
+ return {'hoist': [k, fol], 'result': 'True'} # XXX
else:
return {'hoist': [], 'result': fol}
elif k in BINOPS:
@@ -148,7 +150,7 @@
rlhs = self.hoist_constants(lhs, var)
rrhs = self.hoist_constants(rhs, var)
- if rlhs['hoist'] and rrhs['hoist']:
+ if rlhs['hoist'] and rrhs['hoist'] and rlhs['result']=='True' and llhs['result']=='True':
return {'hoist': ['=', fol], 'result': 'True'}
elif rlhs['hoist']:
return {'hoist': [k, lhs], 'result': rhs}
@@ -196,6 +198,26 @@
else:
return fol
+ def gen_validation_function(self, fol, policy_name, message, tag):
+ if not tag:
+ tag = gen_random_string()
+
+ policy_function_name = 'policy_%(policy_name)s_%(random_string)s' % {
+ 'policy_name': policy_name, 'random_string': tag}
+ self.verdict_next()
+ function_str = """
+def %(fn_name)s(obj, ctx):
+ if not %(vvar)s: raise ValidationError("%(message)s")
+ """ % {'fn_name': policy_function_name, 'vvar': self.verdict_variable, 'message': message}
+
+ function_ast = self.str_to_ast(function_str)
+ policy_code = self.gen_test(fol, self.verdict_variable)
+
+
+ function_ast.body = [policy_code] + function_ast.body
+
+ return function_ast
+
def gen_test_function(self, fol, policy_name, tag):
if not tag:
tag = gen_random_string()
@@ -213,7 +235,7 @@
function_ast.body = [policy_code] + function_ast.body
- return astunparse.unparse(function_ast)
+ return function_ast
def gen_test(self, fol, verdict_var, bindings=None):
if isinstance(fol, str):
@@ -221,7 +243,35 @@
(k, v), = fol.items()
- if k == '=':
+ if k == 'python':
+ try:
+ expr_ast = self.str_to_ast(v)
+ except SyntaxError:
+ raise PolicyException('Syntax error in %s' % v)
+
+ if not isinstance(expr_ast, ast.Expr):
+ raise PolicyException(
+ '%s is not an expression' % expr_ast)
+
+ assignment_str = """
+%(verdict_var)s = (%(escape_expr)s)
+ """ % {'verdict_var': self.verdict_variable, 'escape_expr': v}
+
+ assignment_ast = self.str_to_ast(assignment_str)
+ return assignment_ast
+ elif k == 'not':
+ top_vvar = self.verdict_variable
+ self.verdict_next()
+ sub_vvar = self.verdict_variable
+ block = self.gen_test(v, sub_vvar)
+ assignment_str = """
+%(verdict_var)s = not (%(subvar)s)
+ """ % {'verdict_var': top_vvar, 'subvar': sub_vvar}
+
+ assignment_ast = self.str_to_ast(assignment_str)
+
+ return ast.Module(body=[block, assignment_ast])
+ elif k in ['=','in']:
# This is the simplest case, we don't recurse further
# To use terms that are not simple variables, use
# the Python escape, e.g. {{ slice.creator is not None }}
@@ -259,18 +309,23 @@
except TypeError:
pass
+ if k=='=':
+ operator='=='
+ elif k=='in':
+ operator='in'
+
comparison_str = """
-%(verdict_var)s = (%(lhs)s == %(rhs)s)
- """ % {'verdict_var': verdict_var, 'lhs': lhs, 'rhs': rhs}
+%(verdict_var)s = (%(lhs)s %(operator)s %(rhs)s)
+ """ % {'verdict_var': verdict_var, 'lhs': lhs, 'rhs': rhs, 'operator':operator}
comparison_ast = self.str_to_ast(comparison_str)
-
combined_ast = ast.Module(body=assignments + [comparison_ast])
+
return combined_ast
elif k in BINOPS:
lhs, rhs = v
- top_vvar = self.verdict_variable
+ top_vvar = verdict_var
self.verdict_next()
lvar = self.verdict_variable
@@ -286,7 +341,7 @@
binop = 'and'
elif k == '|':
binop = 'or'
- elif k == '=>':
+ elif k == '->':
binop = 'or'
invert = 'not'
@@ -360,12 +415,24 @@
return ast.Module(body=[python_ast, negate_ast])
-def xproto_fol_to_python_test(fol, model, tag=None):
+def xproto_fol_to_python_test(policy, fol, model, tag=None):
+ if isinstance(fol, jinja2.Undefined):
+ raise Exception('Could not find policy:', policy)
+
f2p = FOL2Python()
fol = f2p.hoist_constants(fol)
- a = f2p.gen_test_function(fol, 'output', tag)
- return a
+ a = f2p.gen_test_function(fol, policy, tag='enforcer')
+ return astunparse.unparse(a)
+def xproto_fol_to_python_validator(policy, fol, model, message, tag=None):
+ if isinstance(fol, jinja2.Undefined):
+ raise Exception('Could not find policy:', policy)
+
+ f2p = FOL2Python()
+ fol = f2p.hoist_constants(fol)
+ a = f2p.gen_validation_function(fol, policy, message, tag='validator')
+
+ return astunparse.unparse(a)
def main():
while True:
@@ -374,7 +441,7 @@
fol_parser = yacc.yacc(module=FOLParser(), start='goal')
val = fol_parser.parse(inp, lexer=fol_lexer)
- a = xproto_fol_to_python_test(val, 'output')
+ a = xproto_fol_to_python_test('pol', val, 'output', 'Test')
print a
diff --git a/xos/core/models/core.xproto b/xos/core/models/core.xproto
index 1a77eee..4bae7ec 100644
--- a/xos/core/models/core.xproto
+++ b/xos/core/models/core.xproto
@@ -12,7 +12,7 @@
required bool backend_need_reap = 7 [default = False];
required string backend_status = 8 [default = "0 - Provisioning in progress", max_length = 1024, null = True];
required bool deleted = 9 [default = False];
- required bool write_protect = 10 [default = False];
+ required bool write_protect = 11 [default = False];
required bool lazy_blocked = 11 [default = False];
required bool no_sync = 12 [default = False];
required bool no_policy = 13 [default = False];
@@ -224,8 +224,17 @@
required manytoone deployment->Deployment:imagedeployments = 2 [db_index = True, null = False, blank = False];
}
+policy instance_name < obj.name = obj.slice.name >
+policy instance_creator < obj.creator >
+policy instance_isolation < (obj.isolation = "container" | self.isolation = "container_vm" ) -> (obj.image.kind = "container") >
+policy instance_isolation_parent < obj.isolation -> obj.parent >
+policy instance_isolation_vm < (obj.isolation = "vm") -> (obj.image.kind = "vm") >
+policy instance_isolation_container_vm < (obj.isolation = "container_vm") -> (obj.image.kind = "vm") >
+policy instance_creator_privilege < not (obj.slice.creator = obj.creator) -> exists Privilege:Privilege.object_id = obj.slice.id & Privilege.accessor_id = obj.creator.id & Privilege.object_type = "Slice" >
message Instance (XOSBase) {
+ option validators = "instance_name:Instance name set improperly, instance_creator:Instance has no creator, instance_isolation: Container instance must use container image, instance_isolation_parent:Container-vm instance must have a parent, instance_isolation_vm: VM Instance must use VM image, instance_isolation_container_vm:Parent field can only be set on Container-vm instances, instance_creator_privilege: instance creator has no privileges on slice";
+
optional string instance_id = 1 [max_length = 200, content_type = "stripped", blank = True, help_text = "Nova instance id", null = True, db_index = False];
optional string instance_uuid = 2 [max_length = 200, content_type = "stripped", blank = True, help_text = "Nova instance uuid", null = True, db_index = False];
required string name = 3 [max_length = 200, content_type = "stripped", blank = False, help_text = "Instance name", null = False, db_index = False];
@@ -273,11 +282,14 @@
required string description = 2 [db_index = False, max_length = 1024, null = False, blank = False];
}
+policy network_slice_policy < (obj.slice in obj.network.permitted_slices.all()) | (obj.slice = obj.network.owner) | (not obj.network.permit_all_slices) >
message NetworkSlice (XOSBase) {
+ option validators = "network_slice_policy:Slice is not allowed to connect to networks";
required manytoone network->Network:networkslices = 1 [db_index = True, null = False, blank = False, unique_with = "slice"];
required manytoone slice->Slice:networkslices = 2 [db_index = True, null = False, blank = False];
}
+
message NetworkTemplate (XOSBase) {
required string name = 1 [db_index = False, max_length = 32, null = False, blank = False];
optional string description = 2 [db_index = False, max_length = 1024, null = True, blank = True];
@@ -298,7 +310,11 @@
required string name = 1 [max_length = 200, content_type = "stripped", blank = False, help_text = "label name", null = False, db_index = False];
required manytomany node->Node/NodeLabel_node:nodelabels = 2 [db_index = False, null = False, blank = True];
}
+
+policy port_policy < (obj.instance.slice in obj.network.permitted_slices.all()) | (obj.instance.slice = obj.network.owner) | (not obj.network.permit_all_slices) >
+
message Port (XOSBase) {
+ option validators = "port_policy:Slice is not allowed to connect to network";
required manytoone network->Network:links = 1 [db_index = True, null = False, blank = False, unique_with = "instance"];
optional manytoone instance->Instance:ports = 2 [db_index = True, null = True, blank = True];
optional string ip = 3 [max_length = 39, content_type = "ip", blank = True, help_text = "Instance ip address", null = True, db_index = False];
@@ -398,8 +414,14 @@
required string role = 1 [choices = "(('admin', 'Admin'), ('pi', 'PI'), ('tech', 'Tech'), ('billing', 'Billing'))", max_length = 30, content_type = "stripped", blank = False, null = False, db_index = False];
}
+policy slice_name < not obj.id -> {{ obj.name.startswith(obj.site.login_base) }} >
+policy slice_name_length_and_no_spaces < {{ len(obj.site.login_base) + 1 < len(obj.name) and ' ' not in obj.name }} >
+policy slice_has_creator < obj.creator >
+
message Slice (XOSBase) {
+ option validators = "slice_name:Slice name must begin with site login_base, slice_name_length_and_no_spaces:Slice name too short or contains spaces, slice_has_creator:Slice has no creator";
option plural = "Slices";
+
required string name = 1 [max_length = 80, content_type = "stripped", blank = False, help_text = "The Name of the Slice", null = False, db_index = False];
required bool enabled = 2 [help_text = "Status for this Slice", default = True, null = False, db_index = False, blank = True];
required string description = 4 [help_text = "High level description of the slice and expected activities", max_length = 1024, null = False, db_index = False, blank = True, varchar = True];
@@ -502,3 +524,4 @@
required string name = 1 [max_length = 200, content_type = "stripped", blank = False, help_text = "Name of the GUI Extensions", null = False, db_index = False];
required string files = 2 [max_length = 1024, content_type = "stripped", blank = False, help_text = "List of comma separated file composing the view", null = False, db_index = False];
}
+