diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index 65daae8f6d..00c01560f4 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -1,10 +1,7 @@ from __future__ import annotations -from typing import Union - import zarr.codecs # noqa: F401 from zarr.array import Array, AsyncArray -from zarr.array_v2 import ArrayV2 from zarr.config import config # noqa: F401 from zarr.group import AsyncGroup, Group from zarr.store import ( @@ -18,9 +15,7 @@ assert not __version__.startswith("0.0.0") -async def open_auto_async( - store: StoreLike, -) -> Union[AsyncArray, AsyncGroup]: +async def open_auto_async(store: StoreLike) -> AsyncArray | AsyncGroup: store_path = make_store_path(store) try: return await AsyncArray.open(store_path) @@ -28,9 +23,7 @@ async def open_auto_async( return await AsyncGroup.open(store_path) -def open_auto( - store: StoreLike, -) -> Union[Array, ArrayV2, Group]: +def open_auto(store: StoreLike) -> Array | Group: object = _sync( open_auto_async(store), ) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 3115b70bc2..a32790d8e1 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -1,10 +1,11 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Generic, Iterable, Protocol, TypeVar, runtime_checkable +from typing import TYPE_CHECKING, Generic, Iterable, TypeVar import numpy as np from zarr.abc.metadata import Metadata +from zarr.abc.store import ByteGetter, ByteSetter from zarr.common import BytesLike @@ -14,20 +15,6 @@ from zarr.metadata import ArrayMetadata -@runtime_checkable -class ByteGetter(Protocol): - async def get(self, byte_range: tuple[int, int | None] | None = None) -> BytesLike | None: ... - - -@runtime_checkable -class ByteSetter(Protocol): - async def get(self, byte_range: tuple[int, int | None] | None = None) -> BytesLike | None: ... - - async def set(self, value: BytesLike, byte_range: tuple[int, int] | None = None) -> None: ... - - async def delete(self) -> None: ... - - CodecInput = TypeVar("CodecInput", bound=np.ndarray | BytesLike) CodecOutput = TypeVar("CodecOutput", bound=np.ndarray | BytesLike) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 3d9550f733..6cc01681a9 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -1,7 +1,8 @@ from abc import abstractmethod, ABC from collections.abc import AsyncGenerator +from typing import List, Protocol, Tuple, Optional, runtime_checkable -from typing import List, Tuple, Optional +from zarr.common import BytesLike class Store(ABC): @@ -61,13 +62,13 @@ def supports_writes(self) -> bool: ... @abstractmethod - async def set(self, key: str, value: bytes) -> None: + async def set(self, key: str, value: BytesLike) -> None: """Store a (key, value) pair. Parameters ---------- key : str - value : bytes + value : BytesLike """ ... @@ -88,12 +89,12 @@ def supports_partial_writes(self) -> bool: ... @abstractmethod - async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None: """Store values at a given key, starting at byte range_start. Parameters ---------- - key_start_values : list[tuple[str, int, bytes]] + key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key @@ -145,3 +146,28 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: AsyncGenerator[str, None] """ ... + + +@runtime_checkable +class ByteGetter(Protocol): + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: ... + + +@runtime_checkable +class ByteSetter(Protocol): + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: ... + + async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None: ... + + async def delete(self) -> None: ... + + +async def set_or_delete(byte_setter: ByteSetter, value: BytesLike | None) -> None: + if value is None: + await byte_setter.delete() + else: + await byte_setter.set(value) diff --git a/src/zarr/array.py b/src/zarr/array.py index 83dc1757da..88fbd01ef9 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -10,20 +10,26 @@ # 1. Was splitting the array into two classes really necessary? +from asyncio import gather from dataclasses import dataclass, replace import json -from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union +from typing import Any, Iterable, Literal import numpy as np import numpy.typing as npt -from zarr.abc.codec import Codec, CodecPipeline +from zarr.abc.codec import Codec +from zarr.abc.store import set_or_delete # from zarr.array_v2 import ArrayV2 +from zarr.attributes import Attributes from zarr.codecs import BytesCodec from zarr.common import ( + JSON, ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, ChunkCoords, Selection, ZarrFormat, @@ -31,10 +37,10 @@ ) from zarr.config import config -from zarr.indexing import BasicIndexer, all_chunk_coords +from zarr.indexing import BasicIndexer from zarr.chunk_grids import RegularChunkGrid -from zarr.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding -from zarr.metadata import ArrayMetadata, parse_indexing_order +from zarr.chunk_key_encodings import ChunkKeyEncoding, DefaultChunkKeyEncoding, V2ChunkKeyEncoding +from zarr.metadata import ArrayMetadata, ArrayV3Metadata, ArrayV2Metadata, parse_indexing_order from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import sync @@ -43,9 +49,11 @@ def parse_array_metadata(data: Any) -> ArrayMetadata: if isinstance(data, ArrayMetadata): return data elif isinstance(data, dict): - return ArrayMetadata.from_dict(data) - else: - raise TypeError + if data["zarr_format"] == 3: + return ArrayV3Metadata.from_dict(data) + elif data["zarr_format"] == 2: + return ArrayV2Metadata.from_dict(data) + raise TypeError @dataclass(frozen=True) @@ -54,10 +62,6 @@ class AsyncArray: store_path: StorePath order: Literal["C", "F"] - @property - def codecs(self) -> CodecPipeline: - return self.metadata.codecs - def __init__( self, metadata: ArrayMetadata, @@ -76,21 +80,116 @@ async def create( cls, store: StoreLike, *, + # v2 and v3 shape: ChunkCoords, dtype: npt.DTypeLike, - chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, zarr_format: ZarrFormat = 3, + fill_value: Any | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime + exists_ok: bool = False, ) -> AsyncArray: store_path = make_store_path(store) + + if chunk_shape is None: + if chunks is None: + raise ValueError("Either chunk_shape or chunks needs to be provided.") + chunk_shape = chunks + elif chunks is not None: + raise ValueError("Only one of chunk_shape or chunks must be provided.") + + if zarr_format == 3: + if dimension_separator is not None: + raise ValueError( + "dimension_separator cannot be used for arrays with version 3. Use chunk_key_encoding instead." + ) + if order is not None: + raise ValueError( + "order cannot be used for arrays with version 3. Use a transpose codec instead." + ) + if filters is not None: + raise ValueError( + "filters cannot be used for arrays with version 3. Use array-to-array codecs instead." + ) + if compressor is not None: + raise ValueError( + "compressor cannot be used for arrays with version 3. Use bytes-to-bytes codecs instead." + ) + return await cls._create_v3( + store_path, + shape=shape, + dtype=dtype, + chunk_shape=chunk_shape, + fill_value=fill_value, + chunk_key_encoding=chunk_key_encoding, + codecs=codecs, + dimension_names=dimension_names, + attributes=attributes, + exists_ok=exists_ok, + ) + elif zarr_format == 2: + if codecs is not None: + raise ValueError( + "codecs cannot be used for arrays with version 2. Use filters and compressor instead." + ) + if chunk_key_encoding is not None: + raise ValueError( + "chunk_key_encoding cannot be used for arrays with version 2. Use dimension_separator instead." + ) + if dimension_names is not None: + raise ValueError("dimension_names cannot be used for arrays with version 2.") + return await cls._create_v2( + store_path, + shape=shape, + dtype=dtype, + chunks=chunk_shape, + dimension_separator=dimension_separator, + fill_value=fill_value, + order=order, + filters=filters, + compressor=compressor, + attributes=attributes, + exists_ok=exists_ok, + ) + else: + raise ValueError(f"Insupported zarr_format. Got: {zarr_format}") + + @classmethod + async def _create_v3( + cls, + store_path: StorePath, + *, + shape: ChunkCoords, + dtype: npt.DTypeLike, + chunk_shape: ChunkCoords, + fill_value: Any | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + attributes: dict[str, JSON] | None = None, + exists_ok: bool = False, + ) -> AsyncArray: if not exists_ok: assert not await (store_path / ZARR_JSON).exists() @@ -102,36 +201,86 @@ async def create( else: fill_value = 0 - metadata = ArrayMetadata( - shape=shape, - data_type=dtype, - chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), - chunk_key_encoding=( + if chunk_key_encoding is None: + chunk_key_encoding = ("default", "/") + if isinstance(chunk_key_encoding, tuple): + chunk_key_encoding = ( V2ChunkKeyEncoding(separator=chunk_key_encoding[1]) if chunk_key_encoding[0] == "v2" else DefaultChunkKeyEncoding(separator=chunk_key_encoding[1]) - ), + ) + + metadata = ArrayV3Metadata( + shape=shape, + data_type=dtype, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), + chunk_key_encoding=chunk_key_encoding, fill_value=fill_value, codecs=codecs, dimension_names=tuple(dimension_names) if dimension_names else None, attributes=attributes or {}, ) - array = cls( - metadata=metadata, - store_path=store_path, - ) + array = cls(metadata=metadata, store_path=store_path) + + await array._save_metadata(metadata) + return array + + @classmethod + async def _create_v2( + cls, + store_path: StorePath, + *, + shape: ChunkCoords, + dtype: npt.DTypeLike, + chunks: ChunkCoords, + dimension_separator: Literal[".", "/"] | None = None, + fill_value: None | int | float = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + attributes: dict[str, JSON] | None = None, + exists_ok: bool = False, + ) -> AsyncArray: + import numcodecs + + if not exists_ok: + assert not await (store_path / ZARRAY_JSON).exists() - await array._save_metadata() + if order is None: + order = "C" + + if dimension_separator is None: + dimension_separator = "." + + metadata = ArrayV2Metadata( + shape=shape, + dtype=np.dtype(dtype), + chunks=chunks, + order=order, + dimension_separator=dimension_separator, + fill_value=0 if fill_value is None else fill_value, + compressor=( + numcodecs.get_codec(compressor).get_config() if compressor is not None else None + ), + filters=( + [numcodecs.get_codec(filter).get_config() for filter in filters] + if filters is not None + else None + ), + attributes=attributes, + ) + array = cls(metadata=metadata, store_path=store_path) + await array._save_metadata(metadata) return array @classmethod def from_dict( cls, store_path: StorePath, - data: Dict[str, Any], + data: dict[str, JSON], ) -> AsyncArray: - metadata = ArrayMetadata.from_dict(data) + metadata = parse_array_metadata(data) async_array = cls(metadata=metadata, store_path=store_path) return async_array @@ -139,30 +288,54 @@ def from_dict( async def open( cls, store: StoreLike, + zarr_format: ZarrFormat | None = 3, ) -> AsyncArray: store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get() - assert zarr_json_bytes is not None - return cls.from_dict( - store_path, - json.loads(zarr_json_bytes), - ) - @classmethod - async def open_auto( - cls, - store: StoreLike, - ) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2] - store_path = make_store_path(store) - v3_metadata_bytes = await (store_path / ZARR_JSON).get() - if v3_metadata_bytes is not None: - return cls.from_dict( - store_path, - json.loads(v3_metadata_bytes), + if zarr_format == 2: + zarray_bytes, zattrs_bytes = await gather( + (store_path / ZARRAY_JSON).get(), (store_path / ZATTRS_JSON).get() ) + if zarray_bytes is None: + raise KeyError(store_path) # filenotfounderror? + elif zarr_format == 3: + zarr_json_bytes = await (store_path / ZARR_JSON).get() + if zarr_json_bytes is None: + raise KeyError(store_path) # filenotfounderror? + elif zarr_format is None: + zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather( + (store_path / ZARR_JSON).get(), + (store_path / ZARRAY_JSON).get(), + (store_path / ZATTRS_JSON).get(), + ) + if zarr_json_bytes is not None and zarray_bytes is not None: + # TODO: revisit this exception type + # alternatively, we could warn and favor v3 + raise ValueError("Both zarr.json and .zarray objects exist") + if zarr_json_bytes is None and zarray_bytes is None: + raise KeyError(store_path) # filenotfounderror? + # set zarr_format based on which keys were found + if zarr_json_bytes is not None: + zarr_format = 3 + else: + zarr_format = 2 + else: + raise ValueError(f"unexpected zarr_format: {zarr_format}") + + if zarr_format == 2: + # V2 arrays are comprised of a .zarray and .zattrs objects + assert zarray_bytes is not None + zarray_dict = json.loads(zarray_bytes) + zattrs_dict = json.loads(zattrs_bytes) if zattrs_bytes is not None else {} + zarray_dict["attributes"] = zattrs_dict + return cls(store_path=store_path, metadata=ArrayV2Metadata.from_dict(zarray_dict)) else: - raise ValueError("no v2 support yet") - # return await ArrayV2.open(store_path) + # V3 arrays are comprised of a zarr.json object + assert zarr_json_bytes is not None + return cls( + store_path=store_path, + metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes)), + ) @property def ndim(self) -> int: @@ -181,15 +354,14 @@ def dtype(self) -> np.dtype[Any]: return self.metadata.dtype @property - def attrs(self) -> dict[str, Any]: + def attrs(self) -> dict[str, JSON]: return self.metadata.attributes async def getitem(self, selection: Selection) -> npt.NDArray[Any]: - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=self.metadata.chunk_grid.chunk_shape, + chunk_grid=self.metadata.chunk_grid, ) # setup output array @@ -200,11 +372,10 @@ async def getitem(self, selection: Selection) -> npt.NDArray[Any]: ) # reading chunks and decoding them - await self.codecs.read( + await self.metadata.codec_pipeline.read( [ ( - self.store_path - / self.metadata.chunk_key_encoding.encode_chunk_key(chunk_coords), + self.store_path / self.metadata.encode_chunk_key(chunk_coords), self.metadata.get_chunk_spec(chunk_coords, self.order), chunk_selection, out_selection, @@ -219,16 +390,16 @@ async def getitem(self, selection: Selection) -> npt.NDArray[Any]: else: return out[()] - async def _save_metadata(self) -> None: - await (self.store_path / ZARR_JSON).set(self.metadata.to_bytes()) + async def _save_metadata(self, metadata: ArrayMetadata) -> None: + to_save = metadata.to_bytes() + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] + await gather(*awaitables) async def setitem(self, selection: Selection, value: np.ndarray) -> None: - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) - chunk_shape = self.metadata.chunk_grid.chunk_shape indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=chunk_shape, + chunk_grid=self.metadata.chunk_grid, ) sel_shape = indexer.shape @@ -245,11 +416,10 @@ async def setitem(self, selection: Selection, value: np.ndarray) -> None: value = value.astype(self.metadata.dtype, order="A") # merging with existing data and encoding chunks - await self.codecs.write( + await self.metadata.codec_pipeline.write( [ ( - self.store_path - / self.metadata.chunk_key_encoding.encode_chunk_key(chunk_coords), + self.store_path / self.metadata.encode_chunk_key(chunk_coords), self.metadata.get_chunk_spec(chunk_coords, self.order), chunk_selection, out_selection, @@ -263,14 +433,11 @@ async def resize( self, new_shape: ChunkCoords, delete_outside_chunks: bool = True ) -> AsyncArray: assert len(new_shape) == len(self.metadata.shape) - new_metadata = replace(self.metadata, shape=new_shape) + new_metadata = self.metadata.update_shape(new_shape) # Remove all chunks outside of the new shape - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) - chunk_shape = self.metadata.chunk_grid.chunk_shape - chunk_key_encoding = self.metadata.chunk_key_encoding - old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) - new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) + old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape)) + new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape)) if delete_outside_chunks: @@ -279,7 +446,7 @@ async def _delete_key(key: str) -> None: await concurrent_map( [ - (chunk_key_encoding.encode_chunk_key(chunk_coords),) + (self.metadata.encode_chunk_key(chunk_coords),) for chunk_coords in old_chunk_coords.difference(new_chunk_coords) ], _delete_key, @@ -287,14 +454,14 @@ async def _delete_key(key: str) -> None: ) # Write new metadata - await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) + await self._save_metadata(new_metadata) return replace(self, metadata=new_metadata) - async def update_attributes(self, new_attributes: Dict[str, Any]) -> AsyncArray: - new_metadata = replace(self.metadata, attributes=new_attributes) + async def update_attributes(self, new_attributes: dict[str, JSON]) -> AsyncArray: + new_metadata = self.metadata.update_attributes(new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) + await self._save_metadata(new_metadata) return replace(self, metadata=new_metadata) def __repr__(self) -> str: @@ -313,17 +480,29 @@ def create( cls, store: StoreLike, *, + # v2 and v3 shape: ChunkCoords, dtype: npt.DTypeLike, - chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, + zarr_format: ZarrFormat = 3, + fill_value: Any | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime exists_ok: bool = False, ) -> Array: async_array = sync( @@ -331,12 +510,18 @@ def create( store=store, shape=shape, dtype=dtype, - chunk_shape=chunk_shape, + zarr_format=zarr_format, + attributes=attributes, fill_value=fill_value, + chunk_shape=chunk_shape, chunk_key_encoding=chunk_key_encoding, codecs=codecs, dimension_names=dimension_names, - attributes=attributes, + chunks=chunks, + dimension_separator=dimension_separator, + order=order, + filters=filters, + compressor=compressor, exists_ok=exists_ok, ), ) @@ -346,7 +531,7 @@ def create( def from_dict( cls, store_path: StorePath, - data: Dict[str, Any], + data: dict[str, JSON], ) -> Array: async_array = AsyncArray.from_dict(store_path=store_path, data=data) return cls(async_array) @@ -359,16 +544,6 @@ def open( async_array = sync(AsyncArray.open(store)) return cls(async_array) - @classmethod - def open_auto( - cls, - store: StoreLike, - ) -> Array: # TODO: Union[Array, ArrayV2]: - async_array = sync( - AsyncArray.open_auto(store), - ) - return cls(async_array) - @property def ndim(self) -> int: return self._async_array.ndim @@ -386,8 +561,8 @@ def dtype(self) -> np.dtype[Any]: return self._async_array.dtype @property - def attrs(self) -> dict[str, Any]: - return self._async_array.attrs + def attrs(self) -> Attributes: + return Attributes(self) @property def metadata(self) -> ArrayMetadata: @@ -418,7 +593,7 @@ def resize(self, new_shape: ChunkCoords) -> Array: ) ) - def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: + def update_attributes(self, new_attributes: dict[str, JSON]) -> Array: return type(self)( sync( self._async_array.update_attributes(new_attributes), diff --git a/src/zarr/array_v2.py b/src/zarr/array_v2.py deleted file mode 100644 index 18251e7db7..0000000000 --- a/src/zarr/array_v2.py +++ /dev/null @@ -1,516 +0,0 @@ -from __future__ import annotations - -import asyncio -from dataclasses import dataclass, replace -import json -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union - -import numcodecs -import numpy as np - -from numcodecs.compat import ensure_bytes, ensure_ndarray - -from zarr.common import ( - ZARRAY_JSON, - ZATTRS_JSON, - BytesLike, - ChunkCoords, - Selection, - SliceSelection, - concurrent_map, - to_thread, -) -from zarr.indexing import BasicIndexer, all_chunk_coords, is_total_slice -from zarr.metadata import ArrayV2Metadata -from zarr.store import StoreLike, StorePath, make_store_path -from zarr.sync import sync - -if TYPE_CHECKING: - from zarr.array import Array - - -@dataclass(frozen=True) -class _AsyncArrayProxy: - array: ArrayV2 - - def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: - return _AsyncArraySelectionProxy(self.array, selection) - - -@dataclass(frozen=True) -class _AsyncArraySelectionProxy: - array: ArrayV2 - selection: Selection - - async def get(self) -> np.ndarray: - return await self.array.get_async(self.selection) - - async def set(self, value: np.ndarray): - return await self.array.set_async(self.selection, value) - - -@dataclass(frozen=True) -class ArrayV2: - metadata: ArrayV2Metadata - attributes: Optional[Dict[str, Any]] - store_path: StorePath - - @classmethod - async def create_async( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: Optional[Union[None, int, float]] = None, - order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, Any]]] = None, - compressor: Optional[Dict[str, Any]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - ) -> ArrayV2: - store_path = make_store_path(store) - if not exists_ok: - assert not await (store_path / ZARRAY_JSON).exists() - - metadata = ArrayV2Metadata( - shape=shape, - dtype=np.dtype(dtype), - chunks=chunks, - order=order, - dimension_separator=dimension_separator, - fill_value=0 if fill_value is None else fill_value, - compressor=( - numcodecs.get_codec(compressor).get_config() if compressor is not None else None - ), - filters=( - [numcodecs.get_codec(filter).get_config() for filter in filters] - if filters is not None - else None - ), - ) - array = cls( - metadata=metadata, - store_path=store_path, - attributes=attributes, - ) - await array._save_metadata() - return array - - @classmethod - def create( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: Optional[Union[None, int, float]] = None, - order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, Any]]] = None, - compressor: Optional[Dict[str, Any]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - ) -> ArrayV2: - return sync( - cls.create_async( - store, - shape=shape, - dtype=dtype, - chunks=chunks, - order=order, - dimension_separator=dimension_separator, - fill_value=0 if fill_value is None else fill_value, - compressor=compressor, - filters=filters, - attributes=attributes, - exists_ok=exists_ok, - ), - ) - - @classmethod - async def open_async( - cls, - store: StoreLike, - ) -> ArrayV2: - store_path = make_store_path(store) - zarray_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZARRAY_JSON).get(), - (store_path / ZATTRS_JSON).get(), - ) - assert zarray_bytes is not None - return cls.from_dict( - store_path, - zarray_json=json.loads(zarray_bytes), - zattrs_json=json.loads(zattrs_bytes) if zattrs_bytes is not None else None, - ) - - @classmethod - def open( - cls, - store: StoreLike, - ) -> ArrayV2: - return sync( - cls.open_async(store), - ) - - @classmethod - def from_dict( - cls, - store_path: StorePath, - zarray_json: Any, - zattrs_json: Optional[Any], - ) -> ArrayV2: - metadata = ArrayV2Metadata.from_dict(zarray_json) - out = cls( - store_path=store_path, - metadata=metadata, - attributes=zattrs_json, - ) - out._validate_metadata() - return out - - async def _save_metadata(self) -> None: - self._validate_metadata() - - await (self.store_path / ZARRAY_JSON).set(self.metadata.to_bytes()) - if self.attributes is not None and len(self.attributes) > 0: - await (self.store_path / ZATTRS_JSON).set( - json.dumps(self.attributes).encode(), - ) - else: - await (self.store_path / ZATTRS_JSON).delete() - - def _validate_metadata(self) -> None: - assert len(self.metadata.shape) == len( - self.metadata.chunks - ), "`chunks` and `shape` need to have the same number of dimensions." - - @property - def ndim(self) -> int: - return len(self.metadata.shape) - - @property - def shape(self) -> ChunkCoords: - return self.metadata.shape - - @property - def dtype(self) -> np.dtype: - return self.metadata.dtype - - @property - def async_(self) -> _AsyncArrayProxy: - return _AsyncArrayProxy(self) - - def __getitem__(self, selection: Selection): - return sync(self.get_async(selection)) - - async def get_async(self, selection: Selection): - indexer = BasicIndexer( - selection, - shape=self.metadata.shape, - chunk_shape=self.metadata.chunks, - ) - - # setup output array - out = np.zeros( - indexer.shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - ) - - # reading chunks and decoding them - await concurrent_map( - [ - (chunk_coords, chunk_selection, out_selection, out) - for chunk_coords, chunk_selection, out_selection in indexer - ], - self._read_chunk, - ) - - if out.shape: - return out - else: - return out[()] - - async def _read_chunk( - self, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - out: np.ndarray, - ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) - - chunk_array = await self._decode_chunk(await store_path.get()) - if chunk_array is not None: - tmp = chunk_array[chunk_selection] - out[out_selection] = tmp - else: - out[out_selection] = self.metadata.fill_value - - async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.ndarray]: - if chunk_bytes is None: - return None - - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) - chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) - else: - chunk_array = ensure_ndarray(chunk_bytes) - - # ensure correct dtype - if str(chunk_array.dtype) != self.metadata.dtype: - chunk_array = chunk_array.view(self.metadata.dtype) - - # apply filters in reverse order - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters[::-1]: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.decode, chunk_array) - - # ensure correct chunk shape - if chunk_array.shape != self.metadata.chunks: - chunk_array = chunk_array.reshape( - self.metadata.chunks, - order=self.metadata.order, - ) - - return chunk_array - - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: - sync(self.set_async(selection, value)) - - async def set_async(self, selection: Selection, value: np.ndarray) -> None: - chunk_shape = self.metadata.chunks - indexer = BasicIndexer( - selection, - shape=self.metadata.shape, - chunk_shape=chunk_shape, - ) - - sel_shape = indexer.shape - - # check value shape - if np.isscalar(value): - # setting a scalar value - pass - else: - if not hasattr(value, "shape"): - value = np.asarray(value, self.metadata.dtype) - assert value.shape == sel_shape - if value.dtype != self.metadata.dtype: - value = value.astype(self.metadata.dtype, order="A") - - # merging with existing data and encoding chunks - await concurrent_map( - [ - ( - value, - chunk_shape, - chunk_coords, - chunk_selection, - out_selection, - ) - for chunk_coords, chunk_selection, out_selection in indexer - ], - self._write_chunk, - ) - - async def _write_chunk( - self, - value: np.ndarray, - chunk_shape: ChunkCoords, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) - - if is_total_slice(chunk_selection, chunk_shape): - # write entire chunks - if np.isscalar(value): - chunk_array = np.empty( - chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - ) - chunk_array.fill(value) - else: - chunk_array = value[out_selection] - await self._write_chunk_to_store(store_path, chunk_array) - - else: - # writing partial chunks - # read chunk first - tmp = await self._decode_chunk(await store_path.get()) - - # merge new value - if tmp is None: - chunk_array = np.empty( - chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - ) - chunk_array.fill(self.metadata.fill_value) - else: - chunk_array = tmp.copy( - order=self.metadata.order, - ) # make a writable copy - chunk_array[chunk_selection] = value[out_selection] - - await self._write_chunk_to_store(store_path, chunk_array) - - async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): - chunk_bytes: Optional[BytesLike] - if np.all(chunk_array == self.metadata.fill_value): - # chunks that only contain fill_value will be removed - await store_path.delete() - else: - chunk_bytes = await self._encode_chunk(chunk_array) - if chunk_bytes is None: - await store_path.delete() - else: - await store_path.set(chunk_bytes) - - async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: - chunk_array = chunk_array.ravel(order=self.metadata.order) - - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.encode, chunk_array) - - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) - if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: - chunk_array = chunk_array.copy(order="A") - encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) - else: - encoded_chunk_bytes = ensure_bytes(chunk_array) - - return encoded_chunk_bytes - - def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) - return "0" if chunk_identifier == "" else chunk_identifier - - async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: - assert len(new_shape) == len(self.metadata.shape) - new_metadata = replace(self.metadata, shape=new_shape) - - # Remove all chunks outside of the new shape - chunk_shape = self.metadata.chunks - old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) - new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) - - async def _delete_key(key: str) -> None: - await (self.store_path / key).delete() - - await concurrent_map( - [ - (self._encode_chunk_key(chunk_coords),) - for chunk_coords in old_chunk_coords.difference(new_chunk_coords) - ], - _delete_key, - ) - - # Write new metadata - await (self.store_path / ZARRAY_JSON).set(new_metadata.to_bytes()) - return replace(self, metadata=new_metadata) - - def resize(self, new_shape: ChunkCoords) -> ArrayV2: - return sync(self.resize_async(new_shape)) - - async def convert_to_v3_async(self) -> Array: - from sys import byteorder as sys_byteorder - - from zarr.abc.codec import Codec - from zarr.array import Array - from zarr.common import ZARR_JSON - from zarr.chunk_grids import RegularChunkGrid - from zarr.chunk_key_encodings import V2ChunkKeyEncoding - from zarr.metadata import ArrayMetadata, DataType - - from zarr.codecs import ( - BloscCodec, - BloscShuffle, - BytesCodec, - GzipCodec, - TransposeCodec, - ) - - data_type = DataType.from_dtype(self.metadata.dtype) - endian: Literal["little", "big"] - if self.metadata.dtype.byteorder == "=": - endian = sys_byteorder - elif self.metadata.dtype.byteorder == ">": - endian = "big" - else: - endian = "little" - - assert ( - self.metadata.filters is None or len(self.metadata.filters) == 0 - ), "Filters are not supported by v3." - - codecs: List[Codec] = [] - - if self.metadata.order == "F": - codecs.append(TransposeCodec(order=tuple(reversed(range(self.metadata.ndim))))) - codecs.append(BytesCodec(endian=endian)) - - if self.metadata.compressor is not None: - v2_codec = numcodecs.get_codec(self.metadata.compressor).get_config() - assert v2_codec["id"] in ( - "blosc", - "gzip", - ), "Only blosc and gzip are supported by v3." - if v2_codec["id"] == "blosc": - codecs.append( - BloscCodec( - typesize=data_type.byte_count, - cname=v2_codec["cname"], - clevel=v2_codec["clevel"], - shuffle=BloscShuffle.from_int(v2_codec.get("shuffle", 0)), - blocksize=v2_codec.get("blocksize", 0), - ) - ) - elif v2_codec["id"] == "gzip": - codecs.append(GzipCodec(level=v2_codec.get("level", 5))) - - new_metadata = ArrayMetadata( - shape=self.metadata.shape, - chunk_grid=RegularChunkGrid(chunk_shape=self.metadata.chunks), - data_type=data_type, - fill_value=0 if self.metadata.fill_value is None else self.metadata.fill_value, - chunk_key_encoding=V2ChunkKeyEncoding(separator=self.metadata.dimension_separator), - codecs=codecs, - attributes=self.attributes or {}, - dimension_names=None, - ) - - new_metadata_bytes = new_metadata.to_bytes() - await (self.store_path / ZARR_JSON).set(new_metadata_bytes) - - return Array.from_dict( - store_path=self.store_path, - data=json.loads(new_metadata_bytes), - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2: - await (self.store_path / ZATTRS_JSON).set(json.dumps(new_attributes).encode()) - return replace(self, attributes=new_attributes) - - def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2: - return sync( - self.update_attributes_async(new_attributes), - ) - - def convert_to_v3(self) -> Array: - return sync(self.convert_to_v3_async()) - - def __repr__(self): - return f"" diff --git a/src/zarr/attributes.py b/src/zarr/attributes.py index 18f6a63a55..e6b26309f2 100644 --- a/src/zarr/attributes.py +++ b/src/zarr/attributes.py @@ -1,21 +1,24 @@ from __future__ import annotations + from collections.abc import MutableMapping -from typing import TYPE_CHECKING, Any, Iterator, Union +from typing import TYPE_CHECKING, Iterator + +from zarr.common import JSON if TYPE_CHECKING: from zarr.group import Group from zarr.array import Array -class Attributes(MutableMapping[str, Any]): - def __init__(self, obj: Union[Array, Group]): +class Attributes(MutableMapping[str, JSON]): + def __init__(self, obj: Array | Group): # key=".zattrs", read_only=False, cache=True, synchronizer=None self._obj = obj - def __getitem__(self, key: str) -> Any: + def __getitem__(self, key: str) -> JSON: return self._obj.metadata.attributes[key] - def __setitem__(self, key: str, value: Any) -> None: + def __setitem__(self, key: str, value: JSON) -> None: new_attrs = dict(self._obj.metadata.attributes) new_attrs[key] = value self._obj = self._obj.update_attributes(new_attrs) diff --git a/src/zarr/chunk_grids.py b/src/zarr/chunk_grids.py index 73557f6e4b..16c0df9174 100644 --- a/src/zarr/chunk_grids.py +++ b/src/zarr/chunk_grids.py @@ -1,5 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict +import itertools +from typing import TYPE_CHECKING, Any, Dict, Iterator from dataclasses import dataclass from zarr.abc.metadata import Metadata @@ -10,6 +11,7 @@ parse_named_configuration, parse_shapelike, ) +from zarr.indexing import _ceildiv if TYPE_CHECKING: from typing_extensions import Self @@ -27,6 +29,9 @@ def from_dict(cls, data: Dict[str, JSON]) -> ChunkGrid: return RegularChunkGrid.from_dict(data) raise ValueError(f"Unknown chunk grid. Got {name_parsed}.") + def all_chunk_coords(self, array_shape: ChunkCoords) -> Iterator[ChunkCoords]: + raise NotImplementedError + @dataclass(frozen=True) class RegularChunkGrid(ChunkGrid): @@ -45,3 +50,8 @@ def from_dict(cls, data: Dict[str, Any]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "regular", "configuration": {"chunk_shape": list(self.chunk_shape)}} + + def all_chunk_coords(self, array_shape: ChunkCoords) -> Iterator[ChunkCoords]: + return itertools.product( + *(range(0, _ceildiv(s, c)) for s, c in zip(array_shape, self.chunk_shape)) + ) diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py new file mode 100644 index 0000000000..444ae96c5c --- /dev/null +++ b/src/zarr/codecs/_v2.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal +import numpy as np + +from zarr.codecs.mixins import ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin +from zarr.common import JSON, ArraySpec, BytesLike, to_thread + +import numcodecs +from numcodecs.compat import ensure_bytes, ensure_ndarray + + +@dataclass(frozen=True) +class V2Compressor(ArrayBytesCodecBatchMixin): + compressor: dict[str, JSON] | None + + is_fixed_size = False + + async def decode_single( + self, + chunk_bytes: BytesLike, + chunk_spec: ArraySpec, + ) -> np.ndarray: + if chunk_bytes is None: + return None + + if self.compressor is not None: + compressor = numcodecs.get_codec(self.compressor) + chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) + else: + chunk_array = ensure_ndarray(chunk_bytes) + + # ensure correct dtype + if str(chunk_array.dtype) != chunk_spec.dtype: + chunk_array = chunk_array.view(chunk_spec.dtype) + + return chunk_array + + async def encode_single( + self, + chunk_array: np.ndarray, + _chunk_spec: ArraySpec, + ) -> BytesLike | None: + if self.compressor is not None: + compressor = numcodecs.get_codec(self.compressor) + if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: + chunk_array = chunk_array.copy(order="A") + encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) + else: + encoded_chunk_bytes = ensure_bytes(chunk_array) + + return encoded_chunk_bytes + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError + + +@dataclass(frozen=True) +class V2Filters(ArrayArrayCodecBatchMixin): + filters: list[dict[str, JSON]] + order: Literal["C", "F"] + + is_fixed_size = False + + async def decode_single( + self, + chunk_array: np.ndarray, + chunk_spec: ArraySpec, + ) -> np.ndarray: + # apply filters in reverse order + if self.filters is not None: + for filter_metadata in self.filters[::-1]: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.decode, chunk_array) + + # ensure correct chunk shape + if chunk_array.shape != chunk_spec.shape: + chunk_array = chunk_array.reshape( + chunk_spec.shape, + order=self.order, + ) + + return chunk_array + + async def encode_single( + self, + chunk_array: np.ndarray, + _chunk_spec: ArraySpec, + ) -> np.ndarray | None: + chunk_array = chunk_array.ravel(order=self.order) + + for filter_metadata in self.filters: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.encode, chunk_array) + + return chunk_array + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index a46a77e95e..8ea8804d49 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -32,10 +32,7 @@ c_order_iter, morton_order_iter, ) -from zarr.metadata import ( - ArrayMetadata, - parse_codecs, -) +from zarr.metadata import ArrayMetadata, parse_codecs if TYPE_CHECKING: from typing import Awaitable, Callable, Dict, Iterator, Optional, Set @@ -394,7 +391,7 @@ async def decode_single( indexer = BasicIndexer( tuple(slice(0, s) for s in shard_shape), shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) # setup output array @@ -439,7 +436,7 @@ async def decode_partial_single( indexer = BasicIndexer( selection, shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) # setup output array @@ -503,7 +500,7 @@ async def encode_single( BasicIndexer( tuple(slice(0, s) for s in shard_shape), shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) ) @@ -546,7 +543,7 @@ async def encode_partial_single( BasicIndexer( selection, shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) ) diff --git a/src/zarr/group.py b/src/zarr/group.py index f8d57e3fba..5bc124a4f6 100644 --- a/src/zarr/group.py +++ b/src/zarr/group.py @@ -7,22 +7,33 @@ import logging import numpy.typing as npt -if TYPE_CHECKING: - from typing import Any, AsyncGenerator, Literal, Iterable +from zarr.abc.store import set_or_delete from zarr.abc.codec import Codec from zarr.abc.metadata import Metadata from zarr.array import AsyncArray, Array from zarr.attributes import Attributes -from zarr.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, ChunkCoords +from zarr.chunk_key_encodings import ChunkKeyEncoding +from zarr.common import ( + JSON, + ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, + ZGROUP_JSON, + ChunkCoords, + ZarrFormat, +) from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import SyncMixin, sync from typing import overload +if TYPE_CHECKING: + from typing import Any, AsyncGenerator, Literal, Iterable + logger = logging.getLogger("zarr.group") -def parse_zarr_format(data: Any) -> Literal[2, 3]: +def parse_zarr_format(data: Any) -> ZarrFormat: if data in (2, 3): return data msg = msg = f"Invalid zarr_format. Expected one 2 or 3. Got {data}." @@ -62,7 +73,7 @@ def _parse_async_node(node: AsyncArray | AsyncGroup) -> Array | Group: @dataclass(frozen=True) class GroupMetadata(Metadata): attributes: dict[str, Any] = field(default_factory=dict) - zarr_format: Literal[2, 3] = 3 + zarr_format: ZarrFormat = 3 node_type: Literal["group"] = field(default="group", init=False) # todo: rename this, since it doesn't return bytes @@ -75,7 +86,7 @@ def to_bytes(self) -> dict[str, bytes]: ZATTRS_JSON: json.dumps(self.attributes).encode(), } - def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: Literal[2, 3] = 3): + def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3): attributes_parsed = parse_attributes(attributes) zarr_format_parsed = parse_zarr_format(zarr_format) @@ -103,7 +114,7 @@ async def create( *, attributes: dict[str, Any] = {}, exists_ok: bool = False, - zarr_format: Literal[2, 3] = 3, + zarr_format: ZarrFormat = 3, ) -> AsyncGroup: store_path = make_store_path(store) if not exists_ok: @@ -245,6 +256,7 @@ async def delitem(self, key: str) -> None: elif self.metadata.zarr_format == 2: await asyncio.gather( (store_path / ZGROUP_JSON).delete(), # TODO: missing_ok=False + (store_path / ZARRAY_JSON).delete(), # TODO: missing_ok=False (store_path / ZATTRS_JSON).delete(), # TODO: missing_ok=True ) else: @@ -252,7 +264,7 @@ async def delitem(self, key: str) -> None: async def _save_metadata(self) -> None: to_save = self.metadata.to_bytes() - awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await asyncio.gather(*awaitables) @property @@ -278,13 +290,25 @@ async def create_array( path: str, shape: ChunkCoords, dtype: npt.DTypeLike, - chunk_shape: ChunkCoords, fill_value: Any | None = None, - chunk_key_encoding: tuple[Literal["default"], Literal[".", "/"]] - | tuple[Literal["v2"], Literal[".", "/"]] = ("default", "/"), - codecs: Iterable[Codec | dict[str, Any]] | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, dimension_names: Iterable[str] | None = None, - attributes: dict[str, Any] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime exists_ok: bool = False, ) -> AsyncArray: return await AsyncArray.create( @@ -297,6 +321,11 @@ async def create_array( codecs=codecs, dimension_names=dimension_names, attributes=attributes, + chunks=chunks, + dimension_separator=dimension_separator, + order=order, + filters=filters, + compressor=compressor, exists_ok=exists_ok, zarr_format=self.metadata.zarr_format, ) @@ -307,15 +336,7 @@ async def update_attributes(self, new_attributes: dict[str, Any]) -> "AsyncGroup self.metadata.attributes.update(new_attributes) # Write new metadata - to_save = self.metadata.to_bytes() - if self.metadata.zarr_format == 2: - # only save the .zattrs object - await (self.store_path / ZATTRS_JSON).set(to_save[ZATTRS_JSON]) - else: - await (self.store_path / ZARR_JSON).set(to_save[ZARR_JSON]) - - self.metadata.attributes.clear() - self.metadata.attributes.update(new_attributes) + await self._save_metadata() return self @@ -480,7 +501,7 @@ async def update_attributes_async(self, new_attributes: dict[str, Any]) -> Group # Write new metadata to_save = new_metadata.to_bytes() - awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await asyncio.gather(*awaitables) async_group = replace(self._async_group, metadata=new_metadata) diff --git a/src/zarr/indexing.py b/src/zarr/indexing.py index 9f324eb5ea..8e7cd95430 100644 --- a/src/zarr/indexing.py +++ b/src/zarr/indexing.py @@ -2,10 +2,13 @@ import itertools import math -from typing import Iterator, List, NamedTuple, Optional, Tuple +from typing import TYPE_CHECKING, Iterator, List, NamedTuple, Optional, Tuple from zarr.common import ChunkCoords, Selection, SliceSelection, product +if TYPE_CHECKING: + from zarr.chunk_grids import ChunkGrid + def _ensure_tuple(v: Selection) -> SliceSelection: if not isinstance(v, tuple): @@ -131,13 +134,18 @@ def __init__( self, selection: Selection, shape: Tuple[int, ...], - chunk_shape: Tuple[int, ...], + chunk_grid: ChunkGrid, ): + from zarr.chunk_grids import RegularChunkGrid + + assert isinstance( + chunk_grid, RegularChunkGrid + ), "Only regular chunk grid is supported, currently." # setup per-dimension indexers self.dim_indexers = [ _SliceDimIndexer(dim_sel, dim_len, dim_chunk_len) for dim_sel, dim_len, dim_chunk_len in zip( - _ensure_selection(selection, shape), shape, chunk_shape + _ensure_selection(selection, shape), shape, chunk_grid.chunk_shape ) ] self.shape = tuple(s.nitems for s in self.dim_indexers) @@ -202,7 +210,3 @@ def is_total_slice(item: Selection, shape: ChunkCoords) -> bool: ) else: raise TypeError("expected slice or tuple of slices, found %r" % item) - - -def all_chunk_coords(shape: ChunkCoords, chunk_shape: ChunkCoords) -> Iterator[ChunkCoords]: - return itertools.product(*(range(0, _ceildiv(s, c)) for s, c in zip(shape, chunk_shape))) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index d1e72a7600..4ea5f9bc5a 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -1,24 +1,29 @@ from __future__ import annotations +from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, cast, Dict, Iterable, Any -from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, cast, Iterable +from dataclasses import dataclass, field, replace import json import numpy as np import numpy.typing as npt +from zarr.abc.codec import Codec, CodecPipeline +from zarr.abc.metadata import Metadata from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, parse_separator +from zarr.codecs._v2 import V2Compressor, V2Filters if TYPE_CHECKING: - from typing import Literal, Union, List, Optional, Tuple + from typing import Literal + from typing_extensions import Self -from zarr.abc.codec import Codec, CodecPipeline -from zarr.abc.metadata import Metadata - from zarr.common import ( JSON, + ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, ArraySpec, ChunkCoords, parse_dtype, @@ -102,16 +107,58 @@ def from_dtype(cls, dtype: np.dtype[Any]) -> DataType: return DataType[dtype_to_data_type[dtype.str]] -@dataclass(frozen=True) -class ArrayMetadata(Metadata): +@dataclass(frozen=True, kw_only=True) +class ArrayMetadata(Metadata, ABC): + shape: ChunkCoords + chunk_grid: ChunkGrid + attributes: dict[str, JSON] + + @property + @abstractmethod + def dtype(self) -> np.dtype[Any]: + pass + + @property + @abstractmethod + def ndim(self) -> int: + pass + + @property + @abstractmethod + def codec_pipeline(self) -> CodecPipeline: + pass + + @abstractmethod + def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: + pass + + @abstractmethod + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + pass + + @abstractmethod + def to_bytes(self) -> dict[str, bytes]: + pass + + @abstractmethod + def update_shape(self, shape: ChunkCoords) -> Self: + pass + + @abstractmethod + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + pass + + +@dataclass(frozen=True, kw_only=True) +class ArrayV3Metadata(ArrayMetadata): shape: ChunkCoords data_type: np.dtype[Any] chunk_grid: ChunkGrid chunk_key_encoding: ChunkKeyEncoding fill_value: Any codecs: CodecPipeline - attributes: Dict[str, Any] = field(default_factory=dict) - dimension_names: Optional[Tuple[str, ...]] = None + attributes: dict[str, Any] = field(default_factory=dict) + dimension_names: tuple[str, ...] | None = None zarr_format: Literal[3] = field(default=3, init=False) node_type: Literal["array"] = field(default="array", init=False) @@ -180,6 +227,10 @@ def dtype(self) -> np.dtype[Any]: def ndim(self) -> int: return len(self.shape) + @property + def codec_pipeline(self) -> CodecPipeline: + return self.codecs + def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: assert isinstance( self.chunk_grid, RegularChunkGrid @@ -191,7 +242,10 @@ def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) - order=order, ) - def to_bytes(self) -> bytes: + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + return self.chunk_key_encoding.encode_chunk_key(chunk_coords) + + def to_bytes(self) -> dict[str, bytes]: def _json_convert(o): if isinstance(o, np.dtype): return str(o) @@ -203,13 +257,10 @@ def _json_convert(o): return o.get_config() raise TypeError - return json.dumps( - self.to_dict(), - default=_json_convert, - ).encode() + return {ZARR_JSON: json.dumps(self.to_dict(), default=_json_convert).encode()} @classmethod - def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: + def from_dict(cls, data: dict[str, JSON]) -> ArrayV3Metadata: # check that the zarr_format attribute is correct _ = parse_zarr_format_v3(data.pop("zarr_format")) # check that the node_type attribute is correct @@ -219,7 +270,7 @@ def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: return cls(**data, dimension_names=dimension_names) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: out_dict = super().to_dict() if not isinstance(out_dict, dict): @@ -231,18 +282,24 @@ def to_dict(self) -> Dict[str, Any]: out_dict.pop("dimension_names") return out_dict + def update_shape(self, shape: ChunkCoords) -> Self: + return replace(self, shape=shape) + + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + return replace(self, attributes=attributes) + -@dataclass(frozen=True) -class ArrayV2Metadata(Metadata): +@dataclass(frozen=True, kw_only=True) +class ArrayV2Metadata(ArrayMetadata): shape: ChunkCoords - chunks: ChunkCoords - dtype: np.dtype[Any] - fill_value: Union[None, int, float] = 0 + chunk_grid: RegularChunkGrid + data_type: np.dtype[Any] + fill_value: None | int | float = 0 order: Literal["C", "F"] = "C" - filters: Optional[List[Dict[str, Any]]] = None + filters: list[dict[str, JSON]] | None = None dimension_separator: Literal[".", "/"] = "." - compressor: Optional[Dict[str, Any]] = None - attributes: Optional[Dict[str, Any]] = cast(Dict[str, Any], field(default_factory=dict)) + compressor: dict[str, JSON] | None = None + attributes: dict[str, JSON] = cast(dict[str, JSON], field(default_factory=dict)) zarr_format: Literal[2] = field(init=False, default=2) def __init__( @@ -254,9 +311,9 @@ def __init__( fill_value: Any, order: Literal["C", "F"], dimension_separator: Literal[".", "/"] = ".", - compressor: Optional[Dict[str, Any]] = None, - filters: Optional[List[Dict[str, Any]]] = None, - attributes: Optional[Dict[str, JSON]] = None, + compressor: dict[str, JSON] | None = None, + filters: list[dict[str, JSON]] | None = None, + attributes: dict[str, JSON] | None = None, ): """ Metadata for a Zarr version 2 array. @@ -266,14 +323,14 @@ def __init__( chunks_parsed = parse_shapelike(chunks) compressor_parsed = parse_compressor(compressor) order_parsed = parse_indexing_order(order) - dimension_separator_parsed = parse_separator(order) + dimension_separator_parsed = parse_separator(dimension_separator) filters_parsed = parse_filters(filters) fill_value_parsed = parse_fill_value(fill_value) attributes_parsed = parse_attributes(attributes) object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) - object.__setattr__(self, "chunks", chunks_parsed) + object.__setattr__(self, "chunk_grid", RegularChunkGrid(chunk_shape=chunks_parsed)) object.__setattr__(self, "compressor", compressor_parsed) object.__setattr__(self, "order", order_parsed) object.__setattr__(self, "dimension_separator", dimension_separator_parsed) @@ -288,7 +345,23 @@ def __init__( def ndim(self) -> int: return len(self.shape) - def to_bytes(self) -> bytes: + @property + def dtype(self) -> np.dtype[Any]: + return self.data_type + + @property + def chunks(self) -> ChunkCoords: + return self.chunk_grid.chunk_shape + + @property + def codec_pipeline(self) -> CodecPipeline: + from zarr.codecs import BatchedCodecPipeline + + return BatchedCodecPipeline.from_list( + [V2Filters(self.filters or [], self.order), V2Compressor(self.compressor)] + ) + + def to_bytes(self) -> dict[str, bytes]: def _json_convert(o): if isinstance(o, np.dtype): if o.fields is None: @@ -297,16 +370,54 @@ def _json_convert(o): return o.descr raise TypeError - return json.dumps(self.to_dict(), default=_json_convert).encode() + zarray_dict = self.to_dict() + assert isinstance(zarray_dict, dict) + zattrs_dict = zarray_dict.pop("attributes", {}) + assert isinstance(zattrs_dict, dict) + return { + ZARRAY_JSON: json.dumps(zarray_dict, default=_json_convert).encode(), + ZATTRS_JSON: json.dumps(zattrs_dict).encode(), + } @classmethod - def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: + def from_dict(cls, data: dict[str, Any]) -> ArrayV2Metadata: # check that the zarr_format attribute is correct _ = parse_zarr_format_v2(data.pop("zarr_format")) return cls(**data) + def to_dict(self) -> JSON: + zarray_dict = super().to_dict() + + assert isinstance(zarray_dict, dict) + + _ = zarray_dict.pop("chunk_grid") + zarray_dict["chunks"] = self.chunk_grid.chunk_shape + + _ = zarray_dict.pop("data_type") + zarray_dict["dtype"] = self.data_type + + return zarray_dict + + def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: + return ArraySpec( + shape=self.chunk_grid.chunk_shape, + dtype=self.dtype, + fill_value=self.fill_value, + order=order, + ) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.dimension_separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier + + def update_shape(self, shape: ChunkCoords) -> Self: + return replace(self, shape=shape) + + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + return replace(self, attributes=attributes) + -def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: +def parse_dimension_names(data: Any) -> tuple[str, ...] | None: if data is None: return data if isinstance(data, Iterable) and all([isinstance(x, str) for x in data]): @@ -316,11 +427,11 @@ def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: # todo: real validation -def parse_attributes(data: Any) -> Dict[str, JSON]: +def parse_attributes(data: Any) -> dict[str, JSON]: if data is None: return {} - data_json = cast(Dict[str, JSON], data) + data_json = cast(dict[str, JSON], data) return data_json @@ -366,7 +477,7 @@ def parse_v2_metadata(data: ArrayV2Metadata) -> ArrayV2Metadata: return data -def parse_codecs(data: Iterable[Union[Codec, JSON]]) -> CodecPipeline: +def parse_codecs(data: Iterable[Codec | JSON]) -> CodecPipeline: from zarr.codecs import BatchedCodecPipeline if not isinstance(data, Iterable): diff --git a/tests/v3/test_group.py b/tests/v3/test_group.py index 710eb3e527..89363373ba 100644 --- a/tests/v3/test_group.py +++ b/tests/v3/test_group.py @@ -233,10 +233,7 @@ def test_asyncgroup_from_dict(store: MemoryStore | LocalStore, data: dict[str, A @pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) -@pytest.mark.parametrize( - "zarr_format", - (pytest.param(2, marks=pytest.mark.xfail(reason="V2 arrays cannot be created yet.")), 3), -) +@pytest.mark.parametrize("zarr_format", (2, 3)) async def test_asyncgroup_getitem(store: LocalStore | MemoryStore, zarr_format: ZarrFormat) -> None: """ Create an `AsyncGroup`, then create members of that group, and ensure that we can access those @@ -263,10 +260,7 @@ async def test_asyncgroup_getitem(store: LocalStore | MemoryStore, zarr_format: @pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) -@pytest.mark.parametrize( - "zarr_format", - (2, 3), -) +@pytest.mark.parametrize("zarr_format", (2, 3)) async def test_asyncgroup_delitem(store: LocalStore | MemoryStore, zarr_format: ZarrFormat) -> None: agroup = await AsyncGroup.create(store=store, zarr_format=zarr_format) sub_array_path = "sub_array" @@ -315,10 +309,7 @@ async def test_asyncgroup_create_group( @pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) -@pytest.mark.parametrize( - "zarr_format", - (pytest.param(2, marks=pytest.mark.xfail(reason="V2 arrays cannot be created yet")), 3), -) +@pytest.mark.parametrize("zarr_format", (2, 3)) async def test_asyncgroup_create_array( store: LocalStore | MemoryStore, zarr_format: ZarrFormat, diff --git a/tests/v3/test_v2.py b/tests/v3/test_v2.py new file mode 100644 index 0000000000..5b831b1bb0 --- /dev/null +++ b/tests/v3/test_v2.py @@ -0,0 +1,28 @@ +from typing import Iterator +import numpy as np +import pytest + +from zarr.abc.store import Store +from zarr.array import Array +from zarr.store import StorePath, MemoryStore + + +@pytest.fixture +def store() -> Iterator[Store]: + yield StorePath(MemoryStore()) + + +def test_simple(store: Store): + data = np.arange(0, 256, dtype="uint16").reshape((16, 16)) + + a = Array.create( + store / "simple_v2", + zarr_format=2, + shape=data.shape, + chunks=(16, 16), + dtype=data.dtype, + fill_value=0, + ) + + a[:, :] = data + assert np.array_equal(data, a[:, :])