CORD-1570: Several bug fixes, expanded unit tests for security refactoring

Change-Id: Ied8dca916d3c22a252f6de38a65ef1b20c9d639d
diff --git a/lib/xos-genx/tests/general_validation_test.py b/lib/xos-genx/tests/general_validation_test.py
index c1e6820..15cdb56 100644
--- a/lib/xos-genx/tests/general_validation_test.py
+++ b/lib/xos-genx/tests/general_validation_test.py
@@ -108,7 +108,7 @@
     def test_bin(self):
         xproto = \
 """
-    policy output < (ctx.is_admin = True | obj.empty = True) & False>
+    policy output < (ctx.is_admin = True | obj.empty = True) | False>
 """
 
         args = FakeArgs()
@@ -128,10 +128,10 @@
         """
 
         obj = FakeArgs()
-	obj.empty = True
+	obj.empty = False
 
 	ctx = FakeArgs()
-	ctx.is_admin = True
+	ctx.is_admin = False
 
         with self.assertRaises(Exception):
             verdict = policy_output_validator(obj, ctx)
diff --git a/lib/xos-genx/tests/optimize_test.py b/lib/xos-genx/tests/optimize_test.py
index ce15bf4..af40575 100644
--- a/lib/xos-genx/tests/optimize_test.py
+++ b/lib/xos-genx/tests/optimize_test.py
@@ -1,70 +1,78 @@
 import unittest
 from xosgenx.jinja2_extensions.fol2 import FOL2Python
-import pdb
+import vimpdb
 
 class XProtoOptimizeTest(unittest.TestCase):
     def setUp(self):
         self.f2p = FOL2Python()
+        self.maxDiff=None
 
     def test_constant(self):
         input = 'True'
-        output = self.f2p.hoist_constants(input)
+        output = self.f2p.hoist_outer(input)
         self.assertEqual(output, input)
 
     def test_exists(self):
-        input = {'exists': ['x',{'|':['x','y']}]}
+        input = {'exists': ['X',{'|':['X.foo','y']}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'|': ['y', {'exists': ['x', 'x']}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
         self.assertEqual(output, expected)
         
-    def test_forall(self):
-        input = {'forall': ['x',{'|':['x','y']}]}
+    def test_exists_implies(self):
+        input = {'exists': ['Foo', {'&': [{'=': ('Foo.a', '1')}, {'->': ['write_access', {'=': ('Foo.b', '1')}]}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'|': ['y', {'forall': ['x', 'x']}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'|': [{'&': ['write_access', {'exists': ['Foo', {'&': [{'=': ['Foo.a', '1']}, {'=': ['Foo.b', '1']}]}]}]}, {'&': [{'not': 'write_access'}, {'exists': ['Foo', {'=': ['Foo.a', '1']}]}]}]}
+        self.assertEqual(output, expected)
+
+    def test_forall(self):
+        input = {'forall': ['X',{'|':['X.foo','y']}]}
+
+        output = self.f2p.hoist_outer(input)
+        expected = {'|': ['y', {'&': [{'not': 'y'}, {'forall': ['X', 'X.foo']}]}]}
         self.assertEqual(output, expected)
 
     def test_exists_embedded(self):
-        input = {'|':['True',{'exists': ['x',{'|':['x','y']}]}]}
+        input = {'&':['True',{'exists': ['X',{'|':['X.foo','y']}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'|': ['True', {'|': ['y', {'exists': ['x', 'x']}]}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
         self.assertEqual(output, expected)
     
     def test_exists_equals(self):
-        input = {'|':['True',{'exists': ['x',{'|':['x',{'=':['y','z']}]}]}]}
+        input = {'&':['True',{'exists': ['X',{'|':['X.foo',{'=':['y','z']}]}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'|': ['True', {'|': [{'=': ['y', 'z']}, {'exists': ['x', 'x']}]}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'|': [{'=': ['y', 'z']}, {'&': [{'not': {'=': ['y', 'z']}}, {'exists': ['X', 'X.foo']}]}]}
         self.assertEqual(output, expected)
 
     def test_exists_nested_constant(self):
-        input = {'|':['True',{'exists': ['x',{'|':['y',{'=':['y','x']}]}]}]}
+        input = {'&':['True',{'exists': ['X',{'|':['y',{'=':['y','X.foo']}]}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'|': ['True', {'|': ['y', {'exists': ['x', {'=': ['y', 'x']}]}]}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', {'=': ['False', 'X.foo']}]}]}]}
         self.assertEqual(output, expected)
 
     def test_exists_nested(self):
-        input = {'exists': ['x',{'exists':['y',{'=':['y','x']}]}]}
+        input = {'exists': ['X',{'exists':['Y',{'=':['Y.foo','X.foo']}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'exists': ['x', {'exists': ['y', {'=': ['y', 'x']}]}]}
-        self.assertEqual(output, expected)
+        output = self.f2p.hoist_outer(input)
+        expected = input
+        self.assertEqual(input, output)
 
     def test_exists_nested2(self):
-        input = {'exists': ['x',{'exists':['y',{'=':['z','y']}]}]}
+        input = {'exists': ['X',{'exists':['Y',{'=':['Z','Y']}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'exists': ['y', {'=': ['z', 'y']}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'exists': ['Y', {'=': ['Z', 'Y']}]}
         self.assertEqual(output, expected)
 
     def test_exists_nested3(self):
-        input = {'exists': ['x',{'exists':['y',{'=':['z','x']}]}]}
+        input = {'exists': ['X',{'exists':['Y',{'=':['Z','X']}]}]}
 
-        output = self.f2p.hoist_constants(input)
-        expected = {'exists': ['x', {'=': ['z', 'x']}]}
+        output = self.f2p.hoist_outer(input)
+        expected = {'exists': ['X', {'=': ['Z', 'X']}]}
         self.assertEqual(output, expected)
 
 
diff --git a/lib/xos-genx/tests/xos_security_test.py b/lib/xos-genx/tests/xos_security_test.py
index 8cdd746..e6d2c6c 100644
--- a/lib/xos-genx/tests/xos_security_test.py
+++ b/lib/xos-genx/tests/xos_security_test.py
@@ -1,6 +1,8 @@
 import unittest
 from xosgenx.generator import XOSGenerator
 from helpers import FakeArgs, XProtoTestHelpers
+import pdb
+import mock
 
 """The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
 which is generated and loaded dynamically.
