Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/_incydr_cli/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ def make_context(self, info_name, args, parent=None, **extra):
return super().make_context(info_name, args, parent=parent, **extra)

def invoke(self, ctx):
settings = IncydrSettings(url="", api_client_id="", api_client_secret="")
settings = IncydrSettings(
url="temp value for logging initialization",
api_client_id="temp value for logging initialization",
api_client_secret="temp value for logging initialization",
)
try:
return super().invoke(ctx)
except click.UsageError as err:
Expand Down
24 changes: 24 additions & 0 deletions src/_incydr_sdk/core/auth.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import Optional

import requests
from pydantic import SecretStr
from requests import Session
from requests.auth import AuthBase
from requests.auth import HTTPBasicAuth

from _incydr_sdk.core.models import AuthResponse
from _incydr_sdk.core.models import RefreshTokenAuthResponse


class APIClientAuth(AuthBase):
Expand All @@ -32,3 +34,25 @@ def __call__(self, request):
token = self.token_response.access_token.get_secret_value()
request.headers["Authorization"] = f"Bearer {token}"
return request


class RefreshTokenAuth(AuthBase):
def __init__(self, session: Session, refresh_url: str, refresh_token: str):
self.session = session
self.refresh_url = refresh_url
self.refresh_token = SecretStr(refresh_token)
self.token_response: Optional[RefreshTokenAuthResponse] = None

def refresh(self):
auth_body = {"refreshToken": self.refresh_token.get_secret_value()}
r = requests.post(self.refresh_url, json=auth_body)
r.raise_for_status()
self.token_response = RefreshTokenAuthResponse.parse_response(r)
self.refresh_token = self.token_response.refreshToken.tokenValue

def __call__(self, request):
if self.token_response is None or self.token_response.accessToken.expired:
self.refresh()
token = self.token_response.accessToken.tokenValue.get_secret_value()
request.headers["Authorization"] = f"Bearer {token}"
return request
40 changes: 19 additions & 21 deletions src/_incydr_sdk/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
from collections import deque

import pydantic
from requests_toolbelt import user_agent
from requests_toolbelt.sessions import BaseUrlSession

Expand All @@ -15,12 +14,12 @@
from _incydr_sdk.audit_log.client import AuditLogClient
from _incydr_sdk.cases.client import CasesClient
from _incydr_sdk.core.auth import APIClientAuth
from _incydr_sdk.core.auth import RefreshTokenAuth
from _incydr_sdk.core.settings import IncydrSettings
from _incydr_sdk.customer.client import CustomerClient
from _incydr_sdk.departments.client import DepartmentsClient
from _incydr_sdk.devices.client import DevicesClient
from _incydr_sdk.directory_groups.client import DirectoryGroupsClient
from _incydr_sdk.exceptions import AuthMissingError
from _incydr_sdk.file_events.client import FileEventsClient
from _incydr_sdk.files.client import FilesClient
from _incydr_sdk.legal_hold.client import LegalHoldClient
Expand Down Expand Up @@ -60,31 +59,30 @@ def __init__(
skip_auth: bool = False,
**settings_kwargs,
):
try:
self._settings = IncydrSettings(
url=url,
api_client_id=api_client_id,
api_client_secret=api_client_secret,
**settings_kwargs,
)
except pydantic.ValidationError as err:
auth_keys = {"api_client_id", "api_client_secret", "url"}
error_keys = {e["loc"][0] for e in err.errors()}
if auth_keys & error_keys:
raise AuthMissingError(err)
else:
raise
self._settings = IncydrSettings(
url=url,
api_client_id=api_client_id,
api_client_secret=api_client_secret,
**settings_kwargs,
)
self._request_history = deque(maxlen=self._settings.max_response_history)

self._session = BaseUrlSession(base_url=self._settings.url)
self._session.headers["User-Agent"] = (
self._settings.user_agent_prefix or ""
) + _base_user_agent
self._session.auth = APIClientAuth(
session=self._session,
api_client_id=self._settings.api_client_id,
api_client_secret=self._settings.api_client_secret,
)
if self._settings.refresh_token and self._settings.refresh_url:
self._session.auth = RefreshTokenAuth(
session=self._session,
refresh_url=self._settings.refresh_url,
refresh_token=self._settings.refresh_token.get_secret_value(),
)
else:
self._session.auth = APIClientAuth(
session=self._session,
api_client_id=self._settings.api_client_id,
api_client_secret=self._settings.api_client_secret,
)

