blob: 78f6e191d53b82a0038f1e66084fd8ec676d397a [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: © 2020 Open Networking Foundation <support@opennetworking.org>
# SPDX-License-Identifier: Apache-2.0
from __future__ import absolute_import
import argparse
import json
import logging
import urllib.request
import urllib.parse
import base64
# create shared logger
logging.basicConfig()
logger = logging.getLogger("cg")
# global dict of jsonpath expressions -> compiled jsonpath parsers, as
# reparsing expressions in each loop results in 100x longer execution time
gjpaths = {}
# gerrit headers is cached
gerrit_headers = []
# gerrit_groups is a cache of groups, to prevent multiple lookups
gerrit_groups = {}
def parse_crowdgroups_args():
"""
parse CLI arguments
"""
parser = argparse.ArgumentParser(description="Crowd-Gerrit group sync")
# Positional args
parser.add_argument(
"settings", type=argparse.FileType("r"), help="JSON settings file",
)
parser.add_argument(
"--debug", action="store_true", help="Print additional debugging information"
)
return parser.parse_args()
def basic_auth_header(username, password):
"""
returns a tuple containing a http basic auth header
"""
creds_str = "%s:%s" % (username, password)
creds_b64 = base64.standard_b64encode(creds_str.encode("utf-8"))
return ("Authorization", "Basic %s" % creds_b64.decode("utf-8"))
def json_api_get(url, headers, data=None, trim_prefix=False, allow_failure=False):
"""
Call JSON API endpoint, return data as a dict
"""
# if data included, encode it as JSON
if data:
data_enc = str(json.dumps(data)).encode("utf-8")
request = urllib.request.Request(url, data=data_enc, method="POST")
request.add_header("Content-Type", "application/json; charset=UTF-8")
else:
request = urllib.request.Request(url)
# add headers tuples
for header in headers:
request.add_header(*header)
try:
response = urllib.request.urlopen(request)
except urllib.error.HTTPError:
# asking for data that doesn't exist results in a 404, just return nothing
if allow_failure:
return None
logger.exception("Server encountered an HTTPError at URL: '%s'", url)
except urllib.error.URLError:
logger.exception("An URLError occurred at URL: '%s'", url)
else:
# docs: https://docs.python.org/3/library/json.html
jsondata = response.read()
logger.debug("API response: %s", jsondata)
# optionally remove the gerrit "magic prefix" - docs:
# https://gerrit-review.googlesource.com/Documentation/rest-api.html#output
if trim_prefix:
jsondata = jsondata.lstrip(b")]}'\n")
try:
data = json.loads(jsondata)
except json.decoder.JSONDecodeError:
# allow return of no data
if allow_failure:
return None
logger.exception("Unable to decode JSON")
else:
logger.debug("JSON decoded: %s", data)
return data
def crowd_users_list(crowd_settings, groupname):
"""
Returns dict of email addresses mapped to usernames from Crowd, given a
group name
"""
url = "%s/rest/usermanagement/1/group/user/direct?groupname=%s&expand=user" % (
crowd_settings["url"],
groupname,
)
crowd_headers = [
basic_auth_header(crowd_settings["username"], crowd_settings["password"]),
("Accept", "application/json"),
]
cout = json_api_get(url, crowd_headers)
userlist = {}
for user in cout["users"]:
userlist[user["email"]] = user["name"]
return userlist
def gerrit_group_id_by_name(gerrit_settings, groupname):
"""
Returns a gerrit group ID given the name of a group
Note - this is the "id" not the "group_id" in the dict.
"""
# read from cache if found
if groupname not in gerrit_groups:
gurl = "%s/groups/?m=%s" % (gerrit_settings["url"], groupname)
gout = json_api_get(gurl, gerrit_headers, None, True)
gerrit_groups[groupname] = gout[groupname]["id"]
logger.debug(
"Group: '%s' has group-id: '%s'", groupname, gerrit_groups[groupname]
)
return gerrit_groups[groupname]
def gerrit_users_list(gerrit_settings, groupname):
"""
Returns dict of lists of usernames and emails from Gerrit, given a group name
"""
gid = gerrit_group_id_by_name(gerrit_settings, groupname)
gurl = "%s/groups/%s/members" % (gerrit_settings["url"], gid)
gout = json_api_get(gurl, gerrit_headers, None, True)
userlist = {}
for user in gout:
userlist[user["email"]] = {
"_account_id": user.get("_account_id"),
"username": user.get("username", None),
}
return userlist
def gerrit_check_crowd_id(gerrit_settings, aid, crowdusername):
"""
checks if a gerrit account has an external CrowdID identity string
"""
gurl = "%s/accounts/%s/external_ids" % (gerrit_settings["url"], aid)
gout = json_api_get(gurl, gerrit_headers, None, True)
crowd_id_string = (
"https://crowd.opennetworking.org/openidserver/users/%s" % crowdusername
)
for identity in gout:
if identity["identity"] == crowd_id_string:
return aid
return None
def gerrit_find_account_id(gerrit_settings, email, crowdusername):
"""
Returns a gerrit account ID given the email address of a user and the
corresponding crowd username, or None if no account exists.
"""
gurl = "%s/accounts/?q=email:%s" % (gerrit_settings["url"], email)
gout = json_api_get(gurl, gerrit_headers, None, True, True)
# check for users with multiple email addresses
if len(gout) > 1:
logger.warning("Email: '%s' register for more than one Gerrit account", email)
return None
# may not have an account, return None if no account
if len(gout) == 0:
logger.warning("Email: '%s' doesn't have a Gerrit account", email)
return None
aid_dict = gout[0]
logger.debug("Email: '%s' has account-id: '%s'", email, aid_dict["_account_id"])
return aid_dict["_account_id"]
def gerrit_lookup_account_ids(gerrit_settings, users):
"""
Returns a list of account-ids given a list of user tuples
"""
account_ids = []
for user in users:
aid = gerrit_find_account_id(gerrit_settings, *user)
if aid:
account_ids.append(aid)
return account_ids
def gerrit_add_accounts_to_group(gerrit_settings, groupname, account_ids):
"""
Given a group name and a list of account-ids, add accounts from group
"""
gid = gerrit_group_id_by_name(gerrit_settings, groupname)
# https://gerrit-review.googlesource.com/Documentation/rest-api-groups.html#_add_group_members
members = {"members": account_ids}
gurl = "%s/groups/%s/members.add" % (gerrit_settings["url"], gid)
gout = json_api_get(gurl, gerrit_headers, members, True)
logger.debug("output of adding accounts to group: %s", gout)
def gerrit_remove_accounts_from_group(gerrit_settings, groupname, account_ids):
"""
Given a group name and a list of account-ids, remove accounts from group
"""
gid = gerrit_group_id_by_name(gerrit_settings, groupname)
# https://gerrit-review.googlesource.com/Documentation/rest-api-groups.html#_add_group_members
members = {"members": account_ids}
gurl = "%s/groups/%s/members.delete" % (gerrit_settings["url"], gid)
gout = json_api_get(gurl, gerrit_headers, members, True, True)
logger.debug("output of removing accounts to group: %s", gout)
def sync_group(settings, groupname):
"""
sync a group from Crowd to Gerrit
"""
cusers = crowd_users_list(settings["crowd"], groupname)
gusers = gerrit_users_list(settings["gerrit"], groupname)
logger.info("Crowd users: %s" % cusers)
logger.info("Gerrit users: %s" % gusers)
# lists of users to add or remove from gerrit
add_g = []
rem_g = list(gusers.keys()) # default to removing all users
# go through list of crowd users by email
for cemail, cuserid in cusers.items():
# if a user is found, remove from rem_g list
if cemail in gusers:
rem_g.remove(cemail)
else:
add_g.append((cemail, cuserid))
logger.info("Users to add: %s" % add_g)
logger.info("Users to remove: %s" % rem_g)
add_aids = gerrit_lookup_account_ids(settings["gerrit"], add_g)
logger.info("Account-ids to add: %s" % add_aids)
gerrit_aids = [gusers[e]["_account_id"] for e in gusers.keys()]
logger.info("All Gerrit account-ids: %s" % gerrit_aids)
aa_nodupes = [a for a in add_aids if a not in gerrit_aids]
logger.info("Duplicates removed: %s" % aa_nodupes)
if aa_nodupes:
gerrit_add_accounts_to_group(settings["gerrit"], groupname, aa_nodupes)
remove_aids = [gusers[e]["_account_id"] for e in rem_g]
# user with a different primary email could be in both add and remove lists
rem_filtered = [a for a in remove_aids if a not in add_aids]
logger.info("Account-ids to remove: %s" % rem_filtered)
if rem_filtered:
gerrit_remove_accounts_from_group(settings["gerrit"], groupname, rem_filtered)
# main function that calls other functions
if __name__ == "__main__":
args = parse_crowdgroups_args()
# only print log messages if debugging
if args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.CRITICAL)
# load settings from JSON file
settings = json.loads(args.settings.read())
# global, so this isn't run multiple times
gerrit_headers = [
basic_auth_header(
settings["gerrit"]["username"], settings["gerrit"]["password"]
),
]
# sync each group
for groupname in settings["groups"]:
sync_group(settings, groupname)