blob: c83648a31ac615705836d7c2c4c544003e8f1b02 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" useraccesstest.py
This is a basic REST API permission test. Call it with a username and a
password, and it will try to read and update some user and slice object,
and report if something is broken.
This is not an exhaustive test.
"""
from __future__ import print_function
import inspect
import json
import os
import requests
import sys
import time
from urllib import urlencode
from operator import itemgetter, attrgetter
if len(sys.argv) != 6:
print(
"syntax: usertest <hostname> <username> <password> <admin_username> <admin_password>"
)
sys.exit(-1)
hostname = sys.argv[1]
username = sys.argv[2]
password = sys.argv[3]
opencloud_auth = (username, password)
admin_auth = (sys.argv[4], sys.argv[5])
REST_API = "http://%s:8000/xos/" % hostname
USERS_API = REST_API + "users/"
SLICES_API = REST_API + "slices/"
SITES_API = REST_API + "sites/"
SITEPRIV_API = REST_API + "siteprivileges/"
SLICEPRIV_API = REST_API + "slice_privileges/"
SITEROLE_API = REST_API + "site_roles/"
SLICEROLE_API = REST_API + "slice_roles/"
TEST_USER_EMAIL = (
"test" + str(time.time()) + "@test.com"
) # in case observer not running, objects won't be purged, so use unique email
def fail_unless(x, msg):
if not x:
(
frame,
filename,
line_number,
function_name,
lines,
index,
) = inspect.getouterframes(inspect.currentframe())[1]
print("FAIL (%s:%d)" % (function_name, line_number), msg)
print("downloading objects using admin")
r = requests.get(USERS_API + "?no_hyperlinks=1", auth=admin_auth)
fail_unless(r.status_code == 200, "failed to get users")
allUsers = r.json()
r = requests.get(SLICES_API + "?no_hyperlinks=1", auth=admin_auth)
fail_unless(r.status_code == 200, "failed to get slices")
allSlices = r.json()
r = requests.get(SITES_API + "?no_hyperlinks=1", auth=admin_auth)
allSites = r.json()
r = requests.get(SITEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
allSitePriv = r.json()
r = requests.get(SLICEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
allSlicePriv = r.json()
r = requests.get(SITEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
allSiteRole = r.json()
r = requests.get(SLICEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
allSliceRole = r.json()
def should_see_user(myself, otherUser):
if myself["is_admin"]:
return True
if myself["id"] == otherUser["id"]:
return True
for sitePriv in allSitePriv:
if (sitePriv["user"] == myself["id"]) and (
sitePriv["site"] == otherUser["site"]
):
for role in allSiteRole:
if role["role"] == "pi" and role["id"] == sitePriv["role"]:
return True
return False
def should_see_slice(myself, slice):
if myself["is_admin"]:
return True
for sitePriv in allSitePriv:
if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == slice["site"]):
for role in allSiteRole:
if role["role"] == "pi" and role["id"] == sitePriv["role"]:
return True
for slicePriv in allSlicePriv:
if (slicePriv["user"] == myself["id"]) and (sitePriv["slice"] == slice["id"]):
for role in allSliceRole:
if role["role"] == "pi" and role["id"] == slicePriv["role"]:
return True
return False
def flip_phone(user):
if user["phone"] == "123":
user["phone"] = "456"
else:
user["phone"] = "123"
def flip_desc(slice):
if slice["description"] == "some_description":
slice["description"] = "some_other_description"
else:
slice["description"] = "some_description"
def delete_user_if_exists(email):
r = requests.get(USERS_API + "?email=%s" % email, auth=admin_auth)
if r.status_code == 200:
user = r.json()
if len(user) > 0:
user = user[0]
r = requests.delete(USERS_API + str(user["id"]) + "/", auth=admin_auth)
fail_unless(r.status_code == 200, "failed to delete the test user")
print(
" loaded user:%d slice:%d, site:%d, site_priv:%d slice_priv:%d"
% (
len(allUsers),
len(allSlices),
len(allSites),
len(allSitePriv),
len(allSlicePriv),
)
)
# get our own user record
r = requests.get(
USERS_API + "?" + urlencode({"email": username, "no_hyperlinks": "1"}),
auth=opencloud_auth,
)
fail_unless(r.status_code == 200, "failed to get user %s" % username)
myself = r.json()
fail_unless(len(myself) == 1, "wrong number of results when getting user %s" % username)
myself = myself[0]
# check to see that we see the users we should be able to
r = requests.get(USERS_API, auth=opencloud_auth)
myUsers = r.json()
for user in myUsers:
fail_unless(
should_see_user(myself, user),
"saw user %s but we shouldn't have" % user["email"],
)
myUsersIds = [r["id"] for r in myUsers]
for user in allUsers:
if should_see_user(myself, user):
fail_unless(
user["id"] in myUsersIds,
"should have seen user %s but didnt" % user["email"],
)
# toggle the phone number on the users we should be able to
"""
for user in allUsers:
user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
flip_phone(user)
r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
if should_see_user(myself, user):
fail_unless(r.status_code==200, "failed to change phone number on %s" % user["email"])
else:
# XXX: this is failing, but for the wrong reason
fail_unless(r.status_code!=200, "was able to change phone number on %s but shouldn't have" % user["email"])
# try changing is_staff. We should be able to do it if we're an admin, but not
# otherwise.
for user in allUsers:
user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
user["is_staff"] = not user["is_staff"]
r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
if myself["is_admin"]:
fail_unless(r.status_code==200, "failed to change is_staff on %s" % user["email"])
else:
# XXX: this is failing, but for the wrong reason
fail_unless(r.status_code!=200, "was able to change is_staff on %s but shouldn't have" % user["email"])
# put it back to false, in case we successfully changed it...
user["is_staff"] = False
r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
"""
# delete the TEST_USER_EMAIL if it exists
delete_user_if_exists(TEST_USER_EMAIL)
# XXX - enacted and policed should not be required
newUser = {
"firstname": "test",
"lastname": "1234",
"email": TEST_USER_EMAIL,
"password": "letmein",
"site": allSites[0]["id"],
"enacted": "2015-01-01T00:00",
"policed": "2015-01-01T00:00",
}
r = requests.post(USERS_API + "?no_hyperlinks=1", data=newUser, auth=opencloud_auth)
if myself["is_admin"]:
fail_unless(r.status_code == 200, "failed to create %s" % TEST_USER_EMAIL)
else:
fail_unless(
r.status_code != 200,
"created %s but we shouldn't have been able to" % TEST_USER_EMAIL,
)
delete_user_if_exists(TEST_USER_EMAIL)
# now create it as admin
r = requests.post(USERS_API + "?no_hyperlinks=1", data=newUser, auth=admin_auth)
if r.status_code != 201:
print(r.text)
fail_unless(r.status_code == 201, "failed to create %s as admin" % TEST_USER_EMAIL)
r = requests.get(
USERS_API + "?" + urlencode({"email": TEST_USER_EMAIL}), auth=admin_auth
)
fail_unless(r.status_code == 200, "failed to get user %s" % TEST_USER_EMAIL)
user = r.json()[0]
r = requests.delete(USERS_API + str(user["id"]) + "/", auth=opencloud_auth)
if myself["is_admin"]:
fail_unless(r.status_code == 200, "failed to delete %s" % TEST_USER_EMAIL)
else:
fail_unless(
r.status_code != 200,
"deleted %s but we shouldn't have been able to" % TEST_USER_EMAIL,
)
# slice tests
r = requests.get(SLICES_API, auth=opencloud_auth)
mySlices = r.json()
for slice in mySlices:
fail_unless(
should_see_slice(myself, slice),
"saw slice %s but we shouldn't have" % slice["name"],
)
mySlicesIds = [r["id"] for r in mySlices]
for slice in allSlices:
if should_see_slice(myself, slice):
fail_unless(
slice["id"] in mySliceIds,
"should have seen slice %s but didnt" % slice["name"],
)
for slice in allSlices:
slice = requests.get(SLICES_API + str(slice["id"]) + "/", auth=admin_auth).json()
flip_desc(slice)
r = requests.put(
SLICES_API + str(slice["id"]) + "/", data=slice, auth=opencloud_auth
)
if should_see_slice(myself, slice):
fail_unless(r.status_code == 200, "failed to change desc on %s" % slice["name"])
else:
fail_unless(
r.status_code != 200,
"was able to change desc on %s but shouldn't have" % slice["name"],
)
print("Done.")