blob: 290d7cff6137357f91210525df81cf88b8eaa4b8 [file] [log] [blame]
""" useraccesstest.py
This is a basic REST API permission test. Call it with a username and a
password, and it will try to read and update some user and slice object,
and report if something is broken.
This is not an exhaustive test.
"""
import inspect
import json
import os
import requests
import sys
from operator import itemgetter, attrgetter
REST_API="http://node43.princeton.vicci.org:8000/plstackapi/"
USERS_API = REST_API + "users/"
SLICES_API = REST_API + "slices/"
SITES_API = REST_API + "sites/"
SITEPRIV_API = REST_API + "site_privileges/"
SLICEPRIV_API = REST_API + "slice_memberships/"
SITEROLE_API = REST_API + "site_roles/"
SLICEROLE_API = REST_API + "slice_roles/"
TEST_USER_EMAIL = "test1234@test.com"
username = sys.argv[1]
password = sys.argv[2]
opencloud_auth=(username, password)
admin_auth=("scott@onlab.us", "letmein") # admin creds, used to get full set of objects
def fail_unless(x, msg):
if not x:
(frame, filename, line_number, function_name, lines, index) = inspect.getouterframes(inspect.currentframe())[1]
print "FAIL (%s:%d)" % (function_name, line_number), msg
print "downloading objects using admin"
r = requests.get(USERS_API + "?no_hyperlinks=1", auth=admin_auth)
allUsers = r.json()
r = requests.get(SLICES_API + "?no_hyperlinks=1", auth=admin_auth)
allSlices = r.json()
r = requests.get(SITES_API + "?no_hyperlinks=1", auth=admin_auth)
allSites = r.json()
r = requests.get(SITEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
allSitePriv = r.json()
r = requests.get(SLICEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
allSlicePriv = r.json()
r = requests.get(SITEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
allSiteRole = r.json()
r = requests.get(SLICEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
allSliceRole = r.json()
def should_see_user(myself, otherUser):
if myself["is_admin"]:
return True
if myself["id"] == otherUser["id"]:
return True
for sitePriv in allSitePriv:
if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == otherUser["site"]):
for role in allSiteRole:
if role["role"]=="pi" and role["id"] == sitePriv["role"]:
return True
return False
def should_see_slice(myself, slice):
if myself["is_admin"]:
return True
for sitePriv in allSitePriv:
if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == slice["site"]):
for role in allSiteRole:
if role["role"]=="pi" and role["id"] == sitePriv["role"]:
return True
for slicePriv in allSlicePriv:
if (slicePriv["user"] == myself["id"]) and (sitePriv["slice"] == slice["id"]):
for role in allSliceRole:
if role["role"]=="pi" and role["id"] == slicePriv["role"]:
return True
return False
def flip_phone(user):
if user["phone"] == "123":
user["phone"] = "456"
else:
user["phone"] = "123"
def flip_desc(slice):
if slice["description"] == "some_description":
slice["description"] = "some_other_description"
else:
slice["description"] = "some_description"
def delete_user_if_exists(email):
r = requests.get(USERS_API +"?email=%s" % email, auth=admin_auth)
if r.status_code==200:
user = r.json()
if len(user)>0:
user=user[0]
r = requests.delete(USERS_API + str(user["id"]) + "/", auth=admin_auth)
fail_unless(r.status_code==200, "failed to delete the test user")
print " loaded user:%d slice:%d, site:%d, site_priv:%d slice_priv:%d" % (len(allUsers), len(allSlices), len(allSites), len(allSitePriv), len(allSlicePriv))
# get our own user record
r = requests.get(USERS_API + "?email=%s&no_hyperlinks" % username, auth=opencloud_auth)
fail_unless(r.status_code==200, "failed to get user %s" % username)
myself = r.json()
fail_unless(len(myself)==1, "wrong number of results when getting user %s" % username)
myself = myself[0]
# check to see that we see the users we should be able to
r = requests.get(USERS_API, auth=opencloud_auth)
myUsers = r.json()
for user in myUsers:
fail_unless(should_see_user(myself, user), "saw user %s but we shouldn't have" % user["email"])
myUsersIds = [r["id"] for r in myUsers]
for user in allUsers:
if should_see_user(myself, user):
fail_unless(user["id"] in myUsersIds, "should have seen user %s but didnt" % user["email"])
# toggle the phone number on the users we should be able to
"""
for user in allUsers:
user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
flip_phone(user)
r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
if should_see_user(myself, user):
fail_unless(r.status_code==200, "failed to change phone number on %s" % user["email"])
else:
# XXX: this is failing, but for the wrong reason
fail_unless(r.status_code!=200, "was able to change phone number on %s but shouldn't have" % user["email"])
# try changing is_staff. We should be able to do it if we're an admin, but not
# otherwise.
for user in allUsers:
user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
user["is_staff"] = not user["is_staff"]
r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
if myself["is_admin"]:
fail_unless(r.status_code==200, "failed to change is_staff on %s" % user["email"])
else:
# XXX: this is failing, but for the wrong reason
fail_unless(r.status_code!=200, "was able to change is_staff on %s but shouldn't have" % user["email"])
# put it back to false, in case we successfully changed it...
user["is_staff"] = False
r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
"""
# delete the TEST_USER_EMAIL if it exists
delete_user_if_exists(TEST_USER_EMAIL)
newUser = {"firstname": "test", "lastname": "1234", "email": TEST_USER_EMAIL, "password": "letmein"}
r = requests.post(USERS_API, data=newUser, auth=opencloud_auth)
if myself["is_admin"]:
fail_unless(r.status_code==200, "failed to create %s" % TEST_USER_EMAIL)
else:
fail_unless(r.status_code!=200, "created %s but we shouldn't have been able to" % TEST_USER_EMAIL)
delete_user_if_exists(TEST_USER_EMAIL)
sys.exit(-1)
# now create it as admin
r = requests.post(USERS_API, data=newUser, auth=admin_auth)
fail_unless(r.status_code==201, "failed to create %s as admin" % TEST_USER_EMAIL)
user = requests.get(USERS_API +"?email=%s" % TEST_USER_EMAIL, auth=admin_auth).json()[0]
r = requests.delete(USERS_API + str(user["id"]) + "/", auth=opencloud_auth)
if myself["is_admin"]:
fail_unless(r.status_code==200, "failed to delete %s" % TEST_USER_EMAIL)
else:
fail_unless(r.status_code!=200, "deleted %s but we shouldn't have been able to" % TEST_USER_EMAIL)
# slice tests
r = requests.get(SLICES_API, auth=opencloud_auth)
mySlices = r.json()
for slice in mySlices:
fail_unless(should_see_slice(myself, slice), "saw slice %s but we shouldn't have" % slice["name"])
mySlicesIds = [r["id"] for r in mySlices]
for slice in allSlices:
if should_see_slice(myself, slice):
fail_unless(slice["id"] in mySliceIds, "should have seen slice %s but didnt" % slice["name"])
for slice in allSlices:
slice = requests.get(SLICES_API + str(slice["id"]) + "/", auth=admin_auth).json()
flip_desc(slice)
r = requests.put(SLICES_API + str(slice["id"]) +"/", data=slice, auth=opencloud_auth)
if should_see_slice(myself, slice):
fail_unless(r.status_code==200, "failed to change desc on %s" % slice["name"])
else:
fail_unless(r.status_code!=200, "was able to change desc on %s but shouldn't have" % slice["name"])
print "Done."