| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| """ useraccesstest.py |
| |
| This is a basic REST API permission test. Call it with a username and a |
| password, and it will try to read and update some user and slice object, |
| and report if something is broken. |
| |
| This is not an exhaustive test. |
| """ |
| |
| |
| from __future__ import print_function |
| import inspect |
| import json |
| import os |
| import requests |
| import sys |
| import time |
| from urllib import urlencode |
| |
| from operator import itemgetter, attrgetter |
| |
| if len(sys.argv) != 6: |
| print( |
| "syntax: usertest <hostname> <username> <password> <admin_username> <admin_password>" |
| ) |
| sys.exit(-1) |
| |
| hostname = sys.argv[1] |
| username = sys.argv[2] |
| password = sys.argv[3] |
| |
| opencloud_auth = (username, password) |
| admin_auth = (sys.argv[4], sys.argv[5]) |
| |
| REST_API = "http://%s:8000/xos/" % hostname |
| USERS_API = REST_API + "users/" |
| SLICES_API = REST_API + "slices/" |
| SITES_API = REST_API + "sites/" |
| SITEPRIV_API = REST_API + "siteprivileges/" |
| SLICEPRIV_API = REST_API + "slice_privileges/" |
| SITEROLE_API = REST_API + "site_roles/" |
| SLICEROLE_API = REST_API + "slice_roles/" |
| |
| TEST_USER_EMAIL = ( |
| "test" + str(time.time()) + "@test.com" |
| ) # in case observer not running, objects won't be purged, so use unique email |
| |
| |
| def fail_unless(x, msg): |
| if not x: |
| ( |
| frame, |
| filename, |
| line_number, |
| function_name, |
| lines, |
| index, |
| ) = inspect.getouterframes(inspect.currentframe())[1] |
| print("FAIL (%s:%d)" % (function_name, line_number), msg) |
| |
| |
| print("downloading objects using admin") |
| r = requests.get(USERS_API + "?no_hyperlinks=1", auth=admin_auth) |
| fail_unless(r.status_code == 200, "failed to get users") |
| allUsers = r.json() |
| r = requests.get(SLICES_API + "?no_hyperlinks=1", auth=admin_auth) |
| fail_unless(r.status_code == 200, "failed to get slices") |
| allSlices = r.json() |
| r = requests.get(SITES_API + "?no_hyperlinks=1", auth=admin_auth) |
| allSites = r.json() |
| r = requests.get(SITEPRIV_API + "?no_hyperlinks=1", auth=admin_auth) |
| allSitePriv = r.json() |
| r = requests.get(SLICEPRIV_API + "?no_hyperlinks=1", auth=admin_auth) |
| allSlicePriv = r.json() |
| r = requests.get(SITEROLE_API + "?no_hyperlinks=1", auth=admin_auth) |
| allSiteRole = r.json() |
| r = requests.get(SLICEROLE_API + "?no_hyperlinks=1", auth=admin_auth) |
| allSliceRole = r.json() |
| |
| |
| def should_see_user(myself, otherUser): |
| if myself["is_admin"]: |
| return True |
| if myself["id"] == otherUser["id"]: |
| return True |
| for sitePriv in allSitePriv: |
| if (sitePriv["user"] == myself["id"]) and ( |
| sitePriv["site"] == otherUser["site"] |
| ): |
| for role in allSiteRole: |
| if role["role"] == "pi" and role["id"] == sitePriv["role"]: |
| return True |
| return False |
| |
| |
| def should_see_slice(myself, slice): |
| if myself["is_admin"]: |
| return True |
| for sitePriv in allSitePriv: |
| if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == slice["site"]): |
| for role in allSiteRole: |
| if role["role"] == "pi" and role["id"] == sitePriv["role"]: |
| return True |
| for slicePriv in allSlicePriv: |
| if (slicePriv["user"] == myself["id"]) and (sitePriv["slice"] == slice["id"]): |
| for role in allSliceRole: |
| if role["role"] == "pi" and role["id"] == slicePriv["role"]: |
| return True |
| return False |
| |
| |
| def flip_phone(user): |
| if user["phone"] == "123": |
| user["phone"] = "456" |
| else: |
| user["phone"] = "123" |
| |
| |
| def flip_desc(slice): |
| if slice["description"] == "some_description": |
| slice["description"] = "some_other_description" |
| else: |
| slice["description"] = "some_description" |
| |
| |
| def delete_user_if_exists(email): |
| r = requests.get(USERS_API + "?email=%s" % email, auth=admin_auth) |
| if r.status_code == 200: |
| user = r.json() |
| if len(user) > 0: |
| user = user[0] |
| r = requests.delete(USERS_API + str(user["id"]) + "/", auth=admin_auth) |
| fail_unless(r.status_code == 200, "failed to delete the test user") |
| |
| |
| print( |
| " loaded user:%d slice:%d, site:%d, site_priv:%d slice_priv:%d" |
| % ( |
| len(allUsers), |
| len(allSlices), |
| len(allSites), |
| len(allSitePriv), |
| len(allSlicePriv), |
| ) |
| ) |
| |
| # get our own user record |
| |
| r = requests.get( |
| USERS_API + "?" + urlencode({"email": username, "no_hyperlinks": "1"}), |
| auth=opencloud_auth, |
| ) |
| fail_unless(r.status_code == 200, "failed to get user %s" % username) |
| myself = r.json() |
| fail_unless(len(myself) == 1, "wrong number of results when getting user %s" % username) |
| myself = myself[0] |
| |
| # check to see that we see the users we should be able to |
| |
| r = requests.get(USERS_API, auth=opencloud_auth) |
| myUsers = r.json() |
| for user in myUsers: |
| fail_unless( |
| should_see_user(myself, user), |
| "saw user %s but we shouldn't have" % user["email"], |
| ) |
| myUsersIds = [r["id"] for r in myUsers] |
| for user in allUsers: |
| if should_see_user(myself, user): |
| fail_unless( |
| user["id"] in myUsersIds, |
| "should have seen user %s but didnt" % user["email"], |
| ) |
| |
| # toggle the phone number on the users we should be able to |
| |
| """ |
| for user in allUsers: |
| user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json() |
| flip_phone(user) |
| r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth) |
| if should_see_user(myself, user): |
| fail_unless(r.status_code==200, "failed to change phone number on %s" % user["email"]) |
| else: |
| # XXX: this is failing, but for the wrong reason |
| fail_unless(r.status_code!=200, "was able to change phone number on %s but shouldn't have" % user["email"]) |
| |
| # try changing is_staff. We should be able to do it if we're an admin, but not |
| # otherwise. |
| |
| for user in allUsers: |
| user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json() |
| user["is_staff"] = not user["is_staff"] |
| r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth) |
| if myself["is_admin"]: |
| fail_unless(r.status_code==200, "failed to change is_staff on %s" % user["email"]) |
| else: |
| # XXX: this is failing, but for the wrong reason |
| fail_unless(r.status_code!=200, "was able to change is_staff on %s but shouldn't have" % user["email"]) |
| |
| # put it back to false, in case we successfully changed it... |
| user["is_staff"] = False |
| r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth) |
| """ |
| |
| # delete the TEST_USER_EMAIL if it exists |
| delete_user_if_exists(TEST_USER_EMAIL) |
| |
| # XXX - enacted and policed should not be required |
| |
| newUser = { |
| "firstname": "test", |
| "lastname": "1234", |
| "email": TEST_USER_EMAIL, |
| "password": "letmein", |
| "site": allSites[0]["id"], |
| "enacted": "2015-01-01T00:00", |
| "policed": "2015-01-01T00:00", |
| } |
| r = requests.post(USERS_API + "?no_hyperlinks=1", data=newUser, auth=opencloud_auth) |
| if myself["is_admin"]: |
| fail_unless(r.status_code == 200, "failed to create %s" % TEST_USER_EMAIL) |
| else: |
| fail_unless( |
| r.status_code != 200, |
| "created %s but we shouldn't have been able to" % TEST_USER_EMAIL, |
| ) |
| |
| delete_user_if_exists(TEST_USER_EMAIL) |
| |
| # now create it as admin |
| r = requests.post(USERS_API + "?no_hyperlinks=1", data=newUser, auth=admin_auth) |
| if r.status_code != 201: |
| print(r.text) |
| fail_unless(r.status_code == 201, "failed to create %s as admin" % TEST_USER_EMAIL) |
| |
| r = requests.get( |
| USERS_API + "?" + urlencode({"email": TEST_USER_EMAIL}), auth=admin_auth |
| ) |
| fail_unless(r.status_code == 200, "failed to get user %s" % TEST_USER_EMAIL) |
| user = r.json()[0] |
| r = requests.delete(USERS_API + str(user["id"]) + "/", auth=opencloud_auth) |
| if myself["is_admin"]: |
| fail_unless(r.status_code == 200, "failed to delete %s" % TEST_USER_EMAIL) |
| else: |
| fail_unless( |
| r.status_code != 200, |
| "deleted %s but we shouldn't have been able to" % TEST_USER_EMAIL, |
| ) |
| |
| # slice tests |
| |
| r = requests.get(SLICES_API, auth=opencloud_auth) |
| mySlices = r.json() |
| |
| for slice in mySlices: |
| fail_unless( |
| should_see_slice(myself, slice), |
| "saw slice %s but we shouldn't have" % slice["name"], |
| ) |
| mySlicesIds = [r["id"] for r in mySlices] |
| for slice in allSlices: |
| if should_see_slice(myself, slice): |
| fail_unless( |
| slice["id"] in mySliceIds, |
| "should have seen slice %s but didnt" % slice["name"], |
| ) |
| |
| for slice in allSlices: |
| slice = requests.get(SLICES_API + str(slice["id"]) + "/", auth=admin_auth).json() |
| flip_desc(slice) |
| r = requests.put( |
| SLICES_API + str(slice["id"]) + "/", data=slice, auth=opencloud_auth |
| ) |
| if should_see_slice(myself, slice): |
| fail_unless(r.status_code == 200, "failed to change desc on %s" % slice["name"]) |
| else: |
| fail_unless( |
| r.status_code != 200, |
| "was able to change desc on %s but shouldn't have" % slice["name"], |
| ) |
| |
| print("Done.") |