Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# Changelog

## 0.4.4 — 11.5.2026.

### Fixed
- Await `async def` task callables submitted through `AsyncClient` instead of
trying to cache the coroutine object.
- Include immutable referenced global values, including globals referenced only
inside nested code objects, in function hashes so global constants invalidate
cached results when changed.
- Resolve nested `ResultRef` / `AsyncResultRef` values inside containers and
record deduplicated input refs in commit metadata.
- Preserve tuple subclasses while resolving refs and keep dict/frozenset
resolution from creating unhashable container members.
- Preserve awaitable objects returned by task functions instead of awaiting
returned values a second time.
- Include stable immutable built-in globals such as `range`, `slice`, and
`datetime` values in function hashes.
- Raise a clear `cashet[redis]` install error when `RedisStore` or
`AsyncRedisStore` is imported from a base install without the Redis extra.

## 0.4.3 — 1.5.2026.

### Fixed
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async def main():
max_workers=1, # max parallelism for submit_many (default: 1, sequential)
)

def square(x: int) -> int:
async def square(x: int) -> int:
return x * x

ref = await client.submit(square, 5)
Expand All @@ -436,7 +436,7 @@ async def main():
asyncio.run(main())
```

`AsyncClient` mirrors `Client` — `submit()`, `submit_many()`, `log()`, `show()`, `get()`, `stats()`, `gc()`, `rm()`, `clear()`, `serve()` are all `async def`. `submit()` returns `AsyncResultRef` with `async load()`. Chain tasks by passing `AsyncResultRef` as an argument.
`AsyncClient` mirrors `Client` — `submit()`, `submit_many()`, `log()`, `show()`, `get()`, `stats()`, `gc()`, `rm()`, `clear()`, `serve()` are all `async def`. `submit()` accepts both sync and async callables, and returns `AsyncResultRef` with `async load()`. Chain tasks by passing `AsyncResultRef` as an argument.

### HTTP Server

Expand Down Expand Up @@ -849,6 +849,7 @@ A lazy reference to a stored result. Pass it as an argument to chain tasks:
```python
step1 = client.submit(func_a, input_data)
step2 = client.submit(func_b, step1) # step1 auto-resolves to its output
step3 = client.submit(func_c, {"payload": step2}) # nested refs resolve too
```

`ResultRef` is generic — `submit()` infers the return type from the function annotation:
Expand Down Expand Up @@ -940,8 +941,9 @@ client.submit(func, arg1, arg2)

**Key design decisions:**

- **Closure variables are not hashed** and emit a `ClosureWarning` if present. Function identity is source code, defaults, keyword defaults, and referenced helper functions; not arbitrary runtime state. If you need cache invalidation based on a value, pass it as an explicit argument.
- **Closure variables are not hashed** and emit a `ClosureWarning` if present. Function identity is source code, defaults, keyword defaults, immutable referenced globals, and referenced helper functions; not arbitrary runtime state. If you need cache invalidation based on a mutable value, pass it as an explicit argument.
- **Referenced user-defined helper functions are hashed recursively.** If your cached function calls or references a helper from your own project (via `co_names` / `globals`), that helper's source is included in the cache key. Change the helper and the caller's cache invalidates. Builtin and stdlib functions are skipped. This behavior is automatic and invisible — no decorators or imports needed.
- **Nested refs resolve through containers.** `ResultRef` and `AsyncResultRef` values inside lists, tuples, sets, frozensets, and dicts are loaded before execution and recorded as commit input refs.
- **Blobs are deduplicated by content hash.** Identical results share one blob on disk.
- **Source is hashed as an AST.** Comments, docstrings, and whitespace changes don't invalidate the cache.
- **Custom object arguments include their class module and qualname** in the argument hash so same-named classes from different modules do not collide.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cashet"
version = "0.4.3"
version = "0.4.4"
description = "A Python memoization cache with Redis, async support, DAG pipelines, and an HTTP server"
readme = "README.md"
license = "MIT"
Expand Down
28 changes: 24 additions & 4 deletions src/cashet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from typing import Any

from cashet.async_client import AsyncClient
from cashet.client import Client
Expand All @@ -17,10 +18,29 @@
from cashet.store import AsyncSQLiteStore, SQLiteStore

try:
from cashet.redis_store import AsyncRedisStore, RedisStore
except ImportError:
AsyncRedisStore = None # type: ignore[misc,assignment]
RedisStore = None # type: ignore[misc,assignment]
from cashet.redis_store import AsyncRedisStore as _AsyncRedisStore
from cashet.redis_store import RedisStore as _RedisStore
except ImportError as e:
_redis_import_error = e

class _UnavailableAsyncRedisStore:
def __init__(self, *args: object, **kwargs: object) -> None:
raise ImportError(
"AsyncRedisStore requires the redis extra. "
"Install it with `cashet[redis]`."
) from _redis_import_error

class _UnavailableRedisStore:
def __init__(self, *args: object, **kwargs: object) -> None:
raise ImportError(
"RedisStore requires the redis extra. Install it with `cashet[redis]`."
) from _redis_import_error

AsyncRedisStore: Any = _UnavailableAsyncRedisStore
RedisStore: Any = _UnavailableRedisStore
else:
AsyncRedisStore = _AsyncRedisStore
RedisStore = _RedisStore

_log_level = os.environ.get("CASHET_LOG", "").upper()
if _log_level in ("DEBUG", "INFO", "WARNING", "ERROR"):
Expand Down
115 changes: 104 additions & 11 deletions src/cashet/async_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import traceback
import weakref
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Hashable
from datetime import UTC, datetime, timedelta
from typing import Any

Expand Down Expand Up @@ -49,6 +49,42 @@ def _is_stale_claim(commit: Commit, ttl: timedelta) -> bool:
return datetime.now(UTC) - commit.claimed_at > ttl


def _contains_resolvable_ref(value: Any, visited: set[int] | None = None) -> bool:
if hasattr(value, "__cashet_ref__"):
return True
if visited is None:
visited = set()
value_id = id(value)
if value_id in visited:
return False
if isinstance(value, dict):
visited.add(value_id)
result = any(_contains_resolvable_ref(v, visited) for v in value.values())
visited.discard(value_id)
return result
if isinstance(value, list | tuple | set | frozenset):
visited.add(value_id)
result = any(_contains_resolvable_ref(item, visited) for item in value)
visited.discard(value_id)
return result
return False


def _rebuild_tuple(value: tuple[Any, ...], resolved_items: list[Any]) -> tuple[Any, ...]:
if all(original is resolved for original, resolved in zip(value, resolved_items, strict=True)):
return value
if type(value) is tuple:
return tuple(resolved_items)
try:
return type(value)(*resolved_items)
except TypeError:
pass
try:
return type(value)(resolved_items)
except TypeError:
return tuple(resolved_items)


class AsyncLocalExecutor:
def __init__(
self, running_ttl: timedelta | None = None, timeout: timedelta | None = None
Expand Down Expand Up @@ -201,15 +237,9 @@ async def _heartbeat() -> None:
try:
resolved_args = await self._resolve_args(args)
resolved_kwargs = await self._resolve_kwargs(kwargs)
if effective_timeout is not None:
result = await asyncio.wait_for(
asyncio.to_thread(func, *resolved_args, **resolved_kwargs),
timeout=effective_timeout.total_seconds(),
)
else:
result = await asyncio.to_thread(
func, *resolved_args, **resolved_kwargs
)
result = await self._call_task(
func, resolved_args, resolved_kwargs, effective_timeout
)
output_ref = await self._store_result(result, store, serializer)
commit.output_ref = output_ref
commit.status = TaskStatus.COMPLETED
Expand Down Expand Up @@ -260,14 +290,55 @@ async def _heartbeat() -> None:

return commit

async def _resolve_value(self, value: Any) -> Any:
async def _resolve_value(self, value: Any, memo: dict[int, Any] | None = None) -> Any:
async_load = getattr(value, "__cashet_async_load__", None)
if async_load is not None:
return await async_load()
if hasattr(value, "__cashet_ref__") and hasattr(value, "load"):
if inspect.iscoroutinefunction(value.load):
return await value.load()
return await asyncio.to_thread(value.load)
if memo is None:
memo = {}
if not _contains_resolvable_ref(value):
return value
value_id = id(value)
if value_id in memo:
return memo[value_id]
if isinstance(value, dict):
dict_result: dict[Any, Any] = {}
memo[value_id] = dict_result
for k, v in value.items():
dict_result[k] = await self._resolve_value(v, memo)
return dict_result
if isinstance(value, list):
list_result: list[Any] = []
memo[value_id] = list_result
for item in value:
list_result.append(await self._resolve_value(item, memo))
return list_result
if isinstance(value, tuple):
resolved_items = [await self._resolve_value(item, memo) for item in value]
tuple_result = _rebuild_tuple(value, resolved_items)
memo[value_id] = tuple_result
return tuple_result
if isinstance(value, set):
set_result: set[Any] = set()
memo[value_id] = set_result
for item in value:
resolved_item = await self._resolve_value(item, memo)
set_result.add(resolved_item if isinstance(resolved_item, Hashable) else item)
return set_result
if isinstance(value, frozenset):
resolved_items = []
for item in value:
resolved_item = await self._resolve_value(item, memo)
resolved_items.append(
resolved_item if isinstance(resolved_item, Hashable) else item
)
frozenset_result = frozenset(resolved_items)
memo[value_id] = frozenset_result
return frozenset_result
Comment on lines +332 to +341
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Medium frozenset resolution may fail with unhashable items

When resolving items inside a frozenset, a resolved value may be unhashable (e.g., a list from a ref), causing a TypeError when constructing the frozenset. Add a hashability check before creating the frozenset.

Suggested change
if isinstance(value, frozenset):
resolved_items = [await self._resolve_value(item, memo) for item in value]
frozenset_result = frozenset(resolved_items)
memo[value_id] = frozenset_result
return frozenset_result
if isinstance(value, frozenset):
resolved_items = [await self._resolve_value(item, memo) for item in value]
for item in resolved_items:
try:
hash(item)
except TypeError:
raise TypeError(
f"Cannot resolve frozenset item {item!r} because it is not hashable"
)
frozenset_result = frozenset(resolved_items)
memo[value_id] = frozenset_result
return frozenset_result

return value

async def _resolve_args(self, args: tuple[Any, ...]) -> tuple[Any, ...]:
Expand All @@ -280,6 +351,28 @@ async def _resolve_kwargs(self, kwargs: dict[str, Any]) -> dict[str, Any]:
resolved[k] = await self._resolve_value(v)
return resolved

async def _call_task(
self,
func: Any,
args: tuple[Any, ...],
kwargs: dict[str, Any],
timeout: timedelta | None,
) -> Any:
if timeout is None:
return await self._call_task_body(func, args, kwargs)
async with asyncio.timeout(timeout.total_seconds()):
return await self._call_task_body(func, args, kwargs)

async def _call_task_body(
self,
func: Any,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Any:
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
return await asyncio.to_thread(func, *args, **kwargs)

async def _store_result(
self, result: Any, store: AsyncStore, serializer: Serializer
) -> ObjectRef:
Expand Down
36 changes: 31 additions & 5 deletions src/cashet/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,41 @@ def __init__(self, key: int | str) -> None:
self.key = key


def _collect_input_refs(value: Any, refs: list[ObjectRef], visited: set[int]) -> None:
if hasattr(value, "__cashet_ref__"):
refs.append(value.__cashet_ref__())
return
value_id = id(value)
if value_id in visited:
return
if isinstance(value, dict):
visited.add(value_id)
for key, val in value.items():
_collect_input_refs(key, refs, visited)
_collect_input_refs(val, refs, visited)
visited.discard(value_id)
elif isinstance(value, list | tuple | set | frozenset):
visited.add(value_id)
for item in value:
_collect_input_refs(item, refs, visited)
visited.discard(value_id)


def resolve_input_refs(args: tuple[Any, ...], kwargs: dict[str, Any]) -> list[ObjectRef]:
refs: list[ObjectRef] = []
visited: set[int] = set()
for arg in args:
if hasattr(arg, "__cashet_ref__"):
refs.append(arg.__cashet_ref__())
_collect_input_refs(arg, refs, visited)
for val in kwargs.values():
if hasattr(val, "__cashet_ref__"):
refs.append(val.__cashet_ref__())
return refs
_collect_input_refs(val, refs, visited)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Medium Input refs not deduplicated causing commit hash variability

The _collect_input_refs function appends refs without deduplication. If the same ref appears multiple times across nested containers, the commit hash changes unnecessarily because the duplicate hashes are included. Deduplicate refs by their hash before returning.

Suggested change
_collect_input_refs(val, refs, visited)
return list({ref.hash: ref for ref in refs}.values())

unique_refs: list[ObjectRef] = []
seen: set[str] = set()
for ref in refs:
if ref.hash in seen:
continue
seen.add(ref.hash)
unique_refs.append(ref)
return unique_refs


class AsyncResultRef(Generic[T]):
Expand Down
Loading
Loading