@@ -12,6 +14,7 @@
 """
 The tests below use the Python code target to generate 
 Python security policies, set up an appropriate environment and execute the Python.
+The security policies here deliberately made complex in order to stress the processor.
 """
 class XProtoXOSSecurityTest(unittest.TestCase):
     def setUp(self):
@@ -23,7 +26,7 @@
     def test_controller_policy(self):
         xproto = \
 """
-    policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.user_id = ctx.user.id & Privilege.object_type = "Deployment" >
+    policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.accessor_id = ctx.user.id & Privilege.object_type = "Deployment" & Privilege.permission = "role:admin" & Privilege.object_id = obj.id >
 """
         args = FakeArgs()
         args.inputs = xproto
@@ -36,21 +39,127 @@
         """
         def policy_output_enforcer(obj, ctx):
             i2 = ctx.user.is_admin
-            i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+            i3 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(object_type='Deployment'), Q(permission='role:admin'), Q(object_id=obj.id))[0]
             i1 = (i2 or i3)
-            return i1
+                return i1
         """
 
         # FIXME: Test this policy by executing it
         self.assertTrue(policy_output_enforcer is not None)
 
     """
-    This is the security policy for controllers
+    This is the security policy for ControllerNetworks
     """
-    def _test_controller_network_policy(self):
+    def test_controller_network_policy(self):
         xproto = \
 """
-    policy test_policy < ctx.user.is_admin | exists Slice: forall ctx.networks: ctx.networks.owner.id = Slice.id >
+    policy test_policy < 
+         ctx.user.is_admin
+         | (exists Privilege:
+             Privilege.accessor_id = ctx.user.id
+             & Privilege.accessor_type = "User"
+             & Privilege.object_type = "Slice"
+             & Privilege.object_id = obj.owner.id)
+         | (exists Privilege:
+             Privilege.accessor_id = ctx.user.id
+             & Privilege.accessor_type = "User"
+             & Privilege.object_type = "Site"
+             & Privilege.object_id = obj.owner.site.id
+             & Privilege.permission = "role:admin") >
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_enforcer(obj, ctx):
+            i2 = ctx.user.is_admin
+            i4 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.owner.id))[0]
+            i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.owner.site.id), Q(permission='role:admin'))[0]
+            i3 = (i4 or i5)
+            i1 = (i2 or i3)
+            return i1
+        """
+
+        # FIXME: Test this policy by executing it
+        self.assertTrue(policy_output_enforcer is not None)
+
+    """
+    This is the security policy for Slices
+    """
+    def test_slice_policy(self):
+        xproto = \
+"""
+   policy site_policy <
+            ctx.user.is_admin
+            | (ctx.write_access -> exists Privilege: Privilege.object_type = "Site" & Privilege.object_id = obj.id & Privilege.accessor_id = ctx.user.id & Privilege.permission_id = "role:admin") >
+
+   policy test_policy <
+         ctx.user.is_admin
+         | (*site_policy(site)
+         & ((exists Privilege:
+             Privilege.accessor_id = ctx.user.id
+             & Privilege.accessor_type = "User"
+             & Privilege.object_type = "Slice"
+             & Privilege.object_id = obj.id
+             & (ctx.write_access->Privilege.permission="role:admin"))
+           | (exists Privilege:
+             Privilege.accessor_id = ctx.user.id
+             & Privilege.accessor_type = "User"
+             & Privilege.object_type = "Site"
+             & Privilege.object_id = obj.site.id
+             & Privilege.permission = "role:admin"))
+            )>
+    
+"""
+        args = FakeArgs()
+        args.inputs = xproto
+        args.target = self.target
+
+        output = XOSGenerator.generate(args)
+
+        exec(output) # This loads the generated function, which should look like this:
+
+        """
+        def policy_output_enforcer(obj, ctx):
+	    i2 = ctx.user.is_admin
+	    i4 = policy_site_policy_enforcer(obj.site, ctx)
+	    i10 = ctx.write_access
+	    i11 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id), Q(permission='role:admin'))))
+	    i8 = (i10 and i11)
+	    i14 = ctx.write_access
+	    i12 = (not i14)
+	    i13 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id))))
+	    i9 = (i12 and i13)
+	    i6 = (i8 or i9)
+	    i7 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.site.id), Q(permission='role:admin'))))
+	    i5 = (i6 or i7)
+	    i3 = (i4 and i5)
+	    i1 = (i2 or i3)
+	    return i1
+        """
+
+        # FIXME: Test this policy by executing it
+        self.assertTrue(policy_output_enforcer is not None)
+
+    """
+    This is the security policy for Users
+    """
+    def test_user_policy(self):
+        xproto = \
+"""
+    policy test_policy <
+         ctx.user.is_admin
+         | ctx.user.id = obj.id
+         | (exists Privilege: 
+             Privilege.accessor_id = ctx.user.id
+             & Privilege.accessor_type = "User"
+             & Privilege.permission = "role:admin"
+             & Privilege.object_type = "Site"
+             & Privilege.object_id = ctx.user.site.id) >
 """
         args = FakeArgs()
         args.inputs = xproto
