Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/kognic/auth/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import dataclass


@dataclass
class ApiCredentials:
client_id: str
client_secret: str
email: str
user_id: int
issuer: str
name: str = "API Credentials"
129 changes: 80 additions & 49 deletions src/kognic/auth/credentials_parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union

from kognic.auth.credentials import ApiCredentials
from kognic.auth.internal import credentials_store

ANY_AUTH_TYPE = Union[str, os.PathLike, tuple, "ApiCredentials", dict, None]
Expand All @@ -17,23 +17,15 @@
]


@dataclass
class ApiCredentials:
client_id: str
client_secret: str
email: str
user_id: int
issuer: str


def parse_credentials(path: Union[str, os.PathLike, dict]) -> ApiCredentials:
if isinstance(path, dict):
credentials = path
else:
absolute_path = Path(path).expanduser().resolve()
try:
credentials = json.loads(Path(path).read_text())
credentials = json.loads(absolute_path.read_text())
except FileNotFoundError:
raise FileNotFoundError(f"Could not find Api Credentials file at {path}") from None
raise FileNotFoundError(f"Could not find API Credentials file at {path}") from None

if not isinstance(credentials, dict):
raise AttributeError(f"Could not json dict from {path}")
Expand All @@ -43,61 +35,76 @@ def parse_credentials(path: Union[str, os.PathLike, dict]) -> ApiCredentials:
raise KeyError(f"Missing key {k} in credentials file")

return ApiCredentials(
client_id=credentials.get("clientId"),
client_secret=credentials.get("clientSecret"),
email=credentials.get("email"),
user_id=credentials.get("userId"),
issuer=credentials.get("issuer"),
client_id=credentials["clientId"],
client_secret=credentials["clientSecret"],
email=credentials["email"],
user_id=credentials["userId"],
issuer=credentials["issuer"],
name=credentials.get("name", "API Credentials"),
)


def get_credentials_from_env() -> tuple[Optional[str], Optional[str]]:
"""
Deprecated
:return:
"""
creds = get_credentials_from_system()
if creds:
return creds.client_id, creds.client_secret
return None, None


def _anonymous_credentials(client_id: str, client_secret: str) -> ApiCredentials:
return ApiCredentials(
client_id=client_id,
client_secret=client_secret,
email="",
user_id=0,
issuer="",
name="",
)


def get_credentials_from_system() -> Optional[ApiCredentials]:
creds = os.getenv("KOGNIC_CREDENTIALS")
if creds:
client_credentials = parse_credentials(creds)
return client_credentials.client_id, client_credentials.client_secret
return parse_credentials(creds)

client_id = os.getenv("KOGNIC_CLIENT_ID")
client_secret = os.getenv("KOGNIC_CLIENT_SECRET")

if client_id and client_secret:
return client_id, client_secret
return _anonymous_credentials(client_id, client_secret)

keyring_creds = credentials_store.load_credentials()
if keyring_creds:
return keyring_creds.client_id, keyring_creds.client_secret

return client_id, client_secret
return keyring_creds

return None

def resolve_credentials(
auth: ANY_AUTH_TYPE = None, client_id: Optional[str] = None, client_secret: Optional[str] = None
) -> tuple[Optional[str], Optional[str]]:
has_credentials_tuple = client_id is not None and client_secret is not None

if has_credentials_tuple:
if auth is not None:
raise ValueError("Choose either auth or client_id+client_secret")
def resolve_any_credentials(auth: ANY_AUTH_TYPE) -> ApiCredentials:
"""
Resolve credentials from a variety of input types
:param auth:
:return:
"""

