Skip to content

Commit

Permalink
Multi-VO: Add support in new token implementation rucio#6406
Browse files Browse the repository at this point in the history
  • Loading branch information
dchristidis committed Jul 23, 2024
1 parent 4bf5318 commit 6a48267
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 35 deletions.
65 changes: 35 additions & 30 deletions lib/rucio/core/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,30 @@ def _token_cache_set(key: str, value: str) -> None:
REGION.set(key, value)


def request_token(audience: str, scope: str, use_cache: bool = True) -> Optional[str]:
def request_token(
audience: str,
scope: str,
vo: str = 'def',
use_cache: bool = True
) -> Optional[str]:
"""Request a token from the provider.
Return ``None`` if the configuration was not loaded properly or the request
was unsuccessful.
"""
if not all([OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, OIDC_PROVIDER_ENDPOINT]):
if OIDC_CONFIGURATION_RUN or not __load_oidc_configuration():
return None
if not OIDC_CONFIGURATION_RUN:
__load_oidc_configuration()
if not all(vo in var for var in [OIDC_CLIENT_IDS, OIDC_CLIENT_SECRETS, OIDC_PROVIDER_ENDPOINTS]):
return None

key = hashlib.md5(f'audience={audience};scope={scope}'.encode()).hexdigest()
key = hashlib.md5(f'audience={audience};scope={scope};vo={vo}'.encode()).hexdigest()

if use_cache and (token := _token_cache_get(key)):
return token

try:
response = requests.post(url=OIDC_PROVIDER_ENDPOINT,
auth=(OIDC_CLIENT_ID, OIDC_CLIENT_SECRET),
response = requests.post(url=OIDC_PROVIDER_ENDPOINTS[vo],
auth=(OIDC_CLIENT_IDS[vo], OIDC_CLIENT_SECRETS[vo]),
data={'grant_type': 'client_credentials',
'audience': audience,
'scope': scope})
Expand Down Expand Up @@ -203,9 +209,9 @@ def __get_rucio_oidc_clients(keytimeout: int = 43200) -> tuple[dict, dict]:
OIDC_CLIENTS = {}
OIDC_ADMIN_CLIENTS = {}
# New-style token support.
OIDC_CLIENT_ID = ''
OIDC_CLIENT_SECRET = ''
OIDC_PROVIDER_ENDPOINT = ''
OIDC_CLIENT_IDS = {}
OIDC_CLIENT_SECRETS = {}
OIDC_PROVIDER_ENDPOINTS = {}
OIDC_CONFIGURATION_RUN = False


Expand All @@ -225,40 +231,39 @@ def __initialize_oidc_clients() -> None:
pass


def __load_oidc_configuration() -> bool:
def __load_oidc_configuration() -> None:
"""Load the configuration for the new-style token support."""
global OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, OIDC_PROVIDER_ENDPOINT, OIDC_CONFIGURATION_RUN
global OIDC_CLIENT_IDS, OIDC_CLIENT_SECRETS, OIDC_PROVIDER_ENDPOINTS, OIDC_CONFIGURATION_RUN

OIDC_CONFIGURATION_RUN = True

if not IDPSECRETS:
logging.error('Configuration option "idpsecrets" in section "oidc" is not set')
return False
if not ADMIN_ISSUER_ID:
logging.error('Configuration option "admin_issuer" in section "oidc" is not set')
return False
return

issuers = {}
try:
with open(IDPSECRETS) as f:
data = json.load(f)
OIDC_CLIENT_ID = data[ADMIN_ISSUER_ID]['client_id']
OIDC_CLIENT_SECRET = data[ADMIN_ISSUER_ID]['client_secret']
issuer = data[ADMIN_ISSUER_ID]['issuer']
for vo in data:
OIDC_CLIENT_IDS[vo] = data[vo]['client_id']
OIDC_CLIENT_SECRETS[vo] = data[vo]['client_secret']
issuers[vo] = data[vo]['issuer']
except Exception:
logging.error('Failed to parse configuration file "%s"', IDPSECRETS,
exc_info=True)
return False
try:
oidc_discover_url = urljoin(issuer, '.well-known/openid-configuration')
response = requests.get(oidc_discover_url)
response.raise_for_status()
payload = response.json()
OIDC_PROVIDER_ENDPOINT = payload['token_endpoint']
except (requests.HTTPError, requests.JSONDecodeError, KeyError):
logging.error('Failed to discover token endpoint', exc_info=True)
return False
return

return True
for vo in issuers:
try:
oidc_discover_url = urljoin(issuers[vo], '.well-known/openid-configuration')
response = requests.get(oidc_discover_url)
response.raise_for_status()
payload = response.json()
OIDC_PROVIDER_ENDPOINTS[vo] = payload['token_endpoint']
except (requests.HTTPError, requests.JSONDecodeError, KeyError):
logging.error('Failed to discover token endpoint for vo "%s"', vo,
exc_info=True)


def __get_init_oidc_client(token_object: models.Token = None, token_type: str = None, **kwargs) -> dict[Any, Any]:
Expand Down
3 changes: 2 additions & 1 deletion lib/rucio/daemons/reaper/reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ def _run_once(
# FIXME: At the time of writing, StoRM requires `storage.read`
# in order to perform a stat operation.
scope = determine_scope_for_rse(rse.id, scopes=['storage.modify', 'storage.read'])
auth_token = request_token(audience, scope)
vo = rse.columns['vo']
auth_token = request_token(audience, scope, vo)
if auth_token:
logger(logging.INFO, 'Using a token to delete on RSE %s', rse.name)
prot = rsemgr.create_protocol(rse.info, 'delete', scheme=scheme, auth_token=auth_token, logger=logger)
Expand Down
4 changes: 3 additions & 1 deletion lib/rucio/transfertool/bittorrent_driver_qbittorrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ def make_driver(cls: "type[QBittorrentDriver]", rse: "RseData", logger: types.Lo
if not address:
return None

vo = rse.columns['vo']

url = urlparse(address)
token = None
if url.scheme.lower() == 'https':
token = request_token(audience=url.hostname, scope='qbittorrent_admin')
token = request_token(audience=url.hostname, scope='qbittorrent_admin', vo=vo)
else:
logging.debug(f'{cls.external_name} will not try token authentication. Requires HTTPS.')

Expand Down
8 changes: 5 additions & 3 deletions lib/rucio/transfertool/fts3.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ def __init__(self,
if oidc_support:
fts_hostname = urlparse(external_host).hostname
if fts_hostname is not None:
token = request_token(audience=fts_hostname, scope='fts')
token = request_token(audience=fts_hostname, scope='fts', vo=vo or 'def')
if token is not None:
self.logger(logging.INFO, 'Using a token to authenticate with FTS instance %s', fts_hostname)
self.token = token
Expand Down Expand Up @@ -1028,13 +1028,15 @@ def _file_from_transfer(self, transfer: "DirectTransfer", job_params: dict[str,
for source in transfer.sources:
src_audience = determine_audience_for_rse(rse_id=source.rse.id)
src_scope = determine_scope_for_rse(rse_id=source.rse.id, scopes=['storage.read'], extra_scopes=['offline_access'])
t_file['source_tokens'].append(request_token(src_audience, src_scope))
src_vo = source.rse.columns['vo']
t_file['source_tokens'].append(request_token(src_audience, src_scope, src_vo))

dst_audience = determine_audience_for_rse(transfer.dst.rse.id)
# FIXME: At the time of writing, StoRM requires `storage.read` in
# order to perform a stat operation.
dst_scope = determine_scope_for_rse(transfer.dst.rse.id, scopes=['storage.modify', 'storage.read'], extra_scopes=['offline_access'])
t_file['destination_tokens'] = [request_token(dst_audience, dst_scope)]
dst_vo = transfer.dst.rse.columns['vo']
t_file['destination_tokens'] = [request_token(dst_audience, dst_scope, dst_vo)]

if isinstance(self.scitags_exp_id, int):
activity_id = self.scitags_activity_ids.get(rws.activity)
Expand Down

0 comments on commit 6a48267

Please sign in to comment.