blob: 766e102ee253474c6f030d49b17bc53d8403cec7 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import XProtoTestHelpers
"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
"""
def policy_output_enforcer(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
"""
The tests below use the Python code target to generate
Python security policies, set up an appropriate environment and execute the Python.
The security policies here deliberately made complex in order to stress the processor.
"""
class XProtoXOSSecurityTest(unittest.TestCase):
def setUp(self):
self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output',proto.policies.test_policy, None, '0') }}")
"""
This is the security policy for controllers
"""
def test_controller_policy(self):
xproto = \
"""
policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.accessor_id = ctx.user.id & Privilege.object_type = "Deployment" & Privilege.permission = "role:admin" & Privilege.object_id = obj.id >
"""
args = XOSProcessorArgs()
args.inputs = xproto
args.target = self.target
output = XOSProcessor.process(args)
exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i3 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(object_type='Deployment'), Q(permission='role:admin'), Q(object_id=obj.id))[0]
i1 = (i2 or i3)
return i1
"""
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
"""
This is the security policy for ControllerNetworks
"""
def test_controller_network_policy(self):
xproto = \
"""
policy test_policy <
ctx.user.is_admin
| (exists Privilege:
Privilege.accessor_id = ctx.user.id
& Privilege.accessor_type = "User"
& Privilege.object_type = "Slice"
& Privilege.object_id = obj.owner.id)
| (exists Privilege:
Privilege.accessor_id = ctx.user.id
& Privilege.accessor_type = "User"
& Privilege.object_type = "Site"
& Privilege.object_id = obj.owner.site.id
& Privilege.permission = "role:admin") >
"""
args = XOSProcessorArgs()
args.inputs = xproto
args.target = self.target
output = XOSProcessor.process(args)
exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i4 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.owner.id))[0]
i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.owner.site.id), Q(permission='role:admin'))[0]
i3 = (i4 or i5)
i1 = (i2 or i3)
return i1
"""
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
"""
This is the security policy for Slices
"""
def test_slice_policy(self):
xproto = \
"""
policy site_policy <
ctx.user.is_admin
| (ctx.write_access -> exists Privilege: Privilege.object_type = "Site" & Privilege.object_id = obj.id & Privilege.accessor_id = ctx.user.id & Privilege.permission_id = "role:admin") >
policy test_policy <
ctx.user.is_admin
| (*site_policy(site)
& ((exists Privilege:
Privilege.accessor_id = ctx.user.id
& Privilege.accessor_type = "User"
& Privilege.object_type = "Slice"
& Privilege.object_id = obj.id
& (ctx.write_access->Privilege.permission="role:admin"))
| (exists Privilege:
Privilege.accessor_id = ctx.user.id
& Privilege.accessor_type = "User"
& Privilege.object_type = "Site"
& Privilege.object_id = obj.site.id
& Privilege.permission = "role:admin"))
)>
"""
args = XOSProcessorArgs()
args.inputs = xproto
args.target = self.target
output = XOSProcessor.process(args)
exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i4 = policy_site_policy_enforcer(obj.site, ctx)
i10 = ctx.write_access
i11 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id), Q(permission='role:admin'))))
i8 = (i10 and i11)
i14 = ctx.write_access
i12 = (not i14)
i13 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id))))
i9 = (i12 and i13)
i6 = (i8 or i9)
i7 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.site.id), Q(permission='role:admin'))))
i5 = (i6 or i7)
i3 = (i4 and i5)
i1 = (i2 or i3)
return i1
"""
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
"""
This is the security policy for Users
"""
def test_user_policy(self):
xproto = \
"""
policy test_policy <
ctx.user.is_admin
| ctx.user.id = obj.id
| (exists Privilege:
Privilege.accessor_id = ctx.user.id
& Privilege.accessor_type = "User"
& Privilege.permission = "role:admin"
& Privilege.object_type = "Site"
& Privilege.object_id = ctx.user.site.id) >
"""
args = XOSProcessorArgs()
args.inputs = xproto
args.target = self.target
output = XOSProcessor.process(args)
exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
i2 = ctx.user.is_admin
i4 = (ctx.user.id == obj.id)
i5 = Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(permission='role:admin'), Q(object_type='Site'), Q(object_id=ctx.user.site.id))[0]
i3 = (i4 or i5)
i1 = (i2 or i3)
return i1
"""
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
if __name__ == '__main__':
unittest.main()