From 59b11bd5f9a87bfdc1b22329e03fd8fe8b723ac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:51:50 +0200 Subject: [PATCH 01/16] fix: support async task callables --- src/cashet/async_executor.py | 38 +++++++++++++++++++++++++++--------- tests/test_async_client.py | 7 +++++++ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/cashet/async_executor.py b/src/cashet/async_executor.py index f37c545..45aabe0 100644 --- a/src/cashet/async_executor.py +++ b/src/cashet/async_executor.py @@ -201,15 +201,9 @@ async def _heartbeat() -> None: try: resolved_args = await self._resolve_args(args) resolved_kwargs = await self._resolve_kwargs(kwargs) - if effective_timeout is not None: - result = await asyncio.wait_for( - asyncio.to_thread(func, *resolved_args, **resolved_kwargs), - timeout=effective_timeout.total_seconds(), - ) - else: - result = await asyncio.to_thread( - func, *resolved_args, **resolved_kwargs - ) + result = await self._call_task( + func, resolved_args, resolved_kwargs, effective_timeout + ) output_ref = await self._store_result(result, store, serializer) commit.output_ref = output_ref commit.status = TaskStatus.COMPLETED @@ -280,6 +274,32 @@ async def _resolve_kwargs(self, kwargs: dict[str, Any]) -> dict[str, Any]: resolved[k] = await self._resolve_value(v) return resolved + async def _call_task( + self, + func: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + timeout: timedelta | None, + ) -> Any: + if timeout is None: + return await self._call_task_body(func, args, kwargs) + async with asyncio.timeout(timeout.total_seconds()): + return await self._call_task_body(func, args, kwargs) + + async def _call_task_body( + self, + func: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if inspect.iscoroutinefunction(func): + result = await func(*args, **kwargs) + else: + result = await asyncio.to_thread(func, *args, **kwargs) + if inspect.isawaitable(result): + return await result + return result + async def _store_result( self, result: Any, store: AsyncStore, serializer: Serializer ) -> ObjectRef: diff --git a/tests/test_async_client.py b/tests/test_async_client.py index 04da39a..086eb17 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -47,6 +47,13 @@ def greet(name: str, greeting: str = "hello") -> str: ref = await async_client.submit(greet, "world", greeting="hi") assert await ref.load() == "hi, world" + async def test_submit_async_function(self, async_client: AsyncClient) -> None: + async def double(x: int) -> int: + return x * 2 + + ref = await async_client.submit(double, 21) + assert await ref.load() == 42 + async def test_async_result_ref_chaining(self, async_client: AsyncClient) -> None: def step1() -> int: return 10 From a197b52c47e6ec95492f245a534954e7adc72a00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:53:18 +0200 Subject: [PATCH 02/16] fix: include immutable globals in function hashes --- src/cashet/hashing.py | 26 +++++++++++++++++++++++++- tests/test_hashing.py | 13 +++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/cashet/hashing.py b/src/cashet/hashing.py index fa00f4b..a24b1b8 100644 --- a/src/cashet/hashing.py +++ b/src/cashet/hashing.py @@ -258,6 +258,25 @@ def _is_user_function(func: types.FunctionType) -> bool: return not _is_stdlib_or_site_path(mod_file) +_HASHED_GLOBAL_TYPES = (type(None), bool, int, float, str, bytes, complex) + + +def _should_hash_global_value(obj: Any, visited: set[int] | None = None) -> bool: + if isinstance(obj, _HASHED_GLOBAL_TYPES): + return True + if visited is None: + visited = set() + obj_id = id(obj) + if obj_id in visited: + return False + if isinstance(obj, tuple | frozenset): + visited.add(obj_id) + result = all(_should_hash_global_value(item, visited) for item in obj) + visited.discard(obj_id) + return result + return False + + def hash_function( func: types.FunctionType, include_deps: bool = True, @@ -299,11 +318,16 @@ def hash_function( except ValueError: pass for name in sorted(func.__code__.co_names): - ref = func.__globals__.get(name) + if name not in func.__globals__: + continue + ref = func.__globals__[name] if isinstance(ref, types.FunctionType) and _is_user_function(ref): dep_hash = hash_function(ref, include_deps=False, visited=visited) if dep_hash: h.update(f"{name}:{dep_hash}".encode()) + elif _should_hash_global_value(ref): + h.update(f"".encode()) + _stable_hash(ref, h) if non_func_closures: names = ", ".join(non_func_closures) warnings.warn( diff --git a/tests/test_hashing.py b/tests/test_hashing.py index 5f2933d..694204a 100644 --- a/tests/test_hashing.py +++ b/tests/test_hashing.py @@ -407,6 +407,19 @@ def test_exec_function_invalidates_on_global_name_change(self, client: Client) - assert ref1.load() == 1 assert ref2.load() == 2 + def test_exec_function_invalidates_on_global_value_change(self, client: Client) -> None: + namespace: dict[str, Any] = {"MULTIPLIER": 2} + exec("def f(x):\n return x * MULTIPLIER", namespace) + func = namespace["f"] + ref1 = client.submit(func, 10) + + namespace["MULTIPLIER"] = 3 + ref2 = client.submit(func, 10) + + assert ref1.hash != ref2.hash + assert ref1.load() == 20 + assert ref2.load() == 30 + def test_lambda_hashes_by_bytecode(self, client: Client) -> None: f = lambda x: x * 3 # noqa: E731 ref1 = client.submit(f, 4) From 8c72e9122f1e278660138ac7a3923a1e1d1f6fac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:54:46 +0200 Subject: [PATCH 03/16] fix: resolve nested result refs --- src/cashet/async_executor.py | 35 ++++++++++++++++++++++++++++++++++- src/cashet/dag.py | 27 +++++++++++++++++++++++---- tests/test_async_client.py | 14 ++++++++++++++ tests/test_core.py | 15 +++++++++++++++ 4 files changed, 86 insertions(+), 5 deletions(-) diff --git a/src/cashet/async_executor.py b/src/cashet/async_executor.py index 45aabe0..ca52fde 100644 --- a/src/cashet/async_executor.py +++ b/src/cashet/async_executor.py @@ -254,7 +254,7 @@ async def _heartbeat() -> None: return commit - async def _resolve_value(self, value: Any) -> Any: + async def _resolve_value(self, value: Any, visited: set[int] | None = None) -> Any: async_load = getattr(value, "__cashet_async_load__", None) if async_load is not None: return await async_load() @@ -262,6 +262,39 @@ async def _resolve_value(self, value: Any) -> Any: if inspect.iscoroutinefunction(value.load): return await value.load() return await asyncio.to_thread(value.load) + if visited is None: + visited = set() + value_id = id(value) + if value_id in visited: + return value + if isinstance(value, dict): + visited.add(value_id) + resolved = { + await self._resolve_value(k, visited): await self._resolve_value(v, visited) + for k, v in value.items() + } + visited.discard(value_id) + return resolved + if isinstance(value, list): + visited.add(value_id) + resolved = [await self._resolve_value(item, visited) for item in value] + visited.discard(value_id) + return resolved + if isinstance(value, tuple): + visited.add(value_id) + resolved = tuple(await self._resolve_value(item, visited) for item in value) + visited.discard(value_id) + return resolved + if isinstance(value, set): + visited.add(value_id) + resolved = {await self._resolve_value(item, visited) for item in value} + visited.discard(value_id) + return resolved + if isinstance(value, frozenset): + visited.add(value_id) + resolved = frozenset(await self._resolve_value(item, visited) for item in value) + visited.discard(value_id) + return resolved return value async def _resolve_args(self, args: tuple[Any, ...]) -> tuple[Any, ...]: diff --git a/src/cashet/dag.py b/src/cashet/dag.py index 45537f5..2918b48 100644 --- a/src/cashet/dag.py +++ b/src/cashet/dag.py @@ -18,14 +18,33 @@ def __init__(self, key: int | str) -> None: self.key = key +def _collect_input_refs(value: Any, refs: list[ObjectRef], visited: set[int]) -> None: + if hasattr(value, "__cashet_ref__"): + refs.append(value.__cashet_ref__()) + return + value_id = id(value) + if value_id in visited: + return + if isinstance(value, dict): + visited.add(value_id) + for key, val in value.items(): + _collect_input_refs(key, refs, visited) + _collect_input_refs(val, refs, visited) + visited.discard(value_id) + elif isinstance(value, list | tuple | set | frozenset): + visited.add(value_id) + for item in value: + _collect_input_refs(item, refs, visited) + visited.discard(value_id) + + def resolve_input_refs(args: tuple[Any, ...], kwargs: dict[str, Any]) -> list[ObjectRef]: refs: list[ObjectRef] = [] + visited: set[int] = set() for arg in args: - if hasattr(arg, "__cashet_ref__"): - refs.append(arg.__cashet_ref__()) + _collect_input_refs(arg, refs, visited) for val in kwargs.values(): - if hasattr(val, "__cashet_ref__"): - refs.append(val.__cashet_ref__()) + _collect_input_refs(val, refs, visited) return refs diff --git a/tests/test_async_client.py b/tests/test_async_client.py index 086eb17..d446e9a 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -68,6 +68,20 @@ def step2(x: int) -> int: assert commit is not None assert [ref.hash for ref in commit.input_refs] == [r1.hash] + async def test_nested_async_result_ref_chaining(self, async_client: AsyncClient) -> None: + def step1() -> int: + return 10 + + def step2(payload: dict[str, int]) -> int: + return payload["value"] * 3 + + r1 = await async_client.submit(step1) + r2 = await async_client.submit(step2, {"value": r1}) + assert await r2.load() == 30 + commit = await async_client.show(r2.commit_hash) + assert commit is not None + assert [ref.hash for ref in commit.input_refs] == [r1.hash] + class TestAsyncClientSubmitMany: async def test_submit_many_basic(self, async_client: AsyncClient) -> None: diff --git a/tests/test_core.py b/tests/test_core.py index 0ee4ed4..877082d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -223,6 +223,21 @@ def double(data: list[int]) -> list[int]: result_ref = client.submit(double, data_ref) assert result_ref.load() == [2, 4, 6] + def test_nested_result_ref_as_input(self, client: Client) -> None: + def gen_data() -> list[int]: + return [1, 2, 3] + + def double(payload: dict[str, list[int]]) -> list[int]: + return [x * 2 for x in payload["data"]] + + data_ref = client.submit(gen_data) + result_ref = client.submit(double, {"data": data_ref}) + assert result_ref.load() == [2, 4, 6] + + commit = client.show(result_ref.commit_hash) + assert commit is not None + assert [ref.hash for ref in commit.input_refs] == [data_ref.hash] + def test_chained_pipeline(self, client: Client) -> None: def step1() -> int: return 10 From abac95dd49190033627073ea4da8ea5cddae9a44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:55:41 +0200 Subject: [PATCH 04/16] fix: raise helpful Redis extra errors --- src/cashet/__init__.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/cashet/__init__.py b/src/cashet/__init__.py index c1b29b2..baf223a 100644 --- a/src/cashet/__init__.py +++ b/src/cashet/__init__.py @@ -18,9 +18,21 @@ try: from cashet.redis_store import AsyncRedisStore, RedisStore -except ImportError: - AsyncRedisStore = None # type: ignore[misc,assignment] - RedisStore = None # type: ignore[misc,assignment] +except ImportError as e: + _redis_import_error = e + + class AsyncRedisStore: # type: ignore[no-redef] + def __init__(self, *args: object, **kwargs: object) -> None: + raise ImportError( + "AsyncRedisStore requires the redis extra. " + "Install it with `cashet[redis]`." + ) from _redis_import_error + + class RedisStore: # type: ignore[no-redef] + def __init__(self, *args: object, **kwargs: object) -> None: + raise ImportError( + "RedisStore requires the redis extra. Install it with `cashet[redis]`." + ) from _redis_import_error _log_level = os.environ.get("CASHET_LOG", "").upper() if _log_level in ("DEBUG", "INFO", "WARNING", "ERROR"): From ddc51a5094f31a5e69e2dc28f508beae34a03c45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:58:11 +0200 Subject: [PATCH 05/16] fix: preserve recursive containers while resolving refs --- src/cashet/async_executor.py | 56 ++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/cashet/async_executor.py b/src/cashet/async_executor.py index ca52fde..6a925fb 100644 --- a/src/cashet/async_executor.py +++ b/src/cashet/async_executor.py @@ -254,7 +254,7 @@ async def _heartbeat() -> None: return commit - async def _resolve_value(self, value: Any, visited: set[int] | None = None) -> Any: + async def _resolve_value(self, value: Any, memo: dict[int, Any] | None = None) -> Any: async_load = getattr(value, "__cashet_async_load__", None) if async_load is not None: return await async_load() @@ -262,39 +262,39 @@ async def _resolve_value(self, value: Any, visited: set[int] | None = None) -> A if inspect.iscoroutinefunction(value.load): return await value.load() return await asyncio.to_thread(value.load) - if visited is None: - visited = set() + if memo is None: + memo = {} value_id = id(value) - if value_id in visited: - return value + if value_id in memo: + return memo[value_id] if isinstance(value, dict): - visited.add(value_id) - resolved = { - await self._resolve_value(k, visited): await self._resolve_value(v, visited) - for k, v in value.items() - } - visited.discard(value_id) - return resolved + dict_result: dict[Any, Any] = {} + memo[value_id] = dict_result + for k, v in value.items(): + dict_result[await self._resolve_value(k, memo)] = await self._resolve_value(v, memo) + return dict_result if isinstance(value, list): - visited.add(value_id) - resolved = [await self._resolve_value(item, visited) for item in value] - visited.discard(value_id) - return resolved + list_result: list[Any] = [] + memo[value_id] = list_result + for item in value: + list_result.append(await self._resolve_value(item, memo)) + return list_result if isinstance(value, tuple): - visited.add(value_id) - resolved = tuple(await self._resolve_value(item, visited) for item in value) - visited.discard(value_id) - return resolved + resolved_items = [await self._resolve_value(item, memo) for item in value] + tuple_result = tuple(resolved_items) + memo[value_id] = tuple_result + return tuple_result if isinstance(value, set): - visited.add(value_id) - resolved = {await self._resolve_value(item, visited) for item in value} - visited.discard(value_id) - return resolved + set_result: set[Any] = set() + memo[value_id] = set_result + for item in value: + set_result.add(await self._resolve_value(item, memo)) + return set_result if isinstance(value, frozenset): - visited.add(value_id) - resolved = frozenset(await self._resolve_value(item, visited) for item in value) - visited.discard(value_id) - return resolved + resolved_items = [await self._resolve_value(item, memo) for item in value] + frozenset_result = frozenset(resolved_items) + memo[value_id] = frozenset_result + return frozenset_result return value async def _resolve_args(self, args: tuple[Any, ...]) -> tuple[Any, ...]: From 094f8ecad0c591dfc760afda53bef763b93b764c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:58:18 +0200 Subject: [PATCH 06/16] fix: type Redis extra fallback exports --- src/cashet/__init__.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/cashet/__init__.py b/src/cashet/__init__.py index baf223a..83a7cd4 100644 --- a/src/cashet/__init__.py +++ b/src/cashet/__init__.py @@ -1,5 +1,6 @@ import logging import os +from typing import Any from cashet.async_client import AsyncClient from cashet.client import Client @@ -17,23 +18,30 @@ from cashet.store import AsyncSQLiteStore, SQLiteStore try: - from cashet.redis_store import AsyncRedisStore, RedisStore + from cashet.redis_store import AsyncRedisStore as _AsyncRedisStore + from cashet.redis_store import RedisStore as _RedisStore except ImportError as e: _redis_import_error = e - class AsyncRedisStore: # type: ignore[no-redef] + class _UnavailableAsyncRedisStore: def __init__(self, *args: object, **kwargs: object) -> None: raise ImportError( "AsyncRedisStore requires the redis extra. " "Install it with `cashet[redis]`." ) from _redis_import_error - class RedisStore: # type: ignore[no-redef] + class _UnavailableRedisStore: def __init__(self, *args: object, **kwargs: object) -> None: raise ImportError( "RedisStore requires the redis extra. Install it with `cashet[redis]`." ) from _redis_import_error + AsyncRedisStore: Any = _UnavailableAsyncRedisStore + RedisStore: Any = _UnavailableRedisStore +else: + AsyncRedisStore = _AsyncRedisStore + RedisStore = _RedisStore + _log_level = os.environ.get("CASHET_LOG", "").upper() if _log_level in ("DEBUG", "INFO", "WARNING", "ERROR"): _handler = logging.StreamHandler() From 77f040e76fb9dd69f648f424791976e0d0722568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:58:49 +0200 Subject: [PATCH 07/16] style: wrap nested ref resolver assignment --- src/cashet/async_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cashet/async_executor.py b/src/cashet/async_executor.py index 6a925fb..16c0ba3 100644 --- a/src/cashet/async_executor.py +++ b/src/cashet/async_executor.py @@ -271,7 +271,8 @@ async def _resolve_value(self, value: Any, memo: dict[int, Any] | None = None) - dict_result: dict[Any, Any] = {} memo[value_id] = dict_result for k, v in value.items(): - dict_result[await self._resolve_value(k, memo)] = await self._resolve_value(v, memo) + key = await self._resolve_value(k, memo) + dict_result[key] = await self._resolve_value(v, memo) return dict_result if isinstance(value, list): list_result: list[Any] = [] From a74c4392ca17ae9bdada6eb6d877bd361309d98c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 01:59:49 +0200 Subject: [PATCH 08/16] chore: bump version to 0.4.4 --- CHANGELOG.md | 12 ++++++++++++ pyproject.toml | 2 +- uv.lock | 37 ++++++++++++++++++++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c918165..01a3aaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 0.4.4 — 11.5.2026. + +### Fixed +- Await `async def` task callables submitted through `AsyncClient` instead of + trying to cache the coroutine object. +- Include immutable referenced global values in function hashes so global + constants invalidate cached results when changed. +- Resolve nested `ResultRef` / `AsyncResultRef` values inside containers and + record them as input refs in commit metadata. +- Raise a clear `cashet[redis]` install error when `RedisStore` or + `AsyncRedisStore` is imported from a base install without the Redis extra. + ## 0.4.3 — 1.5.2026. ### Fixed diff --git a/pyproject.toml b/pyproject.toml index 21c9f11..1819618 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cashet" -version = "0.4.3" +version = "0.4.4" description = "A Python memoization cache with Redis, async support, DAG pipelines, and an HTTP server" readme = "README.md" license = "MIT" diff --git a/uv.lock b/uv.lock index 43e64f8..c084fac 100644 --- a/uv.lock +++ b/uv.lock @@ -26,7 +26,7 @@ wheels = [ [[package]] name = "cashet" -version = "0.4.0" +version = "0.4.4" source = { editable = "." } dependencies = [ { name = "click" }, @@ -46,6 +46,7 @@ server = [ [package.dev-dependencies] dev = [ + { name = "freezegun" }, { name = "httpx" }, { name = "pyright" }, { name = "pytest" }, @@ -68,6 +69,7 @@ provides-extras = ["redis", "server"] [package.metadata.requires-dev] dev = [ + { name = "freezegun", specifier = ">=1.0" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "pyright", specifier = ">=1.1" }, { name = "pytest", specifier = ">=8.0" }, @@ -228,6 +230,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/a5/842ae8f0c08b61d6484b52f99a03510a3a72d23141942d216ebe81fefbce/filelock-3.25.2-py3-none-any.whl", hash = "sha256:ca8afb0da15f229774c9ad1b455ed96e85a81373065fb10446672f64444ddf70", size = 26759, upload-time = "2026-03-11T20:45:37.437Z" }, ] +[[package]] +name = "freezegun" +version = "1.5.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/dd/23e2f4e357f8fd3bdff613c1fe4466d21bfb00a6177f238079b17f7b1c84/freezegun-1.5.5.tar.gz", hash = "sha256:ac7742a6cc6c25a2c35e9292dfd554b897b517d2dec26891a2e8debf205cb94a", size = 35914, upload-time = "2025-08-09T10:39:08.338Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/2e/b41d8a1a917d6581fc27a35d05561037b048e47df50f27f8ac9c7e27a710/freezegun-1.5.5-py3-none-any.whl", hash = "sha256:cd557f4a75cf074e84bc374249b9dd491eaeacd61376b9eb3c423282211619d2", size = 19266, upload-time = "2025-08-09T10:39:06.636Z" }, +] + [[package]] name = "h11" version = "0.16.0" @@ -396,6 +410,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9d/7a/d968e294073affff457b041c2be9868a40c1c71f4a35fcc1e45e5493067b/pytest_cov-7.1.0-py3-none-any.whl", hash = "sha256:a0461110b7865f9a271aa1b51e516c9a95de9d696734a2f71e3e78f46e1d4678", size = 22876, upload-time = "2026-03-21T20:11:14.438Z" }, ] +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, +] + [[package]] name = "redis" version = "7.4.0" @@ -446,6 +472,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/58/ed/dea90a65b7d9e69888890fb14c90d7f51bf0c1e82ad800aeb0160e4bacfd/ruff-0.15.10-py3-none-win_arm64.whl", hash = "sha256:601d1610a9e1f1c2165a4f561eeaa2e2ea1e97f3287c5aa258d3dab8b57c6188", size = 11035607, upload-time = "2026-04-09T14:05:47.593Z" }, ] +[[package]] +name = "six" +version = "1.17.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, +] + [[package]] name = "starlette" version = "1.0.0" From af67a6666cd39a9469c72ad9ce1ba4e789740180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 02:03:17 +0200 Subject: [PATCH 09/16] docs: document 0.4.4 behavior --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d9b1416..d18162d 100644 --- a/README.md +++ b/README.md @@ -426,7 +426,7 @@ async def main(): max_workers=1, # max parallelism for submit_many (default: 1, sequential) ) - def square(x: int) -> int: + async def square(x: int) -> int: return x * x ref = await client.submit(square, 5) @@ -436,7 +436,7 @@ async def main(): asyncio.run(main()) ``` -`AsyncClient` mirrors `Client` — `submit()`, `submit_many()`, `log()`, `show()`, `get()`, `stats()`, `gc()`, `rm()`, `clear()`, `serve()` are all `async def`. `submit()` returns `AsyncResultRef` with `async load()`. Chain tasks by passing `AsyncResultRef` as an argument. +`AsyncClient` mirrors `Client` — `submit()`, `submit_many()`, `log()`, `show()`, `get()`, `stats()`, `gc()`, `rm()`, `clear()`, `serve()` are all `async def`. `submit()` accepts both sync and async callables, and returns `AsyncResultRef` with `async load()`. Chain tasks by passing `AsyncResultRef` as an argument. ### HTTP Server @@ -849,6 +849,7 @@ A lazy reference to a stored result. Pass it as an argument to chain tasks: ```python step1 = client.submit(func_a, input_data) step2 = client.submit(func_b, step1) # step1 auto-resolves to its output +step3 = client.submit(func_c, {"payload": step2}) # nested refs resolve too ``` `ResultRef` is generic — `submit()` infers the return type from the function annotation: @@ -940,8 +941,9 @@ client.submit(func, arg1, arg2) **Key design decisions:** -- **Closure variables are not hashed** and emit a `ClosureWarning` if present. Function identity is source code, defaults, keyword defaults, and referenced helper functions; not arbitrary runtime state. If you need cache invalidation based on a value, pass it as an explicit argument. +- **Closure variables are not hashed** and emit a `ClosureWarning` if present. Function identity is source code, defaults, keyword defaults, immutable referenced globals, and referenced helper functions; not arbitrary runtime state. If you need cache invalidation based on a mutable value, pass it as an explicit argument. - **Referenced user-defined helper functions are hashed recursively.** If your cached function calls or references a helper from your own project (via `co_names` / `globals`), that helper's source is included in the cache key. Change the helper and the caller's cache invalidates. Builtin and stdlib functions are skipped. This behavior is automatic and invisible — no decorators or imports needed. +- **Nested refs resolve through containers.** `ResultRef` and `AsyncResultRef` values inside lists, tuples, sets, frozensets, and dicts are loaded before execution and recorded as commit input refs. - **Blobs are deduplicated by content hash.** Identical results share one blob on disk. - **Source is hashed as an AST.** Comments, docstrings, and whitespace changes don't invalidate the cache. - **Custom object arguments include their class module and qualname** in the argument hash so same-named classes from different modules do not collide. From b9477115926502041efa0fac33505a40a50cc688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 02:29:46 +0200 Subject: [PATCH 10/16] fix: hash globals referenced by nested code --- src/cashet/hashing.py | 16 +++++++++++++++- tests/test_hashing.py | 13 +++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/cashet/hashing.py b/src/cashet/hashing.py index a24b1b8..6d2b7a6 100644 --- a/src/cashet/hashing.py +++ b/src/cashet/hashing.py @@ -277,6 +277,20 @@ def _should_hash_global_value(obj: Any, visited: set[int] | None = None) -> bool return False +def _code_names(code: types.CodeType, visited: set[int] | None = None) -> set[str]: + if visited is None: + visited = set() + code_id = id(code) + if code_id in visited: + return set() + visited.add(code_id) + names = set(code.co_names) + for const in code.co_consts: + if isinstance(const, types.CodeType): + names.update(_code_names(const, visited)) + return names + + def hash_function( func: types.FunctionType, include_deps: bool = True, @@ -317,7 +331,7 @@ def hash_function( non_func_closures.append(name) except ValueError: pass - for name in sorted(func.__code__.co_names): + for name in sorted(_code_names(func.__code__)): if name not in func.__globals__: continue ref = func.__globals__[name] diff --git a/tests/test_hashing.py b/tests/test_hashing.py index 694204a..8293410 100644 --- a/tests/test_hashing.py +++ b/tests/test_hashing.py @@ -420,6 +420,19 @@ def test_exec_function_invalidates_on_global_value_change(self, client: Client) assert ref1.load() == 20 assert ref2.load() == 30 + def test_comprehension_invalidates_on_global_value_change(self, client: Client) -> None: + namespace: dict[str, Any] = {"MULTIPLIER": 2} + exec("def f(xs):\n return [x * MULTIPLIER for x in xs]", namespace) + func = namespace["f"] + ref1 = client.submit(func, [10]) + + namespace["MULTIPLIER"] = 3 + ref2 = client.submit(func, [10]) + + assert ref1.hash != ref2.hash + assert ref1.load() == [20] + assert ref2.load() == [30] + def test_lambda_hashes_by_bytecode(self, client: Client) -> None: f = lambda x: x * 3 # noqa: E731 ref1 = client.submit(f, 4) From 2b2b29d00fae81e06aae02540e30fe17116ec1e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 02:30:56 +0200 Subject: [PATCH 11/16] fix: preserve hashable containers during ref resolution --- src/cashet/async_executor.py | 55 ++++++++++++++++++++++++++++++++---- tests/test_core.py | 32 ++++++++++++++++++++- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/src/cashet/async_executor.py b/src/cashet/async_executor.py index 16c0ba3..898a9e4 100644 --- a/src/cashet/async_executor.py +++ b/src/cashet/async_executor.py @@ -7,7 +7,7 @@ import time import traceback import weakref -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Hashable from datetime import UTC, datetime, timedelta from typing import Any @@ -49,6 +49,42 @@ def _is_stale_claim(commit: Commit, ttl: timedelta) -> bool: return datetime.now(UTC) - commit.claimed_at > ttl +def _contains_resolvable_ref(value: Any, visited: set[int] | None = None) -> bool: + if hasattr(value, "__cashet_ref__"): + return True + if visited is None: + visited = set() + value_id = id(value) + if value_id in visited: + return False + if isinstance(value, dict): + visited.add(value_id) + result = any(_contains_resolvable_ref(v, visited) for v in value.values()) + visited.discard(value_id) + return result + if isinstance(value, list | tuple | set | frozenset): + visited.add(value_id) + result = any(_contains_resolvable_ref(item, visited) for item in value) + visited.discard(value_id) + return result + return False + + +def _rebuild_tuple(value: tuple[Any, ...], resolved_items: list[Any]) -> tuple[Any, ...]: + if all(original is resolved for original, resolved in zip(value, resolved_items, strict=True)): + return value + if type(value) is tuple: + return tuple(resolved_items) + try: + return type(value)(*resolved_items) + except TypeError: + pass + try: + return type(value)(resolved_items) + except TypeError: + return tuple(resolved_items) + + class AsyncLocalExecutor: def __init__( self, running_ttl: timedelta | None = None, timeout: timedelta | None = None @@ -264,6 +300,8 @@ async def _resolve_value(self, value: Any, memo: dict[int, Any] | None = None) - return await asyncio.to_thread(value.load) if memo is None: memo = {} + if not _contains_resolvable_ref(value): + return value value_id = id(value) if value_id in memo: return memo[value_id] @@ -271,8 +309,7 @@ async def _resolve_value(self, value: Any, memo: dict[int, Any] | None = None) - dict_result: dict[Any, Any] = {} memo[value_id] = dict_result for k, v in value.items(): - key = await self._resolve_value(k, memo) - dict_result[key] = await self._resolve_value(v, memo) + dict_result[k] = await self._resolve_value(v, memo) return dict_result if isinstance(value, list): list_result: list[Any] = [] @@ -282,17 +319,23 @@ async def _resolve_value(self, value: Any, memo: dict[int, Any] | None = None) - return list_result if isinstance(value, tuple): resolved_items = [await self._resolve_value(item, memo) for item in value] - tuple_result = tuple(resolved_items) + tuple_result = _rebuild_tuple(value, resolved_items) memo[value_id] = tuple_result return tuple_result if isinstance(value, set): set_result: set[Any] = set() memo[value_id] = set_result for item in value: - set_result.add(await self._resolve_value(item, memo)) + resolved_item = await self._resolve_value(item, memo) + set_result.add(resolved_item if isinstance(resolved_item, Hashable) else item) return set_result if isinstance(value, frozenset): - resolved_items = [await self._resolve_value(item, memo) for item in value] + resolved_items = [] + for item in value: + resolved_item = await self._resolve_value(item, memo) + resolved_items.append( + resolved_item if isinstance(resolved_item, Hashable) else item + ) frozenset_result = frozenset(resolved_items) memo[value_id] = frozenset_result return frozenset_result diff --git a/tests/test_core.py b/tests/test_core.py index 877082d..7cfb668 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,7 +1,7 @@ from __future__ import annotations from pathlib import Path -from typing import Any +from typing import Any, NamedTuple import pytest @@ -238,6 +238,36 @@ def double(payload: dict[str, list[int]]) -> list[int]: assert commit is not None assert [ref.hash for ref in commit.input_refs] == [data_ref.hash] + def test_tuple_subclass_arg_preserved_without_refs(self, client: Client) -> None: + class Point(NamedTuple): + x: int + y: int + + def add_point(point: Point) -> int: + return point.x + point.y + + assert client.submit(add_point, Point(1, 2)).load() == 3 + + def test_ref_dict_key_not_resolved_to_unhashable_value(self, client: Client) -> None: + def gen_data() -> list[int]: + return [1, 2, 3] + + def count_keys(payload: dict[object, str]) -> int: + return len(payload) + + data_ref = client.submit(gen_data) + assert client.submit(count_keys, {data_ref: "value"}).load() == 1 + + def test_ref_in_frozenset_stays_hashable_if_result_is_not(self, client: Client) -> None: + def gen_data() -> list[int]: + return [1, 2, 3] + + def count_items(items: frozenset[object]) -> int: + return len(items) + + data_ref = client.submit(gen_data) + assert client.submit(count_items, frozenset({data_ref})).load() == 1 + def test_chained_pipeline(self, client: Client) -> None: def step1() -> int: return 10 From 6a0bf48da5bea904048d1ad89f3af4f3bd850b1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 02:31:36 +0200 Subject: [PATCH 12/16] fix: deduplicate nested input refs --- src/cashet/dag.py | 9 ++++++++- tests/test_core.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/cashet/dag.py b/src/cashet/dag.py index 2918b48..1151f11 100644 --- a/src/cashet/dag.py +++ b/src/cashet/dag.py @@ -45,7 +45,14 @@ def resolve_input_refs(args: tuple[Any, ...], kwargs: dict[str, Any]) -> list[Ob _collect_input_refs(arg, refs, visited) for val in kwargs.values(): _collect_input_refs(val, refs, visited) - return refs + unique_refs: list[ObjectRef] = [] + seen: set[str] = set() + for ref in refs: + if ref.hash in seen: + continue + seen.add(ref.hash) + unique_refs.append(ref) + return unique_refs class AsyncResultRef(Generic[T]): diff --git a/tests/test_core.py b/tests/test_core.py index 7cfb668..840e8f8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -268,6 +268,21 @@ def count_items(items: frozenset[object]) -> int: data_ref = client.submit(gen_data) assert client.submit(count_items, frozenset({data_ref})).load() == 1 + def test_duplicate_nested_input_refs_are_deduplicated(self, client: Client) -> None: + def gen_data() -> int: + return 10 + + def total(values: list[int]) -> int: + return sum(values) + + data_ref = client.submit(gen_data) + result_ref = client.submit(total, [data_ref, data_ref]) + assert result_ref.load() == 20 + + commit = client.show(result_ref.commit_hash) + assert commit is not None + assert [ref.hash for ref in commit.input_refs] == [data_ref.hash] + def test_chained_pipeline(self, client: Client) -> None: def step1() -> int: return 10 From 83d34f8245ba87181ccc9e0ba54ae00ee3e7397d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 02:45:25 +0200 Subject: [PATCH 13/16] docs: update 0.4.4 review fixes --- CHANGELOG.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01a3aaa..689b55e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,13 @@ ### Fixed - Await `async def` task callables submitted through `AsyncClient` instead of trying to cache the coroutine object. -- Include immutable referenced global values in function hashes so global - constants invalidate cached results when changed. +- Include immutable referenced global values, including globals referenced only + inside nested code objects, in function hashes so global constants invalidate + cached results when changed. - Resolve nested `ResultRef` / `AsyncResultRef` values inside containers and - record them as input refs in commit metadata. + record deduplicated input refs in commit metadata. +- Preserve tuple subclasses while resolving refs and keep dict/frozenset + resolution from creating unhashable container members. - Raise a clear `cashet[redis]` install error when `RedisStore` or `AsyncRedisStore` is imported from a base install without the Redis extra. From 87bc38a7a056c8c539faadc314a0cc5ed761e3d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 03:00:57 +0200 Subject: [PATCH 14/16] fix: preserve awaitable task return values --- src/cashet/async_executor.py | 8 ++------ tests/test_async_client.py | 37 ++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/cashet/async_executor.py b/src/cashet/async_executor.py index 898a9e4..8a975dd 100644 --- a/src/cashet/async_executor.py +++ b/src/cashet/async_executor.py @@ -370,12 +370,8 @@ async def _call_task_body( kwargs: dict[str, Any], ) -> Any: if inspect.iscoroutinefunction(func): - result = await func(*args, **kwargs) - else: - result = await asyncio.to_thread(func, *args, **kwargs) - if inspect.isawaitable(result): - return await result - return result + return await func(*args, **kwargs) + return await asyncio.to_thread(func, *args, **kwargs) async def _store_result( self, result: Any, store: AsyncStore, serializer: Serializer diff --git a/tests/test_async_client.py b/tests/test_async_client.py index d446e9a..5354838 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -1,6 +1,8 @@ from __future__ import annotations +from collections.abc import Generator from pathlib import Path +from typing import Any import pytest import pytest_asyncio @@ -9,6 +11,17 @@ from cashet.dag import AsyncResultRef, TaskRef +class DeferredResult: + def __init__(self, value: str) -> None: + self.value = value + + def __await__(self) -> Generator[Any, None, str]: + async def run() -> str: + return f"executed:{self.value}" + + return run().__await__() + + @pytest_asyncio.fixture async def async_client(tmp_path: Path) -> AsyncClient: return AsyncClient(store_dir=tmp_path / ".cashet") @@ -54,6 +67,30 @@ async def double(x: int) -> int: ref = await async_client.submit(double, 21) assert await ref.load() == 42 + async def test_submit_sync_function_returning_awaitable_value( + self, async_client: AsyncClient + ) -> None: + def returns_deferred() -> DeferredResult: + return DeferredResult("sync") + + ref = await async_client.submit(returns_deferred) + result = await ref.load() + + assert isinstance(result, DeferredResult) + assert result.value == "sync" + + async def test_submit_async_function_returning_awaitable_value( + self, async_client: AsyncClient + ) -> None: + async def returns_deferred() -> DeferredResult: + return DeferredResult("async") + + ref = await async_client.submit(returns_deferred) + result = await ref.load() + + assert isinstance(result, DeferredResult) + assert result.value == "async" + async def test_async_result_ref_chaining(self, async_client: AsyncClient) -> None: def step1() -> int: return 10 From a8345d40c16238880b2c2c7afa603c52db6b5dc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 03:01:39 +0200 Subject: [PATCH 15/16] fix: hash more immutable global constants --- src/cashet/hashing.py | 25 ++++++++++++++++++++++++- tests/test_hashing.py | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/src/cashet/hashing.py b/src/cashet/hashing.py index 6d2b7a6..5cfa771 100644 --- a/src/cashet/hashing.py +++ b/src/cashet/hashing.py @@ -1,6 +1,7 @@ from __future__ import annotations import ast +import datetime as _datetime import hashlib import inspect import io @@ -258,7 +259,21 @@ def _is_user_function(func: types.FunctionType) -> bool: return not _is_stdlib_or_site_path(mod_file) -_HASHED_GLOBAL_TYPES = (type(None), bool, int, float, str, bytes, complex) +_HASHED_GLOBAL_TYPES = ( + type(None), + bool, + int, + float, + str, + bytes, + complex, + range, + _datetime.date, + _datetime.datetime, + _datetime.time, + _datetime.timedelta, + _datetime.timezone, +) def _should_hash_global_value(obj: Any, visited: set[int] | None = None) -> bool: @@ -269,6 +284,14 @@ def _should_hash_global_value(obj: Any, visited: set[int] | None = None) -> bool obj_id = id(obj) if obj_id in visited: return False + if isinstance(obj, slice): + visited.add(obj_id) + result = all( + _should_hash_global_value(item, visited) + for item in (obj.start, obj.stop, obj.step) + ) + visited.discard(obj_id) + return result if isinstance(obj, tuple | frozenset): visited.add(obj_id) result = all(_should_hash_global_value(item, visited) for item in obj) diff --git a/tests/test_hashing.py b/tests/test_hashing.py index 8293410..23d2bd0 100644 --- a/tests/test_hashing.py +++ b/tests/test_hashing.py @@ -1,6 +1,7 @@ from __future__ import annotations import warnings +from datetime import date from typing import Any from cashet import Client @@ -433,6 +434,45 @@ def test_comprehension_invalidates_on_global_value_change(self, client: Client) assert ref1.load() == [20] assert ref2.load() == [30] + def test_exec_function_invalidates_on_range_global_change(self, client: Client) -> None: + namespace: dict[str, Any] = {"WINDOW": range(2)} + exec("def f():\n return list(WINDOW)", namespace) + func = namespace["f"] + ref1 = client.submit(func) + + namespace["WINDOW"] = range(3) + ref2 = client.submit(func) + + assert ref1.hash != ref2.hash + assert ref1.load() == [0, 1] + assert ref2.load() == [0, 1, 2] + + def test_exec_function_invalidates_on_slice_global_change(self, client: Client) -> None: + namespace: dict[str, Any] = {"PART": slice(0, 2)} + exec("def f(xs):\n return xs[PART]", namespace) + func = namespace["f"] + ref1 = client.submit(func, [1, 2, 3]) + + namespace["PART"] = slice(1, 3) + ref2 = client.submit(func, [1, 2, 3]) + + assert ref1.hash != ref2.hash + assert ref1.load() == [1, 2] + assert ref2.load() == [2, 3] + + def test_exec_function_invalidates_on_date_global_change(self, client: Client) -> None: + namespace: dict[str, Any] = {"START": date(2026, 5, 11)} + exec("def f():\n return START.isoformat()", namespace) + func = namespace["f"] + ref1 = client.submit(func) + + namespace["START"] = date(2026, 5, 12) + ref2 = client.submit(func) + + assert ref1.hash != ref2.hash + assert ref1.load() == "2026-05-11" + assert ref2.load() == "2026-05-12" + def test_lambda_hashes_by_bytecode(self, client: Client) -> None: f = lambda x: x * 3 # noqa: E731 ref1 = client.submit(f, 4) From 9f171e27b66837634a845e04c13bb8b2fb808c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Mon, 11 May 2026 03:01:57 +0200 Subject: [PATCH 16/16] docs: update 0.4.4 reviewer fixes --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 689b55e..d20d4da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ record deduplicated input refs in commit metadata. - Preserve tuple subclasses while resolving refs and keep dict/frozenset resolution from creating unhashable container members. +- Preserve awaitable objects returned by task functions instead of awaiting + returned values a second time. +- Include stable immutable built-in globals such as `range`, `slice`, and + `datetime` values in function hashes. - Raise a clear `cashet[redis]` install error when `RedisStore` or `AsyncRedisStore` is imported from a base install without the Redis extra.