-
-
Notifications
You must be signed in to change notification settings - Fork 373
MongoDB and Redis stores #372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8ef5dc1
e3e2c2e
d60aaab
ace251c
ecf18f7
92a4d71
efa9ccd
6f68451
9bbbde6
20ef384
a8f31cf
5ddc193
0abcc1a
b339c09
8b8d289
1cac5eb
b8e2d23
4db7e14
31a9af3
9f5d02b
ac6827e
8b35eb8
f8d3f03
1abeba7
4bbbeba
f9481b8
0188a60
1af4446
ca6b8a4
eb4564b
4f59451
5ab54c0
c386c72
f9dfc06
8d4a8e2
349a885
bfaea1c
d4ad363
cfcff9d
09869f5
cdc2656
c97204c
6c295ee
b47b244
eb1ce7a
e0ccee1
533326a
d549199
fbcc86f
88bb03e
531eeff
add3a0d
9dce652
ef5bc7d
dcd79e5
9be1557
20dd25a
e2988be
44e8850
7ff0f1a
06107ce
3821e16
881b051
5de3e05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| # These packages are currently not available on Windows. | ||
| bsddb3==6.2.6 | ||
| lmdb==0.94 | ||
| redis==3.0.1 | ||
| pymongo==3.7.1 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,7 +37,7 @@ | |
| normalize_storage_path, buffer_size, | ||
| normalize_fill_value, nolock, normalize_dtype) | ||
| from zarr.meta import encode_array_metadata, encode_group_metadata | ||
| from zarr.compat import PY2, OrderedDict_move_to_end | ||
| from zarr.compat import PY2, OrderedDict_move_to_end, binary_type | ||
| from numcodecs.registry import codec_registry | ||
| from numcodecs.compat import ensure_bytes, ensure_contiguous_ndarray | ||
| from zarr.errors import (err_contains_group, err_contains_array, err_bad_compressor, | ||
|
|
@@ -2084,6 +2084,188 @@ def clear(self): | |
| ) | ||
|
|
||
|
|
||
| class MongoDBStore(MutableMapping): | ||
| """Storage class using MongoDB. | ||
|
|
||
| .. note:: This is an experimental feature. | ||
|
|
||
| Requires the `pymongo <https://api.mongodb.com/python/current/>`_ | ||
| package to be installed. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| database : string | ||
| Name of database | ||
| collection : string | ||
| Name of collection | ||
| **kwargs | ||
| Keyword arguments passed through to the `pymongo.MongoClient` function. | ||
|
|
||
| Examples | ||
| -------- | ||
| Store a single array:: | ||
|
|
||
| >>> import zarr | ||
| >>> store = zarr.MongoDBStore('localhost') | ||
| >>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True) | ||
| >>> z[...] = 42 | ||
| >>> store.close() | ||
|
|
||
| Store a group:: | ||
|
|
||
| >>> store = zarr.MongoDBStore('localhost') | ||
| >>> root = zarr.group(store=store, overwrite=True) | ||
| >>> foo = root.create_group('foo') | ||
| >>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5)) | ||
| >>> bar[...] = 42 | ||
| >>> store.close() | ||
|
|
||
| Notes | ||
| ----- | ||
| The maximum chunksize in MongoDB documents is 16 MB. | ||
|
|
||
| """ | ||
|
|
||
| _key = 'key' | ||
| _value = 'value' | ||
|
|
||
| def __init__(self, database='mongodb_zarr', collection='zarr_collection', | ||
| **kwargs): | ||
| import pymongo | ||
|
|
||
| self._database = database | ||
| self._collection = collection | ||
| self._kwargs = kwargs | ||
|
|
||
| self.client = pymongo.MongoClient(**self._kwargs) | ||
| self.db = self.client.get_database(self._database) | ||
| self.collection = self.db.get_collection(self._collection) | ||
|
|
||
| def __getitem__(self, key): | ||
| doc = self.collection.find_one({self._key: key}) | ||
|
|
||
| if doc is None: | ||
| raise KeyError(key) | ||
| else: | ||
| return binary_type(doc[self._value]) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alimanfoo - casting this return value to a binary string seems to have corrected the json problem we were discussing last week.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting. Wonder if
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of interest, what type of object is
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| def __setitem__(self, key, value): | ||
| value = ensure_bytes(value) | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.collection.replace_one({self._key: key}, | ||
| {self._key: key, self._value: value}, | ||
| upsert=True) | ||
|
|
||
| def __delitem__(self, key): | ||
| result = self.collection.delete_many({self._key: key}) | ||
| if not result.deleted_count == 1: | ||
| raise KeyError(key) | ||
|
|
||
| def __iter__(self): | ||
| for f in self.collection.find({}): | ||
| yield f[self._key] | ||
|
|
||
| def __len__(self): | ||
| return self.collection.count_documents({}) | ||
|
|
||
| def __getstate__(self): | ||
| return self._database, self._collection, self._kwargs | ||
|
|
||
| def __setstate__(self, state): | ||
| database, collection, kwargs = state | ||
| self.__init__(database=database, collection=collection, **kwargs) | ||
|
|
||
| def close(self): | ||
| """Cleanup client resources and disconnect from MongoDB.""" | ||
| self.client.close() | ||
|
|
||
| def clear(self): | ||
| """Remove all items from store.""" | ||
| self.collection.delete_many({}) | ||
|
|
||
|
|
||
| class RedisStore(MutableMapping): | ||
| """Storage class using Redis. | ||
|
|
||
| .. note:: This is an experimental feature. | ||
|
|
||
| Requires the `redis <https://redis-py.readthedocs.io/>`_ | ||
| package to be installed. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| prefix : string | ||
| Name of prefix for Redis keys | ||
| **kwargs | ||
| Keyword arguments passed through to the `redis.Redis` function. | ||
|
|
||
| Examples | ||
| -------- | ||
| Store a single array:: | ||
|
|
||
| >>> import zarr | ||
| >>> store = zarr.RedisStore(port=6379) | ||
| >>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True) | ||
| >>> z[...] = 42 | ||
|
|
||
| Store a group:: | ||
|
|
||
| >>> store = zarr.RedisStore(port=6379) | ||
| >>> root = zarr.group(store=store, overwrite=True) | ||
| >>> foo = root.create_group('foo') | ||
| >>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5)) | ||
| >>> bar[...] = 42 | ||
|
|
||
| """ | ||
| def __init__(self, prefix='zarr', **kwargs): | ||
| import redis | ||
| self._prefix = prefix | ||
| self._kwargs = kwargs | ||
|
|
||
| self.client = redis.Redis(**kwargs) | ||
|
|
||
| def _key(self, key): | ||
| return '{prefix}:{key}'.format(prefix=self._prefix, key=key) | ||
|
|
||
| def __getitem__(self, key): | ||
| return self.client[self._key(key)] | ||
|
|
||
| def __setitem__(self, key, value): | ||
| value = ensure_bytes(value) | ||
| self.client[self._key(key)] = value | ||
|
|
||
| def __delitem__(self, key): | ||
| count = self.client.delete(self._key(key)) | ||
| if not count: | ||
| raise KeyError(key) | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def keylist(self): | ||
| offset = len(self._key('')) # length of prefix | ||
| return [key[offset:].decode('utf-8') | ||
| for key in self.client.keys(self._key('*'))] | ||
|
|
||
| def keys(self): | ||
| for key in self.keylist(): | ||
| yield key | ||
|
|
||
| def __iter__(self): | ||
| for key in self.keys(): | ||
| yield key | ||
|
|
||
| def __len__(self): | ||
| return len(self.keylist()) | ||
|
|
||
| def __getstate__(self): | ||
| return self._prefix, self._kwargs | ||
|
|
||
| def __setstate__(self, state): | ||
| prefix, kwargs = state | ||
| self.__init__(prefix=prefix, **kwargs) | ||
|
|
||
| def clear(self): | ||
| for key in self.keys(): | ||
| del self[key] | ||
|
|
||
|
|
||
| class ConsolidatedMetadataStore(MutableMapping): | ||
| """A layer over other storage, where the metadata has been consolidated into | ||
| a single key. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,8 @@ | |
| DirectoryStore, ZipStore, init_group, group_meta_key, | ||
| getsize, migrate_1to2, TempStore, atexit_rmtree, | ||
| NestedDirectoryStore, default_compressor, DBMStore, | ||
| LMDBStore, SQLiteStore, atexit_rmglob, LRUStoreCache, | ||
| ConsolidatedMetadataStore) | ||
| LMDBStore, SQLiteStore, MongoDBStore, RedisStore, | ||
| atexit_rmglob, LRUStoreCache, ConsolidatedMetadataStore) | ||
| from zarr.meta import (decode_array_metadata, encode_array_metadata, ZARR_FORMAT, | ||
| decode_group_metadata, encode_group_metadata) | ||
| from zarr.compat import PY2 | ||
|
|
@@ -900,6 +900,29 @@ def test_context_manager(self): | |
| except ImportError: # pragma: no cover | ||
| sqlite3 = None | ||
|
|
||
| try: | ||
| import pymongo | ||
| from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError | ||
| try: | ||
| client = pymongo.MongoClient(host='127.0.0.1', | ||
| serverSelectionTimeoutMS=1e3) | ||
| client.server_info() | ||
| except (ConnectionFailure, ServerSelectionTimeoutError): # pragma: no cover | ||
| pymongo = None | ||
| except ImportError: # pragma: no cover | ||
| pymongo = None | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| try: | ||
| import redis | ||
| from redis import ConnectionError | ||
| try: | ||
| rs = redis.Redis("localhost", port=6379) | ||
| rs.ping() | ||
| except ConnectionError: # pragma: no cover | ||
| redis = None | ||
| except ImportError: # pragma: no cover | ||
| redis = None | ||
jhamman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| @unittest.skipIf(sqlite3 is None, 'python built without sqlite') | ||
| class TestSQLiteStore(StoreTests, unittest.TestCase): | ||
|
|
@@ -930,6 +953,29 @@ def test_pickle(self): | |
| pickle.dumps(store) | ||
|
|
||
|
|
||
| @unittest.skipIf(pymongo is None, 'test requires pymongo') | ||
| class TestMongoDBStore(StoreTests, unittest.TestCase): | ||
|
|
||
| def create_store(self): | ||
| store = MongoDBStore(host='127.0.0.1', database='zarr_tests', | ||
| collection='zarr_tests') | ||
| # start with an empty store | ||
| store.clear() | ||
| return store | ||
|
|
||
|
|
||
| @unittest.skipIf(redis is None, 'test requires redis') | ||
| class TestRedisStore(StoreTests, unittest.TestCase): | ||
|
|
||
| def create_store(self): | ||
| # TODO: this is the default host for Redis on Travis, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, is this TODO something that needs to be resolved before merge, or can we live with as-is? |
||
| # we probably want to generalize this though | ||
| store = RedisStore(host='localhost', port=6379) | ||
| # start with an empty store | ||
| store.clear() | ||
| return store | ||
|
|
||
|
|
||
| class TestLRUStoreCache(StoreTests, unittest.TestCase): | ||
|
|
||
| def create_store(self): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.