CORD-1570: Several bug fixes, expanded unit tests for security refactoring
Change-Id: Ied8dca916d3c22a252f6de38a65ef1b20c9d639d
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
index c1e6820..15cdb56 100644
--- a/lib/xos-genx/tests/general_validation_test.py
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -108,7 +108,7 @@
def test_bin(self):
xproto = \
"""
- policy output < (ctx.is_admin = True | obj.empty = True) & False>
+ policy output < (ctx.is_admin = True | obj.empty = True) | False>
"""
args = FakeArgs()
@@ -128,10 +128,10 @@
"""
obj = FakeArgs()
- obj.empty = True
+ obj.empty = False
ctx = FakeArgs()
- ctx.is_admin = True
+ ctx.is_admin = False
with self.assertRaises(Exception):
verdict = policy_output_validator(obj, ctx)
diff --git a/lib/xos-genx/tests/optimize_test.py b/lib/xos-genx/tests/optimize_test.py
index ce15bf4..af40575 100644
--- a/lib/xos-genx/tests/optimize_test.py
+++ b/lib/xos-genx/tests/optimize_test.py
@@ -1,70 +1,78 @@
import unittest
from xosgenx.jinja2_extensions.fol2 import FOL2Python
-import pdb
+import vimpdb
class XProtoOptimizeTest(unittest.TestCase):
def setUp(self):
self.f2p = FOL2Python()
+ self.maxDiff=None
def test_constant(self):
input = 'True'
- output = self.f2p.hoist_constants(input)
+ output = self.f2p.hoist_outer(input)
self.assertEqual(output, input)
def test_exists(self):
- input = {'exists': ['x',{'|':['x','y']}]}
+ input = {'exists': ['X',{'|':['X.foo','y']}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['y', {'exists': ['x', 'x']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
- def test_forall(self):
- input = {'forall': ['x',{'|':['x','y']}]}
+ def test_exists_implies(self):
+ input = {'exists': ['Foo', {'&': [{'=': ('Foo.a', '1')}, {'->': ['write_access', {'=': ('Foo.b', '1')}]}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['y', {'forall': ['x', 'x']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': [{'&': ['write_access', {'exists': ['Foo', {'&': [{'=': ['Foo.a', '1']}, {'=': ['Foo.b', '1']}]}]}]}, {'&': [{'not': 'write_access'}, {'exists': ['Foo', {'=': ['Foo.a', '1']}]}]}]}
+ self.assertEqual(output, expected)
+
+ def test_forall(self):
+ input = {'forall': ['X',{'|':['X.foo','y']}]}
+
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'forall': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
def test_exists_embedded(self):
- input = {'|':['True',{'exists': ['x',{'|':['x','y']}]}]}
+ input = {'&':['True',{'exists': ['X',{'|':['X.foo','y']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['True', {'|': ['y', {'exists': ['x', 'x']}]}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
def test_exists_equals(self):
- input = {'|':['True',{'exists': ['x',{'|':['x',{'=':['y','z']}]}]}]}
+ input = {'&':['True',{'exists': ['X',{'|':['X.foo',{'=':['y','z']}]}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['True', {'|': [{'=': ['y', 'z']}, {'exists': ['x', 'x']}]}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': [{'=': ['y', 'z']}, {'&': [{'not': {'=': ['y', 'z']}}, {'exists': ['X', 'X.foo']}]}]}
self.assertEqual(output, expected)
def test_exists_nested_constant(self):
- input = {'|':['True',{'exists': ['x',{'|':['y',{'=':['y','x']}]}]}]}
+ input = {'&':['True',{'exists': ['X',{'|':['y',{'=':['y','X.foo']}]}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'|': ['True', {'|': ['y', {'exists': ['x', {'=': ['y', 'x']}]}]}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', {'=': ['False', 'X.foo']}]}]}]}
self.assertEqual(output, expected)
def test_exists_nested(self):
- input = {'exists': ['x',{'exists':['y',{'=':['y','x']}]}]}
+ input = {'exists': ['X',{'exists':['Y',{'=':['Y.foo','X.foo']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'exists': ['x', {'exists': ['y', {'=': ['y', 'x']}]}]}
- self.assertEqual(output, expected)
+ output = self.f2p.hoist_outer(input)
+ expected = input
+ self.assertEqual(input, output)
def test_exists_nested2(self):
- input = {'exists': ['x',{'exists':['y',{'=':['z','y']}]}]}
+ input = {'exists': ['X',{'exists':['Y',{'=':['Z','Y']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'exists': ['y', {'=': ['z', 'y']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'exists': ['Y', {'=': ['Z', 'Y']}]}
self.assertEqual(output, expected)
def test_exists_nested3(self):
- input = {'exists': ['x',{'exists':['y',{'=':['z','x']}]}]}
+ input = {'exists': ['X',{'exists':['Y',{'=':['Z','X']}]}]}
- output = self.f2p.hoist_constants(input)
- expected = {'exists': ['x', {'=': ['z', 'x']}]}
+ output = self.f2p.hoist_outer(input)
+ expected = {'exists': ['X', {'=': ['Z', 'X']}]}
self.assertEqual(output, expected)
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
index 8cdd746..e6d2c6c 100644
--- a/lib/xos-genx/tests/xos_security_test.py
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -1,6 +1,8 @@
import unittest
from xosgenx.generator import XOSGenerator
from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+import mock
"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
@@ -12,6 +14,7 @@
"""
The tests below use the Python code target to generate
Python security policies, set up an appropriate environment and execute the Python.
+The security policies here deliberately made complex in order to stress the processor.
"""
class XProtoXOSSecurityTest(unittest.TestCase):
def setUp(self):
@@ -23,7 +26,7 @@
def test_controller_policy(self):
xproto = \
"""
- policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.user_id = ctx.user.id & Privilege.object_type = "Deployment" >
+ policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.accessor_id = ctx.user.id & Privilege.object_type = "Deployment" & Privilege.permission = "role:admin" & Privilege.object_id = obj.id >
"""
args = FakeArgs()
args.inputs = xproto
@@ -36,21 +39,127 @@
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
- i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+ i3 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(object_type='Deployment'), Q(permission='role:admin'), Q(object_id=obj.id))[0]
i1 = (i2 or i3)
- return i1
+ return i1
"""
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
"""
- This is the security policy for controllers
+ This is the security policy for ControllerNetworks
"""
- def _test_controller_network_policy(self):
+ def test_controller_network_policy(self):
xproto = \
"""
- policy test_policy < ctx.user.is_admin | exists Slice: forall ctx.networks: ctx.networks.owner.id = Slice.id >
+ policy test_policy <
+ ctx.user.is_admin
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Slice"
+ & Privilege.object_id = obj.owner.id)
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Site"
+ & Privilege.object_id = obj.owner.site.id
+ & Privilege.permission = "role:admin") >
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = ctx.user.is_admin
+ i4 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.owner.id))[0]
+ i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.owner.site.id), Q(permission='role:admin'))[0]
+ i3 = (i4 or i5)
+ i1 = (i2 or i3)
+ return i1
+ """
+
+ # FIXME: Test this policy by executing it
+ self.assertTrue(policy_output_enforcer is not None)
+
+ """
+ This is the security policy for Slices
+ """
+ def test_slice_policy(self):
+ xproto = \
+"""
+ policy site_policy <
+ ctx.user.is_admin
+ | (ctx.write_access -> exists Privilege: Privilege.object_type = "Site" & Privilege.object_id = obj.id & Privilege.accessor_id = ctx.user.id & Privilege.permission_id = "role:admin") >
+
+ policy test_policy <
+ ctx.user.is_admin
+ | (*site_policy(site)
+ & ((exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Slice"
+ & Privilege.object_id = obj.id
+ & (ctx.write_access->Privilege.permission="role:admin"))
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.object_type = "Site"
+ & Privilege.object_id = obj.site.id
+ & Privilege.permission = "role:admin"))
+ )>
+
+"""
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output) # This loads the generated function, which should look like this:
+
+ """
+ def policy_output_enforcer(obj, ctx):
+ i2 = ctx.user.is_admin
+ i4 = policy_site_policy_enforcer(obj.site, ctx)
+ i10 = ctx.write_access
+ i11 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id), Q(permission='role:admin'))))
+ i8 = (i10 and i11)
+ i14 = ctx.write_access
+ i12 = (not i14)
+ i13 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id))))
+ i9 = (i12 and i13)
+ i6 = (i8 or i9)
+ i7 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.site.id), Q(permission='role:admin'))))
+ i5 = (i6 or i7)
+ i3 = (i4 and i5)
+ i1 = (i2 or i3)
+ return i1
+ """
+
+ # FIXME: Test this policy by executing it
+ self.assertTrue(policy_output_enforcer is not None)
+
+ """
+ This is the security policy for Users
+ """
+ def test_user_policy(self):
+ xproto = \
+"""
+ policy test_policy <
+ ctx.user.is_admin
+ | ctx.user.id = obj.id
+ | (exists Privilege:
+ Privilege.accessor_id = ctx.user.id
+ & Privilege.accessor_type = "User"
+ & Privilege.permission = "role:admin"
+ & Privilege.object_type = "Site"
+ & Privilege.object_id = ctx.user.site.id) >
"""
args = FakeArgs()
args.inputs = xproto
@@ -63,7 +172,9 @@
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
- i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+ i4 = (ctx.user.id == obj.id)
+ i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(permission='role:admin'), Q(object_type='Site'), Q(object_id=ctx.user.site.id))[0]
+ i3 = (i4 or i5)
i1 = (i2 or i3)
return i1
"""
@@ -71,6 +182,5 @@
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
-
if __name__ == '__main__':
unittest.main()
diff --git a/lib/xos-genx/tests/xos_validation_test.py b/lib/xos-genx/tests/xos_validation_test.py
index 20ba5da..7e70ede 100644
--- a/lib/xos-genx/tests/xos_validation_test.py
+++ b/lib/xos-genx/tests/xos_validation_test.py
@@ -76,35 +76,6 @@
with self.assertRaises(Exception):
policy_output_validator(obj, {})
- def test_equal(self):
- xproto = \
-"""
- policy test_policy < not (ctx.user = obj.user) >
-"""
-
- args = FakeArgs()
- args.inputs = xproto
- args.target = self.target
-
- output = XOSGenerator.generate(args)
-
- exec(output) # This loads the generated function, which should look like this:
-
- """
- def policy_output_validator(obj, ctx):
- i2 = (ctx.user == obj.user)
- i1 = (not i2)
- if (not i1):
- raise Exception('Necessary Failure')
- """
-
- obj = FakeArgs()
- obj.user = 1
- ctx = FakeArgs()
- ctx.user = 1
-
- with self.assertRaises(Exception):
- policy_output_validator(obj, ctx)
def test_equal(self):
xproto = \
@@ -139,7 +110,7 @@
def test_bin(self):
xproto = \
"""
- policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+ policy test_policy < not (ctx.is_admin = True | obj.empty = True) | False>
"""
args = FakeArgs()
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
index f7af1dc..0c8513a 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
@@ -4,15 +4,18 @@
import string
import jinja2
from plyxproto.parser import *
-import pdb
BINOPS = ['|', '&', '->']
QUANTS = ['exists', 'forall']
-
class PolicyException(Exception):
pass
+class ConstructNotHandled(Exception):
+ pass
+
+class TrivialPolicy(Exception):
+ pass
class AutoVariable:
def __init__(self, base):
@@ -27,11 +30,9 @@
self.idx += 1
return var
-
def gen_random_string():
return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(5))
-
class FOL2Python:
def __init__(self, context_map=None):
# This will produce i0, i1, i2 etc.
@@ -114,25 +115,223 @@
""" Convert a single leaf node from a string
to an AST"""
-
def str_to_ast(self, s):
ast_module = ast.parse(s)
return ast_module.body[0]
- def hoist_constants(self, fol, var=None):
+ def reduce_operands(self, operands):
+ if operands[0] in ['True','False']:
+ return (operands[0],operands[1])
+ elif operands[1] in ['True','False']:
+ return (operands[1],operands[0])
+ else:
+ return None
+
+ """ Simplify binops with constants """
+ def simplify_binop(self, binop):
+ (k,v), = binop.items()
+ if k == '->':
+ lhs, rhs = v
+ if lhs == 'True':
+ return rhs
+ elif rhs == 'True':
+ return 'True'
+ elif lhs == 'False':
+ return 'True'
+ elif rhs == 'False':
+ return {'not': lhs}
+
+ var_expr = self.reduce_operands(v)
+
+ if not var_expr: return binop
+ else:
+ constant, var = var_expr
+ if k=='|':
+ if constant=='True':
+ return 'True'
+ elif constant=='False':
+ return var
+ else:
+ raise Exception("Internal error - variable read as constant")
+ elif k=='&':
+ if constant=='True':
+ return var
+ elif constant=='False':
+ return 'False'
+
+ def is_constant(self, var, fol):
+ try:
+ (k, v), = fol.items()
+ except AttributeError:
+ k = 'term'
+ v = fol
+
+ if k in ['python', 'policy']:
+ # Treat as a constant and hoist, since it cannot be quantified
+ return True
+ elif k == 'term':
+ return not v.startswith(var)
+ elif k == 'not':
+ return self.is_constant(var, fol)
+ elif k in ['in', '=']:
+ lhs, rhs = v
+ return self.is_constant(var,lhs) and self.is_constant(var, rhs)
+ elif k in BINOPS:
+ lhs, rhs = v
+ return self.is_constant(lhs, var) and self.is_constant(rhs, var)
+ elif k in QUANTS:
+ is_constant = self.is_constant(var, fol[1])
+ return is_constant
+ else:
+ raise ConstructNotHandled(k)
+
+ def find_constants(self, var, fol, constants):
try:
(k, v), = fol.items()
except AttributeError:
k = 'term'
v = fol
- if k in ['python', 'policy'] :
- # Tainted, don't optimize
- if var:
- return {'hoist': []}
+ if k in ['python', 'policy']:
+ # Treat as a constant and hoist, since it cannot be quantified
+ if fol not in constants:
+ constants.append(fol)
+ return constants
+ elif k == 'term':
+ if not v.startswith(var):
+ constants.append(v)
+ return constants
+ elif k == 'not':
+ return self.find_constants(var, v, constants)
+ elif k in ['in', '=']:
+ lhs, rhs = v
+ if isinstance(lhs, str) and isinstance(rhs, str):
+ if not lhs.startswith(var) and not rhs.startswith(var):
+ constants.append(fol)
+ return constants
else:
- return fol
+ constants = self.find_constants(var, lhs, constants)
+ return self.find_constants(var, rhs, constants)
+ elif k in BINOPS:
+ lhs, rhs = v
+ constants = self.find_constants(var, lhs, constants)
+ constants = self.find_constants(var, rhs, constants)
+ return constants
+ elif k in QUANTS:
+ is_constant = self.is_constant(var, v[1])
+ if is_constant: constants.append(fol)
+ return constants
+ else:
+ raise ConstructNotHandled(k)
+ """ Hoist constants out of quantifiers. Depth-first. """
+ def hoist_outer(self, fol):
+ try:
+ (k, v), = fol.items()
+ except AttributeError:
+ k = 'term'
+ v = fol
+
+ if k in ['python', 'policy']:
+ # Tainted, optimization and distribution not possible
+ return fol
+ elif k == 'term':
+ return fol
+ elif k == 'not':
+ vprime = self.hoist_outer(v)
+ return {'not': vprime}
+ elif k in ['in', '=']:
+ lhs, rhs = v
+ rlhs = self.hoist_outer(lhs)
+ rrhs = self.hoist_outer(rhs)
+ return {k:[rlhs,rrhs]}
+ elif k in BINOPS:
+ lhs, rhs = v
+ rlhs = self.hoist_outer(lhs)
+ rrhs = self.hoist_outer(rhs)
+
+ fol_prime = {k: [rlhs, rrhs]}
+ fol_simplified = self.simplify_binop(fol_prime)
+ return fol_simplified
+ elif k in QUANTS:
+ rexpr = self.hoist_outer(v[1])
+ return self.hoist_quant(k, [v[0],rexpr])
+ else:
+ raise ConstructNotHandled(k)
+
+ def replace_const(self, fol, c, value):
+ if fol == c:
+ return value
+
+ try:
+ (k, v), = fol.items()
+ except AttributeError:
+ k = 'term'
+ v = fol
+
+ if k == 'term':
+ if v == c: return value
+ else: return v
+ elif k == 'not':
+ new_expr = self.replace_const(v, c, value)
+ if new_expr=='True':
+ return 'False'
+ elif new_expr=='False':
+ return 'True'
+ else:
+ return {'not': new_expr}
+ elif k in ['in', '=']:
+ lhs, rhs = v
+ rlhs = self.replace_const(lhs, c, value)
+ rrhs = self.replace_const(rhs, c, value)
+
+ if rlhs==rrhs:
+ return 'True'
+ else:
+ return {k: [rlhs, rrhs]}
+ elif k in BINOPS:
+ lhs, rhs = v
+
+ rlhs = self.replace_const(lhs, c, value)
+ rrhs = self.replace_const(rhs, c, value)
+
+ return self.simplify_binop({k:[rlhs,rrhs]})
+ elif k in QUANTS:
+ var, expr = v
+ new_expr = self.replace_const(expr, c, value)
+ if new_expr in ['True', 'False']:
+ return new_expr
+ else:
+ return {k: [var, new_expr]}
+ else:
+ raise ConstructNotHandled(k)
+
+ def shannon_expand(self, c, fol):
+ lhs = self.replace_const(fol, c, 'True')
+ rhs = self.replace_const(fol, c, 'False')
+ not_c = {'not': c}
+ rlhs = {'&': [c, lhs]}
+ rlhs = self.simplify_binop(rlhs)
+
+ rrhs = {'&': [not_c, rhs]}
+ rrhs = self.simplify_binop(rrhs)
+
+ combined = {'|': [rlhs, rrhs]}
+ return self.simplify_binop(combined)
+
+ def hoist_quant(self, k, expr):
+ var, v = expr
+
+ constants = self.find_constants(var, v, constants=[])
+
+ fol = {k: expr}
+
+ for c in constants:
+ fol = self.shannon_expand(c, fol)
+
+ return fol
+
+ """
if var:
if k == 'term':
if not v.startswith(var):
@@ -197,6 +396,7 @@
return fol
else:
return fol
+ """
def gen_validation_function(self, fol, policy_name, message, tag):
if not tag:
@@ -250,7 +450,7 @@
policy_fn = fn_template % policy_name
call_str = """
%(verdict_var)s = %(policy_fn)s(obj.%(object_name)s, ctx)
- """ % {'verdict_var': self.verdict_variable, 'policy_fn': policy_fn, 'object_name': object_name}
+ """ % {'verdict_var': verdict_var, 'policy_fn': policy_fn, 'object_name': object_name}
call_ast = self.str_to_ast(call_str)
return call_ast
@@ -266,12 +466,12 @@
assignment_str = """
%(verdict_var)s = (%(escape_expr)s)
- """ % {'verdict_var': self.verdict_variable, 'escape_expr': v}
+ """ % {'verdict_var': verdict_var, 'escape_expr': v}
assignment_ast = self.str_to_ast(assignment_str)
return assignment_ast
elif k == 'not':
- top_vvar = self.verdict_variable
+ top_vvar = verdict_var
self.verdict_next()
sub_vvar = self.verdict_variable
block = self.gen_test(fn_template, v, sub_vvar)
@@ -431,8 +631,13 @@
raise Exception('Could not find policy:', policy)
f2p = FOL2Python()
- fol = f2p.hoist_constants(fol)
- a = f2p.gen_test_function(fol, policy, tag='enforcer')
+ fol_reduced = f2p.hoist_outer(fol)
+
+ if fol_reduced in ['True','False'] and fol != fol_reduced:
+ raise TrivialPolicy("Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s"%{'name':policy, 'reduced':fol_reduced})
+
+ a = f2p.gen_test_function(fol_reduced, policy, tag='enforcer')
+
return astunparse.unparse(a)
def xproto_fol_to_python_validator(policy, fol, model, message, tag=None):
@@ -440,19 +645,30 @@
raise Exception('Could not find policy:', policy)
f2p = FOL2Python()
- fol = f2p.hoist_constants(fol)
- a = f2p.gen_validation_function(fol, policy, message, tag='validator')
+ fol_reduced = f2p.hoist_outer(fol)
+
+ if fol_reduced in ['True','False'] and fol != fol_reduced:
+ raise TrivialPolicy("Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s"%{'name':policy, 'reduced':fol_reduced})
+
+ a = f2p.gen_validation_function(fol_reduced, policy, message, tag='validator')
return astunparse.unparse(a)
def main():
while True:
- inp = raw_input()
+ inp = ''
+ while True:
+ inp_line = raw_input()
+ if inp_line=='EOF': break
+ else: inp+=inp_line
+
fol_lexer = lex.lex(module=FOLLexer())
fol_parser = yacc.yacc(module=FOLParser(), start='goal')
val = fol_parser.parse(inp, lexer=fol_lexer)
a = xproto_fol_to_python_test('pol', val, 'output', 'Test')
+ #f2p = FOL2Python()
+ #a = f2p.hoist_outer(val)
print a