CORD-1392: Autogenerate security policies

Change-Id: Ia183f2a84b27923802e62435c82b20b50fb3fcee
diff --git a/lib/xos-genx/tests/general_security_test.py b/lib/xos-genx/tests/general_security_test.py
new file mode 100644
index 0000000..f2dd857
--- /dev/null
+++ b/lib/xos-genx/tests/general_security_test.py
@@ -0,0 +1,165 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+which is generated and loaded dynamically.
+"""
+def policy_output_0(x, y):
+    raise Exception("Security enforcer not generated. Test failed.")
+    return False
+
+"""
+The tests below use the Python code target to generate 
+Python security policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoSecurityTest(unittest.TestCase):
+    def setUp(self):
+        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+
+    def test_constant(self):
+        xproto = \
+"""
+    policy test_policy < True >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_0(obj, ctx):
+            i1 = True
+            return i1
+        """
+
+        verdict = policy_output_0({}, {})
+        self.assertTrue(verdict)
+
+    def test_equal(self):
+        xproto = \
+"""
+    policy test_policy < ctx.user = obj.user >
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_0(obj, ctx):
+            i1 = (ctx.user == obj.user)
+            return i1
+        """
+
+        obj = FakeArgs()
+	obj.user = 1
+        ctx = FakeArgs()
+	ctx.user = 1
+
+        verdict = policy_output_0(obj, ctx)
+
+    def test_bin(self):
+        xproto = \
+"""
+    policy test_policy < ctx.is_admin = True | obj.empty = True>
+"""
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+	def policy_output_0(obj, ctx):
+	    i2 = (ctx.is_admin == True)
+	    i3 = (obj.empty == True)
+	    i1 = (i2 or i3)
+	    return i1
+        """
+
+        obj = FakeArgs()
+	obj.empty = True
+
+	ctx = FakeArgs()
+	ctx.is_admin = True
+
+        verdict = policy_output_0(obj, ctx)
+
+        self.assertTrue(verdict)
+
+        
+    def test_exists(self):
+        xproto = \
+"""
+    policy test_policy < exists Privilege: Privilege.object_id = obj.id >
+"""
+	args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+	def policy_output_0(obj, ctx):
+	    i1 = Privilege.objects.filter(object_id=obj.id)
+    	    return i1
+        """
+
+        self.assertTrue(policy_output_0 is not None)
+	
+    def test_python(self):
+        xproto = \
+"""
+    policy test_policy < {{ "jack" in ["the", "box"] }} = False >
+"""
+	args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_0(obj, ctx):
+            i2 = ('jack' in ['the', 'box'])
+            i1 = (i2 == False)
+            return i1
+        """
+
+        self.assertTrue(policy_output_0({}, {}) is True)
+
+    def test_forall(self):
+        # This one we only parse
+        xproto = \
+"""
+    policy test_policy < forall Credential: Credential.obj_id = obj_id >
+"""
+
+        target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+        """
+        def policy_output_0(obj, ctx):
+            i2 = Credential.objects.filter((~ Q(obj_id=obj_id)))[0]
+            i1 = (not i2)
+            return i1
+        """
+        exec(output)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/lib/xos-genx/tests/optimize_test.py b/lib/xos-genx/tests/optimize_test.py
new file mode 100644
index 0000000..ce15bf4
--- /dev/null
+++ b/lib/xos-genx/tests/optimize_test.py
@@ -0,0 +1,74 @@
+import unittest
+from xosgenx.jinja2_extensions.fol2 import FOL2Python
+import pdb
+
+class XProtoOptimizeTest(unittest.TestCase):
+    def setUp(self):
+        self.f2p = FOL2Python()
+
+    def test_constant(self):
+        input = 'True'
+        output = self.f2p.hoist_constants(input)
+        self.assertEqual(output, input)
+
+    def test_exists(self):
+        input = {'exists': ['x',{'|':['x','y']}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'|': ['y', {'exists': ['x', 'x']}]}
+        self.assertEqual(output, expected)
+        
+    def test_forall(self):
+        input = {'forall': ['x',{'|':['x','y']}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'|': ['y', {'forall': ['x', 'x']}]}
+        self.assertEqual(output, expected)
+
+    def test_exists_embedded(self):
+        input = {'|':['True',{'exists': ['x',{'|':['x','y']}]}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'|': ['True', {'|': ['y', {'exists': ['x', 'x']}]}]}
+        self.assertEqual(output, expected)
+    
+    def test_exists_equals(self):
+        input = {'|':['True',{'exists': ['x',{'|':['x',{'=':['y','z']}]}]}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'|': ['True', {'|': [{'=': ['y', 'z']}, {'exists': ['x', 'x']}]}]}
+        self.assertEqual(output, expected)
+
+    def test_exists_nested_constant(self):
+        input = {'|':['True',{'exists': ['x',{'|':['y',{'=':['y','x']}]}]}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'|': ['True', {'|': ['y', {'exists': ['x', {'=': ['y', 'x']}]}]}]}
+        self.assertEqual(output, expected)
+
+    def test_exists_nested(self):
+        input = {'exists': ['x',{'exists':['y',{'=':['y','x']}]}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'exists': ['x', {'exists': ['y', {'=': ['y', 'x']}]}]}
+        self.assertEqual(output, expected)
+
+    def test_exists_nested2(self):
+        input = {'exists': ['x',{'exists':['y',{'=':['z','y']}]}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'exists': ['y', {'=': ['z', 'y']}]}
+        self.assertEqual(output, expected)
+
+    def test_exists_nested3(self):
+        input = {'exists': ['x',{'exists':['y',{'=':['z','x']}]}]}
+
+        output = self.f2p.hoist_constants(input)
+        expected = {'exists': ['x', {'=': ['z', 'x']}]}
+        self.assertEqual(output, expected)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
+
diff --git a/lib/xos-genx/tests/policy_test.py b/lib/xos-genx/tests/policy_test.py
index 5f1dca0..fcfed8a 100644
--- a/lib/xos-genx/tests/policy_test.py
+++ b/lib/xos-genx/tests/policy_test.py
@@ -1,7 +1,6 @@
 import unittest
 from xosgenx.generator import XOSGenerator
 from helpers import FakeArgs, XProtoTestHelpers
-import pdb
 
 """
 The tests below convert the policy logic expression
