From 68f9d2c47fad50a5a411decb1ad389e5cc7bd79a Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Wed, 12 Jun 2024 16:08:16 +0200 Subject: [PATCH 1/7] normalize remotestore __str__ method --- src/zarr/store/remote.py | 7 ++++--- tests/v3/test_store/test_remote.py | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index db826f456d..326568f75f 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -25,6 +25,7 @@ class RemoteStore(Store): supports_listing: bool = True _fs: AsyncFileSystem + _url: str path: str allowed_exceptions: tuple[type[Exception], ...] @@ -51,7 +52,7 @@ def __init__( """ super().__init__(mode=mode) - + self._url = url if isinstance(url, str): self._fs, self.path = fsspec.url_to_fs(url, **storage_options) elif hasattr(url, "protocol") and hasattr(url, "fs"): @@ -71,10 +72,10 @@ def __init__( raise TypeError("FileSystem needs to support async operations") def __str__(self) -> str: - return f"Remote fsspec store: {type(self._fs).__name__} , {self.path}" + return f"{self._url}" def __repr__(self) -> str: - return f"" + return f"" async def get( self, diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 936cf206d9..a3de617d59 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -103,9 +103,7 @@ def set(self, store: RemoteStore, key: str, value: Buffer) -> None: self._fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) def test_store_repr(self, store: RemoteStore) -> None: - rep = str(store) - assert "fsspec" in rep - assert store.path in rep + assert str(store) == f"{store._url}" def test_store_supports_writes(self, store: RemoteStore) -> None: assert True From 8fa06e1e0b9d2169b93aeb48d41aa7697b0dc640 Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Wed, 12 Jun 2024 20:03:43 +0200 Subject: [PATCH 2/7] add UPath tests (failing) --- pyproject.toml | 3 +- src/zarr/store/remote.py | 6 ++- tests/v3/test_store/test_remote.py | 69 ++++++++++++++++++++++-------- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 96a884b737..f439945a75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,7 +119,8 @@ extra-dependencies = [ "flask-cors", "flask", "requests", - "mypy" + "mypy", + "universal_pathlib" ] features = ["extra"] diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index 326568f75f..27f4565c0e 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -52,8 +52,8 @@ def __init__( """ super().__init__(mode=mode) - self._url = url if isinstance(url, str): + self._url = url self._fs, self.path = fsspec.url_to_fs(url, **storage_options) elif hasattr(url, "protocol") and hasattr(url, "fs"): # is UPath-like - but without importing @@ -62,8 +62,10 @@ def __init__( "If constructed with a UPath object, no additional " "storage_options are allowed" ) + + self._url = str(url) self.path = url.path - self._fs = url._fs + self._fs = url.fs else: raise ValueError("URL not understood, %s", url) self.allowed_exceptions = allowed_exceptions diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index a3de617d59..16f4820e65 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -2,6 +2,7 @@ import fsspec import pytest +from upath import UPath from zarr.buffer import Buffer, default_buffer_prototype from zarr.store import RemoteStore @@ -16,7 +17,7 @@ test_bucket_name = "test" secure_bucket_name = "test-secure" port = 5555 -endpoint_uri = f"http://127.0.0.1:{port}/" +endpoint_url = f"http://127.0.0.1:{port}/" @pytest.fixture(scope="module") @@ -40,18 +41,31 @@ def get_boto3_client(): # NB: we use the sync botocore client for setup session = Session() - return session.create_client("s3", endpoint_url=endpoint_uri) + return session.create_client("s3", endpoint_url=endpoint_url) @pytest.fixture(autouse=True, scope="function") def s3(s3_base): + """ + Quoting Martin Durant: + pytest-asyncio creates a new event loop for each async test. + When an async-mode s3fs instance is made from async, it will be assigned to the loop from + which it is made. That means that if you use s3fs again from a subsequent test, + you will have the same identical instance, but be running on a different loop - which fails. + + For the rest: it's very convenient to clean up the state of the store between tests, + make sure we start off blank each time. + + https://github.com/zarr-developers/zarr-python/pull/1785#discussion_r1634856207 + """ client = get_boto3_client() client.create_bucket(Bucket=test_bucket_name, ACL="public-read") s3fs.S3FileSystem.clear_instance_cache() - s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) + s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_url}) s3.invalidate_cache() yield s3 - requests.post(f"{endpoint_uri}/moto-api/reset") + requests.post(f"{endpoint_url}/moto-api/reset") + client.close() # ### end from s3fs ### # @@ -65,7 +79,7 @@ async def alist(it): async def test_basic(): - store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_uri, anon=False) + store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_url, anon=False) assert not await alist(store.list()) assert not await store.exists("foo") data = b"hello" @@ -81,29 +95,48 @@ async def test_basic(): class TestRemoteStoreS3(StoreTests[RemoteStore]): store_cls = RemoteStore - @pytest.fixture(scope="function") - def store_kwargs(self) -> dict[str, str | bool]: - return { - "mode": "w", - "endpoint_url": endpoint_uri, - "anon": False, - "url": f"s3://{test_bucket_name}", - } + @pytest.fixture(scope="function", params=(False, True)) + def store_kwargs(self, request) -> dict[str, str | bool]: + url = f"s3://{test_bucket_name}" + anon = False + mode = "w" + if request.param: + return {"mode": mode, "url": UPath(url, endpoint_url=endpoint_url, anon=anon)} + return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url} @pytest.fixture(scope="function") def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: - self._fs, _ = fsspec.url_to_fs(asynchronous=False, **store_kwargs) - out = self.store_cls(asynchronous=True, **store_kwargs) + url = store_kwargs["url"] + mode = store_kwargs["mode"] + if isinstance(url, UPath): + out = self.store_cls(url=url, mode=mode) + else: + endpoint_url = store_kwargs["endpoint_url"] + out = self.store_cls(url=url, asynchronous=True, mode=mode, endpoint_url=endpoint_url) return out def get(self, store: RemoteStore, key: str) -> Buffer: - return Buffer.from_bytes(self._fs.cat(f"{store.path}/{key}")) + # make a new, synchronous instance of the filesystem because this test is run in sync code + fs, _ = fsspec.url_to_fs( + url=store._url, + asynchronous=False, + anon=store._fs.anon, + endpoint_url=store._fs.endpoint_url, + ) + return Buffer.from_bytes(fs.cat(f"{store.path}/{key}")) def set(self, store: RemoteStore, key: str, value: Buffer) -> None: - self._fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) + # make a new, synchronous instance of the filesystem because this test is run in sync code + fs, _ = fsspec.url_to_fs( + url=store._url, + asynchronous=False, + anon=store._fs.anon, + endpoint_url=store._fs.endpoint_url, + ) + fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) def test_store_repr(self, store: RemoteStore) -> None: - assert str(store) == f"{store._url}" + assert str(store) == f"s3://{test_bucket_name}" def test_store_supports_writes(self, store: RemoteStore) -> None: assert True From b73f5e96ee4f9ac6adbc4bd604d84e955f0cf9bd Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Wed, 12 Jun 2024 21:30:31 +0200 Subject: [PATCH 3/7] remove trailing path delimiter from ._url property of remotestore --- src/zarr/store/remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index 27f4565c0e..3b07232d80 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -53,7 +53,7 @@ def __init__( super().__init__(mode=mode) if isinstance(url, str): - self._url = url + self._url = url.rstrip("/") self._fs, self.path = fsspec.url_to_fs(url, **storage_options) elif hasattr(url, "protocol") and hasattr(url, "fs"): # is UPath-like - but without importing @@ -63,7 +63,7 @@ def __init__( "storage_options are allowed" ) - self._url = str(url) + self._url = str(url).rstrip("/") self.path = url.path self._fs = url.fs else: From 6da5ecf15ff47e5a451b57570479349fda41d1f9 Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Wed, 12 Jun 2024 21:49:38 +0200 Subject: [PATCH 4/7] remove trailing path delimiter from path attribute in remotestore --- src/zarr/store/remote.py | 8 +++++--- tests/v3/test_store/test_remote.py | 9 ++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index 3b07232d80..15051334e9 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -54,7 +54,8 @@ def __init__( super().__init__(mode=mode) if isinstance(url, str): self._url = url.rstrip("/") - self._fs, self.path = fsspec.url_to_fs(url, **storage_options) + self._fs, _path = fsspec.url_to_fs(url, **storage_options) + self.path = _path.rstrip("/") elif hasattr(url, "protocol") and hasattr(url, "fs"): # is UPath-like - but without importing if storage_options: @@ -62,9 +63,10 @@ def __init__( "If constructed with a UPath object, no additional " "storage_options are allowed" ) - + # n.b. UPath returns the url and path attributes with a trailing /, at least for s3 + # that trailing / must be removed to compose with the store interface self._url = str(url).rstrip("/") - self.path = url.path + self.path = url.path.rstrip("/") self._fs = url.fs else: raise ValueError("URL not understood, %s", url) diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 16f4820e65..2ac0d45965 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -95,14 +95,17 @@ async def test_basic(): class TestRemoteStoreS3(StoreTests[RemoteStore]): store_cls = RemoteStore - @pytest.fixture(scope="function", params=(False, True)) + @pytest.fixture(scope="function", params=("use_upath", "use_str")) def store_kwargs(self, request) -> dict[str, str | bool]: url = f"s3://{test_bucket_name}" anon = False mode = "w" - if request.param: + if request.param == "use_upath": return {"mode": mode, "url": UPath(url, endpoint_url=endpoint_url, anon=anon)} - return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url} + elif request.param == "use_str": + return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url} + else: + raise AssertionError @pytest.fixture(scope="function") def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: From 1f57e31d55a1cc90fe9b9fb76b8783b46b74f5b3 Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Thu, 13 Jun 2024 19:31:24 +0200 Subject: [PATCH 5/7] add upath to test dependencies --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f439945a75..7f8c23f2b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,6 +106,7 @@ build.hooks.vcs.version-file = "src/zarr/_version.py" [tool.hatch.envs.test] dependencies = [ "numpy~={matrix:numpy}", + "universal_pathlib" ] extra-dependencies = [ "coverage", @@ -119,8 +120,7 @@ extra-dependencies = [ "flask-cors", "flask", "requests", - "mypy", - "universal_pathlib" + "mypy" ] features = ["extra"] From de334ac8a566efb5a44d60b264697fa205c3b1e0 Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Thu, 13 Jun 2024 21:27:06 +0200 Subject: [PATCH 6/7] more aggressive cleanup in s3 fixture --- tests/v3/test_store/test_remote.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 2ac0d45965..4a45f696e5 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -6,6 +6,7 @@ from zarr.buffer import Buffer, default_buffer_prototype from zarr.store import RemoteStore +from zarr.sync import sync from zarr.testing.store import StoreTests s3fs = pytest.importorskip("s3fs") @@ -62,10 +63,12 @@ def s3(s3_base): client.create_bucket(Bucket=test_bucket_name, ACL="public-read") s3fs.S3FileSystem.clear_instance_cache() s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_url}) + session = sync(s3.set_session()) s3.invalidate_cache() yield s3 requests.post(f"{endpoint_url}/moto-api/reset") client.close() + sync(session.close()) # ### end from s3fs ### # From 0d1b6e78fe137cbc52144eb2254c67b89a7775ff Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Thu, 13 Jun 2024 21:35:44 +0200 Subject: [PATCH 7/7] remove redundant elif --- tests/v3/test_store/test_remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 4a45f696e5..98206d427f 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -107,8 +107,8 @@ def store_kwargs(self, request) -> dict[str, str | bool]: return {"mode": mode, "url": UPath(url, endpoint_url=endpoint_url, anon=anon)} elif request.param == "use_str": return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url} - else: - raise AssertionError + + raise AssertionError @pytest.fixture(scope="function") def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: