diff --git a/zarr/tests/test_group_v3.py b/zarr/tests/test_group_v3.py new file mode 100644 index 0000000000..4e7179376b --- /dev/null +++ b/zarr/tests/test_group_v3.py @@ -0,0 +1,56 @@ +import pytest +import numpy as np + +from zarr.v3.group import AsyncGroup, Group, GroupMetadata +from zarr.v3.store import LocalStore, StorePath +from zarr.v3.config import RuntimeConfiguration + + +@pytest.fixture +def store_path(tmpdir): + store = LocalStore(str(tmpdir)) + p = StorePath(store) + return p + + +def test_group(store_path) -> None: + + agroup = AsyncGroup( + metadata=GroupMetadata(), + store_path=store_path, + runtime_configuration=RuntimeConfiguration(), + ) + group = Group(agroup) + + assert agroup.metadata is group.metadata + + # create two groups + foo = group.create_group("foo") + bar = foo.create_group("bar", attributes={"baz": "qux"}) + + # create an array from the "bar" group + data = np.arange(0, 4 * 4, dtype="uint16").reshape((4, 4)) + arr = bar.create_array( + "baz", shape=data.shape, dtype=data.dtype, chunk_shape=(2, 2), exists_ok=True + ) + arr[:] = data + + # check the array + assert arr == bar["baz"] + assert arr.shape == data.shape + assert arr.dtype == data.dtype + + # TODO: update this once the array api settles down + # assert arr.chunk_shape == (2, 2) + + bar2 = foo["bar"] + assert dict(bar2.attrs) == {"baz": "qux"} + + # update a group's attributes + bar2.attrs.update({"name": "bar"}) + # bar.attrs was modified in-place + assert dict(bar2.attrs) == {"baz": "qux", "name": "bar"} + + # and the attrs were modified in the store + bar3 = foo["bar"] + assert dict(bar3.attrs) == {"baz": "qux", "name": "bar"} diff --git a/zarr/v3/__init__.py b/zarr/v3/__init__.py index bbf5aa0359..07258154ad 100644 --- a/zarr/v3/__init__.py +++ b/zarr/v3/__init__.py @@ -6,7 +6,6 @@ from zarr.v3.array import Array # noqa: F401 from zarr.v3.array_v2 import ArrayV2 # noqa: F401 from zarr.v3.group import Group # noqa: F401 -from zarr.v3.group_v2 import GroupV2 # noqa: F401 from zarr.v3.metadata import RuntimeConfiguration, runtime_configuration # noqa: F401 from zarr.v3.store import ( # noqa: F401 LocalStore, @@ -22,18 +21,19 @@ async def open_auto_async( store: StoreLike, runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), -) -> Union[Array, ArrayV2, Group, GroupV2]: +) -> Union[Array, ArrayV2, Group]: store_path = make_store_path(store) try: - return await Group.open_or_array(store_path, runtime_configuration=runtime_configuration_) + return await Array.open(store_path, runtime_configuration=runtime_configuration_) except KeyError: - return await GroupV2.open_or_array(store_path, runtime_configuration_) + return await Group.open(store_path, runtime_configuration=runtime_configuration_) + def open_auto( store: StoreLike, runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), -) -> Union[Array, ArrayV2, Group, GroupV2]: +) -> Union[Array, ArrayV2, Group]: return _sync( open_auto_async(store, runtime_configuration_), runtime_configuration_.asyncio_loop, diff --git a/zarr/v3/abc/array.py b/zarr/v3/abc/array.py deleted file mode 100644 index 976aa48618..0000000000 --- a/zarr/v3/abc/array.py +++ /dev/null @@ -1,140 +0,0 @@ -from __future__ import annotations -from abc import abstractproperty, abstractmethod, ABC -from typing import Tuple, Any, Dict - -import numpy as np - -from zarr.v3.abc.store import ReadStore, WriteStore -from zarr.v3.common import Selection - - -class BaseArray(ABC): - @abstractproperty - def store_path(self) -> str: # TODO: rename to `path`? - """Path to this array in the underlying store.""" - ... - - @abstractproperty - def dtype(self) -> np.dtype: - """Data type of the array elements. - - Returns - ------- - dtype - array data type - """ - ... - - @abstractproperty - def ndim(self) -> int: - """Number of array dimensions (axes). - - Returns - ------- - int - number of array dimensions (axes) - """ - ... - - @abstractproperty - def shape(self) -> Tuple[int, ...]: - """Array dimensions. - - Returns - ------- - tuple of int - array dimensions - """ - ... - - @abstractproperty - def size(self) -> int: - """Number of elements in the array. - - Returns - ------- - int - number of elements in an array. - """ - - @abstractproperty - def attrs(self) -> Dict[str, Any]: - """Array attributes. - - Returns - ------- - dict - user defined attributes - """ - ... - - @abstractproperty - def info(self) -> Any: - """Report some diagnostic information about the array. - - Returns - ------- - out - """ - ... - - -class AsynchronousArray(BaseArray): - """This class can be implemented as a v2 or v3 array""" - - @classmethod - @abstractmethod - async def from_json(cls, zarr_json: Any, store: ReadStore) -> AsynchronousArray: - ... - - @classmethod - @abstractmethod - async def open(cls, store: ReadStore) -> AsynchronousArray: - ... - - @classmethod - @abstractmethod - async def create(cls, store: WriteStore, *, shape, **kwargs) -> AsynchronousArray: - ... - - @abstractmethod - async def getitem(self, selection: Selection): - ... - - @abstractmethod - async def setitem(self, selection: Selection, value: np.ndarray) -> None: - ... - - -class SynchronousArray(BaseArray): - """ - This class can be implemented as a v2 or v3 array - """ - - @classmethod - @abstractmethod - def from_json(cls, zarr_json: Any, store: ReadStore) -> SynchronousArray: - ... - - @classmethod - @abstractmethod - def open(cls, store: ReadStore) -> SynchronousArray: - ... - - @classmethod - @abstractmethod - def create(cls, store: WriteStore, *, shape, **kwargs) -> SynchronousArray: - ... - - @abstractmethod - def __getitem__(self, selection: Selection): # TODO: type as np.ndarray | scalar - ... - - @abstractmethod - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: - ... - - # some day ;) - # @property - # def __array_api_version__(self) -> str: - # return "2022.12" diff --git a/zarr/v3/abc/group.py b/zarr/v3/abc/group.py deleted file mode 100644 index 02de819894..0000000000 --- a/zarr/v3/abc/group.py +++ /dev/null @@ -1,86 +0,0 @@ -from __future__ import annotations - -from abc import abstractproperty, ABC -from collections.abc import MutableMapping -from typing import Dict, Any - - -class BaseGroup(ABC): - @abstractproperty - def attrs(self) -> Dict[str, Any]: - """User-defined attributes.""" - ... - - @abstractproperty - def info(self) -> Any: # TODO: type this later - """Return diagnostic information about the group.""" - ... - - -class AsynchronousGroup(BaseGroup): - pass - # TODO: (considering the following api) - # store_path (rename to path?) - # nchildren - number of child groups + arrays - # children (async iterator) - # contains - check if child exists - # getitem - get child - # group_keys (async iterator) - # groups (async iterator) - # array_keys (async iterator) - # arrays (async iterator) - # visit - # visitkeys - # visitvalues - # tree - # create_group - # require_group - # create_groups - # require_groups - # create_dataset - # require_dataset - # create - # empty - # zeros - # ones - # full - # array - # empty_like - # zeros_like - # ones_like - # full_like - # move - - -class SynchronousGroup(BaseGroup, MutableMapping): - # TODO - think about if we want to keep the MutableMapping abstraction or - pass - # store_path (rename to path?) - # __enter__ - # __exit__ - # group_keys - # groups - # array_keys - # arrays - # visit - # visitkeys - # visitvalues - # visititems - # tree - # create_group - # require_group - # create_groups - # require_groups - # create_dataset - # require_dataset - # create - # empty - # zeros - # ones - # full - # array - # empty_like - # zeros_like - # ones_like - # full_like - # move diff --git a/zarr/v3/array.py b/zarr/v3/array.py index 8c54cfd91c..47d2e00599 100644 --- a/zarr/v3/array.py +++ b/zarr/v3/array.py @@ -1,6 +1,5 @@ # Notes on what I've changed here: # 1. Split Array into AsyncArray and Array -# 2. Inherit from abc (SynchronousArray, AsynchronousArray) # 3. Added .size and .attrs methods # 4. Temporarily disabled the creation of ArrayV2 # 5. Added from_json to AsyncArray @@ -17,9 +16,9 @@ import numpy as np from attr import evolve, frozen -from zarr.v3.abc.array import SynchronousArray, AsynchronousArray from zarr.v3.abc.codec import ArrayBytesCodecPartialDecodeMixin + # from zarr.v3.array_v2 import ArrayV2 from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec from zarr.v3.common import ( @@ -48,7 +47,7 @@ @frozen -class AsyncArray(AsynchronousArray): +class AsyncArray: metadata: ArrayMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -415,7 +414,7 @@ async def info(self): @frozen -class Array(SynchronousArray): +class Array: _async_array: AsyncArray @classmethod diff --git a/zarr/v3/attributes.py b/zarr/v3/attributes.py new file mode 100644 index 0000000000..edbc84d8aa --- /dev/null +++ b/zarr/v3/attributes.py @@ -0,0 +1,32 @@ +from __future__ import annotations +from collections.abc import MutableMapping +from typing import TYPE_CHECKING, Any, Union + +if TYPE_CHECKING: + from zarr.v3.group import Group + from zarr.v3.array import Array + + +class Attributes(MutableMapping[str, Any]): + def __init__(self, obj: Union[Array, Group]): + # key=".zattrs", read_only=False, cache=True, synchronizer=None + self._obj = obj + + def __getitem__(self, key): + return self._obj.metadata.attributes[key] + + def __setitem__(self, key, value): + new_attrs = dict(self._obj.metadata.attributes) + new_attrs[key] = value + self._obj = self._obj.update_attributes(new_attrs) + + def __delitem__(self, key): + new_attrs = dict(self._obj.metadata.attributes) + del new_attrs[key] + self._obj = self._obj.update_attributes(new_attrs) + + def __iter__(self): + return iter(self._obj.metadata.attributes) + + def __len__(self): + return len(self._obj.metadata.attributes) diff --git a/zarr/v3/config.py b/zarr/v3/config.py new file mode 100644 index 0000000000..28df25899a --- /dev/null +++ b/zarr/v3/config.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from asyncio import AbstractEventLoop +from typing import Literal, Optional +from attr import frozen + + +@frozen +class SyncConfiguration: + concurrency: Optional[int] = None + asyncio_loop: Optional[AbstractEventLoop] = None + + +@frozen +class RuntimeConfiguration: + order: Literal["C", "F"] = "C" + # TODO: remove these in favor of the SyncConfiguration object + concurrency: Optional[int] = None + asyncio_loop: Optional[AbstractEventLoop] = None diff --git a/zarr/v3/group.py b/zarr/v3/group.py index aa43c706a5..e7ac38e6e8 100644 --- a/zarr/v3/group.py +++ b/zarr/v3/group.py @@ -1,25 +1,38 @@ from __future__ import annotations +import asyncio import json -from typing import Any, Dict, Literal, Optional, Union +import logging +from typing import Any, Dict, Literal, Optional, Union, AsyncIterator, Iterator, List -from attr import asdict, evolve, field, frozen +from attr import asdict, field, frozen # , validators -from zarr.v3.array import Array -from zarr.v3.common import ZARR_JSON, make_cattr -from zarr.v3.metadata import RuntimeConfiguration +from zarr.v3.array import AsyncArray, Array +from zarr.v3.attributes import Attributes +from zarr.v3.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr +from zarr.v3.config import RuntimeConfiguration, SyncConfiguration from zarr.v3.store import StoreLike, StorePath, make_store_path -from zarr.v3.sync import sync +from zarr.v3.sync import SyncMixin + +logger = logging.getLogger("zarr.group") @frozen class GroupMetadata: attributes: Dict[str, Any] = field(factory=dict) - zarr_format: Literal[3] = 3 - node_type: Literal["group"] = "group" + zarr_format: Literal[2, 3] = 3 # field(default=3, validator=validators.in_([2, 3])) + node_type: Literal["group"] = field(default="group", init=False) - def to_bytes(self) -> bytes: - return json.dumps(asdict(self)).encode() + def to_bytes(self) -> Dict[str, bytes]: + if self.zarr_format == 3: + return {ZARR_JSON: json.dumps(asdict(self)).encode()} + elif self.zarr_format == 2: + return { + ZGROUP_JSON: self.zarr_format, + ZATTRS_JSON: json.dumps(self.attributes).encode(), + } + else: + raise ValueError(f"unexpected zarr_format: {self.zarr_format}") @classmethod def from_json(cls, zarr_json: Any) -> GroupMetadata: @@ -27,25 +40,29 @@ def from_json(cls, zarr_json: Any) -> GroupMetadata: @frozen -class Group: +class AsyncGroup: metadata: GroupMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @classmethod - async def create_async( + async def create( cls, store: StoreLike, *, attributes: Optional[Dict[str, Any]] = None, exists_ok: bool = False, + zarr_format: Literal[2, 3] = 3, # field(default=3, validator=validators.in_([2, 3])), runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: + ) -> AsyncGroup: store_path = make_store_path(store) if not exists_ok: - assert not await (store_path / ZARR_JSON).exists_async() + if zarr_format == 3: + assert not await (store_path / ZARR_JSON).exists_async() + elif zarr_format == 2: + assert not await (store_path / ZGROUP_JSON).exists_async() group = cls( - metadata=GroupMetadata(attributes=attributes or {}), + metadata=GroupMetadata(attributes=attributes or {}, zarr_format=zarr_format), store_path=store_path, runtime_configuration=runtime_configuration, ) @@ -53,45 +70,39 @@ async def create_async( return group @classmethod - def create( - cls, - store: StoreLike, - *, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: - return sync( - cls.create_async( - store, - attributes=attributes, - exists_ok=exists_ok, - runtime_configuration=runtime_configuration, - ), - runtime_configuration.asyncio_loop, - ) - - @classmethod - async def open_async( + async def open( cls, store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: + zarr_format: Literal[2, 3] = 3, + ) -> AsyncGroup: store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() - assert zarr_json_bytes is not None - return cls.from_json(store_path, json.loads(zarr_json_bytes), runtime_configuration) - @classmethod - def open( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Group: - return sync( - cls.open_async(store, runtime_configuration), - runtime_configuration.asyncio_loop, - ) + # TODO: consider trying to autodiscover the zarr-format here + if zarr_format == 3: + # V3 groups are comprised of a zarr.json object + # (it is optional in the case of implicit groups) + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + zarr_json = ( + json.loads(zarr_json_bytes) if zarr_json_bytes is not None else {"zarr_format": 3} + ) + + elif zarr_format == 2: + # V2 groups are comprised of a .zgroup and .zattrs objects + # (both are optional in the case of implicit groups) + zgroup_bytes, zattrs_bytes = await asyncio.gather( + (store_path / ZGROUP_JSON).get_async(), (store_path / ZATTRS_JSON).get_async() + ) + zgroup = ( + json.loads(json.loads(zgroup_bytes)) + if zgroup_bytes is not None + else {"zarr_format": 2} + ) + zattrs = json.loads(json.loads(zattrs_bytes)) if zattrs_bytes is not None else {} + zarr_json = {**zgroup, "attributes": zattrs} + else: + raise ValueError(f"unexpected zarr_format: {zarr_format}") + return cls.from_json(store_path, zarr_json, runtime_configuration) @classmethod def from_json( @@ -107,73 +118,294 @@ def from_json( ) return group - @classmethod - async def open_or_array( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Union[Array, Group]: - store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() - if zarr_json_bytes is None: - raise KeyError - zarr_json = json.loads(zarr_json_bytes) - if zarr_json["node_type"] == "group": - return cls.from_json(store_path, zarr_json, runtime_configuration) - if zarr_json["node_type"] == "array": - return Array.from_json( - store_path, zarr_json, runtime_configuration=runtime_configuration + async def getitem( + self, + key: str, + ) -> Union[AsyncArray, AsyncGroup]: + + store_path = self.store_path / key + + if self.metadata.zarr_format == 3: + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + if zarr_json_bytes is None: + # implicit group? + logger.warning("group at {} is an implicit group", store_path) + zarr_json = { + "zarr_format": self.metadata.zarr_format, + "node_type": "group", + "attributes": {}, + } + else: + zarr_json = json.loads(zarr_json_bytes) + if zarr_json["node_type"] == "group": + return type(self).from_json(store_path, zarr_json, self.runtime_configuration) + if zarr_json["node_type"] == "array": + return AsyncArray.from_json( + store_path, zarr_json, runtime_configuration=self.runtime_configuration + ) + elif self.metadata.zarr_format == 2: + # Q: how do we like optimistically fetching .zgroup, .zarray, and .zattrs? + # This guarantees that we will always make at least one extra request to the store + zgroup_bytes, zarray_bytes, zattrs_bytes = await asyncio.gather( + (store_path / ZGROUP_JSON).get_async(), + (store_path / ZARRAY_JSON).get_async(), + (store_path / ZATTRS_JSON).get_async(), ) - raise KeyError + + # unpack the zarray, if this is None then we must be opening a group + zarray = json.loads(zarray_bytes) if zarray_bytes else None + # unpack the zattrs, this can be None if no attrs were written + zattrs = json.loads(zattrs_bytes) if zattrs_bytes is not None else {} + + if zarray is not None: + # TODO: update this once the V2 array support is part of the primary array class + zarr_json = {**zarray, "attributes": zattrs} + return AsyncArray.from_json( + store_path, zarray, runtime_configuration=self.runtime_configuration + ) + else: + if zgroup_bytes is None: + # implicit group? + logger.warning("group at {} is an implicit group", store_path) + zgroup = ( + json.loads(zgroup_bytes) + if zgroup_bytes is not None + else {"zarr_format": self.metadata.zarr_format} + ) + zarr_json = {**zgroup, "attributes": zattrs} + return type(self).from_json(store_path, zarr_json, self.runtime_configuration) + else: + raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") async def _save_metadata(self) -> None: - await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) + to_save = self.metadata.to_bytes() + awaitables = [(self.store_path / key).set_async(value) for key, value in to_save.items()] + await asyncio.gather(*awaitables) - async def get_async(self, path: str) -> Union[Array, Group]: - return await self.__class__.open_or_array( - self.store_path / path, self.runtime_configuration - ) + @property + def attrs(self): + return self.metadata.attributes - def __getitem__(self, path: str) -> Union[Array, Group]: - return sync(self.get_async(path), self.runtime_configuration.asyncio_loop) + @property + def info(self): + return self.metadata.info - async def create_group_async(self, path: str, **kwargs) -> Group: + async def create_group(self, path: str, **kwargs) -> AsyncGroup: runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await self.__class__.create_async( + return await type(self).create( self.store_path / path, runtime_configuration=runtime_configuration, **kwargs, ) - def create_group(self, path: str, **kwargs) -> Group: - return sync(self.create_group_async(path), self.runtime_configuration.asyncio_loop) - - async def create_array_async(self, path: str, **kwargs) -> Array: + async def create_array(self, path: str, **kwargs) -> AsyncArray: runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await Array.create_async( + return await AsyncArray.create( self.store_path / path, runtime_configuration=runtime_configuration, **kwargs, ) - def create_array(self, path: str, **kwargs) -> Array: - return sync( - self.create_array_async(path, **kwargs), - self.runtime_configuration.asyncio_loop, - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> Group: - new_metadata = evolve(self.metadata, attributes=new_attributes) + async def update_attributes(self, new_attributes: Dict[str, Any]): + # metadata.attributes is "frozen" so we simply clear and update the dict + self.metadata.attributes.clear() + self.metadata.attributes.update(new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) - return evolve(self, metadata=new_metadata) + to_save = self.metadata.to_bytes() + if self.metadata.zarr_format == 2: + # only save the .zattrs object + await (self.store_path / ZATTRS_JSON).set_async(to_save[ZATTRS_JSON]) + else: + await (self.store_path / ZARR_JSON).set_async(to_save[ZARR_JSON]) - def update_attributes(self, new_attributes: Dict[str, Any]) -> Group: - return sync( - self.update_attributes_async(new_attributes), - self.runtime_configuration.asyncio_loop, - ) + self.metadata.attributes.clear() + self.metadata.attributes.update(new_attributes) + + return self def __repr__(self): - return f"" + return f"" + + async def nchildren(self) -> int: + raise NotImplementedError + + async def children(self) -> AsyncIterator[AsyncArray, AsyncGroup]: + raise NotImplementedError + + async def contains(self, child: str) -> bool: + raise NotImplementedError + + async def group_keys(self) -> AsyncIterator[str]: + raise NotImplementedError + + async def groups(self) -> AsyncIterator[AsyncGroup]: + raise NotImplementedError + + async def array_keys(self) -> AsyncIterator[str]: + raise NotImplementedError + + async def arrays(self) -> AsyncIterator[AsyncArray]: + raise NotImplementedError + + async def tree(self, expand=False, level=None) -> Any: + raise NotImplementedError + + async def empty(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def zeros(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def ones(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def full(self, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def empty_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def zeros_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def ones_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def full_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: + raise NotImplementedError + + async def move(self, source: str, dest: str) -> None: + raise NotImplementedError + + +@frozen +class Group(SyncMixin): + _async_group: AsyncGroup + _sync_configuration: SyncConfiguration = field(init=True, default=SyncConfiguration()) + + @classmethod + def create( + cls, + store: StoreLike, + *, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + obj = cls._sync( + AsyncGroup.create( + store, + attributes=attributes, + exists_ok=exists_ok, + runtime_configuration=runtime_configuration, + ) + ) + + return cls(obj) + + @classmethod + def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + obj = cls._sync(AsyncGroup.open(store, runtime_configuration)) + return cls(obj) + + def __getitem__(self, path: str) -> Union[Array, Group]: + obj = self._sync(self._async_group.getitem(path)) + if isinstance(obj, AsyncArray): + return Array(obj) + else: + return Group(obj) + + def __delitem__(self, key): + raise NotImplementedError + + def __iter__(self): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + + def __setitem__(self, key, value): + """__setitem__ is not supported in v3""" + raise NotImplementedError + + @property + def metadata(self) -> GroupMetadata: + return self._async_group.metadata + + @property + def attrs(self) -> Attributes: + return Attributes(self) + + @property + def info(self): + return self._async_group.info + + def update_attributes(self, new_attributes: Dict[str, Any]): + self._sync(self._async_group.update_attributes(new_attributes)) + return self + + @property + def nchildren(self) -> int: + return self._sync(self._async_group.nchildren) + + @property + def children(self) -> List[Array, Group]: + _children = self._sync_iter(self._async_group.children) + return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children] + + def __contains__(self, child) -> bool: + return self._sync(self._async_group.contains(child)) + + def group_keys(self) -> Iterator[str]: + return self._sync_iter(self._async_group.group_keys) + + def groups(self) -> List[Group]: + # TODO: in v2 this was a generator that return key: Group + return [Group(obj) for obj in self._sync_iter(self._async_group.groups)] + + def array_keys(self) -> List[str]: + return self._sync_iter(self._async_group.array_keys) + + def arrays(self) -> List[Array]: + return [Array(obj) for obj in self._sync_iter(self._async_group.arrays)] + + def tree(self, expand=False, level=None) -> Any: + return self._sync(self._async_group.tree(expand=expand, level=level)) + + def create_group(self, name: str, **kwargs) -> Group: + return Group(self._sync(self._async_group.create_group(name, **kwargs))) + + def create_array(self, name: str, **kwargs) -> Array: + return Array(self._sync(self._async_group.create_array(name, **kwargs))) + + def empty(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.empty(**kwargs))) + + def zeros(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.zeros(**kwargs))) + + def ones(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.ones(**kwargs))) + + def full(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.full(**kwargs))) + + def empty_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.empty_like(prototype, **kwargs))) + + def zeros_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.zeros_like(prototype, **kwargs))) + + def ones_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.ones_like(prototype, **kwargs))) + + def full_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.full_like(prototype, **kwargs))) + + def move(self, source: str, dest: str) -> None: + return self._sync(self._async_group.move(source, dest)) diff --git a/zarr/v3/group_v2.py b/zarr/v3/group_v2.py deleted file mode 100644 index 3b1a369ae2..0000000000 --- a/zarr/v3/group_v2.py +++ /dev/null @@ -1,218 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union - -from attr import asdict, evolve, frozen - -from zarr.v3.array_v2 import ArrayV2 -from zarr.v3.common import ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr -from zarr.v3.metadata import RuntimeConfiguration -from zarr.v3.store import StoreLike, StorePath, make_store_path -from zarr.v3.sync import sync - -if TYPE_CHECKING: - from zarr.v3.group import Group - - -@frozen -class GroupV2Metadata: - zarr_format: Literal[2] = 2 - - def to_bytes(self) -> bytes: - return json.dumps(asdict(self)).encode() - - @classmethod - def from_json(cls, zarr_json: Any) -> GroupV2Metadata: - return make_cattr().structure(zarr_json, cls) - - -@frozen -class GroupV2: - metadata: GroupV2Metadata - store_path: StorePath - runtime_configuration: RuntimeConfiguration - attributes: Optional[Dict[str, Any]] = None - - @classmethod - async def create_async( - cls, - store: StoreLike, - *, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - store_path = make_store_path(store) - if not exists_ok: - assert not await (store_path / ZGROUP_JSON).exists_async() - group = cls( - metadata=GroupV2Metadata(), - attributes=attributes, - store_path=store_path, - runtime_configuration=runtime_configuration, - ) - await group._save_metadata() - return group - - @classmethod - def create( - cls, - store: StoreLike, - *, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - return sync( - cls.create_async( - store, - attributes=attributes, - exists_ok=exists_ok, - runtime_configuration=runtime_configuration, - ), - runtime_configuration.asyncio_loop if runtime_configuration else None, - ) - - @classmethod - async def open_async( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - store_path = make_store_path(store) - zgroup_bytes = await (store_path / ZGROUP_JSON).get_async() - assert zgroup_bytes is not None - zattrs_bytes = await (store_path / ZATTRS_JSON).get_async() - metadata = json.loads(zgroup_bytes) - attributes = json.loads(zattrs_bytes) if zattrs_bytes is not None else None - - return cls.from_json( - store_path, - metadata, - runtime_configuration, - attributes, - ) - - @classmethod - def open( - cls, - store_path: StorePath, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> GroupV2: - return sync( - cls.open_async(store_path, runtime_configuration), - runtime_configuration.asyncio_loop, - ) - - @classmethod - def from_json( - cls, - store_path: StorePath, - zarr_json: Any, - runtime_configuration: RuntimeConfiguration, - attributes: Optional[Dict[str, Any]] = None, - ) -> GroupV2: - group = cls( - metadata=GroupV2Metadata.from_json(zarr_json), - store_path=store_path, - runtime_configuration=runtime_configuration, - attributes=attributes, - ) - return group - - @staticmethod - async def open_or_array( - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Union[ArrayV2, GroupV2]: - store_path = make_store_path(store) - zgroup_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZGROUP_JSON).get_async(), - (store_path / ZATTRS_JSON).get_async(), - ) - attributes = json.loads(zattrs_bytes) if zattrs_bytes is not None else None - if zgroup_bytes is not None: - return GroupV2.from_json( - store_path, json.loads(zgroup_bytes), runtime_configuration, attributes - ) - zarray_bytes = await (store_path / ZARRAY_JSON).get_async() - if zarray_bytes is not None: - return ArrayV2.from_json( - store_path, json.loads(zarray_bytes), attributes, runtime_configuration - ) - raise KeyError - - async def _save_metadata(self) -> None: - await (self.store_path / ZGROUP_JSON).set_async(self.metadata.to_bytes()) - if self.attributes is not None and len(self.attributes) > 0: - await (self.store_path / ZATTRS_JSON).set_async( - json.dumps(self.attributes).encode(), - ) - else: - await (self.store_path / ZATTRS_JSON).delete_async() - - async def get_async(self, path: str) -> Union[ArrayV2, GroupV2]: - return await self.__class__.open_or_array( - self.store_path / path, self.runtime_configuration - ) - - def __getitem__(self, path: str) -> Union[ArrayV2, GroupV2]: - return sync(self.get_async(path), self.runtime_configuration.asyncio_loop) - - async def create_group_async(self, path: str, **kwargs) -> GroupV2: - runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await self.__class__.create_async( - self.store_path / path, - runtime_configuration=runtime_configuration, - **kwargs, - ) - - def create_group(self, path: str, **kwargs) -> GroupV2: - return sync(self.create_group_async(path), self.runtime_configuration.asyncio_loop) - - async def create_array_async(self, path: str, **kwargs) -> ArrayV2: - runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) - return await ArrayV2.create_async( - self.store_path / path, - runtime_configuration=runtime_configuration, - **kwargs, - ) - - def create_array(self, path: str, **kwargs) -> ArrayV2: - return sync( - self.create_array_async(path, **kwargs), - self.runtime_configuration.asyncio_loop, - ) - - async def convert_to_v3_async(self) -> Group: - from zarr.v3.common import ZARR_JSON - from zarr.v3.group import Group, GroupMetadata - - new_metadata = GroupMetadata(attributes=self.attributes or {}) - new_metadata_bytes = new_metadata.to_bytes() - - await (self.store_path / ZARR_JSON).set_async(new_metadata_bytes) - - return Group.from_json( - store_path=self.store_path, - zarr_json=json.loads(new_metadata_bytes), - runtime_configuration=self.runtime_configuration, - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> GroupV2: - await (self.store_path / ZATTRS_JSON).set_async(json.dumps(new_attributes).encode()) - return evolve(self, attributes=new_attributes) - - def update_attributes(self, new_attributes: Dict[str, Any]) -> GroupV2: - return sync( - self.update_attributes_async(new_attributes), - self.runtime_configuration.asyncio_loop, - ) - - def convert_to_v3(self) -> Group: - return sync(self.convert_to_v3_async(), loop=self.runtime_configuration.asyncio_loop) - - def __repr__(self): - return f"" diff --git a/zarr/v3/store.py b/zarr/v3/store.py index b6c20be41f..262cd6481a 100644 --- a/zarr/v3/store.py +++ b/zarr/v3/store.py @@ -65,6 +65,14 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" + def __eq__(self, other: Any) -> bool: + try: + if self.store == other.store and self.path == other.path: + return True + except Exception: + pass + return False + class Store: supports_partial_writes = False diff --git a/zarr/v3/sync.py b/zarr/v3/sync.py index ef3a6e08c0..e88c8e93f2 100644 --- a/zarr/v3/sync.py +++ b/zarr/v3/sync.py @@ -4,6 +4,9 @@ import threading from typing import Any, Coroutine, List, Optional +from zarr.v3.config import SyncConfiguration + + # From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread @@ -85,3 +88,20 @@ def _get_loop(): th.start() iothread[0] = th return loop[0] + + +class SyncMixin: + + _sync_configuration: SyncConfiguration + + def _sync(self, coroutine: Coroutine): # TODO: type this + # TODO: refactor this to to take *args and **kwargs and pass those to the method + # this should allow us to better type the sync wrapper + return sync(coroutine, loop=self._sync_configuration.asyncio_loop) + + def _sync_iter(self, func: Coroutine, *args, **kwargs) -> List[Any]: # TODO: type this + async def iter_to_list() -> List[Any]: + # TODO: replace with generators so we don't materialize the entire iterator at once + return [item async for item in func(*args, **kwargs)] + + return self._sync(iter_to_list)