diff --git a/docs/api/storage.rst b/docs/api/storage.rst index 4321837449..092b6cf54b 100644 --- a/docs/api/storage.rst +++ b/docs/api/storage.rst @@ -39,6 +39,10 @@ Storage (``zarr.storage``) .. autoclass:: ConsolidatedMetadataStore +.. autoclass:: LRUChunkCache + + .. automethod:: invalidate + .. autofunction:: init_array .. autofunction:: init_group .. autofunction:: contains_array diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 411ce0a163..9a14413218 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -863,10 +863,34 @@ store. E.g.:: b'Hello from the cloud!' 0.0009490990014455747 -If you are still experiencing poor performance with distributed/cloud storage, -please raise an issue on the GitHub issue tracker with any profiling data you -can provide, as there may be opportunities to optimise further either within -Zarr or within the mapping interface to the storage. +The above :class:`zarr.storage.LRUStoreCache` wraps any Zarr storage class, and stores +encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases +where decoding is computationally expensive, Zarr also provides a +:class:`zarr.storage.LRUChunkCache` which can store decoded chunks, e.g.:: + + >>> import zarr + >>> from numcodecs import LZMA + >>> import numpy as np + >>> store = zarr.DictStore() + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), + ... store=store, compressor=LZMA()) + >>> from timeit import timeit + >>> # data access without cache + ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP + 0.6703157789888792 + >>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None)) + >>> # first data access about the same as without cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.681269913999131 + >>> # second time accesses the decoded chunks in the cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.007617925992235541 + + +If you are still experiencing poor performance with distributed/cloud storage, please +raise an issue on the GitHub issue tracker with any profiling data you can provide, as +there may be opportunities to optimise further either within Zarr or within the mapping +interface to the storage. IO with ``fsspec`` ~~~~~~~~~~~~~~~~~~ diff --git a/zarr/__init__.py b/zarr/__init__.py index 4d2c992dbf..0cea9efa1b 100644 --- a/zarr/__init__.py +++ b/zarr/__init__.py @@ -14,7 +14,7 @@ from zarr.storage import (ABSStore, DBMStore, DictStore, DirectoryStore, KVStore, LMDBStore, LRUStoreCache, MemoryStore, MongoDBStore, NestedDirectoryStore, RedisStore, SQLiteStore, - TempStore, ZipStore) + TempStore, ZipStore, LRUChunkCache) from zarr.sync import ProcessSynchronizer, ThreadSynchronizer from zarr.version import version as __version__ diff --git a/zarr/core.py b/zarr/core.py index e5b2045160..a59799cd77 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -87,6 +87,15 @@ class Array: read and decompressed when possible. .. versionadded:: 2.7 + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. write_empty_chunks : bool, optional If True, all chunks will be stored regardless of their contents. If @@ -169,6 +178,7 @@ def __init__( cache_metadata=True, cache_attrs=True, partial_decompress=False, + chunk_cache=None, write_empty_chunks=True, zarr_version=None, meta_array=None, @@ -199,6 +209,7 @@ def __init__( self._cache_metadata = cache_metadata self._is_view = False self._partial_decompress = partial_decompress + self._chunk_cache = chunk_cache self._write_empty_chunks = write_empty_chunks if meta_array is not None: self._meta_array = np.empty_like(meta_array, shape=()) @@ -941,19 +952,33 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): if selection not in ((), (Ellipsis,)): err_too_many_indices(selection, ()) - try: - # obtain encoded data for chunk - ckey = self._chunk_key((0,)) - cdata = self.chunk_store[ckey] + # obtain key for chunk + ckey = self._chunk_key((0,)) - except KeyError: - # chunk not initialized - chunk = np.zeros_like(self._meta_array, shape=(), dtype=self._dtype) - if self._fill_value is not None: - chunk.fill(self._fill_value) + # setup variable to hold decoded chunk + chunk = None - else: - chunk = self._decode_chunk(cdata) + # check for cached chunk + if self._chunk_cache is not None: + chunk = self._chunk_cache.get(ckey) + + if chunk is None: + try: + # obtain encoded data for chunk + cdata = self.chunk_store[ckey] + + except KeyError: + # chunk not initialized + chunk = np.zeros_like(self._meta_array, shape=(), dtype=self._dtype) + if self._fill_value is not None: + chunk.fill(self._fill_value) + + else: + chunk = self._decode_chunk(cdata) + + # cache decoded chunk + if self._chunk_cache is not None: + self._chunk_cache[ckey] = chunk # handle fields if fields: @@ -1752,6 +1777,10 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # remove chunk if write_empty_chunks is false and it only contains the fill value if (not self.write_empty_chunks) and all_equal(self.fill_value, chunk): + # invalidate value in cache + if self._chunk_cache is not None: + if ckey in self._chunk_cache: + del self._chunk_cache[ckey] try: del self.chunk_store[ckey] return @@ -1763,6 +1792,12 @@ def _set_basic_selection_zd(self, selection, value, fields=None): cdata = self._encode_chunk(chunk) self.chunk_store[ckey] = cdata + if self._chunk_cache is not None: + # ensure cached chunk has been round tripped through encode-decode if dtype=object + if self.dtype == object: + chunk = self._decode_chunk(cdata) + self._chunk_cache[ckey] = chunk + def _set_basic_selection_nd(self, selection, value, fields=None): # implementation of __setitem__ for array with at least one dimension @@ -1822,6 +1857,7 @@ def _set_selection(self, indexer, value, fields=None): # put data self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields) + else: lchunk_coords, lchunk_selection, lout_selection = zip(*indexer) chunk_values = [] @@ -1844,6 +1880,18 @@ def _set_selection(self, indexer, value, fields=None): self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values, fields=fields) + def _select_and_set_out(self, fields, chunk, chunk_selection, drop_axes, + out, out_selection): + # select data from chunk + if fields: + chunk = chunk[fields] + tmp = chunk[chunk_selection] + if drop_axes: + tmp = np.squeeze(tmp, axis=drop_axes) + + # store selected data in output + out[out_selection] = tmp + def _process_chunk( self, out, @@ -1853,6 +1901,7 @@ def _process_chunk( out_is_ndarray, fields, out_selection, + ckey, partial_read_decode=False, ): """Take binary data from storage and fill output array""" @@ -1919,16 +1968,12 @@ def _process_chunk( except ArrayIndexError: cdata = cdata.read_full() chunk = self._decode_chunk(cdata) + if self._chunk_cache is not None: + # cache the decoded chunk + self._chunk_cache[ckey] = chunk - # select data from chunk - if fields: - chunk = chunk[fields] - tmp = chunk[chunk_selection] - if drop_axes: - tmp = np.squeeze(tmp, axis=drop_axes) - - # store selected data in output - out[out_selection] = tmp + self._select_and_set_out(fields, chunk, chunk_selection, drop_axes, + out, out_selection) def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes=None, fields=None): @@ -1961,22 +2006,38 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, # obtain key for chunk ckey = self._chunk_key(chunk_coords) - try: - # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + # setup variable to hold decoded chunk + chunk = None - except KeyError: - # chunk not initialized - if self._fill_value is not None: - if fields: - fill_value = self._fill_value[fields] - else: - fill_value = self._fill_value - out[out_selection] = fill_value + # check for cached chunk + if self._chunk_cache is not None: + try: + chunk = self._chunk_cache[ckey] + self._select_and_set_out(fields, chunk, chunk_selection, + drop_axes, out, out_selection) + except KeyError: + pass - else: - self._process_chunk(out, cdata, chunk_selection, drop_axes, - out_is_ndarray, fields, out_selection) + if chunk is None: + + try: + # obtain compressed data for chunk + cdata = self.chunk_store[ckey] + + except KeyError: + # chunk not initialized + if self._fill_value is not None: + if fields: + fill_value = self._fill_value[fields] + else: + fill_value = self._fill_value + out[out_selection] = fill_value + return + + else: + self._process_chunk(out, cdata, chunk_selection, drop_axes, + out_is_ndarray, fields, out_selection, + ckey) def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, drop_axes=None, fields=None): @@ -2020,6 +2081,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, out_is_ndarray, fields, out_select, + ckey, partial_read_decode=partial_read_decode, ) else: @@ -2159,6 +2221,13 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): else: chunk[chunk_selection] = value + # cache the chunk + if self._chunk_cache is not None: + # ensure cached chunk has been round tripped through encode-decode if dtype=object + if self.dtype == object: + chunk = self._decode_chunk(self._encode_chunk(chunk)) + self._chunk_cache[ckey] = np.copy(chunk) + return chunk def _chunk_key(self, chunk_coords): @@ -2502,6 +2571,13 @@ def _resize_nosync(self, *args): except KeyError: # chunk not initialized pass + if self._chunk_cache is not None: + try: + del self._chunk_cache[key] + except KeyError: + # chunk not cached + pass + old_cdata_shape_working_list[idx_cdata] = min(val_old_cdata, val_new_cdata) def append(self, data, axis=0): diff --git a/zarr/creation.py b/zarr/creation.py index 3414a0158a..1015dc0cba 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -20,7 +20,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', store=None, synchronizer=None, overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, - object_codec=None, dimension_separator=None, write_empty_chunks=True, + object_codec=None, dimension_separator=None, chunk_cache=None, write_empty_chunks=True, *, zarr_version=None, meta_array=None, **kwargs): """Create an array. @@ -53,6 +53,15 @@ def create(shape, chunks=True, dtype=None, compressor='default', chunk_store : MutableMapping, optional Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. filters : sequence of Codecs, optional Sequence of filters to use to encode chunk data prior to compression. cache_metadata : bool, optional @@ -174,7 +183,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only, - write_empty_chunks=write_empty_chunks, meta_array=meta_array) + chunk_cache=chunk_cache, write_empty_chunks=write_empty_chunks, + meta_array=meta_array) return z @@ -411,6 +421,7 @@ def open_array( chunk_store=None, storage_options=None, partial_decompress=False, + chunk_cache=None, write_empty_chunks=True, *, zarr_version=None, @@ -478,6 +489,16 @@ def open_array( non-fill-value data are stored, at the expense of overhead associated with checking the data of each chunk. + .. versionadded:: 2.7 + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. .. versionadded:: 2.11 zarr_version : {None, 2, 3}, optional @@ -598,7 +619,8 @@ def open_array( # instantiate array z = Array(store, read_only=read_only, synchronizer=synchronizer, cache_metadata=cache_metadata, cache_attrs=cache_attrs, path=path, - chunk_store=chunk_store, write_empty_chunks=write_empty_chunks) + chunk_store=chunk_store, chunk_cache=chunk_cache, + write_empty_chunks=write_empty_chunks) return z diff --git a/zarr/hierarchy.py b/zarr/hierarchy.py index 8131cb71aa..8a97a033c1 100644 --- a/zarr/hierarchy.py +++ b/zarr/hierarchy.py @@ -121,7 +121,7 @@ class Group(MutableMapping): """ def __init__(self, store, path=None, read_only=False, chunk_store=None, - cache_attrs=True, synchronizer=None, zarr_version=None, *, + cache_attrs=True, synchronizer=None, chunk_cache=None, zarr_version=None, *, meta_array=None): store: BaseStore = _normalize_store_arg(store, zarr_version=zarr_version) if zarr_version is None: @@ -134,6 +134,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None, chunk_store: BaseStore = _normalize_store_arg(chunk_store, zarr_version=zarr_version) self._store = store self._chunk_store = chunk_store + self._chunk_cache = chunk_cache self._path = normalize_storage_path(path) if self._path: self._key_prefix = self._path + '/' @@ -435,14 +436,14 @@ def __getitem__(self, item): path = self._item_path(item) if contains_array(self._store, path): return Array(self._store, read_only=self._read_only, path=path, - chunk_store=self._chunk_store, + chunk_store=self._chunk_store, chunk_cache=self._chunk_cache, synchronizer=self._synchronizer, cache_attrs=self.attrs.cache, zarr_version=self._version, meta_array=self._meta_array) elif contains_group(self._store, path, explicit_only=True): return Group(self._store, read_only=self._read_only, path=path, - chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer, zarr_version=self._version, - meta_array=self._meta_array) + chunk_store=self._chunk_store, chunk_cache=self._chunk_cache, + cache_attrs=self.attrs.cache, synchronizer=self._synchronizer, + zarr_version=self._version, meta_array=self._meta_array) elif self._version == 3: implicit_group = meta_root + path + '/' # non-empty folder in the metadata path implies an implicit group @@ -546,6 +547,7 @@ def groups(self): path=path, read_only=self._read_only, chunk_store=self._chunk_store, + chunk_cache=self._chunk_cache, cache_attrs=self.attrs.cache, synchronizer=self._synchronizer, zarr_version=self._version) @@ -861,7 +863,8 @@ def _create_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer, zarr_version=self._version) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache, + zarr_version=self._version) def create_groups(self, *names, **kwargs): """Convenience method to create multiple groups in a single call.""" @@ -905,7 +908,8 @@ def _require_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer, zarr_version=self._version) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache, + zarr_version=self._version) def require_groups(self, *names): """Convenience method to require multiple groups in a single call.""" @@ -1034,7 +1038,7 @@ def _require_dataset_nosync(self, name, shape, dtype=None, exact=False, a = Array(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, synchronizer=synchronizer, cache_metadata=cache_metadata, cache_attrs=cache_attrs, - meta_array=self._meta_array) + chunk_cache=self._chunk_cache, meta_array=self._meta_array) shape = normalize_shape(shape) if shape != a.shape: raise TypeError('shape do not match existing array; expected {}, got {}' @@ -1223,7 +1227,7 @@ def _normalize_store_arg(store, *, storage_options=None, mode="r", def group(store=None, overwrite=False, chunk_store=None, - cache_attrs=True, synchronizer=None, path=None, *, zarr_version=None): + cache_attrs=True, synchronizer=None, path=None, chunk_cache=None, *, zarr_version=None): """Create a group. Parameters @@ -1292,7 +1296,8 @@ def group(store=None, overwrite=False, chunk_store=None, def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None, - chunk_store=None, storage_options=None, *, zarr_version=None, meta_array=None): + chunk_store=None, storage_options=None, chunk_cache=None, *, zarr_version=None, + meta_array=None): """Open a group using file-mode-like semantics. Parameters @@ -1395,4 +1400,4 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N return Group(store, read_only=read_only, cache_attrs=cache_attrs, synchronizer=synchronizer, path=path, chunk_store=chunk_store, - zarr_version=zarr_version, meta_array=meta_array) + chunk_cache=chunk_cache, zarr_version=zarr_version, meta_array=meta_array) diff --git a/zarr/storage.py b/zarr/storage.py index f5459990ba..249fd416a0 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -2280,7 +2280,75 @@ def __len__(self): return self.db.stat()['entries'] -class LRUStoreCache(Store): +class LRUMappingCache(MutableMapping): + """Abstract base class for Mapping Cache + """ + + def __init__(self, max_size): + self._max_size = max_size + self._current_size = 0 + self._values_cache = OrderedDict() + self._mutex = Lock() + self.hits = self.misses = 0 + + def __len__(self): + return len(self._keys()) + + def __iter__(self): + return self.keys() + + def keys(self): + with self._mutex: + return iter(self._keys()) + + def _keys(self): + raise NotImplementedError # pragma: no cover + + def _pop_value(self): + # remove the first value from the cache, as this will be the least recently + # used value + _, v = self._values_cache.popitem(last=False) + return v + + def _accommodate_value(self, value_size): + if self._max_size is None: + return + # ensure there is enough space in the cache for a new value + while self._current_size + value_size > self._max_size: + v = self._pop_value() + self._current_size -= buffer_size(v) + + def _cache_value(self, key, value): + # cache a value + value_size = buffer_size(value) + # check size of the value against max size, as if the value itself exceeds max + # size then we are never going to cache it + if self._max_size is None or value_size <= self._max_size: + self._accommodate_value(value_size) + self._values_cache[key] = value + self._current_size += value_size + + def invalidate_values(self): + """Clear the values cache.""" + with self._mutex: + self._values_cache.clear() + + def _invalidate_value(self, key): + if key in self._values_cache: + value = self._values_cache.pop(key) + self._current_size -= buffer_size(value) + + def __getitem__(self, key): + raise NotImplementedError # pragma: no cover + + def __setitem__(self, key, value): + raise NotImplementedError # pragma: no cover + + def __delitem__(self, key): + raise NotImplementedError # pragma: no cover + + +class LRUStoreCache(Store, LRUMappingCache): """Storage class that implements a least-recently-used (LRU) cache layer over some other store. Intended primarily for use with stores that can be slow to access, e.g., remote stores that require network communication to store and @@ -2318,6 +2386,7 @@ class LRUStoreCache(Store): """ def __init__(self, store: StoreLike, max_size: int): + super(LRUStoreCache, self).__init__(max_size) self._store: BaseStore = BaseStore._ensure_store(store) self._max_size = max_size self._current_size = 0 @@ -2339,12 +2408,6 @@ def __setstate__(self, state): self.misses) = state self._mutex = Lock() - def __len__(self): - return len(self._keys()) - - def __iter__(self): - return self.keys() - def __contains__(self, key): with self._mutex: if self._contains_cache is None: @@ -2355,10 +2418,6 @@ def clear(self): self._store.clear() self.invalidate() - def keys(self): - with self._mutex: - return iter(self._keys()) - def _keys(self): if self._keys_cache is None: self._keys_cache = list(self._store.keys()) @@ -2407,11 +2466,6 @@ def invalidate(self): self._invalidate_keys() self._current_size = 0 - def invalidate_values(self): - """Clear the values cache.""" - with self._mutex: - self._values_cache.clear() - def invalidate_keys(self): """Clear the keys cache.""" with self._mutex: @@ -2422,11 +2476,6 @@ def _invalidate_keys(self): self._contains_cache = None self._listdir_cache.clear() - def _invalidate_value(self, key): - if key in self._values_cache: - value = self._values_cache.pop(key) - self._current_size -= buffer_size(value) - def __getitem__(self, key): try: # first try to obtain the value from the cache @@ -2892,3 +2941,108 @@ def getsize(self, path): def listdir(self, path): return listdir(self.meta_store, path) + + +class LRUChunkCache(LRUMappingCache): + """Class that implements a least-recently-used (LRU) cache for array chunks. + Intended primarily for use with stores that can be slow to access, e.g., remote stores that + require network communication to store and retrieve data, and/or arrays where decompression + of data is computationally expensive. + + Parameters + ---------- + max_size : int + The maximum size that the cache may grow to, in number of bytes. Provide `None` + if you would like the cache to have unlimited size. + + Examples + -------- + The example below uses a dict store to store the encoded array and uses LRUChunkCache to + store decoded chunks:: + + >>> import zarr + >>> from numcodecs import LZMA + >>> import numpy as np + >>> store = zarr.MemoryStore() + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), + ... store=store, compressor=LZMA()) + >>> from timeit import timeit + >>> # data access without cache + ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP + 0.6703157789888792 + >>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None)) + >>> # first data access about the same as without cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.681269913999131 + >>> # second time accesses the decoded chunks in the cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.007617925992235541 + + """ + + def __init__(self, max_size): + super(LRUChunkCache, self).__init__(max_size) + + def __getstate__(self): + return (self._max_size, self._current_size, + self._values_cache, self.hits, + self.misses) + + def __setstate__(self, state): + (self._max_size, self._current_size, + self._values_cache, self.hits, + self.misses) = state + self._mutex = Lock() + + def __contains__(self, key): + with self._mutex: + return key in self._keys() + + def clear(self): + self.invalidate() + + def _keys(self): + return self._values_cache.keys() + + def values(self): + return self._values_cache.values() + + def items(self): + return self._values_cache.items() + + def invalidate(self): + """Completely clear the cache.""" + with self._mutex: + self._values_cache.clear() + + def close(self): + pass + + def __getitem__(self, key): + try: + # try to obtain the value from the cache + with self._mutex: + value = self._values_cache[key] + # cache hit if no KeyError is raised + self.hits += 1 + # treat the end as most recently used + self._values_cache.move_to_end(key) + + except KeyError: + # cache miss + with self._mutex: + self.misses += 1 + raise KeyError + + return value + + def __setitem__(self, key, value): + with self._mutex: + self._invalidate_value(key) + self._cache_value(key, value) + + def __delitem__(self, key): + if key not in self._values_cache: + raise KeyError + with self._mutex: + self._invalidate_value(key) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index e32026e662..b206f32951 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -31,6 +31,7 @@ FSStore, KVStore, LMDBStore, + LRUChunkCache, LRUStoreCache, NestedDirectoryStore, SQLiteStore, @@ -2514,6 +2515,103 @@ def test_store_has_bytes_values(self): pass +class TestArrayWithLRUChunkCache(TestArray): + + @staticmethod + def create_array(read_only=False, **kwargs): + store = dict() + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + + init_array(store, **kwargs) + return Array(store, read_only=read_only, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks, + chunk_cache=LRUChunkCache(max_size=None)) + + @staticmethod + def create_array_with_cache(read_only=False, **kwargs): + store = dict() + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + init_array(store, **kwargs) + cache = LRUChunkCache(max_size=None) + return Array(store, read_only=read_only, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks, + chunk_cache=cache), cache + + def test_hit_miss(self): + a = np.arange(100).reshape((10, 10)) + z, cache = self.create_array_with_cache(shape=a.shape, chunks=(10, 1), dtype=a.dtype) + + # test write cache + z[:] = a + assert cache.misses == 0 and cache.hits == 0 + z[:] + assert cache.misses == 0 and cache.hits == 10 + + cache.clear() + cache.misses = 0 + cache.hits = 0 + + # test read cache + assert cache.misses == 0 and cache.hits == 0 + z[:] + assert cache.misses == 10 and cache.hits == 0 + z[:] + assert cache.misses == 10 and cache.hits == 10 + + # noinspection PyStatementEffect + def test_array_0d_with_object_arrays(self): + # test behaviour for array with 0 dimensions + + # setup + a = np.zeros((), dtype=object) + z = self.create_array(shape=(), dtype=a.dtype, fill_value=0, object_codec=Pickle()) + + # check properties + assert a.ndim == z.ndim + assert a.shape == z.shape + assert a.size == z.size + assert a.dtype == z.dtype + assert a.nbytes == z.nbytes + with pytest.raises(TypeError): + len(z) + assert () == z.chunks + assert 1 == z.nchunks + assert (1,) == z.cdata_shape + # compressor always None - no point in compressing a single value + assert z.compressor is None + + # check __getitem__ + b = z[...] + assert isinstance(b, np.ndarray) + assert a.shape == b.shape + assert a.dtype == b.dtype + assert_array_equal(a, np.array(z)) + assert_array_equal(a, z[...]) + assert a[()] == z[()] + with pytest.raises(IndexError): + z[0] + with pytest.raises(IndexError): + z[:] + + # check __setitem__ + z[...] = 42 + assert 42 == z[()] + z[()] = 43 + assert 43 == z[()] + with pytest.raises(IndexError): + z[0] = 42 + with pytest.raises(IndexError): + z[:] = 42 + with pytest.raises(ValueError): + z[...] = np.array([1, 2, 3]) + + fsspec_mapper_kwargs = { "check": True, "create": True, diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 39d4b5988d..86a89fa3cf 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -27,7 +27,7 @@ from zarr.n5 import N5Store, N5FSStore, N5_FORMAT, n5_attrs_key from zarr.storage import (ABSStore, ConsolidatedMetadataStore, DBMStore, DictStore, DirectoryStore, KVStore, LMDBStore, - LRUStoreCache, MemoryStore, MongoDBStore, + LRUStoreCache, LRUChunkCache, MemoryStore, MongoDBStore, NestedDirectoryStore, RedisStore, SQLiteStore, Store, TempStore, ZipStore, array_meta_key, atexit_rmglob, atexit_rmtree, @@ -107,8 +107,8 @@ def test_deprecated_listdir_nosotre(): listdir(store) -class StoreTests: - """Abstract store tests.""" +class MutableMappingStoreTests(object): + """Abstract Mutable Mapping Tests""" version = 2 root = '' @@ -117,10 +117,6 @@ def create_store(self, **kwargs): # pragma: no cover # implement in sub-class raise NotImplementedError - def test_context_manager(self): - with self.create_store(): - pass - def test_get_set_del_contains(self): store = self.create_store() @@ -311,6 +307,14 @@ def test_getsize(self): store.close() + +class StoreTests(MutableMappingStoreTests): + """Abstract store tests.""" + + def test_context_manager(self): + with self.create_store(): + pass + # noinspection PyStatementEffect def test_hierarchy(self): # setup @@ -1989,15 +1993,15 @@ def create_store(self, **kwargs): return store -class TestLRUStoreCache(StoreTests): +class CacheTests: CountingClass = CountingDict - LRUStoreClass = LRUStoreCache - def create_store(self, **kwargs): - # wrapper therefore no dimension_separator argument - skip_if_nested_chunks(**kwargs) - return self.LRUStoreClass(dict(), max_size=2**27) + def create_store(self): # pragma: no cover + raise NotImplementedError + + def create_cache(self, store, max_size=None): # pragma: no cover + raise NotImplementedError def test_cache_values_no_max_size(self): @@ -2013,7 +2017,7 @@ def test_cache_values_no_max_size(self): assert 1 == store.counter['__setitem__', bar_key] # setup cache - cache = self.LRUStoreClass(store, max_size=None) + cache = self.create_cache(store) assert 0 == cache.hits assert 0 == cache.misses @@ -2052,19 +2056,6 @@ def test_cache_values_no_max_size(self): assert 3 == store.counter['__getitem__', foo_key] assert 2 == store.counter['__setitem__', foo_key] - # test __delitem__ - del cache[foo_key] - with pytest.raises(KeyError): - # noinspection PyStatementEffect - cache[foo_key] - with pytest.raises(KeyError): - # noinspection PyStatementEffect - store[foo_key] - - # verify other keys untouched - assert 0 == store.counter['__getitem__', bar_key] - assert 1 == store.counter['__setitem__', bar_key] - def test_cache_values_with_max_size(self): # setup store @@ -2076,7 +2067,7 @@ def test_cache_values_with_max_size(self): assert 0 == store.counter['__getitem__', foo_key] assert 0 == store.counter['__getitem__', bar_key] # setup cache - can only hold one item - cache = self.LRUStoreClass(store, max_size=5) + cache = self.create_cache(store, max_size=5) assert 0 == cache.hits assert 0 == cache.misses @@ -2123,7 +2114,7 @@ def test_cache_values_with_max_size(self): assert 0 == store.counter['__getitem__', foo_key] assert 0 == store.counter['__getitem__', bar_key] # setup cache - can hold two items - cache = self.LRUStoreClass(store, max_size=6) + cache = self.create_cache(store, max_size=6) assert 0 == cache.hits assert 0 == cache.misses @@ -2163,6 +2154,40 @@ def test_cache_values_with_max_size(self): assert 4 == cache.hits assert 2 == cache.misses + +class TestLRUStoreCache(StoreTests, CacheTests): + + LRUStoreClass = LRUStoreCache + + def create_store(self, **kwargs): + # wrapper therefore no dimension_separator argument + skip_if_nested_chunks(**kwargs) + return self.LRUStoreClass(dict(), max_size=2**27) + + def create_cache(self, store, max_size=None): + return self.LRUStoreClass(store=store, max_size=max_size) + + def test_delitem(self): + # setup store + store = self.CountingClass() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + + # setup cache + cache = self.create_cache(store) + + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + store['foo'] + + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + def test_cache_keys(self): # setup @@ -2175,7 +2200,7 @@ def test_cache_keys(self): assert 0 == store.counter['__contains__', foo_key] assert 0 == store.counter['__iter__'] assert 0 == store.counter['keys'] - cache = self.LRUStoreClass(store, max_size=None) + cache = self.create_cache(store) # keys should be cached on first call keys = sorted(cache.keys()) @@ -2225,6 +2250,70 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] +class TestLRUChunkCache(MutableMappingStoreTests, CacheTests): + + # mock test object that will act as both the cache and the array + class MockChunkCacheArray(object): + + def __init__(self, chunk_cache, store): + self.chunk_cache = chunk_cache + self._store = store + self.hits = 0 + self.misses = 0 + + def __setitem__(self, key, value): + self._store[key] = value + self.chunk_cache[key] = value + self._reset_hits_misses() + + def __getitem__(self, item): + try: + value = self.chunk_cache[item] + except KeyError: + value = self._store[item] + self.chunk_cache[item] = value + self._reset_hits_misses() + return value + + def __delitem__(self, key): + self.chunk_cache.__delitem__(key) + + def _reset_hits_misses(self): + self.hits = self.chunk_cache.hits + self.misses = self.chunk_cache.misses + + def invalidate(self): + self.chunk_cache.invalidate() + + def invalidate_values(self): + self.chunk_cache.invalidate_values() + + def create_store(self): + return LRUChunkCache(max_size=2**27) + + def create_cache(self, store, max_size=None): + return self.MockChunkCacheArray(LRUChunkCache(max_size=max_size), store=store) + + def test_delitem(self): + # setup store + store = self.CountingClass() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + + # setup cache + cache = self.create_cache(store) + + cache['foo'] + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache.chunk_cache['foo'] + + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + + def test_getsize(): store = KVStore(dict()) store['foo'] = b'aaa'