From cdc713f51e28917ddec85ca209a93ae46e471ec6 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 8 May 2024 22:16:35 +0200 Subject: [PATCH 01/11] adds wrapper codecs for the v2 codec pipeline --- src/zarr/array_v2.py | 165 ++++++----------------------------------- src/zarr/codecs/_v2.py | 105 ++++++++++++++++++++++++++ src/zarr/metadata.py | 18 ++++- 3 files changed, 143 insertions(+), 145 deletions(-) create mode 100644 src/zarr/codecs/_v2.py diff --git a/src/zarr/array_v2.py b/src/zarr/array_v2.py index 8c2cd3faec..32ced92639 100644 --- a/src/zarr/array_v2.py +++ b/src/zarr/array_v2.py @@ -8,20 +8,10 @@ import numcodecs import numpy as np -from numcodecs.compat import ensure_bytes, ensure_ndarray - -from zarr.common import ( - ZARRAY_JSON, - ZATTRS_JSON, - BytesLike, - ChunkCoords, - Selection, - SliceSelection, - concurrent_map, - to_thread, -) + +from zarr.common import JSON, ZARRAY_JSON, ZATTRS_JSON, ChunkCoords, Selection, concurrent_map from zarr.config import RuntimeConfiguration -from zarr.indexing import BasicIndexer, all_chunk_coords, is_total_slice +from zarr.indexing import BasicIndexer, all_chunk_coords from zarr.metadata import ArrayV2Metadata from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import sync @@ -53,7 +43,7 @@ async def set(self, value: np.ndarray): @dataclass(frozen=True) class ArrayV2: metadata: ArrayV2Metadata - attributes: Optional[Dict[str, Any]] + attributes: Optional[Dict[str, JSON]] store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -68,9 +58,9 @@ async def create_async( dimension_separator: Literal[".", "/"] = ".", fill_value: Optional[Union[None, int, float]] = None, order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, Any]]] = None, - compressor: Optional[Dict[str, Any]] = None, - attributes: Optional[Dict[str, Any]] = None, + filters: Optional[List[Dict[str, JSON]]] = None, + compressor: Optional[Dict[str, JSON]] = None, + attributes: Optional[Dict[str, JSON]] = None, exists_ok: bool = False, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> ArrayV2: @@ -236,12 +226,18 @@ async def get_async(self, selection: Selection): ) # reading chunks and decoding them - await concurrent_map( + await self.metadata.codecs.read_batch( [ - (chunk_coords, chunk_selection, out_selection, out) + ( + self.store_path / self._encode_chunk_key(chunk_coords), + self.metadata.get_chunk_spec(chunk_coords), + chunk_selection, + out_selection, + ) for chunk_coords, chunk_selection, out_selection in indexer ], - self._read_chunk, + out, + self.runtime_configuration, ) if out.shape: @@ -249,51 +245,6 @@ async def get_async(self, selection: Selection): else: return out[()] - async def _read_chunk( - self, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - out: np.ndarray, - ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) - - chunk_array = await self._decode_chunk(await store_path.get()) - if chunk_array is not None: - tmp = chunk_array[chunk_selection] - out[out_selection] = tmp - else: - out[out_selection] = self.metadata.fill_value - - async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.ndarray]: - if chunk_bytes is None: - return None - - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) - chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) - else: - chunk_array = ensure_ndarray(chunk_bytes) - - # ensure correct dtype - if str(chunk_array.dtype) != self.metadata.dtype: - chunk_array = chunk_array.view(self.metadata.dtype) - - # apply filters in reverse order - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters[::-1]: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.decode, chunk_array) - - # ensure correct chunk shape - if chunk_array.shape != self.metadata.chunks: - chunk_array = chunk_array.reshape( - self.metadata.chunks, - order=self.metadata.order, - ) - - return chunk_array - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: sync(self.set_async(selection, value), self.runtime_configuration.asyncio_loop) @@ -319,94 +270,20 @@ async def set_async(self, selection: Selection, value: np.ndarray) -> None: value = value.astype(self.metadata.dtype, order="A") # merging with existing data and encoding chunks - await concurrent_map( + await self.metadata.codecs.write_batch( [ ( - value, - chunk_shape, - chunk_coords, + self.store_path / self._encode_chunk_key(chunk_coords), + self.metadata.get_chunk_spec(chunk_coords), chunk_selection, out_selection, ) for chunk_coords, chunk_selection, out_selection in indexer ], - self._write_chunk, + value, + self.runtime_configuration, ) - async def _write_chunk( - self, - value: np.ndarray, - chunk_shape: ChunkCoords, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) - - if is_total_slice(chunk_selection, chunk_shape): - # write entire chunks - if np.isscalar(value): - chunk_array = np.empty( - chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - ) - chunk_array.fill(value) - else: - chunk_array = value[out_selection] - await self._write_chunk_to_store(store_path, chunk_array) - - else: - # writing partial chunks - # read chunk first - tmp = await self._decode_chunk(await store_path.get()) - - # merge new value - if tmp is None: - chunk_array = np.empty( - chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - ) - chunk_array.fill(self.metadata.fill_value) - else: - chunk_array = tmp.copy( - order=self.metadata.order, - ) # make a writable copy - chunk_array[chunk_selection] = value[out_selection] - - await self._write_chunk_to_store(store_path, chunk_array) - - async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): - chunk_bytes: Optional[BytesLike] - if np.all(chunk_array == self.metadata.fill_value): - # chunks that only contain fill_value will be removed - await store_path.delete() - else: - chunk_bytes = await self._encode_chunk(chunk_array) - if chunk_bytes is None: - await store_path.delete() - else: - await store_path.set(chunk_bytes) - - async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: - chunk_array = chunk_array.ravel(order=self.metadata.order) - - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.encode, chunk_array) - - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) - if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: - chunk_array = chunk_array.copy(order="A") - encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) - else: - encoded_chunk_bytes = ensure_bytes(chunk_array) - - return encoded_chunk_bytes - def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) return "0" if chunk_identifier == "" else chunk_identifier diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py new file mode 100644 index 0000000000..143a84019d --- /dev/null +++ b/src/zarr/codecs/_v2.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal +import numpy as np + +from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec +from zarr.common import JSON, ArraySpec, BytesLike, to_thread +from zarr.config import RuntimeConfiguration + +import numcodecs +from numcodecs.compat import ensure_bytes, ensure_ndarray + + +@dataclass(frozen=True) +class V2Compressor(ArrayBytesCodec): + compressor: dict[str, JSON] | None + + is_fixed_size = False + + async def decode( + self, + chunk_bytes: BytesLike, + chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> np.ndarray: + if chunk_bytes is None: + return None + + if self.compressor is not None: + compressor = numcodecs.get_codec(self.compressor) + chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) + else: + chunk_array = ensure_ndarray(chunk_bytes) + + # ensure correct dtype + if str(chunk_array.dtype) != chunk_spec.dtype: + chunk_array = chunk_array.view(chunk_spec.dtype) + + return chunk_array + + async def encode( + self, + chunk_array: np.ndarray, + _chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> BytesLike | None: + if self.compressor is not None: + compressor = numcodecs.get_codec(self.compressor) + if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: + chunk_array = chunk_array.copy(order="A") + encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) + else: + encoded_chunk_bytes = ensure_bytes(chunk_array) + + return encoded_chunk_bytes + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError + + +@dataclass(frozen=True) +class V2Filters(ArrayArrayCodec): + filters: list[dict[str, JSON]] + order: Literal["C", "F"] + + is_fixed_size = False + + async def decode( + self, + chunk_array: np.ndarray, + chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> np.ndarray: + # apply filters in reverse order + if self.filters is not None: + for filter_metadata in self.filters[::-1]: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.decode, chunk_array) + + # ensure correct chunk shape + if chunk_array.shape != chunk_spec.shape: + chunk_array = chunk_array.reshape( + chunk_spec.shape, + order=self.order, + ) + + return chunk_array + + async def encode( + self, + chunk_array: np.ndarray, + _chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> np.ndarray | None: + chunk_array = chunk_array.ravel(order=self.order) + + for filter_metadata in self.filters: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.encode, chunk_array) + + return chunk_array + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 300b3637ed..01bd757993 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -8,6 +8,7 @@ from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, parse_separator +from zarr.codecs._v2 import V2Compressor, V2Filters if TYPE_CHECKING: @@ -269,7 +270,7 @@ def __init__( chunks_parsed = parse_shapelike(chunks) compressor_parsed = parse_compressor(compressor) order_parsed = parse_indexing_order(order) - dimension_separator_parsed = parse_separator(order) + dimension_separator_parsed = parse_separator(dimension_separator) filters_parsed = parse_filters(filters) fill_value_parsed = parse_fill_value(fill_value) attributes_parsed = parse_attributes(attributes) @@ -291,6 +292,14 @@ def __init__( def ndim(self) -> int: return len(self.shape) + @property + def codecs(self) -> CodecPipeline: + from zarr.codecs.pipeline.hybrid import HybridCodecPipeline + + return HybridCodecPipeline.from_list( + [V2Filters(self.filters or [], self.order), V2Compressor(self.compressor)] + ) + def to_bytes(self) -> bytes: def _json_convert(o): if isinstance(o, np.dtype): @@ -308,6 +317,13 @@ def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: _ = parse_zarr_format_v2(data.pop("zarr_format")) return cls(**data) + def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: + return ArraySpec( + shape=self.chunks, + dtype=self.dtype, + fill_value=self.fill_value, + ) + def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: if data is None: From 3958c74bd6e80a3971b71b96b9a51f28d5edbba4 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Thu, 9 May 2024 11:12:24 +0200 Subject: [PATCH 02/11] encode_chunk_key --- src/zarr/array.py | 9 +++------ src/zarr/array_v2.py | 10 +++------- src/zarr/metadata.py | 7 +++++++ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/zarr/array.py b/src/zarr/array.py index 853ca2b777..4716312fcd 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -210,8 +210,7 @@ async def getitem(self, selection: Selection) -> np.ndarray: await self.codecs.read_batch( [ ( - self.store_path - / self.metadata.chunk_key_encoding.encode_chunk_key(chunk_coords), + self.store_path / self.metadata.encode_chunk_key(chunk_coords), self.metadata.get_chunk_spec(chunk_coords), chunk_selection, out_selection, @@ -256,8 +255,7 @@ async def setitem(self, selection: Selection, value: np.ndarray) -> None: await self.codecs.write_batch( [ ( - self.store_path - / self.metadata.chunk_key_encoding.encode_chunk_key(chunk_coords), + self.store_path / self.metadata.encode_chunk_key(chunk_coords), self.metadata.get_chunk_spec(chunk_coords), chunk_selection, out_selection, @@ -277,7 +275,6 @@ async def resize( # Remove all chunks outside of the new shape assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) chunk_shape = self.metadata.chunk_grid.chunk_shape - chunk_key_encoding = self.metadata.chunk_key_encoding old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) @@ -288,7 +285,7 @@ async def _delete_key(key: str) -> None: await concurrent_map( [ - (chunk_key_encoding.encode_chunk_key(chunk_coords),) + (self.metadata.encode_chunk_key(chunk_coords),) for chunk_coords in old_chunk_coords.difference(new_chunk_coords) ], _delete_key, diff --git a/src/zarr/array_v2.py b/src/zarr/array_v2.py index 32ced92639..f6ab7071b8 100644 --- a/src/zarr/array_v2.py +++ b/src/zarr/array_v2.py @@ -229,7 +229,7 @@ async def get_async(self, selection: Selection): await self.metadata.codecs.read_batch( [ ( - self.store_path / self._encode_chunk_key(chunk_coords), + self.store_path / self.metadata.encode_chunk_key(chunk_coords), self.metadata.get_chunk_spec(chunk_coords), chunk_selection, out_selection, @@ -273,7 +273,7 @@ async def set_async(self, selection: Selection, value: np.ndarray) -> None: await self.metadata.codecs.write_batch( [ ( - self.store_path / self._encode_chunk_key(chunk_coords), + self.store_path / self.metadata.encode_chunk_key(chunk_coords), self.metadata.get_chunk_spec(chunk_coords), chunk_selection, out_selection, @@ -284,10 +284,6 @@ async def set_async(self, selection: Selection, value: np.ndarray) -> None: self.runtime_configuration, ) - def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) - return "0" if chunk_identifier == "" else chunk_identifier - async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: assert len(new_shape) == len(self.metadata.shape) new_metadata = replace(self.metadata, shape=new_shape) @@ -302,7 +298,7 @@ async def _delete_key(key: str) -> None: await concurrent_map( [ - (self._encode_chunk_key(chunk_coords),) + (self.metadata.encode_chunk_key(chunk_coords),) for chunk_coords in old_chunk_coords.difference(new_chunk_coords) ], _delete_key, diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 01bd757993..b6a91a731c 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -195,6 +195,9 @@ def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: fill_value=self.fill_value, ) + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + return self.chunk_key_encoding.encode_chunk_key(chunk_coords) + def to_bytes(self) -> bytes: def _json_convert(o): if isinstance(o, np.dtype): @@ -324,6 +327,10 @@ def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: fill_value=self.fill_value, ) + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.dimension_separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier + def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: if data is None: From ce726a0c46a2ed17ac5c367ead21fad6929d4faa Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Thu, 9 May 2024 20:40:11 +0200 Subject: [PATCH 03/11] refactor ArrayV2 away --- src/zarr/__init__.py | 3 +- src/zarr/abc/codec.py | 21 +- src/zarr/abc/store.py | 37 ++- src/zarr/array.py | 273 ++++++++++------ src/zarr/array_v2.py | 404 ------------------------ src/zarr/attributes.py | 19 +- src/zarr/chunk_grids.py | 12 +- src/zarr/codecs/pipeline/batched.py | 11 +- src/zarr/codecs/pipeline/core.py | 3 +- src/zarr/codecs/pipeline/hybrid.py | 7 +- src/zarr/codecs/pipeline/interleaved.py | 8 +- src/zarr/codecs/sharding.py | 11 +- src/zarr/group.py | 21 +- src/zarr/indexing.py | 18 +- src/zarr/metadata.py | 33 +- 15 files changed, 292 insertions(+), 589 deletions(-) delete mode 100644 src/zarr/array_v2.py diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index 9ae9dc54c4..e7a629002a 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -4,7 +4,6 @@ import zarr.codecs # noqa: F401 from zarr.array import Array, AsyncArray # noqa: F401 -from zarr.array_v2 import ArrayV2 from zarr.config import RuntimeConfiguration # noqa: F401 from zarr.group import AsyncGroup, Group # noqa: F401 from zarr.metadata import runtime_configuration # noqa: F401 @@ -33,7 +32,7 @@ async def open_auto_async( def open_auto( store: StoreLike, runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), -) -> Union[Array, ArrayV2, Group]: +) -> Union[Array, Group]: object = _sync( open_auto_async(store, runtime_configuration_), runtime_configuration_.asyncio_loop, diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 8f2ce9bcae..1d6c22b471 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -7,14 +7,13 @@ Callable, Iterable, Optional, - Protocol, Tuple, TypeVar, - runtime_checkable, ) import numpy as np from zarr.abc.metadata import Metadata +from zarr.abc.store import ByteGetter, ByteSetter from zarr.common import ArraySpec, concurrent_map @@ -42,24 +41,6 @@ async def wrap( return wrap -@runtime_checkable -class ByteGetter(Protocol): - async def get( - self, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: ... - - -@runtime_checkable -class ByteSetter(Protocol): - async def get( - self, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: ... - - async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None: ... - - async def delete(self) -> None: ... - - class Codec(Metadata): is_fixed_size: bool diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index d92f8d4e2e..7172167383 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -1,7 +1,9 @@ from abc import abstractmethod, ABC from collections.abc import AsyncGenerator -from typing import List, Tuple, Optional +from typing import List, Protocol, Tuple, Optional, runtime_checkable + +from zarr.common import BytesLike class Store(ABC): @@ -61,13 +63,13 @@ def supports_writes(self) -> bool: ... @abstractmethod - async def set(self, key: str, value: bytes) -> None: + async def set(self, key: str, value: BytesLike) -> None: """Store a (key, value) pair. Parameters ---------- key : str - value : bytes + value : BytesLike """ ... @@ -88,12 +90,12 @@ def supports_partial_writes(self) -> bool: ... @abstractmethod - async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + async def set_partial_values(self, key_start_values: List[Tuple[str, int, BytesLike]]) -> None: """Store values at a given key, starting at byte range_start. Parameters ---------- - key_start_values : list[tuple[str, int, bytes]] + key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key @@ -145,3 +147,28 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: AsyncGenerator[str, None] """ ... + + +@runtime_checkable +class ByteGetter(Protocol): + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: ... + + +@runtime_checkable +class ByteSetter(Protocol): + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: ... + + async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None: ... + + async def delete(self) -> None: ... + + +async def set_or_delete(byte_setter: ByteSetter, value: BytesLike | None) -> None: + if value is None: + await byte_setter.delete() + else: + await byte_setter.set(value) diff --git a/src/zarr/array.py b/src/zarr/array.py index 4716312fcd..9158fdaaf0 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -11,45 +11,53 @@ # 2. Do we really need runtime_configuration? Specifically, the asyncio_loop seems problematic +from asyncio import gather from dataclasses import dataclass, replace import json -from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union +from typing import Any, Iterable, Literal import numpy as np from zarr.abc.codec import Codec +from zarr.abc.store import set_or_delete # from zarr.array_v2 import ArrayV2 +from zarr.attributes import Attributes from zarr.codecs import BytesCodec from zarr.common import ( + JSON, ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, ChunkCoords, Selection, concurrent_map, ) from zarr.config import RuntimeConfiguration -from zarr.indexing import BasicIndexer, all_chunk_coords +from zarr.indexing import BasicIndexer from zarr.chunk_grids import RegularChunkGrid from zarr.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding -from zarr.metadata import ArrayMetadata +from zarr.metadata import ArrayMetadata, ArrayV2Metadata from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import sync -def parse_array_metadata(data: Any) -> ArrayMetadata: - if isinstance(data, ArrayMetadata): +def parse_array_metadata(data: Any) -> ArrayMetadata | ArrayV2Metadata: + if isinstance(data, ArrayMetadata | ArrayV2Metadata): return data elif isinstance(data, dict): - return ArrayMetadata.from_dict(data) - else: - raise TypeError + if data["zarr_format"] == 3: + return ArrayMetadata.from_dict(data) + elif data["zarr_format"] == 2: + return ArrayV2Metadata.from_dict(data) + raise TypeError @dataclass(frozen=True) class AsyncArray: - metadata: ArrayMetadata + metadata: ArrayMetadata | ArrayV2Metadata store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -59,7 +67,7 @@ def codecs(self): def __init__( self, - metadata: ArrayMetadata, + metadata: ArrayMetadata | ArrayV2Metadata, store_path: StorePath, runtime_configuration: RuntimeConfiguration, ): @@ -75,16 +83,15 @@ async def create( store: StoreLike, *, shape: ChunkCoords, - dtype: Union[str, np.dtype], + dtype: str | np.dtype, chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, + fill_value: Any | None = None, + chunk_key_encoding: ( + tuple[Literal["default"], Literal[".", "/"]] | tuple[Literal["v2"], Literal[".", "/"]] + ) = ("default", "/"), + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + attributes: dict[str, JSON] | None = None, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), exists_ok: bool = False, ) -> AsyncArray: @@ -117,19 +124,63 @@ async def create( runtime_configuration = runtime_configuration or RuntimeConfiguration() array = cls( - metadata=metadata, - store_path=store_path, - runtime_configuration=runtime_configuration, + metadata=metadata, store_path=store_path, runtime_configuration=runtime_configuration ) - await array._save_metadata() + await array._save_metadata(metadata) + return array + + @classmethod + async def create_v2( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: np.dtype, + chunks: ChunkCoords, + dimension_separator: Literal[".", "/"] = ".", + fill_value: None | int | float = None, + order: Literal["C", "F"] = "C", + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + attributes: dict[str, JSON] | None = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> AsyncArray: + import numcodecs + + store_path = make_store_path(store) + if not exists_ok: + assert not await (store_path / ZARRAY_JSON).exists() + + metadata = ArrayV2Metadata( + shape=shape, + dtype=np.dtype(dtype), + chunks=chunks, + order=order, + dimension_separator=dimension_separator, + fill_value=0 if fill_value is None else fill_value, + compressor=( + numcodecs.get_codec(compressor).get_config() if compressor is not None else None + ), + filters=( + [numcodecs.get_codec(filter).get_config() for filter in filters] + if filters is not None + else None + ), + attributes=attributes, + ) + array = cls( + metadata=metadata, store_path=store_path, runtime_configuration=runtime_configuration + ) + await array._save_metadata(metadata) return array @classmethod def from_dict( cls, store_path: StorePath, - data: Dict[str, Any], + data: dict[str, JSON], runtime_configuration: RuntimeConfiguration, ) -> AsyncArray: metadata = ArrayMetadata.from_dict(data) @@ -142,34 +193,60 @@ def from_dict( async def open( cls, store: StoreLike, + zarr_format: Literal[2, 3, None] = 3, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> AsyncArray: store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get() - assert zarr_json_bytes is not None - return cls.from_dict( - store_path, - json.loads(zarr_json_bytes), - runtime_configuration=runtime_configuration, - ) - @classmethod - async def open_auto( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2] - store_path = make_store_path(store) - v3_metadata_bytes = await (store_path / ZARR_JSON).get() - if v3_metadata_bytes is not None: - return cls.from_dict( - store_path, - json.loads(v3_metadata_bytes), - runtime_configuration=runtime_configuration or RuntimeConfiguration(), + if zarr_format == 2: + zarray_bytes, zattrs_bytes = await gather( + (store_path / ZARRAY_JSON).get(), (store_path / ZATTRS_JSON).get() + ) + if zarray_bytes is None: + raise KeyError(store_path) # filenotfounderror? + elif zarr_format == 3: + zarr_json_bytes = await (store_path / ZARR_JSON).get() + if zarr_json_bytes is None: + raise KeyError(store_path) # filenotfounderror? + elif zarr_format is None: + zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather( + (store_path / ZARR_JSON).get(), + (store_path / ZARRAY_JSON).get(), + (store_path / ZATTRS_JSON).get(), + ) + if zarr_json_bytes is not None and zarray_bytes is not None: + # TODO: revisit this exception type + # alternatively, we could warn and favor v3 + raise ValueError("Both zarr.json and .zarray objects exist") + if zarr_json_bytes is None and zarray_bytes is None: + raise KeyError(store_path) # filenotfounderror? + # set zarr_format based on which keys were found + if zarr_json_bytes is not None: + zarr_format = 3 + else: + zarr_format = 2 + else: + raise ValueError(f"unexpected zarr_format: {zarr_format}") + + if zarr_format == 2: + # V2 arrays are comprised of a .zarray and .zattrs objects + assert zarray_bytes is not None + zarray_dict = json.loads(zarray_bytes) + zattrs_dict = json.loads(zattrs_bytes) if zattrs_bytes is not None else {} + zarray_dict["attributes"] = zattrs_dict + return cls( + store_path=store_path, + metadata=ArrayV2Metadata.from_dict(zarray_dict), + runtime_configuration=runtime_configuration, ) else: - raise ValueError("no v2 support yet") - # return await ArrayV2.open(store_path) + # V3 arrays are comprised of a zarr.json object + assert zarr_json_bytes is not None + return cls( + store_path=store_path, + metadata=ArrayMetadata.from_dict(json.loads(zarr_json_bytes)), + runtime_configuration=runtime_configuration, + ) @property def ndim(self) -> int: @@ -187,16 +264,11 @@ def size(self) -> int: def dtype(self) -> np.dtype: return self.metadata.dtype - @property - def attrs(self) -> dict: - return self.metadata.attributes - async def getitem(self, selection: Selection) -> np.ndarray: - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=self.metadata.chunk_grid.chunk_shape, + chunk_grid=self.metadata.chunk_grid, ) # setup output array @@ -226,16 +298,16 @@ async def getitem(self, selection: Selection) -> np.ndarray: else: return out[()] - async def _save_metadata(self) -> None: - await (self.store_path / ZARR_JSON).set(self.metadata.to_bytes()) + async def _save_metadata(self, metadata: ArrayMetadata | ArrayV2Metadata) -> None: + to_save = metadata.to_bytes() + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] + await gather(*awaitables) async def setitem(self, selection: Selection, value: np.ndarray) -> None: - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) - chunk_shape = self.metadata.chunk_grid.chunk_shape indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=chunk_shape, + chunk_grid=self.metadata.chunk_grid, ) sel_shape = indexer.shape @@ -273,10 +345,8 @@ async def resize( new_metadata = replace(self.metadata, shape=new_shape) # Remove all chunks outside of the new shape - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) - chunk_shape = self.metadata.chunk_grid.chunk_shape - old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) - new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) + old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape)) + new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape)) if delete_outside_chunks: @@ -293,14 +363,14 @@ async def _delete_key(key: str) -> None: ) # Write new metadata - await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) + await self._save_metadata(new_metadata) return replace(self, metadata=new_metadata) - async def update_attributes(self, new_attributes: Dict[str, Any]) -> AsyncArray: + async def update_attributes(self, new_attributes: dict[str, JSON]) -> AsyncArray: new_metadata = replace(self.metadata, attributes=new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) + await self._save_metadata(new_metadata) return replace(self, metadata=new_metadata) def __repr__(self): @@ -320,16 +390,15 @@ def create( store: StoreLike, *, shape: ChunkCoords, - dtype: Union[str, np.dtype], + dtype: str | np.dtype, chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, + fill_value: Any | None = None, + chunk_key_encoding: ( + tuple[Literal["default"], Literal[".", "/"]] | tuple[Literal["v2"], Literal[".", "/"]] + ) = ("default", "/"), + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + attributes: dict[str, JSON] | None = None, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), exists_ok: bool = False, ) -> Array: @@ -351,11 +420,47 @@ def create( ) return cls(async_array) + @classmethod + def create_v2( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: np.dtype, + chunks: ChunkCoords, + dimension_separator: Literal[".", "/"] = ".", + fill_value: None | int | float = None, + order: Literal["C", "F"] = "C", + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + attributes: dict[str, JSON] | None = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Array: + async_array = sync( + AsyncArray.create_v2( + store=store, + shape=shape, + dtype=dtype, + chunks=chunks, + dimension_separator=dimension_separator, + fill_value=fill_value, + order=order, + compressor=compressor, + filters=filters, + attributes=attributes, + runtime_configuration=runtime_configuration, + exists_ok=exists_ok, + ), + runtime_configuration.asyncio_loop, + ) + return cls(async_array) + @classmethod def from_dict( cls, store_path: StorePath, - data: Dict[str, Any], + data: dict[str, JSON], runtime_configuration: RuntimeConfiguration, ) -> Array: async_array = AsyncArray.from_dict( @@ -375,18 +480,6 @@ def open( ) return cls(async_array) - @classmethod - def open_auto( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Array: # TODO: Union[Array, ArrayV2]: - async_array = sync( - AsyncArray.open_auto(store, runtime_configuration), - runtime_configuration.asyncio_loop, - ) - return cls(async_array) - @property def ndim(self) -> int: return self._async_array.ndim @@ -404,11 +497,11 @@ def dtype(self) -> np.dtype: return self._async_array.dtype @property - def attrs(self) -> dict: - return self._async_array.attrs + def attrs(self) -> Attributes: + return Attributes(self) @property - def metadata(self) -> ArrayMetadata: + def metadata(self) -> ArrayMetadata | ArrayV2Metadata: return self._async_array.metadata @property @@ -435,7 +528,7 @@ def resize(self, new_shape: ChunkCoords) -> Array: ) ) - def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: + def update_attributes(self, new_attributes: dict[str, JSON]) -> Array: return type(self)( sync( self._async_array.update_attributes(new_attributes), diff --git a/src/zarr/array_v2.py b/src/zarr/array_v2.py deleted file mode 100644 index f6ab7071b8..0000000000 --- a/src/zarr/array_v2.py +++ /dev/null @@ -1,404 +0,0 @@ -from __future__ import annotations - -import asyncio -from dataclasses import dataclass, replace -import json -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union - -import numcodecs -import numpy as np - - -from zarr.common import JSON, ZARRAY_JSON, ZATTRS_JSON, ChunkCoords, Selection, concurrent_map -from zarr.config import RuntimeConfiguration -from zarr.indexing import BasicIndexer, all_chunk_coords -from zarr.metadata import ArrayV2Metadata -from zarr.store import StoreLike, StorePath, make_store_path -from zarr.sync import sync - -if TYPE_CHECKING: - from zarr.array import Array - - -@dataclass(frozen=True) -class _AsyncArrayProxy: - array: ArrayV2 - - def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: - return _AsyncArraySelectionProxy(self.array, selection) - - -@dataclass(frozen=True) -class _AsyncArraySelectionProxy: - array: ArrayV2 - selection: Selection - - async def get(self) -> np.ndarray: - return await self.array.get_async(self.selection) - - async def set(self, value: np.ndarray): - return await self.array.set_async(self.selection, value) - - -@dataclass(frozen=True) -class ArrayV2: - metadata: ArrayV2Metadata - attributes: Optional[Dict[str, JSON]] - store_path: StorePath - runtime_configuration: RuntimeConfiguration - - @classmethod - async def create_async( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: Optional[Union[None, int, float]] = None, - order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, JSON]]] = None, - compressor: Optional[Dict[str, JSON]] = None, - attributes: Optional[Dict[str, JSON]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> ArrayV2: - store_path = make_store_path(store) - if not exists_ok: - assert not await (store_path / ZARRAY_JSON).exists() - - metadata = ArrayV2Metadata( - shape=shape, - dtype=np.dtype(dtype), - chunks=chunks, - order=order, - dimension_separator=dimension_separator, - fill_value=0 if fill_value is None else fill_value, - compressor=( - numcodecs.get_codec(compressor).get_config() if compressor is not None else None - ), - filters=( - [numcodecs.get_codec(filter).get_config() for filter in filters] - if filters is not None - else None - ), - ) - array = cls( - metadata=metadata, - store_path=store_path, - attributes=attributes, - runtime_configuration=runtime_configuration, - ) - await array._save_metadata() - return array - - @classmethod - def create( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: Optional[Union[None, int, float]] = None, - order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, Any]]] = None, - compressor: Optional[Dict[str, Any]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> ArrayV2: - return sync( - cls.create_async( - store, - shape=shape, - dtype=dtype, - chunks=chunks, - order=order, - dimension_separator=dimension_separator, - fill_value=0 if fill_value is None else fill_value, - compressor=compressor, - filters=filters, - attributes=attributes, - exists_ok=exists_ok, - runtime_configuration=runtime_configuration, - ), - runtime_configuration.asyncio_loop, - ) - - @classmethod - async def open_async( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> ArrayV2: - store_path = make_store_path(store) - zarray_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZARRAY_JSON).get(), - (store_path / ZATTRS_JSON).get(), - ) - assert zarray_bytes is not None - return cls.from_dict( - store_path, - zarray_json=json.loads(zarray_bytes), - zattrs_json=json.loads(zattrs_bytes) if zattrs_bytes is not None else None, - runtime_configuration=runtime_configuration, - ) - - @classmethod - def open( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> ArrayV2: - return sync( - cls.open_async(store, runtime_configuration), - runtime_configuration.asyncio_loop, - ) - - @classmethod - def from_dict( - cls, - store_path: StorePath, - zarray_json: Any, - zattrs_json: Optional[Any], - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> ArrayV2: - metadata = ArrayV2Metadata.from_dict(zarray_json) - out = cls( - store_path=store_path, - metadata=metadata, - attributes=zattrs_json, - runtime_configuration=runtime_configuration, - ) - out._validate_metadata() - return out - - async def _save_metadata(self) -> None: - self._validate_metadata() - - await (self.store_path / ZARRAY_JSON).set(self.metadata.to_bytes()) - if self.attributes is not None and len(self.attributes) > 0: - await (self.store_path / ZATTRS_JSON).set( - json.dumps(self.attributes).encode(), - ) - else: - await (self.store_path / ZATTRS_JSON).delete() - - def _validate_metadata(self) -> None: - assert len(self.metadata.shape) == len( - self.metadata.chunks - ), "`chunks` and `shape` need to have the same number of dimensions." - - @property - def ndim(self) -> int: - return len(self.metadata.shape) - - @property - def shape(self) -> ChunkCoords: - return self.metadata.shape - - @property - def dtype(self) -> np.dtype: - return self.metadata.dtype - - @property - def async_(self) -> _AsyncArrayProxy: - return _AsyncArrayProxy(self) - - def __getitem__(self, selection: Selection): - return sync(self.get_async(selection), self.runtime_configuration.asyncio_loop) - - async def get_async(self, selection: Selection): - indexer = BasicIndexer( - selection, - shape=self.metadata.shape, - chunk_shape=self.metadata.chunks, - ) - - # setup output array - out = np.zeros( - indexer.shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - ) - - # reading chunks and decoding them - await self.metadata.codecs.read_batch( - [ - ( - self.store_path / self.metadata.encode_chunk_key(chunk_coords), - self.metadata.get_chunk_spec(chunk_coords), - chunk_selection, - out_selection, - ) - for chunk_coords, chunk_selection, out_selection in indexer - ], - out, - self.runtime_configuration, - ) - - if out.shape: - return out - else: - return out[()] - - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: - sync(self.set_async(selection, value), self.runtime_configuration.asyncio_loop) - - async def set_async(self, selection: Selection, value: np.ndarray) -> None: - chunk_shape = self.metadata.chunks - indexer = BasicIndexer( - selection, - shape=self.metadata.shape, - chunk_shape=chunk_shape, - ) - - sel_shape = indexer.shape - - # check value shape - if np.isscalar(value): - # setting a scalar value - pass - else: - if not hasattr(value, "shape"): - value = np.asarray(value, self.metadata.dtype) - assert value.shape == sel_shape - if value.dtype != self.metadata.dtype: - value = value.astype(self.metadata.dtype, order="A") - - # merging with existing data and encoding chunks - await self.metadata.codecs.write_batch( - [ - ( - self.store_path / self.metadata.encode_chunk_key(chunk_coords), - self.metadata.get_chunk_spec(chunk_coords), - chunk_selection, - out_selection, - ) - for chunk_coords, chunk_selection, out_selection in indexer - ], - value, - self.runtime_configuration, - ) - - async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: - assert len(new_shape) == len(self.metadata.shape) - new_metadata = replace(self.metadata, shape=new_shape) - - # Remove all chunks outside of the new shape - chunk_shape = self.metadata.chunks - old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) - new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) - - async def _delete_key(key: str) -> None: - await (self.store_path / key).delete() - - await concurrent_map( - [ - (self.metadata.encode_chunk_key(chunk_coords),) - for chunk_coords in old_chunk_coords.difference(new_chunk_coords) - ], - _delete_key, - ) - - # Write new metadata - await (self.store_path / ZARRAY_JSON).set(new_metadata.to_bytes()) - return replace(self, metadata=new_metadata) - - def resize(self, new_shape: ChunkCoords) -> ArrayV2: - return sync(self.resize_async(new_shape), self.runtime_configuration.asyncio_loop) - - async def convert_to_v3_async(self) -> Array: - from sys import byteorder as sys_byteorder - - from zarr.abc.codec import Codec - from zarr.array import Array - from zarr.common import ZARR_JSON - from zarr.chunk_grids import RegularChunkGrid - from zarr.chunk_key_encodings import V2ChunkKeyEncoding - from zarr.metadata import ArrayMetadata, DataType - - from zarr.codecs import ( - BloscCodec, - BloscShuffle, - BytesCodec, - GzipCodec, - TransposeCodec, - ) - - data_type = DataType.from_dtype(self.metadata.dtype) - endian: Literal["little", "big"] - if self.metadata.dtype.byteorder == "=": - endian = sys_byteorder - elif self.metadata.dtype.byteorder == ">": - endian = "big" - else: - endian = "little" - - assert ( - self.metadata.filters is None or len(self.metadata.filters) == 0 - ), "Filters are not supported by v3." - - codecs: List[Codec] = [] - - if self.metadata.order == "F": - codecs.append(TransposeCodec(order=tuple(reversed(range(self.metadata.ndim))))) - codecs.append(BytesCodec(endian=endian)) - - if self.metadata.compressor is not None: - v2_codec = numcodecs.get_codec(self.metadata.compressor).get_config() - assert v2_codec["id"] in ( - "blosc", - "gzip", - ), "Only blosc and gzip are supported by v3." - if v2_codec["id"] == "blosc": - codecs.append( - BloscCodec( - typesize=data_type.byte_count, - cname=v2_codec["cname"], - clevel=v2_codec["clevel"], - shuffle=BloscShuffle.from_int(v2_codec.get("shuffle", 0)), - blocksize=v2_codec.get("blocksize", 0), - ) - ) - elif v2_codec["id"] == "gzip": - codecs.append(GzipCodec(level=v2_codec.get("level", 5))) - - new_metadata = ArrayMetadata( - shape=self.metadata.shape, - chunk_grid=RegularChunkGrid(chunk_shape=self.metadata.chunks), - data_type=data_type, - fill_value=0 if self.metadata.fill_value is None else self.metadata.fill_value, - chunk_key_encoding=V2ChunkKeyEncoding(separator=self.metadata.dimension_separator), - codecs=codecs, - attributes=self.attributes or {}, - dimension_names=None, - ) - - new_metadata_bytes = new_metadata.to_bytes() - await (self.store_path / ZARR_JSON).set(new_metadata_bytes) - - return Array.from_dict( - store_path=self.store_path, - data=json.loads(new_metadata_bytes), - runtime_configuration=self.runtime_configuration, - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2: - await (self.store_path / ZATTRS_JSON).set(json.dumps(new_attributes).encode()) - return replace(self, attributes=new_attributes) - - def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2: - return sync( - self.update_attributes_async(new_attributes), - self.runtime_configuration.asyncio_loop, - ) - - def convert_to_v3(self) -> Array: - return sync(self.convert_to_v3_async(), loop=self.runtime_configuration.asyncio_loop) - - def __repr__(self): - return f"" diff --git a/src/zarr/attributes.py b/src/zarr/attributes.py index 8086e18d7b..e6b26309f2 100644 --- a/src/zarr/attributes.py +++ b/src/zarr/attributes.py @@ -1,32 +1,35 @@ from __future__ import annotations + from collections.abc import MutableMapping -from typing import TYPE_CHECKING, Any, Union +from typing import TYPE_CHECKING, Iterator + +from zarr.common import JSON if TYPE_CHECKING: from zarr.group import Group from zarr.array import Array -class Attributes(MutableMapping[str, Any]): - def __init__(self, obj: Union[Array, Group]): +class Attributes(MutableMapping[str, JSON]): + def __init__(self, obj: Array | Group): # key=".zattrs", read_only=False, cache=True, synchronizer=None self._obj = obj - def __getitem__(self, key): + def __getitem__(self, key: str) -> JSON: return self._obj.metadata.attributes[key] - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: JSON) -> None: new_attrs = dict(self._obj.metadata.attributes) new_attrs[key] = value self._obj = self._obj.update_attributes(new_attrs) - def __delitem__(self, key): + def __delitem__(self, key: str) -> None: new_attrs = dict(self._obj.metadata.attributes) del new_attrs[key] self._obj = self._obj.update_attributes(new_attrs) - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self._obj.metadata.attributes) - def __len__(self): + def __len__(self) -> int: return len(self._obj.metadata.attributes) diff --git a/src/zarr/chunk_grids.py b/src/zarr/chunk_grids.py index 73557f6e4b..16c0df9174 100644 --- a/src/zarr/chunk_grids.py +++ b/src/zarr/chunk_grids.py @@ -1,5 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict +import itertools +from typing import TYPE_CHECKING, Any, Dict, Iterator from dataclasses import dataclass from zarr.abc.metadata import Metadata @@ -10,6 +11,7 @@ parse_named_configuration, parse_shapelike, ) +from zarr.indexing import _ceildiv if TYPE_CHECKING: from typing_extensions import Self @@ -27,6 +29,9 @@ def from_dict(cls, data: Dict[str, JSON]) -> ChunkGrid: return RegularChunkGrid.from_dict(data) raise ValueError(f"Unknown chunk grid. Got {name_parsed}.") + def all_chunk_coords(self, array_shape: ChunkCoords) -> Iterator[ChunkCoords]: + raise NotImplementedError + @dataclass(frozen=True) class RegularChunkGrid(ChunkGrid): @@ -45,3 +50,8 @@ def from_dict(cls, data: Dict[str, Any]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "regular", "configuration": {"chunk_shape": list(self.chunk_shape)}} + + def all_chunk_coords(self, array_shape: ChunkCoords) -> Iterator[ChunkCoords]: + return itertools.product( + *(range(0, _ceildiv(s, c)) for s, c in zip(array_shape, self.chunk_shape)) + ) diff --git a/src/zarr/codecs/pipeline/batched.py b/src/zarr/codecs/pipeline/batched.py index 5c72785a70..bb9f3459a9 100644 --- a/src/zarr/codecs/pipeline/batched.py +++ b/src/zarr/codecs/pipeline/batched.py @@ -5,8 +5,6 @@ from dataclasses import dataclass from zarr.abc.codec import ( - ByteGetter, - ByteSetter, Codec, ArrayArrayCodec, ArrayBytesCodec, @@ -14,6 +12,7 @@ ArrayBytesCodecPartialEncodeMixin, BytesBytesCodec, ) +from zarr.abc.store import ByteGetter, ByteSetter, set_or_delete from zarr.codecs.pipeline.core import CodecPipeline from zarr.common import concurrent_map from zarr.indexing import is_total_slice @@ -267,17 +266,11 @@ def _merge_chunk_array( runtime_configuration, ) - async def _write_key(byte_setter: ByteSetter, chunk_bytes: Optional[BytesLike]) -> None: - if chunk_bytes is None: - await byte_setter.delete() - else: - await byte_setter.set(chunk_bytes) - await concurrent_map( [ (byte_setter, chunk_bytes) for chunk_bytes, (byte_setter, _, _, _) in zip(chunk_bytes_batch, batch_info) ], - _write_key, + set_or_delete, runtime_configuration.concurrency, ) diff --git a/src/zarr/codecs/pipeline/core.py b/src/zarr/codecs/pipeline/core.py index caaebcbf97..9543692c72 100644 --- a/src/zarr/codecs/pipeline/core.py +++ b/src/zarr/codecs/pipeline/core.py @@ -11,11 +11,10 @@ ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin, - ByteGetter, - ByteSetter, BytesBytesCodec, Codec, ) +from zarr.abc.store import ByteGetter, ByteSetter from zarr.abc.metadata import Metadata from zarr.codecs.registry import get_codec_class from zarr.common import parse_named_configuration diff --git a/src/zarr/codecs/pipeline/hybrid.py b/src/zarr/codecs/pipeline/hybrid.py index 0b8bae3a45..6f6ecc7770 100644 --- a/src/zarr/codecs/pipeline/hybrid.py +++ b/src/zarr/codecs/pipeline/hybrid.py @@ -5,11 +5,8 @@ import numpy as np from dataclasses import dataclass -from zarr.abc.codec import ( - ByteGetter, - ByteSetter, - Codec, -) +from zarr.abc.codec import Codec +from zarr.abc.store import ByteGetter, ByteSetter from zarr.codecs.pipeline.batched import BatchedCodecPipeline from zarr.codecs.pipeline.core import CodecPipeline from zarr.common import concurrent_map diff --git a/src/zarr/codecs/pipeline/interleaved.py b/src/zarr/codecs/pipeline/interleaved.py index 1f82efb44a..9d530e497d 100644 --- a/src/zarr/codecs/pipeline/interleaved.py +++ b/src/zarr/codecs/pipeline/interleaved.py @@ -5,14 +5,13 @@ from dataclasses import dataclass from zarr.abc.codec import ( - ByteGetter, - ByteSetter, ArrayArrayCodec, ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin, BytesBytesCodec, ) +from zarr.abc.store import ByteGetter, ByteSetter, set_or_delete from zarr.codecs.pipeline.core import CodecPipeline from zarr.common import concurrent_map from zarr.indexing import is_total_slice @@ -260,10 +259,7 @@ async def _write_chunk_to_store(chunk_array: np.ndarray) -> None: chunk_bytes = await self.encode_single( chunk_array, chunk_spec, runtime_configuration ) - if chunk_bytes is None: - await byte_setter.delete() - else: - await byte_setter.set(chunk_bytes) + await set_or_delete(byte_setter, chunk_bytes) if is_total_slice(chunk_selection, chunk_spec.shape): # write entire chunks diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 8da250b9af..d618e410ae 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -7,13 +7,12 @@ import numpy as np from zarr.abc.codec import ( - ByteGetter, - ByteSetter, Codec, ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin, ) +from zarr.abc.store import ByteGetter, ByteSetter from zarr.codecs.bytes import BytesCodec from zarr.codecs.crc32c_ import Crc32cCodec from zarr.codecs.pipeline import CodecPipeline, InterleavedCodecPipeline @@ -392,7 +391,7 @@ async def decode( indexer = BasicIndexer( tuple(slice(0, s) for s in shard_shape), shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) # setup output array @@ -439,7 +438,7 @@ async def decode_partial( indexer = BasicIndexer( selection, shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) # setup output array @@ -505,7 +504,7 @@ async def encode( BasicIndexer( tuple(slice(0, s) for s in shard_shape), shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) ) @@ -550,7 +549,7 @@ async def encode_partial( BasicIndexer( selection, shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) ) diff --git a/src/zarr/group.py b/src/zarr/group.py index 4da059c814..a6c98ff3dc 100644 --- a/src/zarr/group.py +++ b/src/zarr/group.py @@ -13,6 +13,7 @@ Literal, AsyncIterator, ) +from zarr.abc.store import set_or_delete from zarr.abc.metadata import Metadata from zarr.array import AsyncArray, Array @@ -49,13 +50,15 @@ class GroupMetadata(Metadata): node_type: Literal["group"] = field(default="group", init=False) # todo: rename this, since it doesn't return bytes - def to_bytes(self) -> dict[str, bytes]: + def to_bytes(self) -> dict[str, bytes | None]: if self.zarr_format == 3: return {ZARR_JSON: json.dumps(self.to_dict()).encode()} else: return { ZGROUP_JSON: json.dumps({"zarr_format": 2}).encode(), - ZATTRS_JSON: json.dumps(self.attributes).encode(), + ZATTRS_JSON: json.dumps(self.attributes).encode() + if len(self.attributes) > 0 + else None, } def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: Literal[2, 3] = 3): @@ -248,7 +251,7 @@ async def delitem(self, key: str) -> None: async def _save_metadata(self) -> None: to_save = self.metadata.to_bytes() - awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await asyncio.gather(*awaitables) @property @@ -281,15 +284,7 @@ async def update_attributes(self, new_attributes: dict[str, Any]): self.metadata.attributes.update(new_attributes) # Write new metadata - to_save = self.metadata.to_bytes() - if self.metadata.zarr_format == 2: - # only save the .zattrs object - await (self.store_path / ZATTRS_JSON).set(to_save[ZATTRS_JSON]) - else: - await (self.store_path / ZARR_JSON).set(to_save[ZARR_JSON]) - - self.metadata.attributes.clear() - self.metadata.attributes.update(new_attributes) + await self._save_metadata() return self @@ -461,7 +456,7 @@ async def update_attributes_async(self, new_attributes: dict[str, Any]) -> Group # Write new metadata to_save = new_metadata.to_bytes() - awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await asyncio.gather(*awaitables) async_group = replace(self._async_group, metadata=new_metadata) diff --git a/src/zarr/indexing.py b/src/zarr/indexing.py index 9f324eb5ea..8e7cd95430 100644 --- a/src/zarr/indexing.py +++ b/src/zarr/indexing.py @@ -2,10 +2,13 @@ import itertools import math -from typing import Iterator, List, NamedTuple, Optional, Tuple +from typing import TYPE_CHECKING, Iterator, List, NamedTuple, Optional, Tuple from zarr.common import ChunkCoords, Selection, SliceSelection, product +if TYPE_CHECKING: + from zarr.chunk_grids import ChunkGrid + def _ensure_tuple(v: Selection) -> SliceSelection: if not isinstance(v, tuple): @@ -131,13 +134,18 @@ def __init__( self, selection: Selection, shape: Tuple[int, ...], - chunk_shape: Tuple[int, ...], + chunk_grid: ChunkGrid, ): + from zarr.chunk_grids import RegularChunkGrid + + assert isinstance( + chunk_grid, RegularChunkGrid + ), "Only regular chunk grid is supported, currently." # setup per-dimension indexers self.dim_indexers = [ _SliceDimIndexer(dim_sel, dim_len, dim_chunk_len) for dim_sel, dim_len, dim_chunk_len in zip( - _ensure_selection(selection, shape), shape, chunk_shape + _ensure_selection(selection, shape), shape, chunk_grid.chunk_shape ) ] self.shape = tuple(s.nitems for s in self.dim_indexers) @@ -202,7 +210,3 @@ def is_total_slice(item: Selection, shape: ChunkCoords) -> bool: ) else: raise TypeError("expected slice or tuple of slices, found %r" % item) - - -def all_chunk_coords(shape: ChunkCoords, chunk_shape: ChunkCoords) -> Iterator[ChunkCoords]: - return itertools.product(*(range(0, _ceildiv(s, c)) for s, c in zip(shape, chunk_shape))) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index b6a91a731c..7c6b199fb8 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -21,6 +21,9 @@ from zarr.common import ( JSON, + ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, ArraySpec, ChunkCoords, parse_dtype, @@ -198,7 +201,7 @@ def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: return self.chunk_key_encoding.encode_chunk_key(chunk_coords) - def to_bytes(self) -> bytes: + def to_bytes(self) -> dict[str, bytes]: def _json_convert(o): if isinstance(o, np.dtype): return str(o) @@ -210,10 +213,7 @@ def _json_convert(o): return o.get_config() raise TypeError - return json.dumps( - self.to_dict(), - default=_json_convert, - ).encode() + return {ZARR_JSON: json.dumps(self.to_dict(), default=_json_convert).encode()} @classmethod def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: @@ -242,14 +242,14 @@ def to_dict(self) -> Dict[str, Any]: @dataclass(frozen=True) class ArrayV2Metadata(Metadata): shape: ChunkCoords - chunks: ChunkCoords + chunk_grid: RegularChunkGrid dtype: np.dtype[Any] fill_value: Union[None, int, float] = 0 order: Literal["C", "F"] = "C" filters: Optional[List[Dict[str, Any]]] = None dimension_separator: Literal[".", "/"] = "." compressor: Optional[Dict[str, Any]] = None - attributes: Optional[Dict[str, Any]] = cast(Dict[str, Any], field(default_factory=dict)) + attributes: Dict[str, Any] = cast(Dict[str, Any], field(default_factory=dict)) zarr_format: Literal[2] = field(init=False, default=2) def __init__( @@ -280,7 +280,7 @@ def __init__( object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) - object.__setattr__(self, "chunks", chunks_parsed) + object.__setattr__(self, "chunks", RegularChunkGrid(chunk_shape=chunks_parsed)) object.__setattr__(self, "compressor", compressor_parsed) object.__setattr__(self, "order", order_parsed) object.__setattr__(self, "dimension_separator", dimension_separator_parsed) @@ -295,6 +295,10 @@ def __init__( def ndim(self) -> int: return len(self.shape) + @property + def chunks(self) -> ChunkCoords: + return self.chunk_grid.chunk_shape + @property def codecs(self) -> CodecPipeline: from zarr.codecs.pipeline.hybrid import HybridCodecPipeline @@ -303,7 +307,7 @@ def codecs(self) -> CodecPipeline: [V2Filters(self.filters or [], self.order), V2Compressor(self.compressor)] ) - def to_bytes(self) -> bytes: + def to_bytes(self) -> dict[str, bytes | None]: def _json_convert(o): if isinstance(o, np.dtype): if o.fields is None: @@ -312,7 +316,14 @@ def _json_convert(o): return o.descr raise TypeError - return json.dumps(self.to_dict(), default=_json_convert).encode() + zarray_dict = self.to_dict() + assert isinstance(zarray_dict, dict) + zattrs_dict = zarray_dict.pop("attributes", {}) + assert isinstance(zattrs_dict, dict) + return { + ZARRAY_JSON: json.dumps(zarray_dict, default=_json_convert).encode(), + ZATTRS_JSON: json.dumps(zattrs_dict).encode() if len(zattrs_dict) > 0 else None, + } @classmethod def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: @@ -322,7 +333,7 @@ def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: return ArraySpec( - shape=self.chunks, + shape=self.chunk_grid.chunk_shape, dtype=self.dtype, fill_value=self.fill_value, ) From a81226eb10387108ed9f09c6ecef16c8c5dd81f5 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Thu, 9 May 2024 21:21:06 +0200 Subject: [PATCH 04/11] empty zattrs --- src/zarr/group.py | 6 ++---- src/zarr/metadata.py | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/zarr/group.py b/src/zarr/group.py index a6c98ff3dc..fe89d51551 100644 --- a/src/zarr/group.py +++ b/src/zarr/group.py @@ -50,15 +50,13 @@ class GroupMetadata(Metadata): node_type: Literal["group"] = field(default="group", init=False) # todo: rename this, since it doesn't return bytes - def to_bytes(self) -> dict[str, bytes | None]: + def to_bytes(self) -> dict[str, bytes]: if self.zarr_format == 3: return {ZARR_JSON: json.dumps(self.to_dict()).encode()} else: return { ZGROUP_JSON: json.dumps({"zarr_format": 2}).encode(), - ZATTRS_JSON: json.dumps(self.attributes).encode() - if len(self.attributes) > 0 - else None, + ZATTRS_JSON: json.dumps(self.attributes).encode(), } def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: Literal[2, 3] = 3): diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 7c6b199fb8..799955d915 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -307,7 +307,7 @@ def codecs(self) -> CodecPipeline: [V2Filters(self.filters or [], self.order), V2Compressor(self.compressor)] ) - def to_bytes(self) -> dict[str, bytes | None]: + def to_bytes(self) -> dict[str, bytes]: def _json_convert(o): if isinstance(o, np.dtype): if o.fields is None: @@ -322,7 +322,7 @@ def _json_convert(o): assert isinstance(zattrs_dict, dict) return { ZARRAY_JSON: json.dumps(zarray_dict, default=_json_convert).encode(), - ZATTRS_JSON: json.dumps(zattrs_dict).encode() if len(zattrs_dict) > 0 else None, + ZATTRS_JSON: json.dumps(zattrs_dict).encode(), } @classmethod From 2a64c696518f908c227af53bed42a036eb8a62d2 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Thu, 9 May 2024 21:39:51 +0200 Subject: [PATCH 05/11] Apply suggestions from code review Co-authored-by: Davis Bennett --- src/zarr/__init__.py | 2 +- src/zarr/abc/store.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index e7a629002a..b961c85607 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -32,7 +32,7 @@ async def open_auto_async( def open_auto( store: StoreLike, runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), -) -> Union[Array, Group]: +) -> Array | Group: object = _sync( open_auto_async(store, runtime_configuration_), runtime_configuration_.asyncio_loop, diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 7172167383..636175f339 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -90,7 +90,7 @@ def supports_partial_writes(self) -> bool: ... @abstractmethod - async def set_partial_values(self, key_start_values: List[Tuple[str, int, BytesLike]]) -> None: + async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None: """Store values at a given key, starting at byte range_start. Parameters From 69414dac8410af83d5f0be04a3501eacb89f9822 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Sat, 11 May 2024 08:41:58 +0200 Subject: [PATCH 06/11] unify ArrayMetadata --- src/zarr/abc/codec.py | 4 +- src/zarr/array.py | 26 ++++---- src/zarr/codecs/_v2.py | 14 ++-- src/zarr/codecs/pipeline/core.py | 4 +- src/zarr/codecs/sharding.py | 4 +- src/zarr/metadata.py | 106 ++++++++++++++++++++++++------- 6 files changed, 109 insertions(+), 49 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index cd04f71357..a9e86b289d 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from typing_extensions import Self from zarr.common import ArraySpec, BytesLike, SliceSelection - from zarr.metadata import ArrayMetadata + from zarr.metadata import ArrayV3Metadata from zarr.config import RuntimeConfiguration @@ -28,7 +28,7 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: def evolve(self, array_spec: ArraySpec) -> Self: return self - def validate(self, array_metadata: ArrayMetadata) -> None: + def validate(self, array_metadata: ArrayV3Metadata) -> None: pass diff --git a/src/zarr/array.py b/src/zarr/array.py index 9158fdaaf0..a70d583584 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -39,17 +39,17 @@ from zarr.indexing import BasicIndexer from zarr.chunk_grids import RegularChunkGrid from zarr.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding -from zarr.metadata import ArrayMetadata, ArrayV2Metadata +from zarr.metadata import ArrayMetadata, ArrayV3Metadata, ArrayV2Metadata from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import sync -def parse_array_metadata(data: Any) -> ArrayMetadata | ArrayV2Metadata: - if isinstance(data, ArrayMetadata | ArrayV2Metadata): +def parse_array_metadata(data: Any) -> ArrayMetadata: + if isinstance(data, ArrayMetadata): return data elif isinstance(data, dict): if data["zarr_format"] == 3: - return ArrayMetadata.from_dict(data) + return ArrayV3Metadata.from_dict(data) elif data["zarr_format"] == 2: return ArrayV2Metadata.from_dict(data) raise TypeError @@ -57,7 +57,7 @@ def parse_array_metadata(data: Any) -> ArrayMetadata | ArrayV2Metadata: @dataclass(frozen=True) class AsyncArray: - metadata: ArrayMetadata | ArrayV2Metadata + metadata: ArrayMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -67,7 +67,7 @@ def codecs(self): def __init__( self, - metadata: ArrayMetadata | ArrayV2Metadata, + metadata: ArrayMetadata, store_path: StorePath, runtime_configuration: RuntimeConfiguration, ): @@ -107,7 +107,7 @@ async def create( else: fill_value = 0 - metadata = ArrayMetadata( + metadata = ArrayV3Metadata( shape=shape, data_type=dtype, chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), @@ -183,7 +183,7 @@ def from_dict( data: dict[str, JSON], runtime_configuration: RuntimeConfiguration, ) -> AsyncArray: - metadata = ArrayMetadata.from_dict(data) + metadata = ArrayV3Metadata.from_dict(data) async_array = cls( metadata=metadata, store_path=store_path, runtime_configuration=runtime_configuration ) @@ -244,7 +244,7 @@ async def open( assert zarr_json_bytes is not None return cls( store_path=store_path, - metadata=ArrayMetadata.from_dict(json.loads(zarr_json_bytes)), + metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes)), runtime_configuration=runtime_configuration, ) @@ -298,7 +298,7 @@ async def getitem(self, selection: Selection) -> np.ndarray: else: return out[()] - async def _save_metadata(self, metadata: ArrayMetadata | ArrayV2Metadata) -> None: + async def _save_metadata(self, metadata: ArrayMetadata) -> None: to_save = metadata.to_bytes() awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await gather(*awaitables) @@ -342,7 +342,7 @@ async def resize( self, new_shape: ChunkCoords, delete_outside_chunks: bool = True ) -> AsyncArray: assert len(new_shape) == len(self.metadata.shape) - new_metadata = replace(self.metadata, shape=new_shape) + new_metadata = self.metadata.update_shape(new_shape) # Remove all chunks outside of the new shape old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape)) @@ -367,7 +367,7 @@ async def _delete_key(key: str) -> None: return replace(self, metadata=new_metadata) async def update_attributes(self, new_attributes: dict[str, JSON]) -> AsyncArray: - new_metadata = replace(self.metadata, attributes=new_attributes) + new_metadata = self.metadata.update_attributes(new_attributes) # Write new metadata await self._save_metadata(new_metadata) @@ -501,7 +501,7 @@ def attrs(self) -> Attributes: return Attributes(self) @property - def metadata(self) -> ArrayMetadata | ArrayV2Metadata: + def metadata(self) -> ArrayMetadata: return self._async_array.metadata @property diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index 143a84019d..31766182a1 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -4,7 +4,7 @@ from typing import Literal import numpy as np -from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec +from zarr.codecs.mixins import ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin from zarr.common import JSON, ArraySpec, BytesLike, to_thread from zarr.config import RuntimeConfiguration @@ -13,12 +13,12 @@ @dataclass(frozen=True) -class V2Compressor(ArrayBytesCodec): +class V2Compressor(ArrayBytesCodecBatchMixin): compressor: dict[str, JSON] | None is_fixed_size = False - async def decode( + async def decode_single( self, chunk_bytes: BytesLike, chunk_spec: ArraySpec, @@ -39,7 +39,7 @@ async def decode( return chunk_array - async def encode( + async def encode_single( self, chunk_array: np.ndarray, _chunk_spec: ArraySpec, @@ -60,13 +60,13 @@ def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) @dataclass(frozen=True) -class V2Filters(ArrayArrayCodec): +class V2Filters(ArrayArrayCodecBatchMixin): filters: list[dict[str, JSON]] order: Literal["C", "F"] is_fixed_size = False - async def decode( + async def decode_single( self, chunk_array: np.ndarray, chunk_spec: ArraySpec, @@ -87,7 +87,7 @@ async def decode( return chunk_array - async def encode( + async def encode_single( self, chunk_array: np.ndarray, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/pipeline/core.py b/src/zarr/codecs/pipeline/core.py index 9543692c72..d2fa488199 100644 --- a/src/zarr/codecs/pipeline/core.py +++ b/src/zarr/codecs/pipeline/core.py @@ -22,7 +22,7 @@ if TYPE_CHECKING: from typing import Iterator, List, Optional, Tuple, Union from typing_extensions import Self - from zarr.metadata import ArrayMetadata + from zarr.metadata import ArrayV3Metadata from zarr.config import RuntimeConfiguration from zarr.common import JSON, ArraySpec, BytesLike, SliceSelection @@ -131,7 +131,7 @@ def __iter__(self) -> Iterator[Codec]: for bb_codec in self.bytes_bytes_codecs: yield bb_codec - def validate(self, array_metadata: ArrayMetadata) -> None: + def validate(self, array_metadata: ArrayV3Metadata) -> None: for codec in self: codec.validate(array_metadata) diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 9d00a5d179..712d680c1d 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -33,7 +33,7 @@ morton_order_iter, ) from zarr.metadata import ( - ArrayMetadata, + ArrayV3Metadata, runtime_configuration as make_runtime_configuration, parse_codecs, ) @@ -358,7 +358,7 @@ def evolve(self, array_spec: ArraySpec) -> Self: return replace(self, codecs=evolved_codecs) return self - def validate(self, array_metadata: ArrayMetadata) -> None: + def validate(self, array_metadata: ArrayV3Metadata) -> None: if len(self.chunk_shape) != array_metadata.ndim: raise ValueError( "The shard's `chunk_shape` and array's `shape` need to have the " diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 799955d915..ff08a0df05 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -1,7 +1,8 @@ from __future__ import annotations +from abc import ABC, abstractmethod, abstractproperty from enum import Enum -from typing import TYPE_CHECKING, cast, Dict, Iterable, Any -from dataclasses import dataclass, field +from typing import TYPE_CHECKING, cast, Iterable +from dataclasses import dataclass, field, replace import json import numpy as np import numpy.typing as npt @@ -12,7 +13,8 @@ if TYPE_CHECKING: - from typing import Any, Literal, Union, List, Optional, Tuple + from typing import Any, Literal + from typing_extensions import Self from zarr.codecs import CodecPipeline @@ -34,7 +36,7 @@ def runtime_configuration( - order: Literal["C", "F"], concurrency: Optional[int] = None + order: Literal["C", "F"], concurrency: int | None = None ) -> RuntimeConfiguration: return RuntimeConfiguration(order=order, concurrency=concurrency) @@ -113,16 +115,62 @@ def from_dtype(cls, dtype: np.dtype[Any]) -> DataType: return DataType[dtype_to_data_type[dtype.str]] +class ArrayMetadata(ABC, Metadata): + @abstractproperty + def dtype(self) -> np.dtype[Any]: + pass + + @abstractproperty + def ndim(self) -> int: + pass + + @abstractproperty + def shape(self) -> ChunkCoords: + pass + + @abstractproperty + def chunk_grid(self) -> ChunkGrid: + pass + + @abstractproperty + def codecs(self) -> CodecPipeline: + pass + + @abstractproperty + def attributes(self) -> dict[str, JSON]: + pass + + @abstractmethod + def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: + pass + + @abstractmethod + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + pass + + @abstractmethod + def to_bytes(self) -> dict[str, bytes]: + pass + + @abstractmethod + def update_shape(self, shape: ChunkCoords) -> Self: + pass + + @abstractmethod + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + pass + + @dataclass(frozen=True) -class ArrayMetadata(Metadata): +class ArrayV3Metadata(ArrayMetadata): shape: ChunkCoords data_type: np.dtype[Any] chunk_grid: ChunkGrid chunk_key_encoding: ChunkKeyEncoding fill_value: Any codecs: CodecPipeline - attributes: Dict[str, Any] = field(default_factory=dict) - dimension_names: Optional[Tuple[str, ...]] = None + attributes: dict[str, Any] = field(default_factory=dict) + dimension_names: tuple[str, ...] | None = None zarr_format: Literal[3] = field(default=3, init=False) node_type: Literal["array"] = field(default="array", init=False) @@ -216,7 +264,7 @@ def _json_convert(o): return {ZARR_JSON: json.dumps(self.to_dict(), default=_json_convert).encode()} @classmethod - def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: + def from_dict(cls, data: dict[str, Any]) -> ArrayV3Metadata: # check that the zarr_format attribute is correct _ = parse_zarr_format_v3(data.pop("zarr_format")) # check that the node_type attribute is correct @@ -226,7 +274,7 @@ def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: return cls(**data, dimension_names=dimension_names) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: out_dict = super().to_dict() if not isinstance(out_dict, dict): @@ -238,18 +286,24 @@ def to_dict(self) -> Dict[str, Any]: out_dict.pop("dimension_names") return out_dict + def update_shape(self, shape: ChunkCoords) -> Self: + return replace(self, shape=shape) + + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + return replace(self, attributes=attributes) + @dataclass(frozen=True) -class ArrayV2Metadata(Metadata): +class ArrayV2Metadata(ArrayMetadata): shape: ChunkCoords chunk_grid: RegularChunkGrid dtype: np.dtype[Any] - fill_value: Union[None, int, float] = 0 + fill_value: None | int | float = 0 order: Literal["C", "F"] = "C" - filters: Optional[List[Dict[str, Any]]] = None + filters: list[dict[str, Any]] | None = None dimension_separator: Literal[".", "/"] = "." - compressor: Optional[Dict[str, Any]] = None - attributes: Dict[str, Any] = cast(Dict[str, Any], field(default_factory=dict)) + compressor: dict[str, Any] | None = None + attributes: dict[str, Any] = cast(dict[str, Any], field(default_factory=dict)) zarr_format: Literal[2] = field(init=False, default=2) def __init__( @@ -261,9 +315,9 @@ def __init__( fill_value: Any, order: Literal["C", "F"], dimension_separator: Literal[".", "/"] = ".", - compressor: Optional[Dict[str, Any]] = None, - filters: Optional[List[Dict[str, Any]]] = None, - attributes: Optional[Dict[str, JSON]] = None, + compressor: dict[str, Any] | None = None, + filters: list[dict[str, Any]] | None = None, + attributes: dict[str, JSON] | None = None, ): """ Metadata for a Zarr version 2 array. @@ -326,7 +380,7 @@ def _json_convert(o): } @classmethod - def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: + def from_dict(cls, data: dict[str, Any]) -> ArrayV2Metadata: # check that the zarr_format attribute is correct _ = parse_zarr_format_v2(data.pop("zarr_format")) return cls(**data) @@ -342,8 +396,14 @@ def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: chunk_identifier = self.dimension_separator.join(map(str, chunk_coords)) return "0" if chunk_identifier == "" else chunk_identifier + def update_shape(self, shape: ChunkCoords) -> Self: + return replace(self, shape=shape) + + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + return replace(self, attributes=attributes) + -def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: +def parse_dimension_names(data: Any) -> tuple[str, ...] | None: if data is None: return data if isinstance(data, Iterable) and all([isinstance(x, str) for x in data]): @@ -353,11 +413,11 @@ def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: # todo: real validation -def parse_attributes(data: Any) -> Dict[str, JSON]: +def parse_attributes(data: Any) -> dict[str, JSON]: if data is None: return {} - data_json = cast(Dict[str, JSON], data) + data_json = cast(dict[str, JSON], data) return data_json @@ -384,7 +444,7 @@ def parse_node_type_array(data: Any) -> Literal["array"]: # todo: real validation -def parse_filters(data: Any) -> List[Codec]: +def parse_filters(data: Any) -> list[Codec]: return data @@ -403,7 +463,7 @@ def parse_v2_metadata(data: ArrayV2Metadata) -> ArrayV2Metadata: return data -def parse_codecs(data: Iterable[Union[Codec, JSON]]) -> CodecPipeline: +def parse_codecs(data: Iterable[Codec | JSON]) -> CodecPipeline: from zarr.codecs.pipeline.hybrid import HybridCodecPipeline if not isinstance(data, Iterable): From 05fbafd78de040b51267588fcb8683ef3a155b97 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Mon, 13 May 2024 21:03:30 +0200 Subject: [PATCH 07/11] abstract ArrayMetadata --- src/zarr/array.py | 2 +- src/zarr/metadata.py | 64 +++++++++++++++++++++++--------------------- tests/v3/test_v2.py | 27 +++++++++++++++++++ 3 files changed, 62 insertions(+), 31 deletions(-) create mode 100644 tests/v3/test_v2.py diff --git a/src/zarr/array.py b/src/zarr/array.py index a70d583584..7e5a077732 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -63,7 +63,7 @@ class AsyncArray: @property def codecs(self): - return self.metadata.codecs + return self.metadata.codec_pipeline def __init__( self, diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index ff08a0df05..278f42dd68 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -1,7 +1,7 @@ from __future__ import annotations -from abc import ABC, abstractmethod, abstractproperty +from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, cast, Iterable +from typing import TYPE_CHECKING, Any, cast, Iterable from dataclasses import dataclass, field, replace import json import numpy as np @@ -13,7 +13,7 @@ if TYPE_CHECKING: - from typing import Any, Literal + from typing import Literal from typing_extensions import Self from zarr.codecs import CodecPipeline @@ -115,29 +115,25 @@ def from_dtype(cls, dtype: np.dtype[Any]) -> DataType: return DataType[dtype_to_data_type[dtype.str]] -class ArrayMetadata(ABC, Metadata): - @abstractproperty +@dataclass(frozen=True, kw_only=True) +class ArrayMetadata(Metadata, ABC): + shape: ChunkCoords + chunk_grid: ChunkGrid + attributes: dict[str, JSON] + + @property + @abstractmethod def dtype(self) -> np.dtype[Any]: pass - @abstractproperty + @property + @abstractmethod def ndim(self) -> int: pass - @abstractproperty - def shape(self) -> ChunkCoords: - pass - - @abstractproperty - def chunk_grid(self) -> ChunkGrid: - pass - - @abstractproperty - def codecs(self) -> CodecPipeline: - pass - - @abstractproperty - def attributes(self) -> dict[str, JSON]: + @property + @abstractmethod + def codec_pipeline(self) -> CodecPipeline: pass @abstractmethod @@ -161,7 +157,7 @@ def update_attributes(self, attributes: dict[str, JSON]) -> Self: pass -@dataclass(frozen=True) +@dataclass(frozen=True, kw_only=True) class ArrayV3Metadata(ArrayMetadata): shape: ChunkCoords data_type: np.dtype[Any] @@ -236,6 +232,10 @@ def dtype(self) -> np.dtype[Any]: def ndim(self) -> int: return len(self.shape) + @property + def codec_pipeline(self) -> CodecPipeline: + return self.codecs + def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: assert isinstance( self.chunk_grid, RegularChunkGrid @@ -293,17 +293,17 @@ def update_attributes(self, attributes: dict[str, JSON]) -> Self: return replace(self, attributes=attributes) -@dataclass(frozen=True) +@dataclass(frozen=True, kw_only=True) class ArrayV2Metadata(ArrayMetadata): shape: ChunkCoords chunk_grid: RegularChunkGrid - dtype: np.dtype[Any] + data_type: np.dtype[Any] fill_value: None | int | float = 0 order: Literal["C", "F"] = "C" - filters: list[dict[str, Any]] | None = None + filters: list[dict[str, JSON]] | None = None dimension_separator: Literal[".", "/"] = "." - compressor: dict[str, Any] | None = None - attributes: dict[str, Any] = cast(dict[str, Any], field(default_factory=dict)) + compressor: dict[str, JSON] | None = None + attributes: dict[str, JSON] = cast(dict[str, JSON], field(default_factory=dict)) zarr_format: Literal[2] = field(init=False, default=2) def __init__( @@ -315,8 +315,8 @@ def __init__( fill_value: Any, order: Literal["C", "F"], dimension_separator: Literal[".", "/"] = ".", - compressor: dict[str, Any] | None = None, - filters: list[dict[str, Any]] | None = None, + compressor: dict[str, JSON] | None = None, + filters: list[dict[str, JSON]] | None = None, attributes: dict[str, JSON] | None = None, ): """ @@ -334,7 +334,7 @@ def __init__( object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) - object.__setattr__(self, "chunks", RegularChunkGrid(chunk_shape=chunks_parsed)) + object.__setattr__(self, "chunk_grid", RegularChunkGrid(chunk_shape=chunks_parsed)) object.__setattr__(self, "compressor", compressor_parsed) object.__setattr__(self, "order", order_parsed) object.__setattr__(self, "dimension_separator", dimension_separator_parsed) @@ -349,12 +349,16 @@ def __init__( def ndim(self) -> int: return len(self.shape) + @property + def dtype(self) -> np.dtype[Any]: + return self.data_type + @property def chunks(self) -> ChunkCoords: return self.chunk_grid.chunk_shape @property - def codecs(self) -> CodecPipeline: + def codec_pipeline(self) -> CodecPipeline: from zarr.codecs.pipeline.hybrid import HybridCodecPipeline return HybridCodecPipeline.from_list( diff --git a/tests/v3/test_v2.py b/tests/v3/test_v2.py new file mode 100644 index 0000000000..bd3ee49ec9 --- /dev/null +++ b/tests/v3/test_v2.py @@ -0,0 +1,27 @@ +from typing import Iterator +import numpy as np +import pytest + +from zarr.abc.store import Store +from zarr.array import Array +from zarr.store import StorePath, MemoryStore + + +@pytest.fixture +def store() -> Iterator[Store]: + yield StorePath(MemoryStore()) + + +def test_simple(store: Store): + data = np.arange(0, 256, dtype="uint16").reshape((16, 16)) + + a = Array.create_v2( + store / "simple_v2", + shape=data.shape, + chunks=(16, 16), + dtype=data.dtype, + fill_value=0, + ) + + a[:, :] = data + assert np.array_equal(data, a[:, :]) From 1c6f2e9d9e38cdb01cab776b0504d0d8161f5eef Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Mon, 13 May 2024 21:32:26 +0200 Subject: [PATCH 08/11] unified Array.create --- src/zarr/array.py | 197 ++++++++++++++++++++++++++++++++------------ tests/v3/test_v2.py | 3 +- 2 files changed, 147 insertions(+), 53 deletions(-) diff --git a/src/zarr/array.py b/src/zarr/array.py index 7e5a077732..d2eb702929 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -38,7 +38,7 @@ from zarr.indexing import BasicIndexer from zarr.chunk_grids import RegularChunkGrid -from zarr.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding +from zarr.chunk_key_encodings import ChunkKeyEncoding, DefaultChunkKeyEncoding, V2ChunkKeyEncoding from zarr.metadata import ArrayMetadata, ArrayV3Metadata, ArrayV2Metadata from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import sync @@ -82,20 +82,120 @@ async def create( cls, store: StoreLike, *, + # v2 and v3 + shape: ChunkCoords, + dtype: str | np.dtype, + zarr_format: Literal[2, 3] = 3, + fill_value: Any | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + exists_ok: bool = False, + ) -> AsyncArray: + store_path = make_store_path(store) + + if chunk_shape is None: + if chunks is None: + raise ValueError("Either chunk_shape or chunks needs to be provided.") + chunk_shape = chunks + elif chunks is not None: + raise ValueError("Only one of chunk_shape or chunks must be provided.") + + if zarr_format == 3: + if dimension_separator is not None: + raise ValueError( + "dimension_separator cannot be used for arrays with version 3. Use chunk_key_encoding instead." + ) + if order is not None: + raise ValueError( + "order cannot be used for arrays with version 3. Use a transpose codec instead." + ) + if filters is not None: + raise ValueError( + "filters cannot be used for arrays with version 3. Use array-to-array codecs instead." + ) + if compressor is not None: + raise ValueError( + "compressor cannot be used for arrays with version 3. Use bytes-to-bytes codecs instead." + ) + return await cls._create_v3( + store_path, + shape=shape, + dtype=dtype, + chunk_shape=chunk_shape, + fill_value=fill_value, + chunk_key_encoding=chunk_key_encoding, + codecs=codecs, + dimension_names=dimension_names, + attributes=attributes, + runtime_configuration=runtime_configuration, + exists_ok=exists_ok, + ) + elif zarr_format == 2: + if codecs is not None: + raise ValueError( + "codecs cannot be used for arrays with version 2. Use filters and compressor instead." + ) + if chunk_key_encoding is not None: + raise ValueError( + "chunk_key_encoding cannot be used for arrays with version 2. Use dimension_separator instead." + ) + if dimension_names is not None: + raise ValueError("dimension_names cannot be used for arrays with version 2.") + return await cls._create_v2( + store_path, + shape=shape, + dtype=dtype, + chunks=chunk_shape, + dimension_separator=dimension_separator, + fill_value=fill_value, + order=order, + filters=filters, + compressor=compressor, + attributes=attributes, + runtime_configuration=runtime_configuration, + exists_ok=exists_ok, + ) + else: + raise ValueError(f"Insupported zarr_format. Got: {zarr_format}") + + @classmethod + async def _create_v3( + cls, + store_path: StorePath, + *, shape: ChunkCoords, dtype: str | np.dtype, chunk_shape: ChunkCoords, fill_value: Any | None = None, chunk_key_encoding: ( - tuple[Literal["default"], Literal[".", "/"]] | tuple[Literal["v2"], Literal[".", "/"]] - ) = ("default", "/"), + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, codecs: Iterable[Codec | dict[str, JSON]] | None = None, dimension_names: Iterable[str] | None = None, attributes: dict[str, JSON] | None = None, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), exists_ok: bool = False, ) -> AsyncArray: - store_path = make_store_path(store) if not exists_ok: assert not await (store_path / ZARR_JSON).exists() @@ -107,15 +207,20 @@ async def create( else: fill_value = 0 + if chunk_key_encoding is None: + chunk_key_encoding = ("default", "/") + if isinstance(chunk_key_encoding, tuple): + chunk_key_encoding = ( + V2ChunkKeyEncoding(separator=chunk_key_encoding[1]) + if chunk_key_encoding[0] == "v2" + else DefaultChunkKeyEncoding(separator=chunk_key_encoding[1]) + ) + metadata = ArrayV3Metadata( shape=shape, data_type=dtype, chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), - chunk_key_encoding=( - V2ChunkKeyEncoding(separator=chunk_key_encoding[1]) - if chunk_key_encoding[0] == "v2" - else DefaultChunkKeyEncoding(separator=chunk_key_encoding[1]) - ), + chunk_key_encoding=chunk_key_encoding, fill_value=fill_value, codecs=codecs, dimension_names=tuple(dimension_names) if dimension_names else None, @@ -131,16 +236,16 @@ async def create( return array @classmethod - async def create_v2( + async def _create_v2( cls, - store: StoreLike, + store_path: StorePath, *, shape: ChunkCoords, dtype: np.dtype, chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", + dimension_separator: Literal[".", "/"] | None = None, fill_value: None | int | float = None, - order: Literal["C", "F"] = "C", + order: Literal["C", "F"] | None = None, filters: list[dict[str, JSON]] | None = None, compressor: dict[str, JSON] | None = None, attributes: dict[str, JSON] | None = None, @@ -149,10 +254,15 @@ async def create_v2( ) -> AsyncArray: import numcodecs - store_path = make_store_path(store) if not exists_ok: assert not await (store_path / ZARRAY_JSON).exists() + if order is None: + order = "C" + + if dimension_separator is None: + dimension_separator = "." + metadata = ArrayV2Metadata( shape=shape, dtype=np.dtype(dtype), @@ -389,16 +499,29 @@ def create( cls, store: StoreLike, *, + # v2 and v3 shape: ChunkCoords, dtype: str | np.dtype, - chunk_shape: ChunkCoords, + zarr_format: Literal[2, 3] = 3, fill_value: Any | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, chunk_key_encoding: ( - tuple[Literal["default"], Literal[".", "/"]] | tuple[Literal["v2"], Literal[".", "/"]] - ) = ("default", "/"), + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, codecs: Iterable[Codec | dict[str, JSON]] | None = None, dimension_names: Iterable[str] | None = None, - attributes: dict[str, JSON] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), exists_ok: bool = False, ) -> Array: @@ -407,48 +530,18 @@ def create( store=store, shape=shape, dtype=dtype, - chunk_shape=chunk_shape, + zarr_format=zarr_format, + attributes=attributes, fill_value=fill_value, + chunk_shape=chunk_shape, chunk_key_encoding=chunk_key_encoding, codecs=codecs, dimension_names=dimension_names, - attributes=attributes, - runtime_configuration=runtime_configuration, - exists_ok=exists_ok, - ), - runtime_configuration.asyncio_loop, - ) - return cls(async_array) - - @classmethod - def create_v2( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: None | int | float = None, - order: Literal["C", "F"] = "C", - filters: list[dict[str, JSON]] | None = None, - compressor: dict[str, JSON] | None = None, - attributes: dict[str, JSON] | None = None, - exists_ok: bool = False, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Array: - async_array = sync( - AsyncArray.create_v2( - store=store, - shape=shape, - dtype=dtype, chunks=chunks, dimension_separator=dimension_separator, - fill_value=fill_value, order=order, - compressor=compressor, filters=filters, - attributes=attributes, + compressor=compressor, runtime_configuration=runtime_configuration, exists_ok=exists_ok, ), diff --git a/tests/v3/test_v2.py b/tests/v3/test_v2.py index bd3ee49ec9..5b831b1bb0 100644 --- a/tests/v3/test_v2.py +++ b/tests/v3/test_v2.py @@ -15,8 +15,9 @@ def store() -> Iterator[Store]: def test_simple(store: Store): data = np.arange(0, 256, dtype="uint16").reshape((16, 16)) - a = Array.create_v2( + a = Array.create( store / "simple_v2", + zarr_format=2, shape=data.shape, chunks=(16, 16), dtype=data.dtype, From bd150844e3d1659f02f654ce17c2168edd47d263 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Mon, 13 May 2024 22:15:03 +0200 Subject: [PATCH 09/11] use zarr.config for batch_size --- src/zarr/codecs/pipeline/hybrid.py | 6 ++---- src/zarr/config.py | 8 +++++++- tests/v3/test_config.py | 6 +++++- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/zarr/codecs/pipeline/hybrid.py b/src/zarr/codecs/pipeline/hybrid.py index d19f597c62..d90b83c0dd 100644 --- a/src/zarr/codecs/pipeline/hybrid.py +++ b/src/zarr/codecs/pipeline/hybrid.py @@ -17,8 +17,6 @@ from typing_extensions import Self from zarr.common import ArraySpec, BytesLike, SliceSelection -DEFAULT_BATCH_SIZE = 1000 - T = TypeVar("T") @@ -32,7 +30,7 @@ def batched(iterable: Iterable[T], n: int) -> Iterable[Tuple[T, ...]]: @dataclass(frozen=True) class HybridCodecPipeline(CodecPipeline): - batch_size: int # TODO: There needs to be a way of specifying this from the user code + batch_size: int batched_codec_pipeline: BatchedCodecPipeline @classmethod @@ -43,7 +41,7 @@ def from_list(cls, codecs: List[Codec], *, batch_size: Optional[int] = None) -> array_array_codecs=array_array_codecs, array_bytes_codec=array_bytes_codec, bytes_bytes_codecs=bytes_bytes_codecs, - batch_size=batch_size or DEFAULT_BATCH_SIZE, + batch_size=batch_size or config.get("codec_pipeline.batch_size"), batched_codec_pipeline=BatchedCodecPipeline( array_array_codecs=array_array_codecs, array_bytes_codec=array_bytes_codec, diff --git a/src/zarr/config.py b/src/zarr/config.py index e546cb1c23..b0afe71e87 100644 --- a/src/zarr/config.py +++ b/src/zarr/config.py @@ -6,7 +6,13 @@ config = Config( "zarr", - defaults=[{"array": {"order": "C"}, "async": {"concurrency": None, "timeout": None}}], + defaults=[ + { + "array": {"order": "C"}, + "async": {"concurrency": None, "timeout": None}, + "codec_pipeline": {"batch_size": 1000}, + } + ], ) diff --git a/tests/v3/test_config.py b/tests/v3/test_config.py index 43acdec5fa..8c5a7d6f59 100644 --- a/tests/v3/test_config.py +++ b/tests/v3/test_config.py @@ -4,7 +4,11 @@ def test_config_defaults_set(): # regression test for available defaults assert config.defaults == [ - {"array": {"order": "C"}, "async": {"concurrency": None, "timeout": None}} + { + "array": {"order": "C"}, + "async": {"concurrency": None, "timeout": None}, + "codec_pipeline": {"batch_size": 1000}, + } ] assert config.get("array.order") == "C" From c0c320e6130698e7b756d6c46d7fcbe8280f5dc7 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Tue, 14 May 2024 08:24:08 +0200 Subject: [PATCH 10/11] __init__.py aktualisieren Co-authored-by: Joe Hamman --- src/zarr/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index d77f939725..0d34ba2381 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -17,7 +17,7 @@ assert not __version__.startswith("0.0.0") -async def open_auto_async(store: StoreLike) -> Union[AsyncArray, AsyncGroup]: +async def open_auto_async(store: StoreLike) -> AsyncArray | AsyncGroup: store_path = make_store_path(store) try: return await AsyncArray.open(store_path) From fbbec548b842b3a8a635fee31a65136c97f4ef59 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Tue, 14 May 2024 13:48:04 +0200 Subject: [PATCH 11/11] ruff --- src/zarr/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index 0d34ba2381..ed94254afa 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Union - import zarr.codecs # noqa: F401 from zarr.array import Array, AsyncArray # noqa: F401 from zarr.config import config # noqa: F401