diff --git a/changes/3366.feature.md b/changes/3366.feature.md new file mode 100644 index 0000000000..550c9f4436 --- /dev/null +++ b/changes/3366.feature.md @@ -0,0 +1 @@ +Adds `zarr.experimental.cache_store.CacheStore`, a `Store` that implements caching by combining two other `Store` instances. See the [docs page](https://zarr.readthedocs.io/en/latest/user-guide/experimental#cachestore) for more information about this feature. \ No newline at end of file diff --git a/docs/user-guide/experimental.md b/docs/user-guide/experimental.md new file mode 100644 index 0000000000..aead2dedab --- /dev/null +++ b/docs/user-guide/experimental.md @@ -0,0 +1,272 @@ +# Experimental features + +This section contains documentation for experimental Zarr Python features. The features described here are exciting and potentially useful, but also volatile -- we might change them at any time. Take this into account if you consider depending on these features. + +## `CacheStore` + +Zarr Python 3.1.4 adds `zarr.experimental.cache_store.CacheStore` provides a dual-store caching implementation +that can be wrapped around any Zarr store to improve performance for repeated data access. +This is particularly useful when working with remote stores (e.g., S3, HTTP) where network +latency can significantly impact data access speed. + +The CacheStore implements a cache that uses a separate Store instance as the cache backend, +providing persistent caching capabilities with time-based expiration, size-based eviction, +and flexible cache storage options. It automatically evicts the least recently used items +when the cache reaches its maximum size. + +Because the `CacheStore` uses an ordinary Zarr `Store` object as the caching layer, you can reuse the data stored in the cache later. + +> **Note:** The CacheStore is a wrapper store that maintains compatibility with the full +> `zarr.abc.store.Store` API while adding transparent caching functionality. + +## Basic Usage + +Creating a CacheStore requires both a source store and a cache store. The cache store +can be any Store implementation, providing flexibility in cache persistence: + +```python exec="true" session="experimental" source="above" result="ansi" +import zarr +from zarr.storage import LocalStore +import numpy as np +from tempfile import mkdtemp +from zarr.experimental.cache_store import CacheStore + +# Create a local store and a separate cache store +local_store_path = mkdtemp(suffix='.zarr') +source_store = LocalStore(local_store_path) +cache_store = zarr.storage.MemoryStore() # In-memory cache +cached_store = CacheStore( + store=source_store, + cache_store=cache_store, + max_size=256*1024*1024 # 256MB cache +) + +# Create an array using the cached store +zarr_array = zarr.zeros((100, 100), chunks=(10, 10), dtype='f8', store=cached_store, mode='w') + +# Write some data to force chunk creation +zarr_array[:] = np.random.random((100, 100)) +``` + +The dual-store architecture allows you to use different store types for source and cache, +such as a remote store for source data and a local store for persistent caching. + +## Performance Benefits + +The CacheStore provides significant performance improvements for repeated data access: + +```python exec="true" session="experimental" source="above" result="ansi" +import time + +# Benchmark reading with cache +start = time.time() +for _ in range(100): + _ = zarr_array[:] +elapsed_cache = time.time() - start + +# Compare with direct store access (without cache) +zarr_array_nocache = zarr.open(local_store_path, mode='r') +start = time.time() +for _ in range(100): + _ = zarr_array_nocache[:] +elapsed_nocache = time.time() - start + +# Cache provides speedup for repeated access +speedup = elapsed_nocache / elapsed_cache +``` + +Cache effectiveness is particularly pronounced with repeated access to the same data chunks. + + +## Cache Configuration + +The CacheStore can be configured with several parameters: + +**max_size**: Controls the maximum size of cached data in bytes + +```python exec="true" session="experimental" source="above" result="ansi" +# 256MB cache with size limit +cache = CacheStore( + store=source_store, + cache_store=cache_store, + max_size=256*1024*1024 +) + +# Unlimited cache size (use with caution) +cache = CacheStore( + store=source_store, + cache_store=cache_store, + max_size=None +) +``` + +**max_age_seconds**: Controls time-based cache expiration + +```python exec="true" session="experimental" source="above" result="ansi" +# Cache expires after 1 hour +cache = CacheStore( + store=source_store, + cache_store=cache_store, + max_age_seconds=3600 +) + +# Cache never expires +cache = CacheStore( + store=source_store, + cache_store=cache_store, + max_age_seconds="infinity" +) +``` + +**cache_set_data**: Controls whether written data is cached + +```python exec="true" session="experimental" source="above" result="ansi" +# Cache data when writing (default) +cache = CacheStore( + store=source_store, + cache_store=cache_store, + cache_set_data=True +) + +# Don't cache written data (read-only cache) +cache = CacheStore( + store=source_store, + cache_store=cache_store, + cache_set_data=False +) +``` + +## Cache Statistics + +The CacheStore provides statistics to monitor cache performance and state: + +```python exec="true" session="experimental" source="above" result="ansi" +# Access some data to generate cache activity +data = zarr_array[0:50, 0:50] # First access - cache miss +data = zarr_array[0:50, 0:50] # Second access - cache hit + +# Get comprehensive cache information +info = cached_store.cache_info() +print(info['cache_store_type']) # e.g., 'MemoryStore' +print(info['max_age_seconds']) +print(info['max_size']) +print(info['current_size']) +print(info['tracked_keys']) +print(info['cached_keys']) +print(info['cache_set_data']) +``` + +The `cache_info()` method returns a dictionary with detailed information about the cache state. + +## Cache Management + +The CacheStore provides methods for manual cache management: + +```python exec="true" session="experimental" source="above" result="ansi" +# Clear all cached data and tracking information +import asyncio +asyncio.run(cached_store.clear_cache()) + +# Check cache info after clearing +info = cached_store.cache_info() +assert info['tracked_keys'] == 0 +assert info['current_size'] == 0 +``` + +The `clear_cache()` method is an async method that clears both the cache store +(if it supports the `clear` method) and all internal tracking data. + +## Best Practices + +1. **Choose appropriate cache store**: Use MemoryStore for fast temporary caching or LocalStore for persistent caching +2. **Size the cache appropriately**: Set `max_size` based on available storage and expected data access patterns +3. **Use with remote stores**: The cache provides the most benefit when wrapping slow remote stores +4. **Monitor cache statistics**: Use `cache_info()` to tune cache size and access patterns +5. **Consider data locality**: Group related data accesses together to improve cache efficiency +6. **Set appropriate expiration**: Use `max_age_seconds` for time-sensitive data or "infinity" for static data + +## Working with Different Store Types + +The CacheStore can wrap any store that implements the `zarr.abc.store.Store` interface +and use any store type for the cache backend: + +### Local Store with Memory Cache + +```python exec="true" session="experimental-memory-cache" source="above" result="ansi" +from zarr.storage import LocalStore, MemoryStore +from zarr.experimental.cache_store import CacheStore +from tempfile import mkdtemp + +local_store_path = mkdtemp(suffix='.zarr') +source_store = LocalStore(local_store_path) +cache_store = MemoryStore() +cached_store = CacheStore( + store=source_store, + cache_store=cache_store, + max_size=128*1024*1024 +) +``` + +### Memory Store with Persistent Cache + +```python exec="true" session="experimental-local-cache" source="above" result="ansi" +from tempfile import mkdtemp +from zarr.storage import MemoryStore, LocalStore +from zarr.experimental.cache_store import CacheStore + +memory_store = MemoryStore() +local_store_path = mkdtemp(suffix='.zarr') +persistent_cache = LocalStore(local_store_path) +cached_store = CacheStore( + store=memory_store, + cache_store=persistent_cache, + max_size=256*1024*1024 +) +``` + +The dual-store architecture provides flexibility in choosing the best combination +of source and cache stores for your specific use case. + +## Examples from Real Usage + +Here's a complete example demonstrating cache effectiveness: + +```python exec="true" session="experimental-final" source="above" result="ansi" +import numpy as np +import time +from tempfile import mkdtemp +import zarr +import zarr.storage +from zarr.experimental.cache_store import CacheStore + +# Create test data with dual-store cache +local_store_path = mkdtemp(suffix='.zarr') +source_store = zarr.storage.LocalStore(local_store_path) +cache_store = zarr.storage.MemoryStore() +cached_store = CacheStore( + store=source_store, + cache_store=cache_store, + max_size=256*1024*1024 +) +zarr_array = zarr.zeros((100, 100), chunks=(10, 10), dtype='f8', store=cached_store, mode='w') +zarr_array[:] = np.random.random((100, 100)) + +# Demonstrate cache effectiveness with repeated access +start = time.time() +data = zarr_array[20:30, 20:30] # First access (cache miss) +first_access = time.time() - start + +start = time.time() +data = zarr_array[20:30, 20:30] # Second access (cache hit) +second_access = time.time() - start + +# Check cache statistics +info = cached_store.cache_info() +assert info['cached_keys'] > 0 # Should have cached keys +assert info['current_size'] > 0 # Should have cached data +print(f"Cache contains {info['cached_keys']} keys with {info['current_size']} bytes") +``` + +This example shows how the CacheStore can significantly reduce access times for repeated +data reads, particularly important when working with remote data sources. The dual-store +architecture allows for flexible cache persistence and management. diff --git a/mkdocs.yml b/mkdocs.yml index 53b8eef7d4..4c7a1a4df2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -25,6 +25,7 @@ nav: - user-guide/extending.md - user-guide/gpu.md - user-guide/consolidated_metadata.md + - user-guide/experimental.md - API Reference: - api/index.md - api/array.md diff --git a/pyproject.toml b/pyproject.toml index 6164f69382..d72eef9dbc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -399,6 +399,7 @@ filterwarnings = [ "ignore:Unclosed client session >> import zarr + >>> from zarr.storage import MemoryStore + >>> from zarr.experimental.cache_store import CacheStore + >>> + >>> # Create a cached store + >>> source_store = MemoryStore() + >>> cache_store = MemoryStore() + >>> cached_store = CacheStore( + ... store=source_store, + ... cache_store=cache_store, + ... max_age_seconds=60, + ... max_size=1024*1024 + ... ) + >>> + >>> # Use it like any other store + >>> array = zarr.create(shape=(100,), store=cached_store) + >>> array[:] = 42 + + """ + + _cache: Store + max_age_seconds: int | Literal["infinity"] + max_size: int | None + key_insert_times: dict[str, float] + cache_set_data: bool + _cache_order: OrderedDict[str, None] # Track access order for LRU + _current_size: int # Track current cache size + _key_sizes: dict[str, int] # Track size of each cached key + _lock: asyncio.Lock + _hits: int # Cache hit counter + _misses: int # Cache miss counter + _evictions: int # Cache eviction counter + + def __init__( + self, + store: Store, + *, + cache_store: Store, + max_age_seconds: int | str = "infinity", + max_size: int | None = None, + key_insert_times: dict[str, float] | None = None, + cache_set_data: bool = True, + ) -> None: + super().__init__(store) + + if not cache_store.supports_deletes: + msg = ( + f"The provided cache store {cache_store} does not support deletes. " + "The cache_store must support deletes for CacheStore to function properly." + ) + raise ValueError(msg) + + self._cache = cache_store + # Validate and set max_age_seconds + if isinstance(max_age_seconds, str): + if max_age_seconds != "infinity": + raise ValueError("max_age_seconds string value must be 'infinity'") + self.max_age_seconds = "infinity" + else: + self.max_age_seconds = max_age_seconds + self.max_size = max_size + if key_insert_times is None: + self.key_insert_times = {} + else: + self.key_insert_times = key_insert_times + + self.cache_set_data = cache_set_data + self._cache_order = OrderedDict() + self._current_size = 0 + self._key_sizes = {} + self._lock = asyncio.Lock() + self._hits = 0 + self._misses = 0 + self._evictions = 0 + + def _is_key_fresh(self, key: str) -> bool: + """Check if a cached key is still fresh based on max_age_seconds. + + Uses monotonic time for accurate elapsed time measurement. + """ + if self.max_age_seconds == "infinity": + return True + now = time.monotonic() + elapsed = now - self.key_insert_times.get(key, 0) + return elapsed < self.max_age_seconds + + async def _accommodate_value(self, value_size: int) -> None: + """Ensure there is enough space in the cache for a new value. + + Must be called while holding self._lock. + """ + if self.max_size is None: + return + + # Remove least recently used items until we have enough space + while self._current_size + value_size > self.max_size and self._cache_order: + # Get the least recently used key (first in OrderedDict) + lru_key = next(iter(self._cache_order)) + await self._evict_key(lru_key) + + async def _evict_key(self, key: str) -> None: + """Evict a key from the cache. + + Must be called while holding self._lock. + Updates size tracking atomically with deletion. + """ + try: + key_size = self._key_sizes.get(key, 0) + + # Delete from cache store + await self._cache.delete(key) + + # Update tracking after successful deletion + self._remove_from_tracking(key) + self._current_size = max(0, self._current_size - key_size) + self._evictions += 1 + + logger.debug("_evict_key: evicted key %s, freed %d bytes", key, key_size) + except Exception: + logger.exception("_evict_key: failed to evict key %s", key) + raise # Re-raise to signal eviction failure + + async def _cache_value(self, key: str, value: Buffer) -> None: + """Cache a value with size tracking. + + This method holds the lock for the entire operation to ensure atomicity. + """ + value_size = len(value) + + # Check if value exceeds max size + if self.max_size is not None and value_size > self.max_size: + logger.warning( + "_cache_value: value size %d exceeds max_size %d, skipping cache", + value_size, + self.max_size, + ) + return + + async with self._lock: + # If key already exists, subtract old size first + if key in self._key_sizes: + old_size = self._key_sizes[key] + self._current_size -= old_size + logger.debug("_cache_value: updating existing key %s, old size %d", key, old_size) + + # Make room for the new value (this calls _evict_key_locked internally) + await self._accommodate_value(value_size) + + # Update tracking atomically + self._cache_order[key] = None # OrderedDict to track access order + self._current_size += value_size + self._key_sizes[key] = value_size + self.key_insert_times[key] = time.monotonic() + + logger.debug("_cache_value: cached key %s with size %d bytes", key, value_size) + + async def _update_access_order(self, key: str) -> None: + """Update the access order for LRU tracking.""" + if key in self._cache_order: + async with self._lock: + # Move to end (most recently used) + self._cache_order.move_to_end(key) + + def _remove_from_tracking(self, key: str) -> None: + """Remove a key from all tracking structures. + + Must be called while holding self._lock. + """ + self._cache_order.pop(key, None) + self.key_insert_times.pop(key, None) + self._key_sizes.pop(key, None) + + async def _get_try_cache( + self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None + ) -> Buffer | None: + """Try to get data from cache first, falling back to source store.""" + maybe_cached_result = await self._cache.get(key, prototype, byte_range) + if maybe_cached_result is not None: + logger.debug("_get_try_cache: key %s found in cache (HIT)", key) + self._hits += 1 + # Update access order for LRU + await self._update_access_order(key) + return maybe_cached_result + else: + logger.debug( + "_get_try_cache: key %s not found in cache (MISS), fetching from store", key + ) + self._misses += 1 + maybe_fresh_result = await super().get(key, prototype, byte_range) + if maybe_fresh_result is None: + # Key doesn't exist in source store + await self._cache.delete(key) + async with self._lock: + self._remove_from_tracking(key) + else: + # Cache the newly fetched value + await self._cache.set(key, maybe_fresh_result) + await self._cache_value(key, maybe_fresh_result) + return maybe_fresh_result + + async def _get_no_cache( + self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None + ) -> Buffer | None: + """Get data directly from source store and update cache.""" + self._misses += 1 + maybe_fresh_result = await super().get(key, prototype, byte_range) + if maybe_fresh_result is None: + # Key doesn't exist in source, remove from cache and tracking + await self._cache.delete(key) + async with self._lock: + self._remove_from_tracking(key) + else: + logger.debug("_get_no_cache: key %s found in store, setting in cache", key) + await self._cache.set(key, maybe_fresh_result) + await self._cache_value(key, maybe_fresh_result) + return maybe_fresh_result + + async def get( + self, + key: str, + prototype: BufferPrototype, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + """ + Retrieve data from the store, using cache when appropriate. + + Parameters + ---------- + key : str + The key to retrieve + prototype : BufferPrototype + Buffer prototype for creating the result buffer + byte_range : ByteRequest, optional + Byte range to retrieve + + Returns + ------- + Buffer | None + The retrieved data, or None if not found + """ + if not self._is_key_fresh(key): + logger.debug("get: key %s is not fresh, fetching from store", key) + return await self._get_no_cache(key, prototype, byte_range) + else: + logger.debug("get: key %s is fresh, trying cache", key) + return await self._get_try_cache(key, prototype, byte_range) + + async def set(self, key: str, value: Buffer) -> None: + """ + Store data in the underlying store and optionally in cache. + + Parameters + ---------- + key : str + The key to store under + value : Buffer + The data to store + """ + logger.debug("set: setting key %s in store", key) + await super().set(key, value) + if self.cache_set_data: + logger.debug("set: setting key %s in cache", key) + await self._cache.set(key, value) + await self._cache_value(key, value) + else: + logger.debug("set: deleting key %s from cache", key) + await self._cache.delete(key) + async with self._lock: + self._remove_from_tracking(key) + + async def delete(self, key: str) -> None: + """ + Delete data from both the underlying store and cache. + + Parameters + ---------- + key : str + The key to delete + """ + logger.debug("delete: deleting key %s from store", key) + await super().delete(key) + logger.debug("delete: deleting key %s from cache", key) + await self._cache.delete(key) + async with self._lock: + self._remove_from_tracking(key) + + def cache_info(self) -> dict[str, Any]: + """Return information about the cache state.""" + return { + "cache_store_type": type(self._cache).__name__, + "max_age_seconds": "infinity" + if self.max_age_seconds == "infinity" + else self.max_age_seconds, + "max_size": self.max_size, + "current_size": self._current_size, + "cache_set_data": self.cache_set_data, + "tracked_keys": len(self.key_insert_times), + "cached_keys": len(self._cache_order), + } + + def cache_stats(self) -> dict[str, Any]: + """Return cache performance statistics.""" + total_requests = self._hits + self._misses + hit_rate = self._hits / total_requests if total_requests > 0 else 0.0 + return { + "hits": self._hits, + "misses": self._misses, + "evictions": self._evictions, + "total_requests": total_requests, + "hit_rate": hit_rate, + } + + async def clear_cache(self) -> None: + """Clear all cached data and tracking information.""" + # Clear the cache store if it supports clear + if hasattr(self._cache, "clear"): + await self._cache.clear() + + # Reset tracking + async with self._lock: + self.key_insert_times.clear() + self._cache_order.clear() + self._key_sizes.clear() + self._current_size = 0 + logger.debug("clear_cache: cleared all cache data") + + def __repr__(self) -> str: + """Return string representation of the cache store.""" + return ( + f"{self.__class__.__name__}(" + f"store={self._store!r}, " + f"cache_store={self._cache!r}, " + f"max_age_seconds={self.max_age_seconds}, " + f"max_size={self.max_size}, " + f"current_size={self._current_size}, " + f"cached_keys={len(self._cache_order)})" + ) diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py new file mode 100644 index 0000000000..d4a45f78f1 --- /dev/null +++ b/tests/test_experimental/test_cache_store.py @@ -0,0 +1,864 @@ +""" +Tests for the dual-store cache implementation. +""" + +import asyncio +import time + +import pytest + +from zarr.abc.store import Store +from zarr.core.buffer.core import default_buffer_prototype +from zarr.core.buffer.cpu import Buffer as CPUBuffer +from zarr.experimental.cache_store import CacheStore +from zarr.storage import MemoryStore + + +class TestCacheStore: + """Test the dual-store cache implementation.""" + + @pytest.fixture + def source_store(self) -> MemoryStore: + """Create a source store with some test data.""" + return MemoryStore() + + @pytest.fixture + def cache_store(self) -> MemoryStore: + """Create an empty cache store.""" + return MemoryStore() + + @pytest.fixture + def cached_store(self, source_store: Store, cache_store: Store) -> CacheStore: + """Create a cached store instance.""" + return CacheStore(source_store, cache_store=cache_store, key_insert_times={}) + + async def test_basic_caching(self, cached_store: CacheStore, source_store: Store) -> None: + """Test basic cache functionality.""" + # Store some data + test_data = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", test_data) + + # Verify it's in both stores + assert await source_store.exists("test_key") + assert await cached_store._cache.exists("test_key") + + # Retrieve and verify caching works + result = await cached_store.get("test_key", default_buffer_prototype()) + assert result is not None + assert result.to_bytes() == b"test data" + + async def test_cache_miss_and_population( + self, cached_store: CacheStore, source_store: Store + ) -> None: + """Test cache miss and subsequent population.""" + # Put data directly in source store (bypassing cache) + test_data = CPUBuffer.from_bytes(b"source data") + await source_store.set("source_key", test_data) + + # First access should miss cache but populate it + result = await cached_store.get("source_key", default_buffer_prototype()) + assert result is not None + assert result.to_bytes() == b"source data" + + # Verify data is now in cache + assert await cached_store._cache.exists("source_key") + + async def test_cache_expiration(self) -> None: + """Test cache expiration based on max_age_seconds.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_age_seconds=1, # 1 second expiration + key_insert_times={}, + ) + + # Store data + test_data = CPUBuffer.from_bytes(b"expiring data") + await cached_store.set("expire_key", test_data) + + # Should be fresh initially (if _is_key_fresh method exists) + if hasattr(cached_store, "_is_key_fresh"): + assert cached_store._is_key_fresh("expire_key") + + # Wait for expiration + await asyncio.sleep(1.1) + + # Should now be stale + assert not cached_store._is_key_fresh("expire_key") + else: + # Skip freshness check if method doesn't exist + await asyncio.sleep(1.1) + # Just verify the data is still accessible + result = await cached_store.get("expire_key", default_buffer_prototype()) + assert result is not None + + async def test_cache_set_data_false(self, source_store: Store, cache_store: Store) -> None: + """Test behavior when cache_set_data=False.""" + cached_store = CacheStore( + source_store, cache_store=cache_store, cache_set_data=False, key_insert_times={} + ) + + test_data = CPUBuffer.from_bytes(b"no cache data") + await cached_store.set("no_cache_key", test_data) + + # Data should be in source but not cache + assert await source_store.exists("no_cache_key") + assert not await cache_store.exists("no_cache_key") + + async def test_delete_removes_from_both_stores(self, cached_store: CacheStore) -> None: + """Test that delete removes from both source and cache.""" + test_data = CPUBuffer.from_bytes(b"delete me") + await cached_store.set("delete_key", test_data) + + # Verify in both stores + assert await cached_store._store.exists("delete_key") + assert await cached_store._cache.exists("delete_key") + + # Delete + await cached_store.delete("delete_key") + + # Verify removed from both + assert not await cached_store._store.exists("delete_key") + assert not await cached_store._cache.exists("delete_key") + + async def test_exists_checks_source_store( + self, cached_store: CacheStore, source_store: Store + ) -> None: + """Test that exists() checks the source store (source of truth).""" + # Put data directly in source + test_data = CPUBuffer.from_bytes(b"exists test") + await source_store.set("exists_key", test_data) + + # Should exist even though not in cache + assert await cached_store.exists("exists_key") + + async def test_list_operations(self, cached_store: CacheStore, source_store: Store) -> None: + """Test listing operations delegate to source store.""" + # Add some test data + test_data = CPUBuffer.from_bytes(b"list test") + await cached_store.set("list/item1", test_data) + await cached_store.set("list/item2", test_data) + await cached_store.set("other/item3", test_data) + + # Test list_dir + list_items = [key async for key in cached_store.list_dir("list/")] + assert len(list_items) >= 2 # Should include our items + + # Test list_prefix + prefix_items = [key async for key in cached_store.list_prefix("list/")] + assert len(prefix_items) >= 2 + + async def test_stale_cache_refresh(self) -> None: + """Test that stale cache entries are refreshed from source.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, cache_store=cache_store, max_age_seconds=1, key_insert_times={} + ) + + # Store initial data + old_data = CPUBuffer.from_bytes(b"old data") + await cached_store.set("refresh_key", old_data) + + # Wait for expiration + await asyncio.sleep(1.1) + + # Update source store directly (simulating external update) + new_data = CPUBuffer.from_bytes(b"new data") + await source_store.set("refresh_key", new_data) + + # Access should refresh from source when cache is stale + result = await cached_store.get("refresh_key", default_buffer_prototype()) + assert result is not None + assert result.to_bytes() == b"new data" + + async def test_infinity_max_age(self, cached_store: CacheStore) -> None: + """Test that 'infinity' max_age means cache never expires.""" + # Skip test if _is_key_fresh method doesn't exist + if not hasattr(cached_store, "_is_key_fresh"): + pytest.skip("_is_key_fresh method not implemented") + + test_data = CPUBuffer.from_bytes(b"eternal data") + await cached_store.set("eternal_key", test_data) + + # Should always be fresh + assert cached_store._is_key_fresh("eternal_key") + + # Even after time passes + await asyncio.sleep(0.1) + assert cached_store._is_key_fresh("eternal_key") + + async def test_cache_returns_cached_data_for_performance( + self, cached_store: CacheStore, source_store: Store + ) -> None: + """Test that cache returns cached data for performance, even if not in source.""" + # Skip test if key_insert_times attribute doesn't exist + if not hasattr(cached_store, "key_insert_times"): + pytest.skip("key_insert_times attribute not implemented") + + # Put data in cache but not source (simulates orphaned cache entry) + test_data = CPUBuffer.from_bytes(b"orphaned data") + await cached_store._cache.set("orphan_key", test_data) + cached_store.key_insert_times["orphan_key"] = time.monotonic() + + # Cache should return data for performance (no source verification) + result = await cached_store.get("orphan_key", default_buffer_prototype()) + assert result is not None + assert result.to_bytes() == b"orphaned data" + + # Cache entry should remain (performance optimization) + assert await cached_store._cache.exists("orphan_key") + assert "orphan_key" in cached_store.key_insert_times + + async def test_cache_coherency_through_expiration(self) -> None: + """Test that cache coherency is managed through cache expiration, not source verification.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_age_seconds=1, # Short expiration for coherency + ) + + # Add data to both stores + test_data = CPUBuffer.from_bytes(b"original data") + await cached_store.set("coherency_key", test_data) + + # Remove from source (simulating external deletion) + await source_store.delete("coherency_key") + + # Cache should still return cached data (performance optimization) + result = await cached_store.get("coherency_key", default_buffer_prototype()) + assert result is not None + assert result.to_bytes() == b"original data" + + # Wait for cache expiration + await asyncio.sleep(1.1) + + # Now stale cache should be refreshed from source + result = await cached_store.get("coherency_key", default_buffer_prototype()) + assert result is None # Key no longer exists in source + + async def test_cache_info(self, cached_store: CacheStore) -> None: + """Test cache_info method returns correct information.""" + # Test initial state + info = cached_store.cache_info() + + # Check all expected keys are present + expected_keys = { + "cache_store_type", + "max_age_seconds", + "max_size", + "current_size", + "cache_set_data", + "tracked_keys", + "cached_keys", + } + assert set(info.keys()) == expected_keys + + # Check initial values + assert info["cache_store_type"] == "MemoryStore" + assert info["max_age_seconds"] == "infinity" + assert info["max_size"] is None # Default unlimited + assert info["current_size"] == 0 + assert info["cache_set_data"] is True + assert info["tracked_keys"] == 0 + assert info["cached_keys"] == 0 + + # Add some data and verify tracking + test_data = CPUBuffer.from_bytes(b"test data for cache info") + await cached_store.set("info_test_key", test_data) + + # Check updated info + updated_info = cached_store.cache_info() + assert updated_info["tracked_keys"] == 1 + assert updated_info["cached_keys"] == 1 + assert updated_info["current_size"] > 0 # Should have some size now + + async def test_cache_info_with_max_size(self) -> None: + """Test cache_info with max_size configuration.""" + source_store = MemoryStore() + cache_store = MemoryStore() + + # Create cache with specific max_size and max_age + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_size=1024, + max_age_seconds=300, + key_insert_times={}, + ) + + info = cached_store.cache_info() + assert info["max_size"] == 1024 + assert info["max_age_seconds"] == 300 + assert info["current_size"] == 0 + + async def test_clear_cache(self, cached_store: CacheStore) -> None: + """Test clear_cache method clears all cache data and tracking.""" + # Add some test data + test_data1 = CPUBuffer.from_bytes(b"test data 1") + test_data2 = CPUBuffer.from_bytes(b"test data 2") + + await cached_store.set("clear_test_1", test_data1) + await cached_store.set("clear_test_2", test_data2) + + # Verify data is cached + info_before = cached_store.cache_info() + assert info_before["tracked_keys"] == 2 + assert info_before["cached_keys"] == 2 + assert info_before["current_size"] > 0 + + # Verify data exists in cache + assert await cached_store._cache.exists("clear_test_1") + assert await cached_store._cache.exists("clear_test_2") + + # Clear the cache + await cached_store.clear_cache() + + # Verify cache is cleared + info_after = cached_store.cache_info() + assert info_after["tracked_keys"] == 0 + assert info_after["cached_keys"] == 0 + assert info_after["current_size"] == 0 + + # Verify data is removed from cache store (if it supports clear) + if hasattr(cached_store._cache, "clear"): + # If cache store supports clear, all data should be gone + assert not await cached_store._cache.exists("clear_test_1") + assert not await cached_store._cache.exists("clear_test_2") + + # Verify data still exists in source store + assert await cached_store._store.exists("clear_test_1") + assert await cached_store._store.exists("clear_test_2") + + async def test_max_age_infinity(self) -> None: + """Test cache with infinite max age.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_age_seconds="infinity") + + # Add data and verify it never expires + test_data = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", test_data) + + # Even after time passes, key should be fresh + assert cached_store._is_key_fresh("test_key") + + async def test_max_age_numeric(self) -> None: + """Test cache with numeric max age.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_age_seconds=1, # 1 second + ) + + # Add data + test_data = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", test_data) + + # Key should be fresh initially + assert cached_store._is_key_fresh("test_key") + + # Manually set old timestamp to test expiration + cached_store.key_insert_times["test_key"] = time.monotonic() - 2 # 2 seconds ago + + # Key should now be stale + assert not cached_store._is_key_fresh("test_key") + + async def test_cache_set_data_disabled(self) -> None: + """Test cache behavior when cache_set_data is False.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, cache_set_data=False) + + # Set data + test_data = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", test_data) + + # Data should be in source but not in cache + assert await source_store.exists("test_key") + assert not await cache_store.exists("test_key") + + # Cache info should show no cached data + info = cached_store.cache_info() + assert info["cache_set_data"] is False + assert info["cached_keys"] == 0 + + async def test_eviction_with_max_size(self) -> None: + """Test LRU eviction when max_size is exceeded.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_size=100, # Small cache size + ) + + # Add data that exceeds cache size + small_data = CPUBuffer.from_bytes(b"a" * 40) # 40 bytes + medium_data = CPUBuffer.from_bytes(b"b" * 40) # 40 bytes + large_data = CPUBuffer.from_bytes(b"c" * 40) # 40 bytes (would exceed 100 byte limit) + + # Set first two items + await cached_store.set("key1", small_data) + await cached_store.set("key2", medium_data) + + # Cache should have 2 items + info = cached_store.cache_info() + assert info["cached_keys"] == 2 + assert info["current_size"] == 80 + + # Add third item - should trigger eviction of first item + await cached_store.set("key3", large_data) + + # Cache should still have items but first one may be evicted + info = cached_store.cache_info() + assert info["current_size"] <= 100 + + async def test_value_exceeds_max_size(self) -> None: + """Test behavior when a single value exceeds max_size.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_size=50, # Small cache size + ) + + # Try to cache data larger than max_size + large_data = CPUBuffer.from_bytes(b"x" * 100) # 100 bytes > 50 byte limit + await cached_store.set("large_key", large_data) + + # Data should be in source but not cached + assert await source_store.exists("large_key") + info = cached_store.cache_info() + assert info["cached_keys"] == 0 + assert info["current_size"] == 0 + + async def test_get_nonexistent_key(self) -> None: + """Test getting a key that doesn't exist in either store.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store) + + # Try to get nonexistent key + result = await cached_store.get("nonexistent", default_buffer_prototype()) + assert result is None + + # Should not create any cache entries + info = cached_store.cache_info() + assert info["cached_keys"] == 0 + + async def test_delete_both_stores(self) -> None: + """Test that delete removes from both source and cache stores.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store) + + # Add data + test_data = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", test_data) + + # Verify it's in both stores + assert await source_store.exists("test_key") + assert await cache_store.exists("test_key") + + # Delete + await cached_store.delete("test_key") + + # Verify it's removed from both + assert not await source_store.exists("test_key") + assert not await cache_store.exists("test_key") + + # Verify tracking is updated + info = cached_store.cache_info() + assert info["cached_keys"] == 0 + + async def test_invalid_max_age_seconds(self) -> None: + """Test that invalid max_age_seconds values raise ValueError.""" + source_store = MemoryStore() + cache_store = MemoryStore() + + with pytest.raises(ValueError, match="max_age_seconds string value must be 'infinity'"): + CacheStore(source_store, cache_store=cache_store, max_age_seconds="invalid") + + async def test_unlimited_cache_size(self) -> None: + """Test behavior when max_size is None (unlimited).""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_size=None, # Unlimited cache + ) + + # Add large amounts of data + for i in range(10): + large_data = CPUBuffer.from_bytes(b"x" * 1000) # 1KB each + await cached_store.set(f"large_key_{i}", large_data) + + # All should be cached since there's no size limit + info = cached_store.cache_info() + assert info["cached_keys"] == 10 + assert info["current_size"] == 10000 # 10 * 1000 bytes + + async def test_evict_key_exception_handling(self) -> None: + """Test exception handling in _evict_key method.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=100) + + # Add some data + test_data = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", test_data) + + # Manually corrupt the tracking to trigger exception + # Remove from one structure but not others to create inconsistency + del cached_store._cache_order["test_key"] + + # Try to evict - should handle the KeyError gracefully + await cached_store._evict_key("test_key") + + # Should still work and not crash + info = cached_store.cache_info() + assert isinstance(info, dict) + + async def test_get_no_cache_delete_tracking(self) -> None: + """Test _get_no_cache when key doesn't exist and needs cleanup.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store) + + # First, add key to cache tracking but not to source + test_data = CPUBuffer.from_bytes(b"test data") + await cache_store.set("phantom_key", test_data) + await cached_store._cache_value("phantom_key", test_data) + + # Verify it's in tracking + assert "phantom_key" in cached_store._cache_order + assert "phantom_key" in cached_store.key_insert_times + + # Now try to get it - since it's not in source, should clean up tracking + result = await cached_store._get_no_cache("phantom_key", default_buffer_prototype()) + assert result is None + + # Should have cleaned up tracking + assert "phantom_key" not in cached_store._cache_order + assert "phantom_key" not in cached_store.key_insert_times + + async def test_accommodate_value_no_max_size(self) -> None: + """Test _accommodate_value early return when max_size is None.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + source_store, + cache_store=cache_store, + max_size=None, # No size limit + ) + + # This should return early without doing anything + await cached_store._accommodate_value(1000000) # Large value + + # Should not affect anything since max_size is None + info = cached_store.cache_info() + assert info["current_size"] == 0 + + async def test_concurrent_set_operations(self) -> None: + """Test that concurrent set operations don't corrupt cache size tracking.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=1000) + + # Create 10 concurrent set operations + async def set_data(key: str) -> None: + data = CPUBuffer.from_bytes(b"x" * 50) + await cached_store.set(key, data) + + # Run concurrently + await asyncio.gather(*[set_data(f"key_{i}") for i in range(10)]) + + info = cached_store.cache_info() + # Expected: 10 keys * 50 bytes = 500 bytes + assert info["cached_keys"] == 10 + assert info["current_size"] == 500 # WOULD FAIL due to race condition + + async def test_concurrent_eviction_race(self) -> None: + """Test concurrent evictions don't corrupt size tracking.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=200) + + # Fill cache to near capacity + data = CPUBuffer.from_bytes(b"x" * 80) + await cached_store.set("key1", data) + await cached_store.set("key2", data) + + # Now trigger two concurrent sets that both need to evict + async def set_large(key: str) -> None: + large_data = CPUBuffer.from_bytes(b"y" * 100) + await cached_store.set(key, large_data) + + await asyncio.gather(set_large("key3"), set_large("key4")) + + info = cached_store.cache_info() + # Size should be consistent with tracked keys + assert info["current_size"] <= 200 # Might pass + # But verify actual cache store size matches tracking + total_size = sum(cached_store._key_sizes.get(k, 0) for k in cached_store._cache_order) + assert total_size == info["current_size"] # WOULD FAIL + + async def test_concurrent_get_and_evict(self) -> None: + """Test get operations during eviction don't cause corruption.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=100) + + # Setup + data = CPUBuffer.from_bytes(b"x" * 40) + await cached_store.set("key1", data) + await cached_store.set("key2", data) + + # Concurrent: read key1 while adding key3 (triggers eviction) + async def read_key() -> None: + for _ in range(100): + await cached_store.get("key1", default_buffer_prototype()) + + async def write_key() -> None: + for i in range(10): + new_data = CPUBuffer.from_bytes(b"y" * 40) + await cached_store.set(f"new_{i}", new_data) + + await asyncio.gather(read_key(), write_key()) + + # Verify consistency + info = cached_store.cache_info() + assert info["current_size"] <= 100 + assert len(cached_store._cache_order) == len(cached_store._key_sizes) + + async def test_eviction_actually_deletes_from_cache_store(self) -> None: + """Test that eviction removes keys from cache_store, not just tracking.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=100) + + # Add data that will be evicted + data1 = CPUBuffer.from_bytes(b"x" * 60) + data2 = CPUBuffer.from_bytes(b"y" * 60) + + await cached_store.set("key1", data1) + + # Verify key1 is in cache_store + assert await cache_store.exists("key1") + + # Add key2, which should evict key1 + await cached_store.set("key2", data2) + + # Check tracking - key1 should be removed + assert "key1" not in cached_store._cache_order + assert "key1" not in cached_store._key_sizes + + # CRITICAL: key1 should also be removed from cache_store + assert not await cache_store.exists("key1"), ( + "Evicted key still exists in cache_store! _evict_key doesn't actually delete." + ) + + # But key1 should still exist in source store + assert await source_store.exists("key1") + + async def test_eviction_no_orphaned_keys(self) -> None: + """Test that eviction doesn't leave orphaned keys in cache_store.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=150) + + # Add multiple keys that will cause evictions + for i in range(10): + data = CPUBuffer.from_bytes(b"x" * 60) + await cached_store.set(f"key_{i}", data) + + # Check tracking + info = cached_store.cache_info() + tracked_keys = info["cached_keys"] + + # Count actual keys in cache_store + actual_keys = 0 + async for _ in cache_store.list(): + actual_keys += 1 + + # Cache store should have same number of keys as tracking + assert actual_keys == tracked_keys, ( + f"Cache store has {actual_keys} keys but tracking shows {tracked_keys}. " + f"Eviction doesn't delete from cache_store!" + ) + + async def test_size_accounting_with_key_updates(self) -> None: + """Test that updating the same key replaces size instead of accumulating.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=500) + + # Set initial value + data1 = CPUBuffer.from_bytes(b"x" * 100) + await cached_store.set("same_key", data1) + + info1 = cached_store.cache_info() + assert info1["current_size"] == 100 + + # Update with different size + data2 = CPUBuffer.from_bytes(b"y" * 200) + await cached_store.set("same_key", data2) + + info2 = cached_store.cache_info() + + # Should be 200, not 300 (update replaces, doesn't accumulate) + assert info2["current_size"] == 200, ( + f"Expected size 200 but got {info2['current_size']}. " + "Updating same key should replace, not accumulate." + ) + + async def test_all_tracked_keys_exist_in_cache_store(self) -> None: + """Test invariant: all keys in tracking should exist in cache_store.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(source_store, cache_store=cache_store, max_size=500) + + # Add some data + for i in range(5): + data = CPUBuffer.from_bytes(b"x" * 50) + await cached_store.set(f"key_{i}", data) + + # Every key in tracking should exist in cache_store + for key in cached_store._cache_order: + assert await cache_store.exists(key), ( + f"Key '{key}' is tracked but doesn't exist in cache_store" + ) + + # Every key in _key_sizes should exist in cache_store + for key in cached_store._key_sizes: + assert await cache_store.exists(key), ( + f"Key '{key}' has size tracked but doesn't exist in cache_store" + ) + + # Additional coverage tests for 100% coverage + + async def test_cache_store_requires_delete_support(self) -> None: + """Test that CacheStore validates cache_store supports deletes.""" + from unittest.mock import MagicMock + + # Create a mock store that doesn't support deletes + source_store = MemoryStore() + cache_store = MagicMock() + cache_store.supports_deletes = False + + with pytest.raises(ValueError, match="does not support deletes"): + CacheStore(store=source_store, cache_store=cache_store) + + async def test_evict_key_exception_handling_with_real_error( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test _evict_key exception handling when deletion fails.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(store=source_store, cache_store=cache_store, max_size=100) + + # Set up a key in tracking + buffer = CPUBuffer.from_bytes(b"test data") + await cached_store.set("test_key", buffer) + + # Mock the cache delete to raise an exception + async def failing_delete(key: str) -> None: + raise RuntimeError("Simulated cache deletion failure") + + monkeypatch.setattr(cache_store, "delete", failing_delete) + + # Attempt to evict should raise the exception + with pytest.raises(RuntimeError, match="Simulated cache deletion failure"): + async with cached_store._lock: + await cached_store._evict_key("test_key") + + async def test_cache_stats_method(self) -> None: + """Test cache_stats method returns correct statistics.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(store=source_store, cache_store=cache_store, max_size=1000) + + # Initially, stats should be zero + stats = cached_store.cache_stats() + assert stats["hits"] == 0 + assert stats["misses"] == 0 + assert stats["evictions"] == 0 + assert stats["total_requests"] == 0 + assert stats["hit_rate"] == 0.0 + + # Perform some operations + buffer = CPUBuffer.from_bytes(b"x" * 100) + + # Write to source store directly to avoid affecting stats + await source_store.set("key1", buffer) + + # First get is a miss (not in cache yet) + result1 = await cached_store.get("key1", default_buffer_prototype()) + assert result1 is not None + + # Second get is a hit (now in cache) + result2 = await cached_store.get("key1", default_buffer_prototype()) + assert result2 is not None + + stats = cached_store.cache_stats() + assert stats["hits"] == 1 + assert stats["misses"] == 1 + assert stats["total_requests"] == 2 + assert stats["hit_rate"] == 0.5 + + async def test_cache_stats_with_evictions(self) -> None: + """Test cache_stats tracks evictions correctly.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + store=source_store, + cache_store=cache_store, + max_size=150, # Small size to force eviction + ) + + # Add items that will trigger eviction + buffer1 = CPUBuffer.from_bytes(b"x" * 100) + buffer2 = CPUBuffer.from_bytes(b"y" * 100) + + await cached_store.set("key1", buffer1) + await cached_store.set("key2", buffer2) # Should evict key1 + + stats = cached_store.cache_stats() + assert stats["evictions"] == 1 + + def test_repr_method(self) -> None: + """Test __repr__ returns useful string representation.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore( + store=source_store, cache_store=cache_store, max_age_seconds=60, max_size=1024 + ) + + repr_str = repr(cached_store) + + # Check that repr contains key information + assert "CacheStore" in repr_str + assert "max_age_seconds=60" in repr_str + assert "max_size=1024" in repr_str + assert "current_size=0" in repr_str + assert "cached_keys=0" in repr_str + + async def test_cache_stats_zero_division_protection(self) -> None: + """Test cache_stats handles zero requests correctly.""" + source_store = MemoryStore() + cache_store = MemoryStore() + cached_store = CacheStore(store=source_store, cache_store=cache_store) + + # With no requests, hit_rate should be 0.0 (not NaN or error) + stats = cached_store.cache_stats() + assert stats["hit_rate"] == 0.0 + assert stats["total_requests"] == 0