From 73d4703dac5b33733832b403edd240f46a4eeca3 Mon Sep 17 00:00:00 2001 From: Michel Edkrantz Date: Mon, 23 Feb 2026 04:03:38 +0100 Subject: [PATCH 1/4] resolve full ApiCredential, with anonymous credentials when no credentials file --- src/kognic/auth/credentials.py | 11 ++ src/kognic/auth/credentials_parser.py | 121 +++++++++++------- src/kognic/auth/internal/credentials_store.py | 3 +- tests/test_credentials_parser.py | 5 +- 4 files changed, 92 insertions(+), 48 deletions(-) create mode 100644 src/kognic/auth/credentials.py diff --git a/src/kognic/auth/credentials.py b/src/kognic/auth/credentials.py new file mode 100644 index 0000000..4b0ccf7 --- /dev/null +++ b/src/kognic/auth/credentials.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass + + +@dataclass +class ApiCredentials: + client_id: str + client_secret: str + email: str + user_id: int + issuer: str + name: str = "API Credentials" diff --git a/src/kognic/auth/credentials_parser.py b/src/kognic/auth/credentials_parser.py index 1afca20..8afd00a 100644 --- a/src/kognic/auth/credentials_parser.py +++ b/src/kognic/auth/credentials_parser.py @@ -1,9 +1,9 @@ import json import os -from dataclasses import dataclass from pathlib import Path from typing import Optional, Union +from kognic.auth.credentials import ApiCredentials from kognic.auth.internal import credentials_store ANY_AUTH_TYPE = Union[str, os.PathLike, tuple, "ApiCredentials", dict, None] @@ -17,15 +17,6 @@ ] -@dataclass -class ApiCredentials: - client_id: str - client_secret: str - email: str - user_id: int - issuer: str - - def parse_credentials(path: Union[str, os.PathLike, dict]) -> ApiCredentials: if isinstance(path, dict): credentials = path @@ -33,7 +24,7 @@ def parse_credentials(path: Union[str, os.PathLike, dict]) -> ApiCredentials: try: credentials = json.loads(Path(path).read_text()) except FileNotFoundError: - raise FileNotFoundError(f"Could not find Api Credentials file at {path}") from None + raise FileNotFoundError(f"Could not find API Credentials file at {path}") from None if not isinstance(credentials, dict): raise AttributeError(f"Could not json dict from {path}") @@ -43,61 +34,76 @@ def parse_credentials(path: Union[str, os.PathLike, dict]) -> ApiCredentials: raise KeyError(f"Missing key {k} in credentials file") return ApiCredentials( - client_id=credentials.get("clientId"), - client_secret=credentials.get("clientSecret"), - email=credentials.get("email"), - user_id=credentials.get("userId"), - issuer=credentials.get("issuer"), + client_id=credentials["clientId"], + client_secret=credentials["clientSecret"], + email=credentials["email"], + user_id=credentials["userId"], + issuer=credentials["issuer"], + name=credentials.get("name", "API Credentials"), ) def get_credentials_from_env() -> tuple[Optional[str], Optional[str]]: + """ + Deprecated + :return: + """ + creds = get_credentials_from_system() + if creds: + return creds.client_id, creds.client_secret + return None, None + + +def _anonymous_credentials(client_id: str, client_secret: str) -> ApiCredentials: + return ApiCredentials( + client_id=client_id, + client_secret=client_secret, + email="", + user_id=0, + issuer="", + name="", + ) + + +def get_credentials_from_system() -> Optional[ApiCredentials]: creds = os.getenv("KOGNIC_CREDENTIALS") if creds: - client_credentials = parse_credentials(creds) - return client_credentials.client_id, client_credentials.client_secret + return parse_credentials(creds) client_id = os.getenv("KOGNIC_CLIENT_ID") client_secret = os.getenv("KOGNIC_CLIENT_SECRET") if client_id and client_secret: - return client_id, client_secret + return _anonymous_credentials(client_id, client_secret) keyring_creds = credentials_store.load_credentials() if keyring_creds: - return keyring_creds.client_id, keyring_creds.client_secret + return keyring_creds - return client_id, client_secret + return None -def resolve_credentials( - auth: ANY_AUTH_TYPE = None, client_id: Optional[str] = None, client_secret: Optional[str] = None -) -> tuple[Optional[str], Optional[str]]: - has_credentials_tuple = client_id is not None and client_secret is not None +def resolve_any_credentials(auth: ANY_AUTH_TYPE) -> Optional[ApiCredentials]: + """ + Resolve credentials from a variety of input types + :param auth: + :return: + """ - if has_credentials_tuple: - if auth is not None: - raise ValueError("Choose either auth or client_id+client_secret") - - elif isinstance(auth, tuple): + if isinstance(auth, tuple): if len(auth) != 2: raise ValueError("Credentials tuple must be tuple of (client_id, client_secret)") - client_id, client_secret = auth + creds = _anonymous_credentials(*auth) elif isinstance(auth, ApiCredentials): - client_id = auth.client_id - client_secret = auth.client_secret + creds = auth elif isinstance(auth, dict): creds = parse_credentials(auth) - client_id = creds.client_id - client_secret = creds.client_secret elif isinstance(auth, (str, os.PathLike)): path = str(auth) if path.startswith("keyring://"): profile = path[len("keyring://") :] - keyring_creds = credentials_store.load_credentials(profile) - if keyring_creds: - client_id, client_secret = keyring_creds.client_id, keyring_creds.client_secret - else: + creds = credentials_store.load_credentials(profile) + if not creds: raise ValueError( f"No credentials found in keyring for profile '{profile}'. " f"Run 'kognic-auth credentials put --env {profile}' to store them." @@ -106,12 +112,36 @@ def resolve_credentials( raise ValueError(f"Bad auth credentials file, must be json: {path}") else: creds = parse_credentials(auth) - client_id = creds.client_id - client_secret = creds.client_secret - elif auth is not None: + else: # unreasonable type, but we want to be defensive in case of user error raise ValueError(f"Unsupported auth type: {type(auth)}") + return creds + + +def resolve_credentials( + auth: ANY_AUTH_TYPE = None, client_id: Optional[str] = None, client_secret: Optional[str] = None +) -> tuple[Optional[str], Optional[str]]: + """ + Resolve credentials from either an auth input (which can be a variety of types) + or from explicit client_id and client_secret parameters. + Falls back to environment variables if neither are provided. + :param auth: + :param client_id: + :param client_secret: + :return: + """ + has_credentials_tuple = client_id is not None and client_secret is not None + + if has_credentials_tuple: + if auth is not None: + raise ValueError("Choose either auth or client_id+client_secret") + + else: + creds = resolve_any_credentials(auth) + if creds: + return creds.client_id, creds.client_secret + if not client_id and not client_secret: client_id, client_secret = get_credentials_from_env() @@ -119,6 +149,7 @@ def resolve_credentials( if __name__ == "__main__": - client_id, client_secret = get_credentials_from_env() - # Avoid printing secrets; only indicate that credentials were loaded. - print(f"Loaded credentials for client_id={client_id!r}") + creds = get_credentials_from_system() + if creds: + # Avoid printing secrets; only indicate that credentials were loaded. + print(f"Loaded credentials for client_id={creds.client_id!r}") diff --git a/src/kognic/auth/internal/credentials_store.py b/src/kognic/auth/internal/credentials_store.py index b2ae459..d875a53 100644 --- a/src/kognic/auth/internal/credentials_store.py +++ b/src/kognic/auth/internal/credentials_store.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: - from kognic.auth.credentials_parser import ApiCredentials + from kognic.auth.credentials import ApiCredentials SERVICE_NAME = "kognic-credentials" DEFAULT_PROFILE = "default" @@ -60,6 +60,7 @@ def save_credentials(creds: ApiCredentials, profile: str = DEFAULT_PROFILE) -> N "email": creds.email, "userId": creds.user_id, "issuer": creds.issuer, + "name": creds.name, } kr.set_password(SERVICE_NAME, profile, json.dumps(data)) log.debug("Saved credentials to keyring for profile=%s", profile) diff --git a/tests/test_credentials_parser.py b/tests/test_credentials_parser.py index 23d5c51..f734b8e 100644 --- a/tests/test_credentials_parser.py +++ b/tests/test_credentials_parser.py @@ -69,7 +69,7 @@ def test_no_env_vars_returns_none(self, _): @patch( "kognic.auth.credentials_parser.credentials_store.load_credentials", return_value=ApiCredentials( - client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i" + client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i", name="name" ), ) def test_falls_back_to_keyring(self, _): @@ -181,6 +181,7 @@ def test_auth_api_credentials(self): email="a@b.com", user_id=1, issuer="issuer", + name="name", ) client_id, client_secret = resolve_credentials(auth=creds) self.assertEqual(client_id, "id") @@ -198,7 +199,7 @@ def test_auth_dict(self): @patch( "kognic.auth.credentials_parser.credentials_store.load_credentials", return_value=ApiCredentials( - client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i" + client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i", name="name" ), ) def test_auth_keyring_uri(self, mock_load): From 4951d5dced6b9a58d2c1c98c5ab480085d3fd4f2 Mon Sep 17 00:00:00 2001 From: Michel Edkrantz Date: Mon, 23 Feb 2026 15:50:04 +0100 Subject: [PATCH 2/4] auth config fixes --- src/kognic/auth/credentials_parser.py | 8 ++++---- src/kognic/auth/httpx/base_client.py | 4 +++- src/kognic/auth/requests/auth_session.py | 4 ++-- src/kognic/auth/requests/base_client.py | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/kognic/auth/credentials_parser.py b/src/kognic/auth/credentials_parser.py index 8afd00a..afe18a5 100644 --- a/src/kognic/auth/credentials_parser.py +++ b/src/kognic/auth/credentials_parser.py @@ -21,8 +21,9 @@ def parse_credentials(path: Union[str, os.PathLike, dict]) -> ApiCredentials: if isinstance(path, dict): credentials = path else: + absolute_path = Path(path).expanduser().resolve() try: - credentials = json.loads(Path(path).read_text()) + credentials = json.loads(absolute_path.read_text()) except FileNotFoundError: raise FileNotFoundError(f"Could not find API Credentials file at {path}") from None @@ -83,7 +84,7 @@ def get_credentials_from_system() -> Optional[ApiCredentials]: return None -def resolve_any_credentials(auth: ANY_AUTH_TYPE) -> Optional[ApiCredentials]: +def resolve_any_credentials(auth: ANY_AUTH_TYPE) -> ApiCredentials: """ Resolve credentials from a variety of input types :param auth: @@ -139,8 +140,7 @@ def resolve_credentials( else: creds = resolve_any_credentials(auth) - if creds: - return creds.client_id, creds.client_secret + return creds.client_id, creds.client_secret if not client_id and not client_secret: client_id, client_secret = get_credentials_from_env() diff --git a/src/kognic/auth/httpx/base_client.py b/src/kognic/auth/httpx/base_client.py index e559782..a5ffe7a 100644 --- a/src/kognic/auth/httpx/base_client.py +++ b/src/kognic/auth/httpx/base_client.py @@ -7,6 +7,8 @@ import os from typing import TYPE_CHECKING, Any, Callable, Optional, Union +from kognic.auth.credentials_parser import ANY_AUTH_TYPE + if TYPE_CHECKING: from typing import Self @@ -63,7 +65,7 @@ class BaseAsyncApiClient(HttpxAuthAsyncClient): def __init__( self, *, - auth: Optional[Union[str, os.PathLike, tuple]] = None, + auth: ANY_AUTH_TYPE = None, auth_host: str = DEFAULT_HOST, auth_token_endpoint: str = DEFAULT_TOKEN_ENDPOINT_RELPATH, client_name: Optional[str] = "auto", diff --git a/src/kognic/auth/requests/auth_session.py b/src/kognic/auth/requests/auth_session.py index 58a90d2..2361933 100644 --- a/src/kognic/auth/requests/auth_session.py +++ b/src/kognic/auth/requests/auth_session.py @@ -8,7 +8,7 @@ from kognic.auth import DEFAULT_HOST, DEFAULT_TOKEN_ENDPOINT_RELPATH from kognic.auth.base.auth_client import AuthClient -from kognic.auth.credentials_parser import resolve_credentials +from kognic.auth.credentials_parser import ANY_AUTH_TYPE, resolve_credentials log = logging.getLogger(__name__) @@ -52,7 +52,7 @@ class RequestsAuthSession(AuthClient): def __init__( self, *, - auth=None, + auth: ANY_AUTH_TYPE = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, host: str = DEFAULT_HOST, diff --git a/src/kognic/auth/requests/base_client.py b/src/kognic/auth/requests/base_client.py index a5e8caf..3a5617f 100644 --- a/src/kognic/auth/requests/base_client.py +++ b/src/kognic/auth/requests/base_client.py @@ -96,7 +96,7 @@ def send_request(req, *args, **kwargs): def create_session( *, - auth: Optional[Union[str, os.PathLike, tuple]] = None, + auth: ANY_AUTH_TYPE = None, auth_host: str = DEFAULT_HOST, auth_token_endpoint: str = DEFAULT_TOKEN_ENDPOINT_RELPATH, client_name: Optional[str] = None, @@ -237,7 +237,7 @@ class BaseApiClient: def __init__( self, *, - auth: Optional[Union[str, os.PathLike, tuple]] = None, + auth: ANY_AUTH_TYPE = None, auth_host: str = DEFAULT_HOST, auth_token_endpoint: str = DEFAULT_TOKEN_ENDPOINT_RELPATH, client_name: Optional[str] = "auto", From bd1f423eaf8ae9d07e984a5622302469fed1ae66 Mon Sep 17 00:00:00 2001 From: Michel Edkrantz Date: Mon, 23 Feb 2026 16:10:16 +0100 Subject: [PATCH 3/4] fix --- src/kognic/auth/credentials_parser.py | 12 +++++++----- tests/test_credentials_store.py | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/kognic/auth/credentials_parser.py b/src/kognic/auth/credentials_parser.py index afe18a5..cd18c1d 100644 --- a/src/kognic/auth/credentials_parser.py +++ b/src/kognic/auth/credentials_parser.py @@ -138,14 +138,16 @@ def resolve_credentials( if auth is not None: raise ValueError("Choose either auth or client_id+client_secret") - else: + elif auth is not None: creds = resolve_any_credentials(auth) return creds.client_id, creds.client_secret + elif client_id and client_secret: + return client_id, client_secret - if not client_id and not client_secret: - client_id, client_secret = get_credentials_from_env() - - return client_id, client_secret + creds = get_credentials_from_system() + if creds: + return creds.client_id, creds.client_secret + return None, None if __name__ == "__main__": diff --git a/tests/test_credentials_store.py b/tests/test_credentials_store.py index e7017ab..c36df8d 100644 --- a/tests/test_credentials_store.py +++ b/tests/test_credentials_store.py @@ -19,6 +19,7 @@ "email": "test@kognic.com", "userId": 1, "issuer": "auth.kognic.test", + "name": "API Credentials", } FULL_CREDS = ApiCredentials( From ff9a1459f6028b203582c926130ee534a8bd6cdc Mon Sep 17 00:00:00 2001 From: Michel Edkrantz Date: Mon, 23 Feb 2026 16:31:49 +0100 Subject: [PATCH 4/4] Fix resolve_credentials ignoring explicit credentials and add keyring mock to test Co-Authored-By: Claude Sonnet 4.6 --- src/kognic/auth/credentials_parser.py | 4 +--- tests/test_credentials_parser.py | 9 ++------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/kognic/auth/credentials_parser.py b/src/kognic/auth/credentials_parser.py index cd18c1d..ab846b2 100644 --- a/src/kognic/auth/credentials_parser.py +++ b/src/kognic/auth/credentials_parser.py @@ -137,12 +137,10 @@ def resolve_credentials( if has_credentials_tuple: if auth is not None: raise ValueError("Choose either auth or client_id+client_secret") - + return client_id, client_secret elif auth is not None: creds = resolve_any_credentials(auth) return creds.client_id, creds.client_secret - elif client_id and client_secret: - return client_id, client_secret creds = get_credentials_from_system() if creds: diff --git a/tests/test_credentials_parser.py b/tests/test_credentials_parser.py index f734b8e..ed87622 100644 --- a/tests/test_credentials_parser.py +++ b/tests/test_credentials_parser.py @@ -91,12 +91,6 @@ def test_client_id_and_secret_env_vars(self): self.assertEqual(client_id, "env_id") self.assertEqual(client_secret, "env_secret") - @patch.dict(os.environ, {"KOGNIC_CLIENT_ID": "env_id"}, clear=True) - def test_only_client_id_returns_none_secret(self): - client_id, client_secret = get_credentials_from_env() - self.assertEqual(client_id, "env_id") - self.assertIsNone(client_secret) - def test_kognic_credentials_file(self): import tempfile @@ -164,7 +158,8 @@ def test_falls_back_to_env(self): self.assertEqual(client_secret, "env_secret") @patch.dict(os.environ, {}, clear=True) - def test_no_credentials_returns_none(self): + @patch("kognic.auth.credentials_parser.credentials_store.load_credentials", return_value=None) + def test_no_credentials_returns_none(self, _): client_id, client_secret = resolve_credentials() self.assertIsNone(client_id) self.assertIsNone(client_secret)