CORD-1385: Autogenerate validation code

Change-Id: I8dda8f78482b382cd5d9c5397070266324d4fab9
diff --git a/lib/xos-genx/tests/general_security_test.py b/lib/xos-genx/tests/general_security_test.py
index f2dd857..d5f3e79 100644
--- a/lib/xos-genx/tests/general_security_test.py
+++ b/lib/xos-genx/tests/general_security_test.py
@@ -1,12 +1,11 @@
 import unittest
 from xosgenx.generator import XOSGenerator
 from helpers import FakeArgs, XProtoTestHelpers
-import pdb
 
-"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
 which is generated and loaded dynamically.
 """
-def policy_output_0(x, y):
+def policy_output_enforcer(x, y):
     raise Exception("Security enforcer not generated. Test failed.")
     return False
 
@@ -16,7 +15,7 @@
 """
 class XProtoSecurityTest(unittest.TestCase):
     def setUp(self):
-        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output', proto.policies.test_policy, None, '0') }}")
 
     def test_constant(self):
         xproto = \
@@ -32,12 +31,12 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-        def policy_output_0(obj, ctx):
+        def policy_output_enforcer(obj, ctx):
             i1 = True
             return i1
         """
 
-        verdict = policy_output_0({}, {})
+        verdict = policy_output_enforcer({}, {})
         self.assertTrue(verdict)
 
     def test_equal(self):
@@ -55,7 +54,7 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-        def policy_output_0(obj, ctx):
+        def policy_output_enforcer(obj, ctx):
             i1 = (ctx.user == obj.user)
             return i1
         """
@@ -65,7 +64,7 @@
         ctx = FakeArgs()
 	ctx.user = 1
 
-        verdict = policy_output_0(obj, ctx)
+        verdict = policy_output_enforcer(obj, ctx)
 
     def test_bin(self):
         xproto = \
@@ -81,7 +80,7 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-	def policy_output_0(obj, ctx):
+	def policy_output_enforcer(obj, ctx):
 	    i2 = (ctx.is_admin == True)
 	    i3 = (obj.empty == True)
 	    i1 = (i2 or i3)
@@ -94,7 +93,7 @@
 	ctx = FakeArgs()
 	ctx.is_admin = True
 
-        verdict = policy_output_0(obj, ctx)
+        verdict = policy_output_enforcer(obj, ctx)
 
         self.assertTrue(verdict)
 
@@ -112,12 +111,12 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-	def policy_output_0(obj, ctx):
+	def policy_output_enforcer(obj, ctx):
 	    i1 = Privilege.objects.filter(object_id=obj.id)
     	    return i1
         """
 
-        self.assertTrue(policy_output_0 is not None)
+        self.assertTrue(policy_output_enforcer is not None)
 	
     def test_python(self):
         xproto = \
@@ -131,13 +130,13 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-        def policy_output_0(obj, ctx):
+        def policy_output_enforcer(obj, ctx):
             i2 = ('jack' in ['the', 'box'])
             i1 = (i2 == False)
             return i1
         """
 
-        self.assertTrue(policy_output_0({}, {}) is True)
+        self.assertTrue(policy_output_enforcer({}, {}) is True)
 
     def test_forall(self):
         # This one we only parse
@@ -146,15 +145,13 @@
     policy test_policy < forall Credential: Credential.obj_id = obj_id >
 """
 
-        target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
-
         args = FakeArgs()
         args.inputs = xproto
-        args.target = target
+        args.target = self.target
 
         output = XOSGenerator.generate(args)
         """
