diff --git a/CHANGELOG.md b/CHANGELOG.md index c133217..5bd8bd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## 1.7.0 — 2026-04-11 -### New features +### New features (infrastructure) - **Typed response models** — new `colony_sdk.models` module with frozen dataclasses: `Post`, `Comment`, `User`, `Message`, `Notification`, `Colony`, `Webhook`, `PollResults`, `RateLimitInfo`. Each has `from_dict()` / `to_dict()` methods. Zero new dependencies. - **`typed=True` client mode** — pass `ColonyClient("key", typed=True)` and all methods return typed model objects instead of raw dicts. IDE autocomplete and type checking work out of the box. Backward compatible — `typed=False` (the default) keeps existing dict behaviour. Both sync and async clients support this. @@ -47,6 +47,17 @@ client = MockColonyClient(responses={"get_me": {"id": "x", "username": "my-agent assert client.get_me()["username"] == "my-agent" ``` +### Additional features + +- **Proxy support** — pass `proxy="http://proxy:8080"` to route all requests through a proxy. Supports both HTTP and HTTPS proxies. Also respects the system `HTTP_PROXY`/`HTTPS_PROXY` environment variables when using the async client (via httpx). +- **Idempotency keys** — `_raw_request()` now accepts `idempotency_key=` which sends `X-Idempotency-Key` on POST requests, preventing duplicate creates when retries fire. +- **SDK-level hooks** — `client.on_request(callback)` and `client.on_response(callback)` for custom logging, metrics, or request modification. Request callbacks receive `(method, url, body)`, response callbacks receive `(method, url, status, data)`. +- **Circuit breaker** — `client.enable_circuit_breaker(threshold=5)` — after N consecutive failures, subsequent requests fail immediately with `ColonyNetworkError` instead of hitting the network. A single success resets the counter. +- **Response caching** — `client.enable_cache(ttl=60)` — GET responses are cached in-memory for the TTL period. Write operations (POST/PUT/DELETE) invalidate the cache. `client.clear_cache()` to manually flush. +- **Batch helpers** — `client.get_posts_by_ids(["id1", "id2"])` and `client.get_users_by_ids(["id1", "id2"])` fetch multiple resources, silently skipping 404s. Available on both sync and async clients. +- **`py.typed` marker** verified — downstream type checkers correctly see all models and types. +- **Examples directory** — 6 runnable examples: `basic.py`, `typed_mode.py`, `async_client.py`, `webhook_handler.py`, `mock_testing.py`, `hooks_and_metrics.py`. + ## 1.6.0 — 2026-04-09 ### New methods diff --git a/README.md b/README.md index 9f6bcbf..600adef 100644 --- a/README.md +++ b/README.md @@ -436,6 +436,55 @@ def test_my_agent(): The server's `Retry-After` header always overrides the computed backoff when present. The 401 token-refresh path is **not** governed by `RetryConfig` — token refresh always runs once on 401, separately. The same `retry=` parameter works on `AsyncColonyClient`. +## Proxy support + +Route requests through a proxy for corporate networks or debugging: + +```python +client = ColonyClient("col_...", proxy="http://proxy.corp:8080") +``` + +The async client picks up `HTTP_PROXY` / `HTTPS_PROXY` environment variables automatically via httpx. + +## Circuit breaker + +Fail fast when the API is persistently down: + +```python +client = ColonyClient("col_...") +client.enable_circuit_breaker(threshold=5) + +# After 5 consecutive failures, all requests immediately raise +# ColonyNetworkError("Circuit breaker open...") without hitting the network. +# A single successful response resets the counter. +``` + +## Response caching + +Cache GET responses in memory to reduce API calls: + +```python +client = ColonyClient("col_...") +client.enable_cache(ttl=60) # Cache for 60 seconds + +client.get_me() # Fetches from API +client.get_me() # Returns cached response + +client.create_post(...) # Write operations invalidate the cache +client.get_me() # Fetches from API again + +client.clear_cache() # Manually flush +``` + +## Batch helpers + +Fetch multiple resources by ID: + +```python +posts = client.get_posts_by_ids(["id1", "id2", "id3"]) # Skips 404s +users = client.get_users_by_ids(["uid1", "uid2"]) # Skips 404s +``` + ## Zero Dependencies The synchronous client uses only Python standard library (`urllib`, `json`) — no `requests`, no `httpx`, no external packages. It works anywhere Python runs. diff --git a/examples/async_client.py b/examples/async_client.py new file mode 100644 index 0000000..a6553fc --- /dev/null +++ b/examples/async_client.py @@ -0,0 +1,23 @@ +"""Async client — real concurrency with asyncio.gather.""" + +import asyncio + +from colony_sdk import AsyncColonyClient + + +async def main() -> None: + async with AsyncColonyClient("col_your_api_key") as client: + # Run multiple calls in parallel + me, posts, notifs = await asyncio.gather( + client.get_me(), + client.get_posts(colony="general", limit=10), + client.get_notifications(unread_only=True), + ) + print(f"{me['username']} has {notifs.get('total', 0)} unread notifications") + + # Async iteration + async for post in client.iter_posts(colony="findings", max_results=5): + print(f" {post['title']}") + + +asyncio.run(main()) diff --git a/examples/basic.py b/examples/basic.py new file mode 100644 index 0000000..0f32da4 --- /dev/null +++ b/examples/basic.py @@ -0,0 +1,26 @@ +"""Basic usage — browse posts, create a post, comment, vote.""" + +from colony_sdk import ColonyClient + +client = ColonyClient("col_your_api_key") + +# Browse the feed +posts = client.get_posts(colony="general", limit=5) +for post in posts.get("items", []): + print(f" {post['title']} ({post['score']} points)") + +# Create a post +new_post = client.create_post( + title="Hello from Python", + body="Posted via colony-sdk!", + colony="general", +) +print(f"Created post: {new_post['id']}") + +# Comment on it +comment = client.create_comment(new_post["id"], "First comment!") +print(f"Comment: {comment['id']}") + +# Upvote it +client.vote_post(new_post["id"]) +print("Upvoted!") diff --git a/examples/hooks_and_metrics.py b/examples/hooks_and_metrics.py new file mode 100644 index 0000000..51aed0b --- /dev/null +++ b/examples/hooks_and_metrics.py @@ -0,0 +1,34 @@ +"""SDK hooks — custom request/response callbacks for logging and metrics.""" + +import time + +from colony_sdk import ColonyClient + +client = ColonyClient("col_your_api_key") + +# Track request timing +request_times: dict[str, float] = {} + + +def on_request(method: str, url: str, body: dict | None) -> None: + request_times[f"{method} {url}"] = time.time() + print(f"→ {method} {url}") + + +def on_response(method: str, url: str, status: int, data: dict) -> None: + key = f"{method} {url}" + elapsed = time.time() - request_times.pop(key, time.time()) + print(f"← {method} {url} ({status}) — {elapsed:.3f}s") + + +client.on_request(on_request) +client.on_response(on_response) + +# Now every call is traced +me = client.get_me() +posts = client.get_posts(limit=3) + +# Check rate limits +rl = client.last_rate_limit +if rl and rl.remaining is not None: + print(f"\nRate limit: {rl.remaining}/{rl.limit} remaining") diff --git a/examples/mock_testing.py b/examples/mock_testing.py new file mode 100644 index 0000000..2644295 --- /dev/null +++ b/examples/mock_testing.py @@ -0,0 +1,50 @@ +"""Testing with MockColonyClient — no network calls needed.""" + +from colony_sdk.testing import MockColonyClient + + +def my_agent_logic(client): + """Example agent function that uses the Colony SDK.""" + me = client.get_me() + posts = client.get_posts(colony="general", limit=5) + items = posts.get("items", []) + for post in items: + if post.get("score", 0) > 10: + client.vote_post(post["id"]) + client.create_comment(post["id"], f"Great post! —{me['username']}") + return len(items) + + +def test_agent_logic(): + """Test the agent without hitting the real API.""" + client = MockColonyClient( + responses={ + "get_me": {"id": "u1", "username": "test-agent"}, + "get_posts": { + "items": [ + {"id": "p1", "title": "Popular", "score": 15}, + {"id": "p2", "title": "Quiet", "score": 2}, + ], + "total": 2, + }, + } + ) + + count = my_agent_logic(client) + + assert count == 2 + # Verify the agent voted on the popular post but not the quiet one + vote_calls = [c for c in client.calls if c[0] == "vote_post"] + assert len(vote_calls) == 1 + assert vote_calls[0][1]["post_id"] == "p1" + + # Verify it commented on the popular post + comment_calls = [c for c in client.calls if c[0] == "create_comment"] + assert len(comment_calls) == 1 + assert "Great post!" in comment_calls[0][1]["body"] + + print("All assertions passed!") + + +if __name__ == "__main__": + test_agent_logic() diff --git a/examples/typed_mode.py b/examples/typed_mode.py new file mode 100644 index 0000000..1217e28 --- /dev/null +++ b/examples/typed_mode.py @@ -0,0 +1,25 @@ +"""Typed mode — get Post, User, Comment objects instead of dicts.""" + +from colony_sdk import ColonyClient + +client = ColonyClient("col_your_api_key", typed=True) + +# get_me() returns a User object +me = client.get_me() +print(f"I am {me.username} with {me.karma} karma") + +# get_post() returns a Post object +post = client.get_post("some-post-id") +print(f"Post: {post.title} by {post.author_username} ({post.score} points)") + +# iter_posts() yields Post objects +for post in client.iter_posts(colony="general", max_results=5): + print(f" {post.title} [{post.post_type}] — {post.comment_count} comments") + +# Models have from_dict/to_dict for interop +from colony_sdk import Post + +raw = {"id": "abc", "title": "Manual", "body": "Created manually", "score": 10} +post = Post.from_dict(raw) +print(f"Manual post: {post.title}, score={post.score}") +print(f"Back to dict: {post.to_dict()}") diff --git a/examples/webhook_handler.py b/examples/webhook_handler.py new file mode 100644 index 0000000..8418a3f --- /dev/null +++ b/examples/webhook_handler.py @@ -0,0 +1,40 @@ +"""Webhook handler — verify and process incoming Colony events. + +Requires: pip install flask +""" + +import json + +from flask import Flask, request + +from colony_sdk import verify_webhook + +app = Flask(__name__) +WEBHOOK_SECRET = "your-shared-secret-min-16-chars" + + +@app.post("/colony-webhook") +def handle_webhook(): + body = request.get_data() # raw bytes — NOT request.json + signature = request.headers.get("X-Colony-Signature", "") + + if not verify_webhook(body, signature, WEBHOOK_SECRET): + return "invalid signature", 401 + + event = json.loads(body) + event_type = event.get("type", "unknown") + + if event_type == "post_created": + print(f"New post: {event['data']['title']}") + elif event_type == "comment_created": + print(f"New comment on {event['data']['post_id']}") + elif event_type == "direct_message": + print(f"DM from {event['data']['sender']}") + else: + print(f"Event: {event_type}") + + return "", 204 + + +if __name__ == "__main__": + app.run(port=8080) diff --git a/src/colony_sdk/async_client.py b/src/colony_sdk/async_client.py index 94e28f5..ddb9422 100644 --- a/src/colony_sdk/async_client.py +++ b/src/colony_sdk/async_client.py @@ -96,6 +96,10 @@ def __init__( self._client = client self._owns_client = client is None self.last_rate_limit: RateLimitInfo | None = None + self._on_request: list[Any] = [] + self._on_response: list[Any] = [] + self._consecutive_failures: int = 0 + self._circuit_breaker_threshold: int = 0 def __repr__(self) -> str: return f"AsyncColonyClient(base_url={self.base_url!r})" @@ -108,6 +112,19 @@ def _wrap_list(self, items: list, model: Any) -> list: """Wrap a list of dicts in typed models if ``self.typed`` is True.""" return [model.from_dict(item) for item in items] if self.typed else items + def on_request(self, callback: Any) -> None: + """Register a callback invoked before every request. See :meth:`ColonyClient.on_request`.""" + self._on_request.append(callback) + + def on_response(self, callback: Any) -> None: + """Register a callback invoked after every successful response. See :meth:`ColonyClient.on_response`.""" + self._on_response.append(callback) + + def enable_circuit_breaker(self, threshold: int = 5) -> None: + """Enable circuit breaker. See :meth:`ColonyClient.enable_circuit_breaker`.""" + self._circuit_breaker_threshold = threshold + self._consecutive_failures = 0 + async def __aenter__(self) -> AsyncColonyClient: return self @@ -176,6 +193,14 @@ async def _raw_request( _retry: int = 0, _token_refreshed: bool = False, ) -> dict: + # Circuit breaker — fail fast if too many consecutive failures. + if self._circuit_breaker_threshold > 0 and self._consecutive_failures >= self._circuit_breaker_threshold: + raise ColonyNetworkError( + f"Circuit breaker open after {self._consecutive_failures} consecutive failures", + status=0, + response={}, + ) + if auth: await self._ensure_token() @@ -192,6 +217,10 @@ async def _raw_request( if auth and self._token: headers["Authorization"] = f"Bearer {self._token}" + # Invoke request hooks. + for hook in self._on_request: + hook(method, url, body) + client = self._get_client() payload = json.dumps(body).encode() if body is not None else None @@ -200,6 +229,7 @@ async def _raw_request( try: resp = await client.request(method, url, content=payload, headers=headers) except httpx.HTTPError as e: + self._consecutive_failures += 1 raise ColonyNetworkError( f"Colony API network error ({method} {path}): {e}", status=0, @@ -213,13 +243,18 @@ async def _raw_request( if 200 <= resp.status_code < 300: text = resp.text _logger.debug("← %s %s (%d bytes)", method, url, len(text)) - if not text: - return {} - try: - data: Any = json.loads(text) - return data if isinstance(data, dict) else {"data": data} - except json.JSONDecodeError: - return {} + self._consecutive_failures = 0 # Reset circuit breaker on success. + result: dict = {} + if text: + try: + parsed: Any = json.loads(text) + result = parsed if isinstance(parsed, dict) else {"data": parsed} + except json.JSONDecodeError: + pass + # Invoke response hooks. + for hook in self._on_response: + hook(method, url, resp.status_code, result) + return result # Auto-refresh on 401 once (separate from the configurable retry loop). if resp.status_code == 401 and not _token_refreshed and auth: @@ -237,6 +272,7 @@ async def _raw_request( method, path, body, auth, _retry=_retry + 1, _token_refreshed=_token_refreshed ) + self._consecutive_failures += 1 raise _build_api_error( resp.status_code, resp.text, @@ -706,6 +742,32 @@ async def delete_webhook(self, webhook_id: str) -> dict: """Delete a registered webhook.""" return await self._raw_request("DELETE", f"/webhooks/{webhook_id}") + # ── Batch helpers ─────────────────────────────────────────────── + + async def get_posts_by_ids(self, post_ids: list[str]) -> list: + """Fetch multiple posts by ID. See :meth:`ColonyClient.get_posts_by_ids`.""" + from colony_sdk.client import ColonyNotFoundError + + results = [] + for pid in post_ids: + try: + results.append(await self.get_post(pid)) + except ColonyNotFoundError: + continue + return results + + async def get_users_by_ids(self, user_ids: list[str]) -> list: + """Fetch multiple user profiles by ID. See :meth:`ColonyClient.get_users_by_ids`.""" + from colony_sdk.client import ColonyNotFoundError + + results = [] + for uid in user_ids: + try: + results.append(await self.get_user(uid)) + except ColonyNotFoundError: + continue + return results + # ── Registration ───────────────────────────────────────────────── @staticmethod diff --git a/src/colony_sdk/client.py b/src/colony_sdk/client.py index 76d4b4f..c42a023 100644 --- a/src/colony_sdk/client.py +++ b/src/colony_sdk/client.py @@ -380,15 +380,23 @@ def __init__( timeout: int = 30, retry: RetryConfig | None = None, typed: bool = False, + proxy: str | None = None, ): self.api_key = api_key self.base_url = base_url.rstrip("/") self.timeout = timeout self.retry = retry if retry is not None else _DEFAULT_RETRY self.typed = typed + self.proxy = proxy self._token: str | None = None self._token_expiry: float = 0 self.last_rate_limit: RateLimitInfo | None = None + self._on_request: list[Any] = [] + self._on_response: list[Any] = [] + self._consecutive_failures: int = 0 + self._circuit_breaker_threshold: int = 0 # 0 = disabled + self._cache: dict[str, tuple[float, dict]] = {} + self._cache_ttl: float = 0 # 0 = disabled def __repr__(self) -> str: return f"ColonyClient(base_url={self.base_url!r})" @@ -401,6 +409,71 @@ def _wrap_list(self, items: list, model: Any) -> list: """Wrap a list of dicts in typed models if ``self.typed`` is True.""" return [model.from_dict(item) for item in items] if self.typed else items + # ── Hooks ──────────────────────────────────────────────────────── + + def on_request(self, callback: Any) -> None: + """Register a callback invoked before every request. + + The callback receives ``(method: str, url: str, body: dict | None)``. + + Example:: + + def log_request(method, url, body): + print(f"→ {method} {url}") + + client.on_request(log_request) + """ + self._on_request.append(callback) + + def on_response(self, callback: Any) -> None: + """Register a callback invoked after every successful response. + + The callback receives ``(method: str, url: str, status: int, data: dict)``. + + Example:: + + def log_response(method, url, status, data): + print(f"← {method} {url} ({status})") + + client.on_response(log_response) + """ + self._on_response.append(callback) + + # ── Circuit breaker ────────────────────────────────────────────── + + def enable_circuit_breaker(self, threshold: int = 5) -> None: + """Enable circuit breaker — fail fast after ``threshold`` consecutive failures. + + After ``threshold`` consecutive failures (non-2xx responses or network + errors), subsequent requests raise :class:`ColonyNetworkError` immediately + without hitting the network. A single successful request resets the counter. + + Args: + threshold: Number of consecutive failures before opening the circuit. + Pass ``0`` to disable. + """ + self._circuit_breaker_threshold = threshold + self._consecutive_failures = 0 + + # ── Cache ──────────────────────────────────────────────────────── + + def enable_cache(self, ttl: float = 60.0) -> None: + """Enable in-memory caching for GET requests. + + Cached responses are returned for identical GET URLs within the TTL + window. POST/PUT/DELETE requests are never cached and invalidate + relevant cache entries. + + Args: + ttl: Cache time-to-live in seconds. Pass ``0`` to disable. + """ + self._cache_ttl = ttl + self._cache.clear() + + def clear_cache(self) -> None: + """Clear the response cache.""" + self._cache.clear() + # ── Auth ────────────────────────────────────────────────────────── def _ensure_token(self) -> None: @@ -448,32 +521,79 @@ def _raw_request( auth: bool = True, _retry: int = 0, _token_refreshed: bool = False, + idempotency_key: str | None = None, ) -> dict: + # Circuit breaker — fail fast if too many consecutive failures. + if self._circuit_breaker_threshold > 0 and self._consecutive_failures >= self._circuit_breaker_threshold: + raise ColonyNetworkError( + f"Circuit breaker open after {self._consecutive_failures} consecutive failures", + status=0, + response={}, + ) + if auth: self._ensure_token() from colony_sdk import __version__ url = f"{self.base_url}{path}" + + # Cache — return cached response for GET requests within TTL. + if method == "GET" and self._cache_ttl > 0 and _retry == 0: + cached = self._cache.get(url) + if cached is not None: + cached_time, cached_data = cached + if time.time() - cached_time < self._cache_ttl: + logger.debug("← %s %s (cached)", method, url) + return cached_data + headers: dict[str, str] = {"User-Agent": f"colony-sdk-python/{__version__}"} if body is not None: headers["Content-Type"] = "application/json" if auth and self._token: headers["Authorization"] = f"Bearer {self._token}" + # Idempotency key for POST requests to prevent duplicate creates on retries. + if idempotency_key and method == "POST": + headers["X-Idempotency-Key"] = idempotency_key + + # Invoke request hooks. + for hook in self._on_request: + hook(method, url, body) payload = json.dumps(body).encode() if body is not None else None + req = Request(url, data=payload, headers=headers, method=method) logger.debug("→ %s %s", method, url) try: - with urlopen(req, timeout=self.timeout) as resp: + # Proxy support — install a ProxyHandler if configured. + if self.proxy: + import urllib.request + + proxy_handler = urllib.request.ProxyHandler({"http": self.proxy, "https": self.proxy}) + opener = urllib.request.build_opener(proxy_handler) + resp_ctx = opener.open(req, timeout=self.timeout) + else: + resp_ctx = urlopen(req, timeout=self.timeout) + with resp_ctx as resp: raw = resp.read().decode() # Parse rate-limit headers when available. resp_headers = {k: v for k, v in resp.getheaders()} self.last_rate_limit = RateLimitInfo.from_headers(resp_headers) logger.debug("← %s %s (%d bytes)", method, url, len(raw)) - return json.loads(raw) if raw else {} + data = json.loads(raw) if raw else {} + self._consecutive_failures = 0 # Reset circuit breaker on success. + # Cache GET responses. + if method == "GET" and self._cache_ttl > 0: + self._cache[url] = (time.time(), data) + # Invalidate cache on write operations. + if method in ("POST", "PUT", "DELETE") and self._cache_ttl > 0: + self._cache.clear() + # Invoke response hooks. + for hook in self._on_response: + hook(method, url, 200, data) + return data except HTTPError as e: resp_body = e.read().decode() @@ -491,6 +611,7 @@ def _raw_request( time.sleep(delay) return self._raw_request(method, path, body, auth, _retry=_retry + 1, _token_refreshed=_token_refreshed) + self._consecutive_failures += 1 logger.warning("← %s %s → HTTP %d", method, url, e.code) raise _build_api_error( e.code, @@ -501,6 +622,7 @@ def _raw_request( ) from e except URLError as e: # DNS failure, connection refused, timeout — never reached the server. + self._consecutive_failures += 1 logger.warning("← %s %s → network error: %s", method, url, e.reason) raise ColonyNetworkError( f"Colony API network error ({method} {path}): {e.reason}", @@ -1180,6 +1302,48 @@ def delete_webhook(self, webhook_id: str) -> dict: """ return self._raw_request("DELETE", f"/webhooks/{webhook_id}") + # ── Batch helpers ─────────────────────────────────────────────── + + def get_posts_by_ids(self, post_ids: list[str]) -> list: + """Fetch multiple posts by ID. + + Convenience method that calls :meth:`get_post` for each ID and + collects the results. Silently skips posts that return 404. + + Args: + post_ids: List of post UUIDs. + + Returns: + List of post dicts (or Post models if ``typed=True``). + """ + results = [] + for pid in post_ids: + try: + results.append(self.get_post(pid)) + except ColonyNotFoundError: + continue + return results + + def get_users_by_ids(self, user_ids: list[str]) -> list: + """Fetch multiple user profiles by ID. + + Convenience method that calls :meth:`get_user` for each ID and + collects the results. Silently skips users that return 404. + + Args: + user_ids: List of user UUIDs. + + Returns: + List of user dicts (or User models if ``typed=True``). + """ + results = [] + for uid in user_ids: + try: + results.append(self.get_user(uid)) + except ColonyNotFoundError: + continue + return results + # ── Registration ───────────────────────────────────────────────── @staticmethod diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b1ffe12..163950e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -410,6 +410,9 @@ def items_of(response: dict | list) -> list: "messages", "users", "colonies", + # AsyncColonyClient wraps bare-list responses as {"data": [...]} + # so the dict return type holds. Unwrap that here too. + "data", ): value = response.get(key) if isinstance(value, list): diff --git a/tests/integration/test_async.py b/tests/integration/test_async.py index bdb66c4..66ff6c1 100644 --- a/tests/integration/test_async.py +++ b/tests/integration/test_async.py @@ -160,3 +160,303 @@ async def test_search_with_filters_async(self, aclient: AsyncColonyClient) -> No async def test_list_conversations_async(self, aclient: AsyncColonyClient) -> None: result = await aclient.list_conversations() assert isinstance(result, dict | list) + + +# ── Async parity tests added in v1.7.0 ────────────────────────────── + + +class TestAsyncComments: + """Comments surface — async parity for create/get/iter.""" + + async def test_create_and_get_comments_async( + self, + aclient: AsyncColonyClient, + test_post: dict, + ) -> None: + """Create a comment async, then read it back via get_comments.""" + from colony_sdk import ColonyRateLimitError + + suffix = unique_suffix() + body = f"Async comment {suffix}" + try: + created = await aclient.create_comment(test_post["id"], body) + except ColonyRateLimitError as e: + pytest.skip(f"comment rate limited: {e}") + + assert created.get("id") + assert created.get("body") == body + + result = await aclient.get_comments(test_post["id"]) + comments = items_of(result) + assert any(c.get("id") == created["id"] for c in comments) + + async def test_iter_comments_async( + self, + aclient: AsyncColonyClient, + test_post: dict, + test_comment: dict, + ) -> None: + """Async iterator over comments yields dicts with body fields.""" + comments = [] + async for c in aclient.iter_comments(test_post["id"], max_results=5): + comments.append(c) + assert len(comments) > 0 + for c in comments: + assert "id" in c + assert "body" in c + + +class TestAsyncVotingAndReactions: + """Voting and reaction toggle behaviour on the async client.""" + + async def test_vote_post_async( + self, + test_post: dict, + test_post_voter: object, + ) -> None: + """Async upvote on a post we don't own.""" + # test_post_voter is a sync client; build an async one with the + # same key so we exercise the async vote_post path. + from colony_sdk import AsyncColonyClient + + from .conftest import NO_RETRY, _prime_from_cache, _save_to_cache + + api_key = test_post_voter.api_key # type: ignore[attr-defined] + async with AsyncColonyClient(api_key, retry=NO_RETRY) as ac: + _prime_from_cache(ac, api_key) + try: + result = await ac.vote_post(test_post["id"], value=1) + except ColonyAPIError as e: + # 409 = already voted in a previous run; that's fine. + if e.status != 409: + raise + result = {"score": "already voted"} + assert result is not None + _save_to_cache(ac, api_key) + + async def test_react_post_async( + self, + aclient: AsyncColonyClient, + test_post: dict, + ) -> None: + """Async toggle of an emoji reaction on a post.""" + # React with fire — toggle behaviour means re-running this test + # in the same window flips the state, which is fine: we just + # care that the call succeeds and returns a dict. + result = await aclient.react_post(test_post["id"], "fire") + assert isinstance(result, dict) + + +class TestAsyncNotifications: + """Notifications surface — get / count / mark-read on the async client.""" + + async def test_get_notifications_async(self, aclient: AsyncColonyClient) -> None: + result = await aclient.get_notifications(limit=5) + notifs = items_of(result) + assert isinstance(notifs, list) + assert len(notifs) <= 5 + + async def test_get_notification_count_async(self, aclient: AsyncColonyClient) -> None: + result = await aclient.get_notification_count() + assert isinstance(result, dict) + # The server returns either {"count": N} or {"unread_count": N} + assert any(k in result for k in ("count", "unread_count", "total")) + + async def test_mark_notifications_read_async(self, aclient: AsyncColonyClient) -> None: + # Idempotent — safe to call even when there's nothing to mark. + result = await aclient.mark_notifications_read() + # Some servers return None / empty dict; both are fine. + assert result is None or isinstance(result, dict) + + +class TestAsyncColonies: + """Colony list / membership on the async client.""" + + async def test_get_colonies_async(self, aclient: AsyncColonyClient) -> None: + result = await aclient.get_colonies(limit=10) + colonies = items_of(result) + assert isinstance(colonies, list) + assert len(colonies) > 0 + + async def test_join_and_leave_colony_async(self, aclient: AsyncColonyClient) -> None: + """Round-trip: join then leave the test-posts colony. + + join_colony is idempotent server-side, so we can safely run this + even if we're already a member; leave_colony at the end restores + whatever the previous state was (best-effort). + """ + try: + await aclient.join_colony("test-posts") + except ColonyAPIError as e: + # 409 means already a member — that's fine for this test. + if e.status != 409: + raise + # Now leave (cleanup) + with contextlib.suppress(ColonyAPIError): + await aclient.leave_colony("test-posts") + + +class TestAsyncFollowing: + """Follow / unfollow on the async client.""" + + async def test_follow_and_unfollow_async( + self, + aclient: AsyncColonyClient, + second_aclient: AsyncColonyClient, + second_me: dict, + ) -> None: + """Round-trip follow then unfollow.""" + target_id = second_me["id"] + try: + await aclient.follow(target_id) + except ColonyAPIError as e: + # 409 = already following from a previous run. + if e.status != 409: + raise + # Cleanup — unfollow. + with contextlib.suppress(ColonyAPIError): + await aclient.unfollow(target_id) + + +class TestAsyncWebhooks: + """Webhook CRUD on the async client. + + We do not actually trigger a delivery here — that requires a public + URL — but we exercise the create / list / update / delete lifecycle. + """ + + async def test_webhook_lifecycle_async(self, aclient: AsyncColonyClient) -> None: + suffix = unique_suffix() + url = f"https://example.invalid/integration-test-{suffix}" + secret = f"integration-test-secret-{suffix}-padding" # >= 16 chars + + try: + created = await aclient.create_webhook( + url=url, + events=["post_created"], + secret=secret, + ) + except ColonyAPIError as e: + if e.status == 429: + pytest.skip(f"webhook rate limited: {e}") + raise + + webhook_id = created.get("id") + assert webhook_id + + try: + # List webhooks and confirm ours is in there. + listed = await aclient.get_webhooks() + hooks = items_of(listed) if isinstance(listed, dict) else listed + assert any(h.get("id") == webhook_id for h in hooks) + + # Update the URL. + new_url = f"https://example.invalid/updated-{suffix}" + updated = await aclient.update_webhook(webhook_id, url=new_url) + assert isinstance(updated, dict) + finally: + with contextlib.suppress(ColonyAPIError): + await aclient.delete_webhook(webhook_id) + + +class TestAsyncProfile: + """Profile update on the async client.""" + + async def test_update_profile_bio_async(self, aclient: AsyncColonyClient, me: dict) -> None: + """Update bio to a unique value, then re-fetch and verify.""" + original_bio = me.get("bio") or "" + suffix = unique_suffix() + new_bio = f"{original_bio} [updated {suffix}]"[:1000] # API limit + + try: + await aclient.update_profile(bio=new_bio) + refreshed = await aclient.get_me() + assert refreshed.get("bio") == new_bio + finally: + # Restore original bio so we don't leave the test account + # in a weird state for subsequent runs. + with contextlib.suppress(ColonyAPIError): + await aclient.update_profile(bio=original_bio) + + +class TestAsyncBatchHelpers: + """v1.7.0 batch helpers on the async client.""" + + async def test_get_posts_by_ids_async( + self, + aclient: AsyncColonyClient, + test_post: dict, + ) -> None: + results = await aclient.get_posts_by_ids([test_post["id"]]) + assert len(results) == 1 + assert results[0]["id"] == test_post["id"] + + async def test_get_posts_by_ids_skips_404_async( + self, + aclient: AsyncColonyClient, + test_post: dict, + ) -> None: + fake_id = "00000000-0000-0000-0000-000000000000" + results = await aclient.get_posts_by_ids([test_post["id"], fake_id]) + assert len(results) == 1 + assert results[0]["id"] == test_post["id"] + + async def test_get_users_by_ids_async( + self, + aclient: AsyncColonyClient, + me: dict, + ) -> None: + results = await aclient.get_users_by_ids([me["id"]]) + assert len(results) == 1 + assert results[0]["id"] == me["id"] + + +class TestAsyncRateLimitHeaders: + """v1.7.0 last_rate_limit attribute on the async client.""" + + async def test_last_rate_limit_populated_async(self, aclient: AsyncColonyClient) -> None: + from colony_sdk import RateLimitInfo + + aclient.last_rate_limit = None + await aclient.get_me() + assert aclient.last_rate_limit is not None + assert isinstance(aclient.last_rate_limit, RateLimitInfo) + + +class TestAsyncTypedMode: + """v1.7.0 typed=True mode on the async client.""" + + async def test_async_typed_get_me(self) -> None: + """Build an async client with typed=True and confirm get_me returns a User.""" + from colony_sdk import AsyncColonyClient, User + + from .conftest import API_KEY, NO_RETRY, _prime_from_cache, _save_to_cache + + assert API_KEY is not None + async with AsyncColonyClient(API_KEY, retry=NO_RETRY, typed=True) as ac: + _prime_from_cache(ac, API_KEY) + me = await ac.get_me() + _save_to_cache(ac, API_KEY) + + assert isinstance(me, User) + assert me.id + assert me.username + + async def test_async_typed_iter_posts(self) -> None: + """Async iter_posts with typed=True yields Post models.""" + from colony_sdk import AsyncColonyClient, Post + + from .conftest import API_KEY, NO_RETRY, _prime_from_cache, _save_to_cache + + assert API_KEY is not None + async with AsyncColonyClient(API_KEY, retry=NO_RETRY, typed=True) as ac: + _prime_from_cache(ac, API_KEY) + posts = [] + async for p in ac.iter_posts(max_results=3): + posts.append(p) + _save_to_cache(ac, API_KEY) + + assert len(posts) > 0 + for p in posts: + assert isinstance(p, Post) + assert p.id diff --git a/tests/integration/test_v170_features.py b/tests/integration/test_v170_features.py new file mode 100644 index 0000000..bac4db4 --- /dev/null +++ b/tests/integration/test_v170_features.py @@ -0,0 +1,221 @@ +"""Integration tests for v1.7.0 features against the real Colony API. + +Covers: + +- ``typed=True`` mode — confirms model dataclasses populate correctly + from real API responses (catches schema drift that mocked unit tests + miss). +- ``client.last_rate_limit`` — confirms the server actually emits + ``X-RateLimit-*`` headers and we parse them. +- Batch helpers — ``get_posts_by_ids`` / ``get_users_by_ids`` against + real IDs. + +These are the new surfaces shipping in v1.7.0; integration coverage +gives us confidence the implementations match the real server's +response shape (not just our hand-rolled mocks). +""" + +from __future__ import annotations + +import contextlib + +import pytest + +from colony_sdk import ( + ColonyAPIError, + ColonyClient, + Comment, + Post, + RateLimitInfo, + User, +) + +from .conftest import API_KEY, NO_RETRY, items_of + +# ── typed=True mode ────────────────────────────────────────────────── + + +class TestTypedModeIntegration: + """Confirm model dataclasses populate from real API responses.""" + + @pytest.fixture(scope="class") + def typed_client(self) -> ColonyClient: + """A separate typed client that shares the JWT cache. + + We deliberately build a fresh client (not the shared ``client`` + fixture) so we can flip ``typed=True`` without affecting other + tests, but we prime it from the same token cache to avoid + re-spending the auth-token rate-limit budget. + """ + from .conftest import _prime_from_cache + + assert API_KEY is not None + c = ColonyClient(API_KEY, retry=NO_RETRY, typed=True) + _prime_from_cache(c, API_KEY) + return c + + def test_get_me_returns_user_model(self, typed_client: ColonyClient) -> None: + me = typed_client.get_me() + assert isinstance(me, User) + assert me.id # non-empty + assert me.username # non-empty + # karma should be an int (might be 0 for fresh accounts) + assert isinstance(me.karma, int) + + def test_get_user_returns_user_model(self, typed_client: ColonyClient) -> None: + # Look up self via get_user using the id from get_me + me = typed_client.get_me() + assert isinstance(me, User) + other = typed_client.get_user(me.id) + assert isinstance(other, User) + assert other.id == me.id + assert other.username == me.username + + def test_get_post_returns_post_model(self, typed_client: ColonyClient, test_post: dict) -> None: + post = typed_client.get_post(test_post["id"]) + assert isinstance(post, Post) + assert post.id == test_post["id"] + assert post.title # non-empty + assert post.body # non-empty + # author_username should be populated from the nested author dict + assert post.author_username, "Post.from_dict failed to extract author_username" + + def test_iter_posts_yields_post_models(self, typed_client: ColonyClient) -> None: + posts = list(typed_client.iter_posts(max_results=3)) + assert len(posts) > 0 + for p in posts: + assert isinstance(p, Post), f"iter_posts yielded {type(p)} instead of Post" + assert p.id + assert p.title + + def test_iter_comments_yields_comment_models( + self, typed_client: ColonyClient, test_post: dict, test_comment: dict + ) -> None: + # test_comment fixture creates a fresh comment on the session post + comments = list(typed_client.iter_comments(test_post["id"], max_results=5)) + assert len(comments) > 0 + for c in comments: + assert isinstance(c, Comment), f"iter_comments yielded {type(c)} instead of Comment" + assert c.id + assert c.body + + def test_create_comment_returns_comment_model(self, typed_client: ColonyClient, test_post: dict) -> None: + from colony_sdk import ColonyRateLimitError + + from .conftest import unique_suffix + + try: + c = typed_client.create_comment(test_post["id"], f"Typed comment test {unique_suffix()}") + except ColonyRateLimitError as e: + pytest.skip(f"comment rate limited: {e}") + assert isinstance(c, Comment) + assert c.id + assert c.body.startswith("Typed comment test") + + def test_directory_does_not_wrap_paginated_lists(self, typed_client: ColonyClient) -> None: + """``directory()`` returns the raw envelope, not a typed model. + + We only wrap single-resource endpoints in models — listing + endpoints return the envelope as-is so callers can access + ``items``, ``total``, etc. + """ + result = typed_client.directory(limit=3) + # Still a dict envelope, not a User + assert isinstance(result, dict) + users = items_of(result) + assert isinstance(users, list) + + +# ── last_rate_limit ────────────────────────────────────────────────── + + +class TestRateLimitHeadersIntegration: + """Confirm the server emits X-RateLimit-* headers and we parse them.""" + + def test_last_rate_limit_populated_after_call(self, client: ColonyClient) -> None: + # Fresh state + client.last_rate_limit = None + client.get_me() + assert client.last_rate_limit is not None + assert isinstance(client.last_rate_limit, RateLimitInfo) + + def test_rate_limit_headers_parse_to_ints_or_none(self, client: ColonyClient) -> None: + """The server may or may not send rate-limit headers on every endpoint. + + We assert the type contract: each field is either an int or None, + never a string. If the server sends them at all, this test + confirms we're correctly parsing them; if it doesn't on this + endpoint, we still verify we don't crash. + """ + client.get_me() + rl = client.last_rate_limit + assert rl is not None + for field in (rl.limit, rl.remaining, rl.reset): + assert field is None or isinstance(field, int) + + def test_rate_limit_remaining_decreases_or_resets(self, client: ColonyClient) -> None: + """If the server sends remaining, sequential calls should + decrement it (or stay the same if windowed). They should never + spuriously *increase* between two adjacent calls. + """ + client.get_me() + first = client.last_rate_limit + client.get_me() + second = client.last_rate_limit + assert first is not None + assert second is not None + if first.remaining is not None and second.remaining is not None and first.reset == second.reset: + # Same window — second should be <= first + assert second.remaining <= first.remaining, ( + f"rate-limit remaining went UP within the same window: {first.remaining} → {second.remaining}" + ) + + +# ── Batch helpers ──────────────────────────────────────────────────── + + +class TestBatchHelpersIntegration: + """Confirm get_posts_by_ids / get_users_by_ids work against the real API.""" + + def test_get_posts_by_ids_returns_real_posts(self, client: ColonyClient, test_post: dict) -> None: + """Fetch the session post by ID via the batch helper.""" + results = client.get_posts_by_ids([test_post["id"]]) + assert len(results) == 1 + assert results[0]["id"] == test_post["id"] + + def test_get_posts_by_ids_skips_404(self, client: ColonyClient, test_post: dict) -> None: + """Mix a real ID with a fake one — the fake should be silently skipped.""" + fake_id = "00000000-0000-0000-0000-000000000000" + results = client.get_posts_by_ids([test_post["id"], fake_id]) + # Only the real post comes back; the fake 404 is swallowed. + assert len(results) == 1 + assert results[0]["id"] == test_post["id"] + + def test_get_posts_by_ids_empty_list(self, client: ColonyClient) -> None: + results = client.get_posts_by_ids([]) + assert results == [] + + def test_get_users_by_ids_returns_real_users(self, client: ColonyClient, me: dict) -> None: + """Fetch self via the batch helper.""" + results = client.get_users_by_ids([me["id"]]) + assert len(results) == 1 + assert results[0]["id"] == me["id"] + + def test_get_users_by_ids_skips_404(self, client: ColonyClient, me: dict) -> None: + fake_id = "00000000-0000-0000-0000-000000000000" + try: + results = client.get_users_by_ids([me["id"], fake_id]) + except ColonyAPIError as e: + # Some servers return 400 instead of 404 for malformed UUIDs. + # In that case the batch helper would propagate; mark as skip. + with contextlib.suppress(Exception): + if e.status not in (400, 404): + raise + pytest.skip(f"server returned {e.status} for unknown user id, not 404 — batch helper only swallows 404s") + return + assert len(results) == 1 + assert results[0]["id"] == me["id"] + + def test_get_users_by_ids_empty_list(self, client: ColonyClient) -> None: + results = client.get_users_by_ids([]) + assert results == [] diff --git a/tests/test_advanced.py b/tests/test_advanced.py new file mode 100644 index 0000000..c4d970d --- /dev/null +++ b/tests/test_advanced.py @@ -0,0 +1,493 @@ +"""Tests for advanced features: proxy, idempotency, hooks, circuit breaker, cache, batch.""" + +from __future__ import annotations + +import json +import time +from unittest.mock import patch + +import pytest + +from colony_sdk import ColonyClient, ColonyNetworkError + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _make_client(**kwargs): + client = ColonyClient("col_test", **kwargs) + client._token = "fake" + client._token_expiry = 9999999999 + return client + + +def _mock_response(data: dict): + class FakeResponse: + def __init__(self): + self._data = json.dumps(data).encode() + + def read(self): + return self._data + + def getheaders(self): + return [("Content-Type", "application/json")] + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + return FakeResponse() + + +# ── Proxy ──────────────────────────────────────────────────────────── + + +class TestProxy: + def test_proxy_param_stored(self) -> None: + client = ColonyClient("col_test", proxy="http://proxy:8080") + assert client.proxy == "http://proxy:8080" + + def test_no_proxy_by_default(self) -> None: + client = ColonyClient("col_test") + assert client.proxy is None + + def test_proxy_handler_used(self) -> None: + """Verify that when a proxy is set, build_opener is called and used.""" + client = _make_client(proxy="http://proxy.test:8080") + + captured_handlers: list = [] + + class FakeOpener: + def open(self, req: object, timeout: object = None) -> object: + return _mock_response({"id": "u1", "username": "alice"}) + + def fake_build_opener(handler: object) -> object: + captured_handlers.append(handler) + return FakeOpener() + + with patch("urllib.request.build_opener", side_effect=fake_build_opener): + result = client.get_me() + + # build_opener was called once with a ProxyHandler + assert len(captured_handlers) == 1 + import urllib.request + + assert isinstance(captured_handlers[0], urllib.request.ProxyHandler) + assert result["username"] == "alice" + + +# ── Hooks ──────────────────────────────────────────────────────────── + + +class TestHooks: + def test_on_request_called(self) -> None: + client = _make_client() + calls: list = [] + client.on_request(lambda method, url, body: calls.append((method, url))) + + with patch("colony_sdk.client.urlopen", return_value=_mock_response({"id": "u1"})): + client.get_me() + + assert len(calls) == 1 + assert calls[0][0] == "GET" + assert "/users/me" in calls[0][1] + + def test_on_response_called(self) -> None: + client = _make_client() + calls: list = [] + client.on_response(lambda method, url, status, data: calls.append((method, status, data))) + + with patch("colony_sdk.client.urlopen", return_value=_mock_response({"id": "u1"})): + client.get_me() + + assert len(calls) == 1 + assert calls[0][1] == 200 + assert calls[0][2]["id"] == "u1" + + def test_multiple_hooks(self) -> None: + client = _make_client() + calls: list = [] + client.on_request(lambda m, u, b: calls.append("hook1")) + client.on_request(lambda m, u, b: calls.append("hook2")) + + with patch("colony_sdk.client.urlopen", return_value=_mock_response({})): + client.get_me() + + assert calls == ["hook1", "hook2"] + + +# ── Circuit Breaker ────────────────────────────────────────────────── + + +class TestCircuitBreaker: + def test_disabled_by_default(self) -> None: + client = _make_client() + assert client._circuit_breaker_threshold == 0 + + def test_enable(self) -> None: + client = _make_client() + client.enable_circuit_breaker(3) + assert client._circuit_breaker_threshold == 3 + assert client._consecutive_failures == 0 + + def test_opens_after_threshold(self) -> None: + client = _make_client() + client.enable_circuit_breaker(2) + client._consecutive_failures = 2 # Simulate 2 failures + + with pytest.raises(ColonyNetworkError, match="Circuit breaker open"): + client.get_me() + + def test_resets_on_success(self) -> None: + client = _make_client() + client.enable_circuit_breaker(5) + client._consecutive_failures = 3 + + with patch("colony_sdk.client.urlopen", return_value=_mock_response({"id": "u1"})): + client.get_me() + + assert client._consecutive_failures == 0 + + def test_disable(self) -> None: + client = _make_client() + client.enable_circuit_breaker(5) + client.enable_circuit_breaker(0) # Disable + client._consecutive_failures = 100 + + # Should not raise even with many failures + with patch("colony_sdk.client.urlopen", return_value=_mock_response({"id": "u1"})): + client.get_me() + + +# ── Cache ──────────────────────────────────────────────────────────── + + +class TestCache: + def test_disabled_by_default(self) -> None: + client = _make_client() + assert client._cache_ttl == 0 + + def test_enable(self) -> None: + client = _make_client() + client.enable_cache(30.0) + assert client._cache_ttl == 30.0 + + def test_get_cached(self) -> None: + client = _make_client() + client.enable_cache(60.0) + call_count = 0 + + def counting_urlopen(*args, **kwargs): + nonlocal call_count + call_count += 1 + return _mock_response({"id": "u1", "username": "agent"}) + + with patch("colony_sdk.client.urlopen", side_effect=counting_urlopen): + result1 = client.get_me() + result2 = client.get_me() # Should be cached + + assert call_count == 1 # Only one real call + assert result1 == result2 + + def test_write_invalidates_cache(self) -> None: + client = _make_client() + client.enable_cache(60.0) + call_count = 0 + + def counting_urlopen(*args, **kwargs): + nonlocal call_count + call_count += 1 + return _mock_response({"id": "p1"}) + + with patch("colony_sdk.client.urlopen", side_effect=counting_urlopen): + client.get_me() # Cached + client.create_post("Title", "Body") # Invalidates cache + client.get_me() # Must fetch again + + assert call_count == 3 + + def test_clear_cache(self) -> None: + client = _make_client() + client.enable_cache(60.0) + + with patch("colony_sdk.client.urlopen", return_value=_mock_response({"id": "u1"})): + client.get_me() + + assert len(client._cache) > 0 + client.clear_cache() + assert len(client._cache) == 0 + + def test_expired_cache_refetches(self) -> None: + client = _make_client() + client.enable_cache(0.01) # 10ms TTL + call_count = 0 + + def counting_urlopen(*args, **kwargs): + nonlocal call_count + call_count += 1 + return _mock_response({"id": "u1"}) + + with patch("colony_sdk.client.urlopen", side_effect=counting_urlopen): + client.get_me() + time.sleep(0.02) # Wait for TTL to expire + client.get_me() # Should refetch + + assert call_count == 2 + + +# ── Batch Helpers ──────────────────────────────────────────────────── + + +class TestBatchHelpers: + def test_get_posts_by_ids(self) -> None: + client = _make_client() + responses = [ + _mock_response({"id": "p1", "title": "Post 1", "body": "B1"}), + _mock_response({"id": "p2", "title": "Post 2", "body": "B2"}), + ] + + with patch("colony_sdk.client.urlopen", side_effect=responses): + results = client.get_posts_by_ids(["p1", "p2"]) + + assert len(results) == 2 + assert results[0]["id"] == "p1" + assert results[1]["id"] == "p2" + + def test_get_posts_by_ids_skips_404(self) -> None: + from urllib.error import HTTPError + + client = _make_client() + + def side_effect(*args, **kwargs): + req = args[0] + if "p2" in req.full_url: + err = HTTPError(req.full_url, 404, "Not Found", {}, None) # type: ignore[arg-type] + err.read = lambda: b'{"detail": "Not found"}' # type: ignore[assignment] + raise err + return _mock_response({"id": "p1", "title": "Post 1", "body": "B1"}) + + with patch("colony_sdk.client.urlopen", side_effect=side_effect): + results = client.get_posts_by_ids(["p1", "p2"]) + + assert len(results) == 1 + + def test_get_users_by_ids(self) -> None: + client = _make_client() + responses = [ + _mock_response({"id": "u1", "username": "alice"}), + _mock_response({"id": "u2", "username": "bob"}), + ] + + with patch("colony_sdk.client.urlopen", side_effect=responses): + results = client.get_users_by_ids(["u1", "u2"]) + + assert len(results) == 2 + assert results[0]["username"] == "alice" + + def test_get_users_by_ids_skips_404(self) -> None: + from urllib.error import HTTPError + + client = _make_client() + + def side_effect(*args, **kwargs): + req = args[0] + if "u2" in req.full_url: + err = HTTPError(req.full_url, 404, "Not Found", {}, None) # type: ignore[arg-type] + err.read = lambda: b'{"detail": "Not found"}' # type: ignore[assignment] + raise err + return _mock_response({"id": "u1", "username": "alice"}) + + with patch("colony_sdk.client.urlopen", side_effect=side_effect): + results = client.get_users_by_ids(["u1", "u2"]) + + assert len(results) == 1 + + +# ── Idempotency ────────────────────────────────────────────────────── + + +class TestIdempotency: + def test_idempotency_key_sent_on_post(self) -> None: + client = _make_client() + captured_headers: dict = {} + + def capture_urlopen(req, **kwargs): + captured_headers.update(dict(req.headers)) + return _mock_response({"id": "p1"}) + + with patch("colony_sdk.client.urlopen", side_effect=capture_urlopen): + client._raw_request("POST", "/posts", body={"title": "T"}, idempotency_key="key-123") + + assert captured_headers.get("X-idempotency-key") == "key-123" + + def test_idempotency_key_not_sent_on_get(self) -> None: + client = _make_client() + captured_headers: dict = {} + + def capture_urlopen(req, **kwargs): + captured_headers.update(dict(req.headers)) + return _mock_response({"id": "u1"}) + + with patch("colony_sdk.client.urlopen", side_effect=capture_urlopen): + client._raw_request("GET", "/users/me", idempotency_key="key-123") + + assert "X-idempotency-key" not in captured_headers + + +# ── py.typed ───────────────────────────────────────────────────────── + + +class TestAsyncCircuitBreaker: + def test_enable(self) -> None: + from colony_sdk import AsyncColonyClient + + client = AsyncColonyClient("col_test") + client.enable_circuit_breaker(3) + assert client._circuit_breaker_threshold == 3 + + @pytest.mark.asyncio + async def test_opens_after_threshold(self) -> None: + from colony_sdk import AsyncColonyClient + + client = AsyncColonyClient("col_test") + client.enable_circuit_breaker(2) + client._consecutive_failures = 2 + + with pytest.raises(ColonyNetworkError, match="Circuit breaker open"): + await client.get_me() + + +class TestAsyncHooks: + def test_register_hooks(self) -> None: + from colony_sdk import AsyncColonyClient + + client = AsyncColonyClient("col_test") + calls: list = [] + client.on_request(lambda m, u, b: calls.append("req")) + client.on_response(lambda m, u, s, d: calls.append("resp")) + assert len(client._on_request) == 1 + assert len(client._on_response) == 1 + + @pytest.mark.asyncio + async def test_on_request_called(self) -> None: + import httpx + + from colony_sdk import AsyncColonyClient + + async def mock_handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"id": "u1", "username": "alice"}) + + transport = httpx.MockTransport(mock_handler) + calls: list = [] + async with AsyncColonyClient("col_test", client=httpx.AsyncClient(transport=transport)) as client: + client._token = "fake" + client._token_expiry = 9999999999 + client.on_request(lambda m, u, b: calls.append((m, u))) + await client.get_me() + + assert len(calls) == 1 + assert calls[0][0] == "GET" + assert "/users/me" in calls[0][1] + + @pytest.mark.asyncio + async def test_on_response_called(self) -> None: + import httpx + + from colony_sdk import AsyncColonyClient + + async def mock_handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"id": "u1", "username": "alice"}) + + transport = httpx.MockTransport(mock_handler) + calls: list = [] + async with AsyncColonyClient("col_test", client=httpx.AsyncClient(transport=transport)) as client: + client._token = "fake" + client._token_expiry = 9999999999 + client.on_response(lambda m, u, s, d: calls.append((m, s, d))) + await client.get_me() + + assert len(calls) == 1 + assert calls[0][1] == 200 + assert calls[0][2]["username"] == "alice" + + +class TestAsyncBatchHelpers: + @pytest.mark.asyncio + async def test_get_posts_by_ids(self) -> None: + import httpx + + from colony_sdk import AsyncColonyClient + + async def mock_handler(request: httpx.Request) -> httpx.Response: + if "p1" in str(request.url): + return httpx.Response(200, json={"id": "p1", "title": "Post 1"}) + return httpx.Response(200, json={"id": "p2", "title": "Post 2"}) + + transport = httpx.MockTransport(mock_handler) + async with AsyncColonyClient("col_test", client=httpx.AsyncClient(transport=transport)) as client: + client._token = "fake" + client._token_expiry = 9999999999 + results = await client.get_posts_by_ids(["p1", "p2"]) + + assert len(results) == 2 + + @pytest.mark.asyncio + async def test_get_posts_by_ids_skips_404(self) -> None: + import httpx + + from colony_sdk import AsyncColonyClient, RetryConfig + + async def mock_handler(request: httpx.Request) -> httpx.Response: + if "p2" in str(request.url): + return httpx.Response(404, json={"detail": "Not found"}) + return httpx.Response(200, json={"id": "p1", "title": "Post 1"}) + + transport = httpx.MockTransport(mock_handler) + async with AsyncColonyClient( + "col_test", + client=httpx.AsyncClient(transport=transport), + retry=RetryConfig(max_retries=0), + ) as client: + client._token = "fake" + client._token_expiry = 9999999999 + results = await client.get_posts_by_ids(["p1", "p2"]) + + assert len(results) == 1 + assert results[0]["id"] == "p1" + + @pytest.mark.asyncio + async def test_get_users_by_ids_skips_404(self) -> None: + import httpx + + from colony_sdk import AsyncColonyClient + + async def mock_handler(request: httpx.Request) -> httpx.Response: + if "u2" in str(request.url): + return httpx.Response(404, json={"detail": "Not found"}) + return httpx.Response(200, json={"id": "u1", "username": "alice"}) + + from colony_sdk import RetryConfig + + transport = httpx.MockTransport(mock_handler) + async with AsyncColonyClient( + "col_test", + client=httpx.AsyncClient(transport=transport), + retry=RetryConfig(max_retries=0), + ) as client: + client._token = "fake" + client._token_expiry = 9999999999 + results = await client.get_users_by_ids(["u1", "u2"]) + + assert len(results) == 1 + assert results[0]["username"] == "alice" + + +class TestPyTyped: + def test_marker_exists(self) -> None: + import importlib.resources + + # py.typed should be accessible as a package resource + files = importlib.resources.files("colony_sdk") + py_typed = files / "py.typed" + assert py_typed.is_file()