def response_hook(response, *args, **kwargs):
level = self._settings.log_level
Expand Down
14 changes: 14 additions & 0 deletions src/_incydr_sdk/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ def expired(self):
)


class TokenDetails(Model):
tokenValue: SecretStr
expiresAt: datetime

@property
def expired(self):
return datetime.now(timezone.utc) >= self.expiresAt


class RefreshTokenAuthResponse(ResponseModel):
accessToken: TokenDetails
refreshToken: TokenDetails


class CSVModel(BaseModel):
"""
Pydantic model class enables multiple aliases to be assigned to a single field value. If the field is required
Expand Down
31 changes: 29 additions & 2 deletions src/_incydr_sdk/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from io import IOBase
from pathlib import Path
from textwrap import indent
from typing import Optional
from typing import Union

from pydantic import Field
Expand All @@ -19,6 +20,8 @@
from rich.logging import RichHandler

from _incydr_sdk.enums import _Enum
from _incydr_sdk.exceptions import AuthMissingError


# capture default displayhook so we can "uninstall" rich
_sys_displayhook = sys.displayhook
Expand Down Expand Up @@ -85,8 +88,8 @@ class IncydrSettings(BaseSettings):
and the Python repl. Defaults to True. env_var=`INCYDR_USE_RICH`
"""

api_client_id: str
api_client_secret: SecretStr
api_client_id: Optional[str] = Field(default=None)
api_client_secret: Optional[SecretStr] = Field(default=None)
url: str
page_size: int = Field(default=100)
max_response_history: int = Field(default=5)
Expand All @@ -96,6 +99,8 @@ class IncydrSettings(BaseSettings):
log_level: Union[int, str] = Field(default=logging.WARNING, validate_default=True)
logger: logging.Logger = Field(default=None)
user_agent_prefix: Union[str] = Field(default="")
refresh_token: Optional[SecretStr] = Field(default=None)
refresh_url: Optional[str] = Field(default=None)