@@ -9,6 +8,25 @@
 """
 
 class XProtoPolicyTest(unittest.TestCase):
+    def test_annotation(self):
+        xproto = \
+"""
+    policy true_policy < True >
+
+    message always::true_policy {
+        required int still = 9;
+    }
+"""
+
+        target = XProtoTestHelpers.write_tmp_target("{{ proto.messages.0 }}")
+
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+        self.assertIn("true_policy", output)
+
     def test_constant(self):
         xproto = \
 """
@@ -24,6 +42,66 @@
         output = XOSGenerator.generate(args).replace('t','T')
         self.assertTrue(eval(output)) 
 
+    def test_term(self):
+        xproto = \
+"""
+    policy slice_user < slice.user.is_admin >
+"""
+
+        target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_user }}")
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+       
+        slice = FakeArgs()
+        slice.user = FakeArgs()
+        slice.user.is_admin = True
+
+        expr = eval(output)
+        self.assertTrue(expr)
+
+    def test_num_constant(self):
+        xproto = \
+"""
+    policy slice_user < slice.user.age = 57 >
+"""
+
+        target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_user }}")
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+       
+        slice = FakeArgs()
+        slice.user = FakeArgs()
+        slice.user.is_admin = True
+
+        expr = eval(output)
+        self.assertTrue(expr)
+
+    def test_string_constant(self):
+        xproto = \
+"""
+    policy slice_user < slice.user.email = "admin@opencord.org" >
+"""
+
+        target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_user }}")
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = target
+
+        output = XOSGenerator.generate(args)
+       
+        slice = FakeArgs()
+        slice.user = FakeArgs()
+        slice.user.is_admin = True
+
+        expr = eval(output)
+        self.assertTrue(expr)
+
     def test_equal(self):
         xproto = \
 """
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
new file mode 100644
index 0000000..30a7193
--- /dev/null
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -0,0 +1,77 @@
+import unittest
+from xosgenx.generator import XOSGenerator
+from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+
+"""The function below is for eliminating warnings arising due to the missing policy_output_0,
+which is generated and loaded dynamically.
+"""
+def policy_output_0(x, y):
+    raise Exception("Security enforcer not generated. Test failed.")
+    return False
+
+"""
+The tests below use the Python code target to generate 
+Python security policies, set up an appropriate environment and execute the Python.
+"""
+class XProtoXOSSecurityTest(unittest.TestCase):
+    def setUp(self):
+        self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test(proto.policies.test_policy, None, '0') }}")
+
+    """
+    This is the security policy for controllers
+    """
+    def test_controller_policy(self):
+        xproto = \
+"""
+    policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.user_id = ctx.user.id & Privilege.object_type = "Deployment" >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_0(obj, ctx):
+            i2 = ctx.user.is_admin
+            i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+            i1 = (i2 or i3)
+            return i1
+        """
+
+        # FIXME: Test this policy by executing it
+        self.assertTrue(policy_output_0 is not None)
+
+    """
+    This is the security policy for controllers
+    """
+    def _test_controller_network_policy(self):
+        xproto = \
+"""
+    policy test_policy < ctx.user.is_admin | exists Slice: forall ctx.networks: ctx.networks.owner.id = Slice.id >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_0(obj, ctx):
+            i2 = ctx.user.is_admin
+            i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+            i1 = (i2 or i3)
+            return i1
+        """
+
+        # FIXME: Test this policy by executing it
+        self.assertTrue(policy_output_0 is not None)
+
+
+if __name__ == '__main__':
+    unittest.main()