@@ -63,7 +172,9 @@
         """
         def policy_output_enforcer(obj, ctx):
             i2 = ctx.user.is_admin
-            i3 = Privilege.objects.filter(Q(user_id=ctx.user.id), Q(object_type='Deployment'))[0]
+            i4 = (ctx.user.id == obj.id)
+            i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(permission='role:admin'), Q(object_type='Site'), Q(object_id=ctx.user.site.id))[0]
+            i3 = (i4 or i5)
             i1 = (i2 or i3)
             return i1
         """
@@ -71,6 +182,5 @@
         # FIXME: Test this policy by executing it
         self.assertTrue(policy_output_enforcer is not None)
 
-
 if __name__ == '__main__':
     unittest.main()
diff --git a/lib/xos-genx/tests/xos_validation_test.py b/lib/xos-genx/tests/xos_validation_test.py
index 20ba5da..7e70ede 100644
--- a/lib/xos-genx/tests/xos_validation_test.py
+++ b/lib/xos-genx/tests/xos_validation_test.py
@@ -76,35 +76,6 @@
 
         with self.assertRaises(Exception):
            policy_output_validator(obj, {})
-    def test_equal(self):
-        xproto = \
-"""
-    policy test_policy < not (ctx.user = obj.user) >
-"""
-
-        args = FakeArgs()
-        args.inputs = xproto
-        args.target = self.target
-
-        output = XOSGenerator.generate(args)
-
-        exec(output) # This loads the generated function, which should look like this:
-
-        """
-        def policy_output_validator(obj, ctx):
-            i2 = (ctx.user == obj.user)
-            i1 = (not i2)
-            if (not i1):
-                raise Exception('Necessary Failure')
-        """
-
-        obj = FakeArgs()
-	obj.user = 1
-        ctx = FakeArgs()
-	ctx.user = 1
-
-        with self.assertRaises(Exception):
-           policy_output_validator(obj, ctx)
 
     def test_equal(self):
         xproto = \
@@ -139,7 +110,7 @@
     def test_bin(self):
         xproto = \
 """
-    policy test_policy < (ctx.is_admin = True | obj.empty = True) & False>
+    policy test_policy < not (ctx.is_admin = True | obj.empty = True) | False>
 """
 
         args = FakeArgs()