model_config = SettingsConfigDict(
env_prefix="incydr_",
Expand Down Expand Up @@ -213,6 +218,28 @@ def _configure_logging(cls, values): # noqa
cls.logger = logger
return values

@model_validator(mode="after")
@classmethod
def _auth_validator(cls, values): # noqa
values_dict = values.dict()

# if we have a refresh token and a refresh url, don't validate api client auth/secret
if values_dict.get("refresh_url") and values_dict.get("refresh_token"):
return values

errors = []

if not values_dict.get("api_client_id"):
errors.append("api_client_id")

if not values_dict.get("api_client_secret"):
errors.append("api_client_secret")

if len(errors) > 0:
raise AuthMissingError(errors)

return values

def _log_response_info(self, response):
self.logger.info(
f"{response.request.method} {response.request.url} status_code={response.status_code}"
Expand Down
15 changes: 4 additions & 11 deletions src/_incydr_sdk/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
from pydantic import ValidationError


class IncydrException(Exception):
"""Base class for all Incydr specific exceptions."""

...


class AuthMissingError(IncydrException):
def __init__(self, validation_error: ValidationError):
self.pydantic_error = str(validation_error)
self.errors = validation_error.errors()

@property
def error_keys(self):
return [e["loc"][0] for e in self.errors]
def __init__(self, error_keys):
self.error_keys = error_keys

def __str__(self):
errors_formatted = "\n - ".join(self.error_keys)
return (
f"{self.pydantic_error}\n\n"
f"Missing required authentication variables in environment or in initialization\n\n - {errors_formatted}\n\n"
"Pass required args to the `incydr.Client` or set required values in your environment.\n\n"
"See https://developer.code42.com/sdk/settings for more details."
)
Expand Down
121 changes: 121 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from pytest_httpserver import HTTPServer

from .conftest import TEST_HOST
from .conftest import TEST_TOKEN
from _incydr_sdk.core.auth import RefreshTokenAuth
from _incydr_sdk.core.models import CSVModel
from _incydr_sdk.core.models import Model
from _incydr_sdk.core.settings import IncydrSettings
from _incydr_sdk.exceptions import AuthMissingError
from incydr import Client


Expand Down Expand Up @@ -122,3 +126,120 @@ class Test(Model):
def test_user_agent(httpserver_auth: HTTPServer):
c = Client()
assert c._session.headers["User-Agent"].startswith("incydrSDK")


@pytest.fixture
def httpserver_refresh_token_auth(httpserver: HTTPServer, monkeypatch):
monkeypatch.setenv("incydr_url", TEST_HOST)
monkeypatch.setenv("incydr_refresh_token", "test_refresh_token")
monkeypatch.setenv("incydr_refresh_url", f"{TEST_HOST}/v1/refresh")

refresh_response = {
"accessToken": {
"tokenValue": TEST_TOKEN,
"expiresAt": "2099-01-01T00:00:00Z",
},
"refreshToken": {
"tokenValue": "new_refresh_token",
"expiresAt": "2099-01-01T00:00:00Z",
},
}
httpserver.expect_request("/v1/refresh", method="POST").respond_with_json(
refresh_response
)
return httpserver


def test_client_init_with_refresh_token_and_refresh_url_uses_refresh_token_auth(
httpserver_refresh_token_auth: HTTPServer,
):
c = Client()
assert isinstance(c._session.auth, RefreshTokenAuth)
assert c.settings.refresh_token.get_secret_value() == "test_refresh_token"
assert c.settings.refresh_url == f"{TEST_HOST}/v1/refresh"


def test_client_init_with_refresh_token_does_not_require_api_client_credentials(
httpserver_refresh_token_auth: HTTPServer,
):
c = Client()
assert c.settings.api_client_id is None
assert c.settings.api_client_secret is None


def test_client_init_with_refresh_token_passed_as_args(
httpserver: HTTPServer, monkeypatch
):
monkeypatch.setenv("incydr_url", TEST_HOST)

refresh_response = {
"accessToken": {
"tokenValue": TEST_TOKEN,
"expiresAt": "2099-01-01T00:00:00Z",
},
"refreshToken": {
"tokenValue": "new_refresh_token",
"expiresAt": "2099-01-01T00:00:00Z",
},
}
httpserver.expect_request("/v1/refresh", method="POST").respond_with_json(
refresh_response
)

c = Client(
refresh_token="arg_refresh_token",
refresh_url=f"{TEST_HOST}/v1/refresh",
)
assert isinstance(c._session.auth, RefreshTokenAuth)
assert c.settings.refresh_token.get_secret_value() == "arg_refresh_token"


def test_settings_with_only_refresh_token_raises_auth_missing_error(monkeypatch):
with pytest.raises(AuthMissingError) as exc_info:
IncydrSettings(
url=TEST_HOST,
refresh_token="test_refresh_token",
_env_file="",
)

assert "api_client_id" in exc_info.value.error_keys
assert "api_client_secret" in exc_info.value.error_keys


def test_settings_with_only_refresh_url_raises_auth_missing_error(monkeypatch):
with pytest.raises(AuthMissingError) as exc_info:
IncydrSettings(
url=TEST_HOST,
refresh_url=f"{TEST_HOST}/v1/refresh",
_env_file="",
)

assert "api_client_id" in exc_info.value.error_keys
assert "api_client_secret" in exc_info.value.error_keys


def test_client_prefers_refresh_token_auth_when_both_auth_methods_provided(
httpserver: HTTPServer, monkeypatch
):
monkeypatch.setenv("incydr_url", TEST_HOST)
monkeypatch.setenv("incydr_api_client_id", "env_id")
monkeypatch.setenv("incydr_api_client_secret", "env_secret")
monkeypatch.setenv("incydr_refresh_token", "test_refresh_token")
monkeypatch.setenv("incydr_refresh_url", f"{TEST_HOST}/v1/refresh")

refresh_response = {
"accessToken": {
"tokenValue": TEST_TOKEN,
"expiresAt": "2099-01-01T00:00:00Z",
},
"refreshToken": {
"tokenValue": "new_refresh_token",
"expiresAt": "2099-01-01T00:00:00Z",
},
}
httpserver.expect_request("/v1/refresh", method="POST").respond_with_json(
refresh_response
)

c = Client()
assert isinstance(c._session.auth, RefreshTokenAuth)