-        def policy_output_0(obj, ctx):
+        def policy_output_enforcer(obj, ctx):
             i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
             i1 = (not i2)
             return i1
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
new file mode 100644
index 0000000..4134fe1
--- /dev/null
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -0,0 +1,202 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
+which is generated and loaded dynamically.
+"""
+def policy_output_validator(x, y):
+    raise Exception("Validator not generated. Test failed.")
+    return False
+
+"""
+The tests below use the Python code target to generate 
+Python validation policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoGeneralValidationTest(unittest.TestCase):
+    def setUp(self):
+        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+
+    def test_constant(self):
+        xproto = \
+"""
+    policy test_policy < False >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i1 = False
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        with self.assertRaises(Exception):
+           policy_output_validator({}, {})
+    
+    def test_equal(self):
+        xproto = \
+"""
+    policy test_policy < not (ctx.user = obj.user) >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = (ctx.user == obj.user)
+            i1 = (not i2)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        obj = FakeArgs()
+	obj.user = 1
+        ctx = FakeArgs()
+	ctx.user = 1
+
+        with self.assertRaises(Exception):
+           policy_output_validator(obj, ctx)
+
+    def test_equal(self):
+        xproto = \
+"""
+    policy test_policy < not (ctx.user = obj.user) >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = (ctx.user == obj.user)
+            i1 = (not i2)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        obj = FakeArgs()
+	obj.user = 1
+        ctx = FakeArgs()
+	ctx.user = 1
+
+        with self.assertRaises(Exception):
+           policy_output_validator(obj, ctx)
+
+    def test_bin(self):
+        xproto = \
+"""
+    policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = (ctx.is_admin == True)
+            i3 = (obj.empty == True)
+            i1 = (i2 or i3)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        obj = FakeArgs()
+	obj.empty = True
+
+	ctx = FakeArgs()
+	ctx.is_admin = True
+
+        with self.assertRaises(Exception):
+            verdict = policy_output_validator(obj, ctx)
+
+        
+    def test_exists(self):
+        xproto = \
+"""
+    policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+	args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i1 = Privilege.objects.filter(Q(object_id=obj.id))[0]
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        self.assertTrue(policy_output_validator is not None)
+	
+    def test_python(self):
+        xproto = \
+"""
+    policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+"""
+	args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = ('jack' in ['the', 'box'])
+            i1 = (i2 == True)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        with self.assertRaises(Exception):
+            self.assertTrue(policy_output_validator({}, {}) is True)
+
+    def test_forall(self):
+        # This one we only parse
+        xproto = \
+"""
+    policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        """
+        def policy_output_enforcer(obj, ctx):
+            i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+            i1 = (not i2)
+            return i1
+        """
+
+        self.assertIn('policy_output_validator', output)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/lib/xos-genx/tests/policy_test.py b/lib/xos-genx/tests/policy_test.py
index fcfed8a..25c307e 100644
--- a/lib/xos-genx/tests/policy_test.py
+++ b/lib/xos-genx/tests/policy_test.py
@@ -1,6 +1,7 @@
 import unittest
 from xosgenx.generator import XOSGenerator
 from helpers import FakeArgs, XProtoTestHelpers
+import pdb
 
 """
 The tests below convert the policy logic expression
@@ -42,6 +43,26 @@
         output = XOSGenerator.generate(args).replace('t','T')
         self.assertTrue(eval(output)) 
 
+    def test_function_term(self):
+        xproto = \
+"""
+    policy slice_user < slice.user.compute_is_admin() >
+"""
+
+        target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_user }}")
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+       
+        slice = FakeArgs()
+        slice.user = FakeArgs()
+        slice.user.compute_is_admin = lambda: True
+
+        expr = eval(output)
+        self.assertTrue(expr)
+
     def test_term(self):
         xproto = \
 """
@@ -147,7 +168,29 @@
 
         self.assertFalse(eval(expr))
 
-        
+    def test_implies(self):
+        xproto = \
+"""
+    policy implies < obj.name -> obj.creator >
+"""
+        target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.implies }}")
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+
+        slice = FakeArgs()
+        slice.is_admin = False
+        obj = FakeArgs()
+        obj.name = 'Thing 1'
+        obj.creator = None
+
+        (op, operands), = eval(output).items()
+        expr = 'not ' + op.join(operands).replace('->',' or ')
+
+        self.assertFalse(eval(expr))
+   
     def test_exists(self):
         xproto = \
 """
diff --git a/lib/xos-genx/tests/test_cli.py b/lib/xos-genx/tests/test_cli.py
index af21253..6e76073 100644
--- a/lib/xos-genx/tests/test_cli.py
+++ b/lib/xos-genx/tests/test_cli.py
@@ -36,4 +36,4 @@
             self.assertEqual(actual_args.output, expected_args.output)
 
 if __name__ == '__main__':
-    unittest.main()
\ No newline at end of file
+    unittest.main()
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
index 30a7193..8cdd746 100644
--- a/lib/xos-genx/tests/xos_security_test.py
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -1,12 +1,11 @@
 import unittest
 from xosgenx.generator import XOSGenerator
 from helpers import FakeArgs, XProtoTestHelpers
-import pdb
 
-"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
 which is generated and loaded dynamically.
 """
-def policy_output_0(x, y):
+def policy_output_enforcer(x, y):
     raise Exception("Security enforcer not generated. Test failed.")
     return False
 
@@ -16,7 +15,7 @@
 """
 class XProtoXOSSecurityTest(unittest.TestCase):
     def setUp(self):
-        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output',proto.policies.test_policy, None, '0') }}")
 
     """
     This is the security policy for controllers
@@ -35,7 +34,7 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-        def policy_output_0(obj, ctx):
+        def policy_output_enforcer(obj, ctx):
             i2 = ctx.user.is_admin
             i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
             i1 = (i2 or i3)
@@ -43,7 +42,7 @@
         """
 
         # FIXME: Test this policy by executing it
-        self.assertTrue(policy_output_0 is not None)
+        self.assertTrue(policy_output_enforcer is not None)
 
     """
     This is the security policy for controllers
