Source code for httpx_gracedb.auth
#
# Copyright (C) 2019-2025 Leo P. Singer <leo.singer@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from os import R_OK, access
from pathlib import Path
from urllib.parse import urlparse
from httpx._client import BaseClient
from httpx._types import AuthTypes
from igwn_auth_utils.scitokens import default_bearer_token_file
from safe_netrc import netrc
from .scitoken import SciTokenAuth, SciTokenReloadingAuth
__all__ = ("ClientAuthMixin",)
def _find_username_password(url):
host = urlparse(url).hostname
try:
result = netrc().authenticators(host)
except IOError:
result = None
if result is not None:
username, _, password = result
result = (username, password)
return result
def _find_token():
path = default_bearer_token_file()
if access(path, R_OK):
return path
[docs]
class ClientAuthMixin(BaseClient):
"""A mixin for :class:`httpx.Client` to add support for all GraceDB
authentication mechanisms.
Parameters
----------
url
GraceDB Client URL.
token
Filename for SciTokens bearer token.
username
Username for basic auth.
password
Password for basic auth.
force_noauth
If true, then do not use any authentication at all.
fail_if_noauth
If true, then raise an exception if authentication credentials are
not provided.
auth_reload
If true, then automatically reload the authentication before it
expires.
auth_reload_timeout
Reload the authentication this many seconds before it expires.
kwargs
Extra arguments passed to :class:`httpx.Client`.
Notes
-----
When a new Client instance is created, the following sources of
authentication are tried, in order:
1. If the :obj:`force_noauth` keyword argument is true, then perform no
authentication at all.
2. If the :obj:`token` keyword argument is provided, then use SciTokens
bearer token authentication.
3. If the :obj:`username` and :obj:`password` keyword arguments are
provided, then use basic auth.
4. Look for a SciTokens bearer token in:
a. the environment variable :envvar:`BEARER_TOKEN_FILE`
b. the file :file:`$XDG_RUNTIME_DIR/bt_u{UID}`, where :samp:`{UID}`
is your numeric user ID, if the file exists and is readable
5. Read the netrc file [1]_ located at :file:`~/.netrc`, or at the path
stored in the environment variable :envvar:`NETRC`, and look for a
username and password matching the hostname in the URL.
6. If the :obj:`fail_if_noauth` keyword argument is true, and no
authentication source was found, then raise a :class:`ValueError`.
References
----------
.. [1] The .netrc file.
https://www.gnu.org/software/inetutils/manual/html_node/The-_002enetrc-file.html
""" # noqa: E501
def __init__(
self,
url: str | None = None,
token: str | Path | None = None,
username: str | None = None,
password: str | None = None,
force_noauth: bool = False,
fail_if_noauth: bool = False,
auth_reload: bool = False,
auth_reload_timeout: float = 300,
**kwargs,
):
auth: AuthTypes | None = None
# Argument validation
if fail_if_noauth and force_noauth:
raise ValueError("Must not set both force_noauth and fail_if_noauth.")
if (username is None) ^ (password is None):
raise ValueError("Must provide username and password, or neither.")
if force_noauth:
pass
elif token is not None:
pass
elif username is not None and password is not None:
auth = (username, password)
elif (token := _find_token()) is not None:
pass
elif (default_basic_auth := _find_username_password(url)) is not None:
auth = default_basic_auth
elif fail_if_noauth:
raise ValueError("No authentication credentials found.")
if token is not None:
if auth_reload:
auth = SciTokenReloadingAuth(token, reload_timeout=auth_reload_timeout)
else:
auth = SciTokenAuth(token)
super().__init__(auth=auth, **kwargs)