elif isinstance(auth, tuple):
if isinstance(auth, tuple):
if len(auth) != 2:
raise ValueError("Credentials tuple must be tuple of (client_id, client_secret)")
client_id, client_secret = auth
creds = _anonymous_credentials(*auth)
elif isinstance(auth, ApiCredentials):
client_id = auth.client_id
client_secret = auth.client_secret
creds = auth
elif isinstance(auth, dict):
creds = parse_credentials(auth)
client_id = creds.client_id
client_secret = creds.client_secret
elif isinstance(auth, (str, os.PathLike)):
path = str(auth)
if path.startswith("keyring://"):
profile = path[len("keyring://") :]
keyring_creds = credentials_store.load_credentials(profile)
if keyring_creds:
client_id, client_secret = keyring_creds.client_id, keyring_creds.client_secret
else:
creds = credentials_store.load_credentials(profile)
if not creds:
raise ValueError(
f"No credentials found in keyring for profile '{profile}'. "
f"Run 'kognic-auth credentials put <file> --env {profile}' to store them."
Expand All @@ -106,19 +113,43 @@ def resolve_credentials(
raise ValueError(f"Bad auth credentials file, must be json: {path}")
else:
creds = parse_credentials(auth)
client_id = creds.client_id
client_secret = creds.client_secret
elif auth is not None:
else:
# unreasonable type, but we want to be defensive in case of user error
raise ValueError(f"Unsupported auth type: {type(auth)}")

if not client_id and not client_secret:
client_id, client_secret = get_credentials_from_env()
return creds

return client_id, client_secret

def resolve_credentials(
auth: ANY_AUTH_TYPE = None, client_id: Optional[str] = None, client_secret: Optional[str] = None
) -> tuple[Optional[str], Optional[str]]:
"""
Resolve credentials from either an auth input (which can be a variety of types)
or from explicit client_id and client_secret parameters.
Falls back to environment variables if neither are provided.
:param auth:
:param client_id:
:param client_secret:
:return:
"""
has_credentials_tuple = client_id is not None and client_secret is not None

if has_credentials_tuple:
if auth is not None:
raise ValueError("Choose either auth or client_id+client_secret")
return client_id, client_secret
elif auth is not None:
creds = resolve_any_credentials(auth)
return creds.client_id, creds.client_secret

creds = get_credentials_from_system()
if creds:
return creds.client_id, creds.client_secret
return None, None


if __name__ == "__main__":
client_id, client_secret = get_credentials_from_env()
# Avoid printing secrets; only indicate that credentials were loaded.
print(f"Loaded credentials for client_id={client_id!r}")
creds = get_credentials_from_system()
if creds:
# Avoid printing secrets; only indicate that credentials were loaded.
print(f"Loaded credentials for client_id={creds.client_id!r}")
4 changes: 3 additions & 1 deletion src/kognic/auth/httpx/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import os
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from kognic.auth.credentials_parser import ANY_AUTH_TYPE

if TYPE_CHECKING:
from typing import Self

Expand Down Expand Up @@ -63,7 +65,7 @@ class BaseAsyncApiClient(HttpxAuthAsyncClient):
def __init__(
self,
*,
auth: Optional[Union[str, os.PathLike, tuple]] = None,
auth: ANY_AUTH_TYPE = None,
auth_host: str = DEFAULT_HOST,
auth_token_endpoint: str = DEFAULT_TOKEN_ENDPOINT_RELPATH,
client_name: Optional[str] = "auto",
Expand Down
3 changes: 2 additions & 1 deletion src/kognic/auth/internal/credentials_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from kognic.auth.credentials_parser import ApiCredentials
from kognic.auth.credentials import ApiCredentials

SERVICE_NAME = "kognic-credentials"
DEFAULT_PROFILE = "default"
Expand Down Expand Up @@ -60,6 +60,7 @@ def save_credentials(creds: ApiCredentials, profile: str = DEFAULT_PROFILE) -> N
"email": creds.email,
"userId": creds.user_id,
"issuer": creds.issuer,
"name": creds.name,
}
kr.set_password(SERVICE_NAME, profile, json.dumps(data))
log.debug("Saved credentials to keyring for profile=%s", profile)
Expand Down
4 changes: 2 additions & 2 deletions src/kognic/auth/requests/auth_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kognic.auth import DEFAULT_HOST, DEFAULT_TOKEN_ENDPOINT_RELPATH
from kognic.auth.base.auth_client import AuthClient
from kognic.auth.credentials_parser import resolve_credentials
from kognic.auth.credentials_parser import ANY_AUTH_TYPE, resolve_credentials

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +52,7 @@ class RequestsAuthSession(AuthClient):
def __init__(
self,
*,
auth=None,
auth: ANY_AUTH_TYPE = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
host: str = DEFAULT_HOST,
Expand Down
4 changes: 2 additions & 2 deletions src/kognic/auth/requests/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def send_request(req, *args, **kwargs):

def create_session(
*,
auth: Optional[Union[str, os.PathLike, tuple]] = None,
auth: ANY_AUTH_TYPE = None,
auth_host: str = DEFAULT_HOST,
auth_token_endpoint: str = DEFAULT_TOKEN_ENDPOINT_RELPATH,
client_name: Optional[str] = None,
Expand Down Expand Up @@ -237,7 +237,7 @@ class BaseApiClient:
def __init__(
self,
*,
auth: Optional[Union[str, os.PathLike, tuple]] = None,
auth: ANY_AUTH_TYPE = None,
auth_host: str = DEFAULT_HOST,
auth_token_endpoint: str = DEFAULT_TOKEN_ENDPOINT_RELPATH,
client_name: Optional[str] = "auto",
Expand Down
14 changes: 5 additions & 9 deletions tests/test_credentials_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_no_env_vars_returns_none(self, _):
@patch(
"kognic.auth.credentials_parser.credentials_store.load_credentials",
return_value=ApiCredentials(
client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i"
client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i", name="name"
),
)
def test_falls_back_to_keyring(self, _):
Expand All @@ -91,12 +91,6 @@ def test_client_id_and_secret_env_vars(self):
self.assertEqual(client_id, "env_id")
self.assertEqual(client_secret, "env_secret")

@patch.dict(os.environ, {"KOGNIC_CLIENT_ID": "env_id"}, clear=True)
def test_only_client_id_returns_none_secret(self):
client_id, client_secret = get_credentials_from_env()
self.assertEqual(client_id, "env_id")
self.assertIsNone(client_secret)

def test_kognic_credentials_file(self):
import tempfile

Expand Down Expand Up @@ -164,7 +158,8 @@ def test_falls_back_to_env(self):
self.assertEqual(client_secret, "env_secret")

@patch.dict(os.environ, {}, clear=True)
def test_no_credentials_returns_none(self):
@patch("kognic.auth.credentials_parser.credentials_store.load_credentials", return_value=None)
def test_no_credentials_returns_none(self, _):
client_id, client_secret = resolve_credentials()
self.assertIsNone(client_id)
self.assertIsNone(client_secret)
Expand All @@ -181,6 +176,7 @@ def test_auth_api_credentials(self):
email="a@b.com",
user_id=1,
issuer="issuer",
name="name",
)
client_id, client_secret = resolve_credentials(auth=creds)
self.assertEqual(client_id, "id")
Expand All @@ -198,7 +194,7 @@ def test_auth_dict(self):
@patch(
"kognic.auth.credentials_parser.credentials_store.load_credentials",
return_value=ApiCredentials(
client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i"
client_id="kr_id", client_secret="kr_secret", email="a@b.com", user_id=1, issuer="i", name="name"
),
)
def test_auth_keyring_uri(self, mock_load):
Expand Down
1 change: 1 addition & 0 deletions tests/test_credentials_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"email": "test@kognic.com",
"userId": 1,
"issuer": "auth.kognic.test",
"name": "API Credentials",
}

FULL_CREDS = ApiCredentials(
Expand Down