diff --git a/pyproject.toml b/pyproject.toml index 96a884b737..7f8c23f2b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,6 +106,7 @@ build.hooks.vcs.version-file = "src/zarr/_version.py" [tool.hatch.envs.test] dependencies = [ "numpy~={matrix:numpy}", + "universal_pathlib" ] extra-dependencies = [ "coverage", diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index db826f456d..15051334e9 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -25,6 +25,7 @@ class RemoteStore(Store): supports_listing: bool = True _fs: AsyncFileSystem + _url: str path: str allowed_exceptions: tuple[type[Exception], ...] @@ -51,9 +52,10 @@ def __init__( """ super().__init__(mode=mode) - if isinstance(url, str): - self._fs, self.path = fsspec.url_to_fs(url, **storage_options) + self._url = url.rstrip("/") + self._fs, _path = fsspec.url_to_fs(url, **storage_options) + self.path = _path.rstrip("/") elif hasattr(url, "protocol") and hasattr(url, "fs"): # is UPath-like - but without importing if storage_options: @@ -61,8 +63,11 @@ def __init__( "If constructed with a UPath object, no additional " "storage_options are allowed" ) - self.path = url.path - self._fs = url._fs + # n.b. UPath returns the url and path attributes with a trailing /, at least for s3 + # that trailing / must be removed to compose with the store interface + self._url = str(url).rstrip("/") + self.path = url.path.rstrip("/") + self._fs = url.fs else: raise ValueError("URL not understood, %s", url) self.allowed_exceptions = allowed_exceptions @@ -71,10 +76,10 @@ def __init__( raise TypeError("FileSystem needs to support async operations") def __str__(self) -> str: - return f"Remote fsspec store: {type(self._fs).__name__} , {self.path}" + return f"{self._url}" def __repr__(self) -> str: - return f"" + return f"" async def get( self, diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 936cf206d9..98206d427f 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -2,9 +2,11 @@ import fsspec import pytest +from upath import UPath from zarr.buffer import Buffer, default_buffer_prototype from zarr.store import RemoteStore +from zarr.sync import sync from zarr.testing.store import StoreTests s3fs = pytest.importorskip("s3fs") @@ -16,7 +18,7 @@ test_bucket_name = "test" secure_bucket_name = "test-secure" port = 5555 -endpoint_uri = f"http://127.0.0.1:{port}/" +endpoint_url = f"http://127.0.0.1:{port}/" @pytest.fixture(scope="module") @@ -40,18 +42,33 @@ def get_boto3_client(): # NB: we use the sync botocore client for setup session = Session() - return session.create_client("s3", endpoint_url=endpoint_uri) + return session.create_client("s3", endpoint_url=endpoint_url) @pytest.fixture(autouse=True, scope="function") def s3(s3_base): + """ + Quoting Martin Durant: + pytest-asyncio creates a new event loop for each async test. + When an async-mode s3fs instance is made from async, it will be assigned to the loop from + which it is made. That means that if you use s3fs again from a subsequent test, + you will have the same identical instance, but be running on a different loop - which fails. + + For the rest: it's very convenient to clean up the state of the store between tests, + make sure we start off blank each time. + + https://github.com/zarr-developers/zarr-python/pull/1785#discussion_r1634856207 + """ client = get_boto3_client() client.create_bucket(Bucket=test_bucket_name, ACL="public-read") s3fs.S3FileSystem.clear_instance_cache() - s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) + s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_url}) + session = sync(s3.set_session()) s3.invalidate_cache() yield s3 - requests.post(f"{endpoint_uri}/moto-api/reset") + requests.post(f"{endpoint_url}/moto-api/reset") + client.close() + sync(session.close()) # ### end from s3fs ### # @@ -65,7 +82,7 @@ async def alist(it): async def test_basic(): - store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_uri, anon=False) + store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_url, anon=False) assert not await alist(store.list()) assert not await store.exists("foo") data = b"hello" @@ -81,31 +98,51 @@ async def test_basic(): class TestRemoteStoreS3(StoreTests[RemoteStore]): store_cls = RemoteStore - @pytest.fixture(scope="function") - def store_kwargs(self) -> dict[str, str | bool]: - return { - "mode": "w", - "endpoint_url": endpoint_uri, - "anon": False, - "url": f"s3://{test_bucket_name}", - } + @pytest.fixture(scope="function", params=("use_upath", "use_str")) + def store_kwargs(self, request) -> dict[str, str | bool]: + url = f"s3://{test_bucket_name}" + anon = False + mode = "w" + if request.param == "use_upath": + return {"mode": mode, "url": UPath(url, endpoint_url=endpoint_url, anon=anon)} + elif request.param == "use_str": + return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url} + + raise AssertionError @pytest.fixture(scope="function") def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore: - self._fs, _ = fsspec.url_to_fs(asynchronous=False, **store_kwargs) - out = self.store_cls(asynchronous=True, **store_kwargs) + url = store_kwargs["url"] + mode = store_kwargs["mode"] + if isinstance(url, UPath): + out = self.store_cls(url=url, mode=mode) + else: + endpoint_url = store_kwargs["endpoint_url"] + out = self.store_cls(url=url, asynchronous=True, mode=mode, endpoint_url=endpoint_url) return out def get(self, store: RemoteStore, key: str) -> Buffer: - return Buffer.from_bytes(self._fs.cat(f"{store.path}/{key}")) + # make a new, synchronous instance of the filesystem because this test is run in sync code + fs, _ = fsspec.url_to_fs( + url=store._url, + asynchronous=False, + anon=store._fs.anon, + endpoint_url=store._fs.endpoint_url, + ) + return Buffer.from_bytes(fs.cat(f"{store.path}/{key}")) def set(self, store: RemoteStore, key: str, value: Buffer) -> None: - self._fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) + # make a new, synchronous instance of the filesystem because this test is run in sync code + fs, _ = fsspec.url_to_fs( + url=store._url, + asynchronous=False, + anon=store._fs.anon, + endpoint_url=store._fs.endpoint_url, + ) + fs.write_bytes(f"{store.path}/{key}", value.to_bytes()) def test_store_repr(self, store: RemoteStore) -> None: - rep = str(store) - assert "fsspec" in rep - assert store.path in rep + assert str(store) == f"s3://{test_bucket_name}" def test_store_supports_writes(self, store: RemoteStore) -> None: assert True