CORD-1568: Support sharing policies by letting one policy invoke another
Change-Id: I87fdb80a65d86b61364abd3ec35961668924c8f5
diff --git a/lib/xos-genx/tests/general_security_test.py b/lib/xos-genx/tests/general_security_test.py
index d5f3e79..1201baa 100644
--- a/lib/xos-genx/tests/general_security_test.py
+++ b/lib/xos-genx/tests/general_security_test.py
@@ -15,12 +15,16 @@
"""
class XProtoSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output', proto.policies.test_policy, None, '0') }}")
+ self.target = XProtoTestHelpers.write_tmp_target("""
+{% for name, policy in proto.policies.items() %}
+{{ xproto_fol_to_python_test(name, policy, None, '0') }}
+{% endfor %}
+""")
def test_constant(self):
xproto = \
"""
- policy test_policy < True >
+ policy output < True >
"""
args = FakeArgs()
args.inputs = xproto
@@ -42,7 +46,7 @@
def test_equal(self):
xproto = \
"""
- policy test_policy < ctx.user = obj.user >
+ policy output < ctx.user = obj.user >
"""
args = FakeArgs()
@@ -66,10 +70,46 @@
verdict = policy_output_enforcer(obj, ctx)
+ def test_call_policy(self):
+ xproto = \
+"""
+ policy sub_policy < ctx.user = obj.user >
+ policy output < *sub_policy(child) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output,globals()) # This loads the generated function, which should look like this:
+
+ """
+ def policy_sub_policy_enforcer(obj, ctx):
+ i1 = (ctx.user == obj.user)
+ return i1
+
+ def policy_output_enforcer(obj, ctx):
+ i1 = policy_sub_policy_enforcer(obj.child, ctx)
+ return i1
+ """
+
+ obj = FakeArgs()
+ obj.child = FakeArgs()
+ obj.child.user = 1
+
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ verdict = policy_output_enforcer(obj, ctx)
+ self.assertTrue(verdict)
+
+
def test_bin(self):
xproto = \
"""
- policy test_policy < ctx.is_admin = True | obj.empty = True>
+ policy output < ctx.is_admin = True | obj.empty = True>
"""
args = FakeArgs()
@@ -101,7 +141,7 @@
def test_exists(self):
xproto = \
"""
- policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+ policy output < exists Privilege: Privilege.object_id = obj.id >
"""
args = FakeArgs()
args.inputs = xproto
@@ -121,7 +161,7 @@
def test_python(self):
xproto = \
"""
- policy test_policy < {{ "jack" in ["the", "box"] }} = False >
+ policy output < {{ "jack" in ["the", "box"] }} = False >
"""
args = FakeArgs()
args.inputs = xproto
@@ -142,7 +182,7 @@
# This one we only parse
xproto = \
"""
- policy test_policy < forall Credential: Credential.obj_id = obj_id >
+ policy output < forall Credential: Credential.obj_id = obj_id >
"""
args = FakeArgs()
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
index 4134fe1..c1e6820 100644
--- a/lib/xos-genx/tests/general_validation_test.py
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -16,12 +16,16 @@
"""
class XProtoGeneralValidationTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+ self.target = XProtoTestHelpers.write_tmp_target("""
+{% for name, policy in proto.policies.items() %}
+{{ xproto_fol_to_python_validator(name, policy, None, 'Necessary Failure') }}
+{% endfor %}
+""")
def test_constant(self):
xproto = \
"""
- policy test_policy < False >
+ policy output < False >
"""
args = FakeArgs()
args.inputs = xproto
@@ -44,7 +48,7 @@
def test_equal(self):
xproto = \
"""
- policy test_policy < not (ctx.user = obj.user) >
+ policy output < not (ctx.user = obj.user) >
"""
args = FakeArgs()
@@ -74,7 +78,7 @@
def test_equal(self):
xproto = \
"""
- policy test_policy < not (ctx.user = obj.user) >
+ policy output < not (ctx.user = obj.user) >
"""
args = FakeArgs()
@@ -104,7 +108,7 @@
def test_bin(self):
xproto = \
"""
- policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+ policy output < (ctx.is_admin = True | obj.empty = True) & False>
"""
args = FakeArgs()
@@ -136,7 +140,7 @@
def test_exists(self):
xproto = \
"""
- policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+ policy output < exists Privilege: Privilege.object_id = obj.id >
"""
args = FakeArgs()
args.inputs = xproto
@@ -157,7 +161,7 @@
def test_python(self):
xproto = \
"""
- policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+ policy output < {{ "jack" in ["the", "box"] }} = True >
"""
args = FakeArgs()
args.inputs = xproto
@@ -176,11 +180,48 @@
with self.assertRaises(Exception):
self.assertTrue(policy_output_validator({}, {}) is True)
+ def test_call_policy(self):
+ xproto = \
+"""
+ policy sub_policy < ctx.user = obj.user >
+ policy output < *sub_policy(child) >
+"""
+
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = self.target
+
+ output = XOSGenerator.generate(args)
+
+ exec(output,globals()) # This loads the generated function, which should look like this:
+
+ """
+ def policy_sub_policy_validator(obj, ctx):
+ i1 = (ctx.user == obj.user)
+ if (not i1):
+ raise ValidationError('Necessary Failure')
+
+ def policy_output_validator(obj, ctx):
+ i1 = policy_sub_policy_validator(obj.child, ctx)
+ if (not i1):
+ raise ValidationError('Necessary Failure')
+ """
+
+ obj = FakeArgs()
+ obj.child = FakeArgs()
+ obj.child.user = 1
+
+ ctx = FakeArgs()
+ ctx.user = 1
+
+ with self.assertRaises(Exception):
+ verdict = policy_output_enforcer(obj, ctx)
+
def test_forall(self):
# This one we only parse
xproto = \
"""
- policy test_policy < forall Credential: Credential.obj_id = obj_id >
+ policy output < forall Credential: Credential.obj_id = obj_id >
"""
args = FakeArgs()
diff --git a/lib/xos-genx/tests/policy_test.py b/lib/xos-genx/tests/policy_test.py
index 25c307e..5bc3511 100644
--- a/lib/xos-genx/tests/policy_test.py
+++ b/lib/xos-genx/tests/policy_test.py
@@ -215,6 +215,41 @@
self.assertTrue(eval(expr))
+ def test_policy_function(self):
+ xproto = \
+"""
+ policy slice_policy < exists Privilege: Privilege.object_id = obj.id >
+ policy network_slice_policy < *slice_policy(slice) >
+"""
+
+ target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.network_slice_policy }} ")
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = target
+
+ output = XOSGenerator.generate(args)
+
+ (op, operands), = eval(output).items()
+
+ self.assertIn('slice_policy', operands)
+ self.assertIn('slice', operands)
+
+ def test_policy_missing_function(self):
+ xproto = \
+"""
+ policy slice_policy < exists Privilege: Privilege.object_id = obj.id >
+ policy network_slice_policy < *slice_policyX(slice) >
+"""
+
+ target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.network_slice_policy }} ")
+ args = FakeArgs()
+ args.inputs = xproto
+ args.target = target
+
+ with self.assertRaises(Exception):
+ output = XOSGenerator.generate(args)
+
+
def test_forall(self):
# This one we only parse
xproto = \