@@ -62,7 +61,7 @@
         exec(output) # This loads the generated function, which should look like this:
 
         """
-        def policy_output_0(obj, ctx):
+        def policy_output_enforcer(obj, ctx):
             i2 = ctx.user.is_admin
             i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
             i1 = (i2 or i3)
@@ -70,7 +69,7 @@
         """
 
         # FIXME: Test this policy by executing it
-        self.assertTrue(policy_output_0 is not None)
+        self.assertTrue(policy_output_enforcer is not None)
 
 
 if __name__ == '__main__':
diff --git a/lib/xos-genx/tests/xos_validation_test.py b/lib/xos-genx/tests/xos_validation_test.py
new file mode 100644
index 0000000..20ba5da
--- /dev/null
+++ b/lib/xos-genx/tests/xos_validation_test.py
@@ -0,0 +1,237 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
+which is generated and loaded dynamically.
+"""
+def policy_output_validator(x, y):
+    raise Exception("Validator not generated. Test failed.")
+    return False
+
+"""
+The tests below use the Python code target to generate 
+Python validation policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoXOSModelValidationTest(unittest.TestCase):
+    def setUp(self):
+        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+
+    def test_instance_container(self):
+        xproto = \
+"""
+    policy test_policy < (obj.isolation = "container" | obj.isolation = "container_vm" ) -> (obj.image.kind = "container") >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        obj = FakeArgs()
+        obj.isolation = 'container'
+        obj.kind = 'not a container'
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i4 = (obj.isolation == 'container')
+            i5 = (self.isolation == 'container_vm')
+            i2 = (i4 or i5)
+            i3 = (obj.image.kind == 'container')
+            i1 = (i2 or i3)
+            return i1
+        """
+
+        with self.assertRaises(Exception):
+           policy_output_validator(obj, {})
+    
+    def test_slice_name_validation(self):
+        xproto = \
+"""
+    policy test_policy < not obj.id -> {{ obj.name.startswith(obj.site.login_base) }} >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        obj = FakeArgs()
+        obj.isolation = 'container'
+        obj.kind = 'not a container'
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i3 = obj.id
+            i4 = obj.name.startswith(obj.site.login_base)
+            i2 = ((not i3) or i4)
+            i1 = (not i2)
+            if (not i1):
+                raise ValidationError('Necessary Failure')
+        """
+
+        with self.assertRaises(Exception):
+           policy_output_validator(obj, {})
+    def test_equal(self):
+        xproto = \
+"""
+    policy test_policy < not (ctx.user = obj.user) >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = (ctx.user == obj.user)
+            i1 = (not i2)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        obj = FakeArgs()
+	obj.user = 1
+        ctx = FakeArgs()
+	ctx.user = 1
+
+        with self.assertRaises(Exception):
+           policy_output_validator(obj, ctx)
+
+    def test_equal(self):
+        xproto = \
+"""
+    policy test_policy < not (ctx.user = obj.user) >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = (ctx.user == obj.user)
+            i1 = (not i2)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        obj = FakeArgs()
+	obj.user = 1
+        ctx = FakeArgs()
+	ctx.user = 1
+
+        with self.assertRaises(Exception):
+           policy_output_validator(obj, ctx)
+
+    def test_bin(self):
+        xproto = \
+"""
+    policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = (ctx.is_admin == True)
+            i3 = (obj.empty == True)
+            i1 = (i2 or i3)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        obj = FakeArgs()
+	obj.empty = True
+
+	ctx = FakeArgs()
+	ctx.is_admin = True
+
+        with self.assertRaises(Exception):
+            verdict = policy_output_validator(obj, ctx)
+
+        
+    def test_exists(self):
+        xproto = \
+"""
+    policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+	args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i1 = Privilege.objects.filter(Q(object_id=obj.id))[0]
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        self.assertTrue(policy_output_validator is not None)
+	
+    def test_python(self):
+        xproto = \
+"""
+    policy test_policy < {{ "jack" in ["the", "box"] }} = True >
+"""
+	args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_validator(obj, ctx):
+            i2 = ('jack' in ['the', 'box'])
+            i1 = (i2 == True)
+            if (not i1):
+                raise Exception('Necessary Failure')
+        """
+
+        with self.assertRaises(Exception):
+            self.assertTrue(policy_output_validator({}, {}) is True)
+
+    def test_forall(self):
+        # This one we only parse
+        xproto = \
+"""
+    policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        """
+        def policy_output_enforcer(obj, ctx):
+            i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+            i1 = (not i2)
+            return i1
+        """
+
+        self.assertIn('policy_output_validator', output)
+
+if __name__ == '__main__':
+    unittest.main()