From 318b187aa78f6331cb36e39c941fefc514abc98a Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Sat, 30 Dec 2017 22:44:18 +0000 Subject: [PATCH 1/9] initial work on lru store cache --- zarr/storage.py | 123 ++++++++++++++++++++++++++++++++++++- zarr/tests/test_storage.py | 10 ++- 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/zarr/storage.py b/zarr/storage.py index 4d25e6a634..17f777963d 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -7,7 +7,7 @@ """ from __future__ import absolute_import, print_function, division -from collections import MutableMapping +from collections import MutableMapping, OrderedDict import os import tempfile import zipfile @@ -1676,3 +1676,124 @@ def __iter__(self): def __len__(self): return self.db.stat()['entries'] + + +class LRUStoreCache(MutableMapping): + + def __init__(self, store, max_size): + self._store = store + self._max_size = max_size + self._current_size = 0 + self._keys_cache = None + self._listdir_cache = dict() + self._values_cache = OrderedDict() + self._mutex = Lock() + self.hits = self.misses = 0 + + def __getstate__(self): + return (self._store, self._max_size, self._current_size, self._keys_cache, + self._values_cache, self.hits, self.misses) + + def __setstate__(self, state): + (self._store, self._max_size, self._current_size, self._keys_cache, + self._values_cache, self.hits, self.misses) = state + self._mutex = Lock() + + def __len__(self): + return len(self._store) + + def __iter__(self): + return self.keys() + + def keys(self): + with self._mutex: + if self._keys_cache is None: + self._keys_cache = list(self._store.keys()) + return iter(self._keys_cache) + + def listdir(self, path): + with self._mutex: + try: + return self._listdir_cache[path] + except KeyError: + listing = listdir(self._store, path) + self._listdir_cache[path] = listing + return listing + + def _pop_value(self): + # remove the first value from the cache, as this will be the least recently + # used value + _, v = self._values_cache.popitem(last=False) + return v + + def _accommodate_value(self, value_size): + if self._max_size is None: + return + # ensure there is enough space in the cache for a new value + while self._current_size + value_size > self._max_size: + v = self._pop_value() + self._current_size -= buffer_size(v) + + def _cache_value(self, key, value): + # cache a value + value_size = buffer_size(value) + # check size of the value against max size, as if the value itself exceeds max + # size then we are never going to cache it + if self._max_size is None or value_size <= self._max_size: + self._accommodate_value(value_size) + self._values_cache[key] = value + self._current_size += value_size + + def clear_values(self): + with self._mutex: + self._values_cache.clear() + + def clear_keys(self): + with self._mutex: + self._keys_cache = None + self._listdir_cache.clear() + + def _clear_value(self, key): + if key in self._values_cache: + value = self._values_cache.pop(key) + self._current_size -= buffer_size(value) + + def __getitem__(self, key): + try: + # first try to obtain the value from the cache + with self._mutex: + value = self._values_cache[key] + # cache hit if no KeyError is raised + self.hits += 1 + # treat the end as most recently used + self._values_cache.move_to_end(key) + + except KeyError: + # cache miss, retrieve value from the store + value = self._store[key] + with self._mutex: + self.misses += 1 + # need to check if key is not in the cache, as it may have been cached + # while we were retrieving the value from the store + if key not in self._values_cache: + self._cache_value(key, value) + + return value + + def __setitem__(self, key, value): + self._store[key] = value + with self._mutex: + # clear keys + self._keys_cache = None + # clear value + self._clear_value(key) + # cache new value + self._cache_value(key, value) + + def __delitem__(self, key): + del self._store[key] + with self._mutex: + # clear keys + self._keys_cache = None + # clear value + self._clear_value(key) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index d5c7c1e067..23cda15fc8 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -20,7 +20,7 @@ DirectoryStore, ZipStore, init_group, group_meta_key, getsize, migrate_1to2, TempStore, atexit_rmtree, NestedDirectoryStore, default_compressor, DBMStore, - LMDBStore, atexit_rmglob) + LMDBStore, atexit_rmglob, LRUStoreCache) from zarr.meta import (decode_array_metadata, encode_array_metadata, ZARR_FORMAT, decode_group_metadata, encode_group_metadata) from zarr.compat import PY2 @@ -846,6 +846,14 @@ def test_context_manager(self): eq(2, len(store)) +class TestLRUStoreCache(StoreTests, unittest.TestCase): + + def create_store(self): + return LRUStoreCache(dict(), max_size=2**27) + + # TODO test caching + + def test_getsize(): store = dict() store['foo'] = b'aaa' From 228924aeb535d6927715afd29dc84f6680235d1d Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Sat, 30 Dec 2017 23:48:22 +0000 Subject: [PATCH 2/9] add store cache tests; work through getsize failures --- zarr/storage.py | 68 +++++++++++++---------- zarr/tests/test_core.py | 13 ++++- zarr/tests/test_hierarchy.py | 11 +++- zarr/tests/test_storage.py | 102 ++++++++++++++++++++++++++--------- zarr/tests/util.py | 31 +++++++++++ zarr/util.py | 2 +- 6 files changed, 171 insertions(+), 56 deletions(-) create mode 100644 zarr/tests/util.py diff --git a/zarr/storage.py b/zarr/storage.py index 17f777963d..7381c96273 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -143,18 +143,23 @@ def getsize(store, path=None): return store.getsize(path) elif isinstance(store, dict): # compute from size of values - prefix = _path_to_prefix(path) - size = 0 - for k in listdir(store, path): - try: - v = store[prefix + k] - except KeyError: - pass - else: + if path in store: + v = store[path] + size = buffer_size(v) + else: + members = listdir(store, path) + prefix = _path_to_prefix(path) + size = 0 + for k in members: try: - size += buffer_size(v) - except TypeError: - return -1 + v = store[prefix + k] + except KeyError: + pass + else: + try: + size += buffer_size(v) + except TypeError: + return -1 return size else: return -1 @@ -610,16 +615,21 @@ def getsize(self, path=None): path = normalize_storage_path(path) # obtain value to return size of - value = self.root + value = None if path: try: parent, key = self._get_parent(path) value = parent[key] except KeyError: - err_path_not_found(path) + pass + else: + value = self.root # obtain size of value - if isinstance(value, self.cls): + if value is None: + return 0 + + elif isinstance(value, self.cls): # total size for directory size = 0 for v in value.values(): @@ -629,6 +639,7 @@ def getsize(self, path=None): except TypeError: return -1 return size + else: try: return buffer_size(value) @@ -843,7 +854,7 @@ def getsize(self, path=None): size += os.path.getsize(child_fs_path) return size else: - err_path_not_found(path) + return 0 def clear(self): shutil.rmtree(self.path) @@ -1230,7 +1241,7 @@ def getsize(self, path=None): info = self.zf.getinfo(path) return info.compress_size except KeyError: - err_path_not_found(path) + return 0 else: return 0 @@ -1692,11 +1703,11 @@ def __init__(self, store, max_size): def __getstate__(self): return (self._store, self._max_size, self._current_size, self._keys_cache, - self._values_cache, self.hits, self.misses) + self._listdir_cache, self._values_cache, self.hits, self.misses) def __setstate__(self, state): (self._store, self._max_size, self._current_size, self._keys_cache, - self._values_cache, self.hits, self.misses) = state + self._listdir_cache, self._values_cache, self.hits, self.misses) = state self._mutex = Lock() def __len__(self): @@ -1711,7 +1722,7 @@ def keys(self): self._keys_cache = list(self._store.keys()) return iter(self._keys_cache) - def listdir(self, path): + def listdir(self, path=None): with self._mutex: try: return self._listdir_cache[path] @@ -1720,6 +1731,9 @@ def listdir(self, path): self._listdir_cache[path] = listing return listing + def getsize(self, path=None): + return getsize(self._store, path=path) + def _pop_value(self): # remove the first value from the cache, as this will be the least recently # used value @@ -1750,8 +1764,11 @@ def clear_values(self): def clear_keys(self): with self._mutex: - self._keys_cache = None - self._listdir_cache.clear() + self._clear_keys() + + def _clear_keys(self): + self._keys_cache = None + self._listdir_cache.clear() def _clear_value(self, key): if key in self._values_cache: @@ -1783,17 +1800,12 @@ def __getitem__(self, key): def __setitem__(self, key, value): self._store[key] = value with self._mutex: - # clear keys - self._keys_cache = None - # clear value + self._clear_keys() self._clear_value(key) - # cache new value self._cache_value(key, value) def __delitem__(self, key): del self._store[key] with self._mutex: - # clear keys - self._keys_cache = None - # clear value + self._clear_keys() self._clear_value(key) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index c50328422e..3867befdec 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -18,7 +18,8 @@ from zarr.storage import (DirectoryStore, init_array, init_group, NestedDirectoryStore, - DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob) + DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob, + LRUStoreCache) from zarr.core import Array from zarr.errors import PermissionError from zarr.compat import PY2, text_type, binary_type @@ -1615,3 +1616,13 @@ def test_cache_metadata(self): def test_object_arrays_danger(self): # skip this one as it only works if metadata are cached pass + + +class TestArrayWithStoreCache(TestArray): + + @staticmethod + def create_array(read_only=False, **kwargs): + store = LRUStoreCache(dict(), max_size=None) + kwargs.setdefault('compressor', Zlib(level=1)) + init_array(store, **kwargs) + return Array(store, read_only=read_only) diff --git a/zarr/tests/test_hierarchy.py b/zarr/tests/test_hierarchy.py index 2273568a0e..e19472d214 100644 --- a/zarr/tests/test_hierarchy.py +++ b/zarr/tests/test_hierarchy.py @@ -20,7 +20,8 @@ from zarr.storage import (DictStore, DirectoryStore, ZipStore, init_group, init_array, array_meta_key, group_meta_key, atexit_rmtree, - NestedDirectoryStore, DBMStore, LMDBStore, atexit_rmglob) + NestedDirectoryStore, DBMStore, LMDBStore, atexit_rmglob, + LRUStoreCache) from zarr.core import Array from zarr.compat import PY2, text_type from zarr.hierarchy import Group, group, open_group @@ -945,6 +946,14 @@ def test_chunk_store(self): eq(expect, actual) +class TestGroupWithStoreCache(TestGroup): + + @staticmethod + def create_store(): + store = LRUStoreCache(dict(), max_size=None) + return store, None + + def test_group(): # test the group() convenience function diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 23cda15fc8..760635d453 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -14,6 +14,7 @@ from numpy.testing import assert_array_equal, assert_array_almost_equal from nose import SkipTest from nose.tools import assert_raises, eq_ as eq, assert_is_none +import pytest from zarr.storage import (init_array, array_meta_key, attrs_key, DictStore, @@ -27,6 +28,7 @@ from zarr.codecs import Zlib, Blosc, BZ2 from zarr.errors import PermissionError from zarr.hierarchy import group +from zarr.tests.util import CountingDict class StoreTests(object): @@ -139,23 +141,23 @@ def test_pickle(self): def test_getsize(self): store = self.create_store() - if hasattr(store, 'getsize'): - eq(0, store.getsize()) + if isinstance(store, dict) or hasattr(store, 'getsize'): + eq(0, getsize(store)) store['foo'] = b'x' - eq(1, store.getsize()) - eq(1, store.getsize('foo')) + eq(1, getsize(store)) + eq(1, getsize(store, 'foo')) store['bar'] = b'yy' - eq(3, store.getsize()) - eq(2, store.getsize('bar')) + eq(3, getsize(store)) + eq(2, getsize(store, 'bar')) store['baz'] = bytearray(b'zzz') - eq(6, store.getsize()) - eq(3, store.getsize('baz')) + eq(6, getsize(store)) + eq(3, getsize(store, 'baz')) store['quux'] = array.array('B', b'zzzz') - eq(10, store.getsize()) - eq(4, store.getsize('quux')) + eq(10, getsize(store)) + eq(4, getsize(store, 'quux')) store['spong'] = np.frombuffer(b'zzzzz', dtype='u1') - eq(15, store.getsize()) - eq(5, store.getsize('spong')) + eq(15, getsize(store)) + eq(5, getsize(store, 'spong')) # noinspection PyStatementEffect def test_hierarchy(self): @@ -197,18 +199,13 @@ def test_hierarchy(self): eq(6, store.getsize('c/e')) eq(3, store.getsize('c/e/f')) eq(3, store.getsize('c/e/g')) - with assert_raises(ValueError): - store.getsize('x') - with assert_raises(ValueError): - store.getsize('a/x') - with assert_raises(ValueError): - store.getsize('c/x') - with assert_raises(ValueError): - store.getsize('c/x/y') - with assert_raises(ValueError): - store.getsize('c/d/y') - with assert_raises(ValueError): - store.getsize('c/d/y/z') + # non-existent paths + eq(0, store.getsize('x')) + eq(0, store.getsize('a/x')) + eq(0, store.getsize('c/x')) + eq(0, store.getsize('c/x/y')) + eq(0, store.getsize('c/d/y')) + eq(0, store.getsize('c/d/y/z')) # test listdir (optional) if hasattr(store, 'listdir'): @@ -851,7 +848,62 @@ class TestLRUStoreCache(StoreTests, unittest.TestCase): def create_store(self): return LRUStoreCache(dict(), max_size=2**27) - # TODO test caching + def test_cache_values(self): + + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__getitem__', 'foo'] + assert 1 == store.counter['__setitem__', 'foo'] + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + + # setup cache + cache = LRUStoreCache(store, max_size=None) + assert 0 == cache.hits + assert 0 == cache.misses + + # test first __getitem__, cache miss + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == store.counter['__setitem__', 'foo'] + assert 0 == cache.hits + assert 1 == cache.misses + + # test second __getitem__, cache hit + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == store.counter['__setitem__', 'foo'] + assert 1 == cache.hits + assert 1 == cache.misses + + # test __setitem__, __getitem__ + cache['foo'] = b'zzz' + assert 1 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] + # should be a cache hit + assert b'zzz' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] + assert 2 == cache.hits + assert 1 == cache.misses + + # test __delitem__ + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + store['foo'] + + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + + # TODO test max size + # TODO test key caching def test_getsize(): diff --git a/zarr/tests/util.py b/zarr/tests/util.py new file mode 100644 index 0000000000..5fcee7ea64 --- /dev/null +++ b/zarr/tests/util.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division +import collections + + +class CountingDict(collections.MutableMapping): + + def __init__(self): + self.wrapped = dict() + self.counter = collections.Counter() + + def __len__(self): + return len(self.wrapped) + + def __iter__(self): + return iter(self.wrapped) + + def __contains__(self, item): + return item in self.wrapped + + def __getitem__(self, item): + self.counter['__getitem__', item] += 1 + return self.wrapped[item] + + def __setitem__(self, key, value): + self.counter['__setitem__', key] += 1 + self.wrapped[key] = value + + def __delitem__(self, key): + self.counter['__delitem__', key] += 1 + del self.wrapped[key] diff --git a/zarr/util.py b/zarr/util.py index 3b43e3a0d6..b74671247b 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -318,7 +318,7 @@ def buffer_size(v): return v.buffer_info()[1] * v.itemsize else: # pragma: py2 no cover v = memoryview(v) - return reduce(operator.mul, v.shape) * v.itemsize + return reduce(operator.mul, v.shape, 1) * v.itemsize def info_text_report(items): From 5e5efc5122e6d994b5d405f3760d2c895b0fb32d Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Sun, 31 Dec 2017 00:03:13 +0000 Subject: [PATCH 3/9] py2 compat --- zarr/compat.py | 7 +++++++ zarr/storage.py | 8 ++++---- zarr/util.py | 5 ++++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/zarr/compat.py b/zarr/compat.py index 38ab245f58..9be3384123 100644 --- a/zarr/compat.py +++ b/zarr/compat.py @@ -16,9 +16,16 @@ class PermissionError(Exception): pass + def OrderedDict_move_to_end(od, key): + od[key] = od.pop(key) + + else: # pragma: py2 no cover text_type = str binary_type = bytes from functools import reduce PermissionError = PermissionError + + def OrderedDict_move_to_end(od, key): + od.move_to_end(key) diff --git a/zarr/storage.py b/zarr/storage.py index 7381c96273..86419a8125 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -28,10 +28,10 @@ normalize_storage_path, buffer_size, normalize_fill_value, nolock, normalize_dtype) from zarr.meta import encode_array_metadata, encode_group_metadata -from zarr.compat import PY2, binary_type +from zarr.compat import PY2, binary_type, OrderedDict_move_to_end from numcodecs.registry import codec_registry -from zarr.errors import (err_contains_group, err_contains_array, err_path_not_found, - err_bad_compressor, err_fspath_exists_notdir, err_read_only) +from zarr.errors import (err_contains_group, err_contains_array, err_bad_compressor, + err_fspath_exists_notdir, err_read_only) array_meta_key = '.zarray' @@ -1783,7 +1783,7 @@ def __getitem__(self, key): # cache hit if no KeyError is raised self.hits += 1 # treat the end as most recently used - self._values_cache.move_to_end(key) + OrderedDict_move_to_end(self._values_cache, key) except KeyError: # cache miss, retrieve value from the store diff --git a/zarr/util.py b/zarr/util.py index b74671247b..aedff4ab94 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -318,7 +318,10 @@ def buffer_size(v): return v.buffer_info()[1] * v.itemsize else: # pragma: py2 no cover v = memoryview(v) - return reduce(operator.mul, v.shape, 1) * v.itemsize + if v.shape: + return reduce(operator.mul, v.shape) * v.itemsize + else: + return v.itemsize def info_text_report(items): From efbec722f3f21c1cd20510c6581ebb7d7d1f789d Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Sun, 31 Dec 2017 14:58:28 +0000 Subject: [PATCH 4/9] add test with max size --- zarr/tests/test_storage.py | 98 +++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 760635d453..f1a9a5ae50 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -848,7 +848,7 @@ class TestLRUStoreCache(StoreTests, unittest.TestCase): def create_store(self): return LRUStoreCache(dict(), max_size=2**27) - def test_cache_values(self): + def test_cache_values_no_max_size(self): # setup store store = CountingDict() @@ -903,6 +903,102 @@ def test_cache_values(self): assert 1 == store.counter['__setitem__', 'bar'] # TODO test max size + def test_cache_values_with_max_size(self): + + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__getitem__', 'foo'] + assert 0 == store.counter['__getitem__', 'bar'] + # setup cache - can only hold one item + cache = LRUStoreCache(store, max_size=5) + assert 0 == cache.hits + assert 0 == cache.misses + + # test first 'foo' __getitem__, cache miss + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 0 == cache.hits + assert 1 == cache.misses + + # test second 'foo' __getitem__, cache hit + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == cache.hits + assert 1 == cache.misses + + # test first 'bar' __getitem__, cache miss + assert b'yyy' == cache['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 1 == cache.hits + assert 2 == cache.misses + + # test second 'bar' __getitem__, cache hit + assert b'yyy' == cache['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 2 == cache.hits + assert 2 == cache.misses + + # test 'foo' __getitem__, should have been evicted, cache miss + assert b'xxx' == cache['foo'] + assert 2 == store.counter['__getitem__', 'foo'] + assert 2 == cache.hits + assert 3 == cache.misses + + # test 'bar' __getitem__, should have been evicted, cache miss + assert b'yyy' == cache['bar'] + assert 2 == store.counter['__getitem__', 'bar'] + assert 2 == cache.hits + assert 4 == cache.misses + + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__getitem__', 'foo'] + assert 0 == store.counter['__getitem__', 'bar'] + # setup cache - can hold two items + cache = LRUStoreCache(store, max_size=6) + assert 0 == cache.hits + assert 0 == cache.misses + + # test first 'foo' __getitem__, cache miss + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 0 == cache.hits + assert 1 == cache.misses + + # test second 'foo' __getitem__, cache hit + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == cache.hits + assert 1 == cache.misses + + # test first 'bar' __getitem__, cache miss + assert b'yyy' == cache['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 1 == cache.hits + assert 2 == cache.misses + + # test second 'bar' __getitem__, cache hit + assert b'yyy' == cache['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 2 == cache.hits + assert 2 == cache.misses + + # test 'foo' __getitem__, should still be cached + assert b'xxx' == cache['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 3 == cache.hits + assert 2 == cache.misses + + # test 'bar' __getitem__, should still be cached + assert b'yyy' == cache['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 4 == cache.hits + assert 2 == cache.misses + # TODO test key caching From dfe1fa746ecf026304f7c2d0fdd794b4dcf808bd Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Mon, 1 Jan 2018 23:36:55 +0000 Subject: [PATCH 5/9] add tests for keys caching --- zarr/storage.py | 21 +++++++++--- zarr/tests/test_storage.py | 65 ++++++++++++++++++++++++++++++++++++-- zarr/tests/util.py | 7 ++++ 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/zarr/storage.py b/zarr/storage.py index 86419a8125..29011fa6f7 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1696,6 +1696,7 @@ def __init__(self, store, max_size): self._max_size = max_size self._current_size = 0 self._keys_cache = None + self._contains_cache = None self._listdir_cache = dict() self._values_cache = OrderedDict() self._mutex = Lock() @@ -1703,11 +1704,13 @@ def __init__(self, store, max_size): def __getstate__(self): return (self._store, self._max_size, self._current_size, self._keys_cache, - self._listdir_cache, self._values_cache, self.hits, self.misses) + self._contains_cache, self._listdir_cache, self._values_cache, self.hits, + self.misses) def __setstate__(self, state): (self._store, self._max_size, self._current_size, self._keys_cache, - self._listdir_cache, self._values_cache, self.hits, self.misses) = state + self._contains_cache, self._listdir_cache, self._values_cache, self.hits, + self.misses) = state self._mutex = Lock() def __len__(self): @@ -1716,10 +1719,19 @@ def __len__(self): def __iter__(self): return self.keys() + def __contains__(self, key): + with self._mutex: + if self._contains_cache is None: + self._contains_cache = set(self._keys()) + return key in self._contains_cache + def keys(self): with self._mutex: - if self._keys_cache is None: - self._keys_cache = list(self._store.keys()) + return self._keys() + + def _keys(self): + if self._keys_cache is None: + self._keys_cache = list(self._store.keys()) return iter(self._keys_cache) def listdir(self, path=None): @@ -1768,6 +1780,7 @@ def clear_keys(self): def _clear_keys(self): self._keys_cache = None + self._contains_cache = None self._listdir_cache.clear() def _clear_value(self, key): diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index f1a9a5ae50..4bb7a7de1b 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -889,6 +889,12 @@ def test_cache_values_no_max_size(self): assert 2 == cache.hits assert 1 == cache.misses + # manually clear all cached values + cache.clear_values() + assert b'zzz' == cache['foo'] + assert 2 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] + # test __delitem__ del cache['foo'] with pytest.raises(KeyError): @@ -902,7 +908,6 @@ def test_cache_values_no_max_size(self): assert 0 == store.counter['__getitem__', 'bar'] assert 1 == store.counter['__setitem__', 'bar'] - # TODO test max size def test_cache_values_with_max_size(self): # setup store @@ -999,7 +1004,63 @@ def test_cache_values_with_max_size(self): assert 4 == cache.hits assert 2 == cache.misses - # TODO test key caching + def test_cache_keys(self): + + # setup + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__contains__', 'foo'] + assert 0 == store.counter['__iter__'] + assert 0 == store.counter['keys'] + cache = LRUStoreCache(store, max_size=None) + + # keys should be cached on first call + keys = sorted(cache.keys()) + assert keys == ['bar', 'foo'] + assert 1 == store.counter['keys'] + # keys should now be cached + assert keys == sorted(cache.keys()) + assert 1 == store.counter['keys'] + assert 'foo' in cache + assert 0 == store.counter['__contains__', 'foo'] + assert keys == sorted(cache) + assert 0 == store.counter['__iter__'] + assert 1 == store.counter['keys'] + + # cache should be cleared if store is modified - crude but simple for now + cache['baz'] = b'zzz' + keys = sorted(cache.keys()) + assert keys == ['bar', 'baz', 'foo'] + assert 2 == store.counter['keys'] + # keys should now be cached + assert keys == sorted(cache.keys()) + assert 2 == store.counter['keys'] + + # manually clear keys + cache.clear_keys() + keys = sorted(cache.keys()) + assert keys == ['bar', 'baz', 'foo'] + assert 3 == store.counter['keys'] + assert 0 == store.counter['__contains__', 'foo'] + assert 0 == store.counter['__iter__'] + cache.clear_keys() + keys = sorted(cache) + assert keys == ['bar', 'baz', 'foo'] + assert 4 == store.counter['keys'] + assert 0 == store.counter['__contains__', 'foo'] + assert 0 == store.counter['__iter__'] + cache.clear_keys() + assert 'foo' in cache + assert 5 == store.counter['keys'] + assert 0 == store.counter['__contains__', 'foo'] + assert 0 == store.counter['__iter__'] + + # check these would get counted if called directly + assert 'foo' in store + assert 1 == store.counter['__contains__', 'foo'] + assert keys == sorted(store) + assert 1 == store.counter['__iter__'] def test_getsize(): diff --git a/zarr/tests/util.py b/zarr/tests/util.py index 5fcee7ea64..def9c61132 100644 --- a/zarr/tests/util.py +++ b/zarr/tests/util.py @@ -10,12 +10,19 @@ def __init__(self): self.counter = collections.Counter() def __len__(self): + self.counter['__len__'] += 1 return len(self.wrapped) + def keys(self): + self.counter['keys'] += 1 + return self.wrapped.keys() + def __iter__(self): + self.counter['__iter__'] += 1 return iter(self.wrapped) def __contains__(self, item): + self.counter['__contains__', item] += 1 return item in self.wrapped def __getitem__(self, item): From 24f175f4e36bba5b2ea6a99163d3854ff93e22e4 Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Tue, 2 Jan 2018 09:05:51 +0000 Subject: [PATCH 6/9] API docs for LRUStoreCache --- docs/api/storage.rst | 11 +++++++++ zarr/__init__.py | 2 +- zarr/storage.py | 56 ++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/docs/api/storage.rst b/docs/api/storage.rst index 04b0c899fd..a90009bfdb 100644 --- a/docs/api/storage.rst +++ b/docs/api/storage.rst @@ -21,6 +21,17 @@ Storage (``zarr.storage``) .. automethod:: close .. automethod:: flush +.. autoclass:: LRUStoreCache + + .. automethod:: clear_values + .. automethod:: clear_keys + .. autofunction:: init_array .. autofunction:: init_group +.. autofunction:: contains_array +.. autofunction:: contains_group +.. autofunction:: listdir +.. autofunction:: rmdir +.. autofunction:: getsize +.. autofunction:: rename .. autofunction:: migrate_1to2 diff --git a/zarr/__init__.py b/zarr/__init__.py index 89068f6dde..f0fd7a1c80 100644 --- a/zarr/__init__.py +++ b/zarr/__init__.py @@ -7,7 +7,7 @@ from zarr.creation import (empty, zeros, ones, full, array, empty_like, zeros_like, ones_like, full_like, open_array, open_like, create) from zarr.storage import (DictStore, DirectoryStore, ZipStore, TempStore, - NestedDirectoryStore, DBMStore, LMDBStore) + NestedDirectoryStore, DBMStore, LMDBStore, LRUStoreCache) from zarr.hierarchy import group, open_group, Group from zarr.sync import ThreadSynchronizer, ProcessSynchronizer from zarr.codecs import * diff --git a/zarr/storage.py b/zarr/storage.py index 29011fa6f7..cd1c92645e 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -5,6 +5,15 @@ :mod:`collections` module in the Python standard library can be used as a Zarr array store, as long as it accepts string (str) keys and bytes values. +In addition to the :class:`MutableMapping` interface, store classes may also implement +optional methods `listdir` (list members of a "directory") and `rmdir` (remove all +members of a "directory"). These methods should be implemented if the store class is +aware of the hierarchical organisation of resources within the store and can provide +efficient implementations. If these methods are not available, Zarr will fall back to +slower implementations that work via the :class:`MutableMapping` interface. Store +classes may also optionally implement a `rename` method (rename all members under a given +path) and a `getsize` method (return the size in bytes of a given value). + """ from __future__ import absolute_import, print_function, division from collections import MutableMapping, OrderedDict @@ -80,7 +89,9 @@ def _rmdir_from_keys(store, path=None): def rmdir(store, path=None): - """Remove all items under the given path.""" + """Remove all items under the given path. If `store` provides a `rmdir` method, + this will be called, otherwise will fall back to implementation via the + `MutableMapping` interface.""" path = normalize_storage_path(path) if hasattr(store, 'rmdir'): # pass through @@ -101,7 +112,9 @@ def _rename_from_keys(store, src_path, dst_path): def rename(store, src_path, dst_path): - """Rename all items under the given path.""" + """Rename all items under the given path. If `store` provides a `rename` method, + this will be called, otherwise will fall back to implementation via the + `MutableMapping` interface.""" src_path = normalize_storage_path(src_path) dst_path = normalize_storage_path(dst_path) if hasattr(store, 'rename'): @@ -125,7 +138,9 @@ def _listdir_from_keys(store, path=None): def listdir(store, path=None): - """Obtain a directory listing for the given path.""" + """Obtain a directory listing for the given path. If `store` provides a `listdir` + method, this will be called, otherwise will fall back to implementation via the + `MutableMapping` interface.""" path = normalize_storage_path(path) if hasattr(store, 'listdir'): # pass through @@ -136,7 +151,8 @@ def listdir(store, path=None): def getsize(store, path=None): - """Compute size of stored items for a given path.""" + """Compute size of stored items for a given path. If `store` provides a `getsize` + method, this will be called, otherwise will return -1.""" path = normalize_storage_path(path) if hasattr(store, 'getsize'): # pass through @@ -868,6 +884,7 @@ def atexit_rmtree(path, rmtree(path) +# noinspection PyShadowingNames def atexit_rmglob(path, glob=glob.glob, isdir=os.path.isdir, @@ -1690,6 +1707,35 @@ def __len__(self): class LRUStoreCache(MutableMapping): + """Storage class that implements a least-recently-used (LRU) cache layer over + some other store. Intended primarily for use with stores that can be slow to + access, e.g., remote stores that require network communication to store and + retrieve data. + + Parameters + ---------- + store : MutableMapping + The store containing the actual data to be cached. + max_size : int + The maximum size that the cache may grow to, in number of bytes. Provide `None` + if you would like the cache to have unlimited size. + + Examples + -------- + The example below wraps an S3 store with an LRU cache:: + + >>> import s3fs + >>> import zarr + >>> s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name='eu-west-2')) + >>> store = s3fs.S3Map(root='zarr-demo/store', s3=s3, check=False) + >>> cache = zarr.LRUStoreCache(store, max_size=2**28) + >>> root = zarr.group(store=cache) + >>> z = root['foo/bar/baz'] + >>> from timeit import timeit + >>> timeit('print(z.tostring())', number=1) # first time is relatively slow + >>> timeit('print(z.tostring())', number=1) # second time is fast, uses cache + + """ def __init__(self, store, max_size): self._store = store @@ -1771,10 +1817,12 @@ def _cache_value(self, key, value): self._current_size += value_size def clear_values(self): + """Clear the values cache.""" with self._mutex: self._values_cache.clear() def clear_keys(self): + """Clear the keys cache.""" with self._mutex: self._clear_keys() From 9d58243cc7a8e8f1c31b004a302d64573333ce77 Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Tue, 2 Jan 2018 11:20:18 +0000 Subject: [PATCH 7/9] fix API docs for lru store cache --- docs/api/storage.rst | 5 +++-- zarr/storage.py | 44 ++++++++++++++++++++++++++------------ zarr/tests/test_storage.py | 16 ++++++++------ 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/docs/api/storage.rst b/docs/api/storage.rst index a90009bfdb..2365359fa9 100644 --- a/docs/api/storage.rst +++ b/docs/api/storage.rst @@ -23,8 +23,9 @@ Storage (``zarr.storage``) .. autoclass:: LRUStoreCache - .. automethod:: clear_values - .. automethod:: clear_keys + .. automethod:: invalidate + .. automethod:: invalidate_values + .. automethod:: invalidate_keys .. autofunction:: init_array .. autofunction:: init_group diff --git a/zarr/storage.py b/zarr/storage.py index cd1c92645e..7a5273c044 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1732,8 +1732,14 @@ class LRUStoreCache(MutableMapping): >>> root = zarr.group(store=cache) >>> z = root['foo/bar/baz'] >>> from timeit import timeit - >>> timeit('print(z.tostring())', number=1) # first time is relatively slow - >>> timeit('print(z.tostring())', number=1) # second time is fast, uses cache + >>> # first data access is relatively slow, retrieved from store + ... timeit('print(z[:].tostring())', number=1, globals=globals()) # doctest: +SKIP + b'Hello from the cloud!' + 0.1081731989979744 + >>> # second data access is faster, uses cache + ... timeit('print(z[:].tostring())', number=1, globals=globals()) # doctest: +SKIP + b'Hello from the cloud!' + 0.0009490990014455747 """ @@ -1760,7 +1766,7 @@ def __setstate__(self, state): self._mutex = Lock() def __len__(self): - return len(self._store) + return len(self._keys()) def __iter__(self): return self.keys() @@ -1771,14 +1777,18 @@ def __contains__(self, key): self._contains_cache = set(self._keys()) return key in self._contains_cache + def clear(self): + self._store.clear() + self.invalidate() + def keys(self): with self._mutex: - return self._keys() + return iter(self._keys()) def _keys(self): if self._keys_cache is None: self._keys_cache = list(self._store.keys()) - return iter(self._keys_cache) + return self._keys_cache def listdir(self, path=None): with self._mutex: @@ -1816,22 +1826,28 @@ def _cache_value(self, key, value): self._values_cache[key] = value self._current_size += value_size - def clear_values(self): + def invalidate(self): + """Completely clear the cache.""" + with self._mutex: + self._values_cache.clear() + self._invalidate_keys() + + def invalidate_values(self): """Clear the values cache.""" with self._mutex: self._values_cache.clear() - def clear_keys(self): + def invalidate_keys(self): """Clear the keys cache.""" with self._mutex: - self._clear_keys() + self._invalidate_keys() - def _clear_keys(self): + def _invalidate_keys(self): self._keys_cache = None self._contains_cache = None self._listdir_cache.clear() - def _clear_value(self, key): + def _invalidate_value(self, key): if key in self._values_cache: value = self._values_cache.pop(key) self._current_size -= buffer_size(value) @@ -1861,12 +1877,12 @@ def __getitem__(self, key): def __setitem__(self, key, value): self._store[key] = value with self._mutex: - self._clear_keys() - self._clear_value(key) + self._invalidate_keys() + self._invalidate_value(key) self._cache_value(key, value) def __delitem__(self, key): del self._store[key] with self._mutex: - self._clear_keys() - self._clear_value(key) + self._invalidate_keys() + self._invalidate_value(key) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 4bb7a7de1b..ec8443e778 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -889,11 +889,15 @@ def test_cache_values_no_max_size(self): assert 2 == cache.hits assert 1 == cache.misses - # manually clear all cached values - cache.clear_values() + # manually invalidate all cached values + cache.invalidate_values() assert b'zzz' == cache['foo'] assert 2 == store.counter['__getitem__', 'foo'] assert 2 == store.counter['__setitem__', 'foo'] + cache.invalidate() + assert b'zzz' == cache['foo'] + assert 3 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] # test __delitem__ del cache['foo'] @@ -1037,20 +1041,20 @@ def test_cache_keys(self): assert keys == sorted(cache.keys()) assert 2 == store.counter['keys'] - # manually clear keys - cache.clear_keys() + # manually invalidate keys + cache.invalidate_keys() keys = sorted(cache.keys()) assert keys == ['bar', 'baz', 'foo'] assert 3 == store.counter['keys'] assert 0 == store.counter['__contains__', 'foo'] assert 0 == store.counter['__iter__'] - cache.clear_keys() + cache.invalidate_keys() keys = sorted(cache) assert keys == ['bar', 'baz', 'foo'] assert 4 == store.counter['keys'] assert 0 == store.counter['__contains__', 'foo'] assert 0 == store.counter['__iter__'] - cache.clear_keys() + cache.invalidate_keys() assert 'foo' in cache assert 5 == store.counter['keys'] assert 0 == store.counter['__contains__', 'foo'] From 4bf5f16a1e3af2ba57aed3834f1bc75fc7189907 Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Tue, 2 Jan 2018 11:24:22 +0000 Subject: [PATCH 8/9] document changes --- docs/release.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release.rst b/docs/release.rst index 8317026660..0067c51721 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -127,6 +127,11 @@ Enhancements * **Added support for ``datetime64`` and ``timedelta64`` data types**; :issue:`85`, :issue:`215`. +* **New LRUStoreCache class**. The class :class:`zarr.storage.LRUStoreCache` has been + added and provides a means to locally cache data in memory from a store that may be + slow, e.g., a store that retrieves data from a remote server via the network; + :issue:`223`. + Bug fixes ~~~~~~~~~ From af0f272d488c207c817d59b849123d77684c7948 Mon Sep 17 00:00:00 2001 From: Alistair Miles Date: Tue, 2 Jan 2018 11:41:59 +0000 Subject: [PATCH 9/9] add tutorial docs on cloud storage performance [ci skip] --- docs/tutorial.rst | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/docs/tutorial.rst b/docs/tutorial.rst index e28b816edb..43a98cf115 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -729,6 +729,9 @@ group (requires `lmdb `_ to be installed):: >>> z[:] = 42 >>> store.close() +Distributed/cloud storage +~~~~~~~~~~~~~~~~~~~~~~~~~ + It is also possible to use distributed storage systems. The Dask project has implementations of the ``MutableMapping`` interface for Amazon S3 (`S3Map `_), Hadoop @@ -767,6 +770,37 @@ Here is an example using S3Map to read an array created previously:: >>> z[:].tostring() b'Hello from the cloud!' +Note that retrieving data from a remote service via the network can be significantly +slower than retrieving data from a local file system, and will depend on network latency +and bandwidth between the client and server systems. If you are experiencing poor +performance, there are several things you can try. One option is to increase the array +chunk size, which will reduce the number of chunks and thus reduce the number of network +round-trips required to retrieve data for an array (and thus reduce the impact of network +latency). Another option is to try to increase the compression ratio by changing +compression options or trying a different compressor (which will reduce the impact of +limited network bandwidth). As of version 2.2, Zarr also provides the +:class:`zarr.storage.LRUStoreCache` which can be used to implement a local in-memory cache +layer over a remote store. E.g.:: + + >>> s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name='eu-west-2')) + >>> store = s3fs.S3Map(root='zarr-demo/store', s3=s3, check=False) + >>> cache = zarr.LRUStoreCache(store, max_size=2**28) + >>> root = zarr.group(store=cache) + >>> z = root['foo/bar/baz'] + >>> from timeit import timeit + >>> # first data access is relatively slow, retrieved from store + ... timeit('print(z[:].tostring())', number=1, globals=globals()) # doctest: +SKIP + b'Hello from the cloud!' + 0.1081731989979744 + >>> # second data access is faster, uses cache + ... timeit('print(z[:].tostring())', number=1, globals=globals()) # doctest: +SKIP + b'Hello from the cloud!' + 0.0009490990014455747 + +If you are still experiencing poor performance with distributed/cloud storage, please +raise an issue on the GitHub issue tracker with any profiling data you can provide, as +there may be opportunities to optimise further either within Zarr or within the mapping +interface to the storage. .. _tutorial_strings: