Skip to content
Closed
250 changes: 242 additions & 8 deletions zarr/_storage/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import sys
from collections.abc import MutableMapping
from string import ascii_letters, digits
from typing import Any, List, Optional, Union

from zarr.meta import Metadata2
from zarr.meta import Metadata2, Metadata3, _default_entry_point_metadata_v3
from zarr.util import normalize_storage_path

# v2 store keys
Expand Down Expand Up @@ -131,6 +133,169 @@ def rmdir(self, path: str = "") -> None:
_rmdir_from_keys(self, path)


class StoreV3(BaseStore):
_store_version = 3
_metadata_class = Metadata3

@staticmethod
def _valid_key(key: str) -> bool:
"""
Verify that a key conforms to the specification.

A key is any string containing only character in the range a-z, A-Z,
0-9, or in the set /.-_ it will return True if that's the case, False
otherwise.

In addition, in spec v3, keys can only start with the prefix meta/,
data/ or be exactly zarr.json and should not end with /. This should
not be exposed to the user, and is a store implementation detail, so
this method will raise a ValueError in that case.
"""
if sys.version_info > (3, 7):
if not key.isascii():
return False
if set(key) - set(ascii_letters + digits + "/.-_"):
return False

if (
not key.startswith("data/")
and (not key.startswith("meta/"))
and (not key == "zarr.json")
):
raise ValueError("keys starts with unexpected value: `{}`".format(key))

if key.endswith('/'):
raise ValueError("keys may not end in /")

return True

def list_prefix(self, prefix):
if prefix.startswith('/'):
raise ValueError("prefix must not begin with /")
# TODO: force prefix to end with /?
return [k for k in self.list() if k.startswith(prefix)]

def erase(self, key):
self.__delitem__(key)

def erase_prefix(self, prefix):
assert prefix.endswith("/")

if prefix == "/":
all_keys = self.list()
else:
all_keys = self.list_prefix(prefix)
for key in all_keys:
self.erase(key)

def list_dir(self, prefix):
"""
Note: carefully test this with trailing/leading slashes
"""
if prefix: # allow prefix = "" ?
assert prefix.endswith("/")

all_keys = self.list_prefix(prefix)
len_prefix = len(prefix)
keys = []
prefixes = []
for k in all_keys:
trail = k[len_prefix:]
if "/" not in trail:
keys.append(prefix + trail)
else:
prefixes.append(prefix + trail.split("/", maxsplit=1)[0] + "/")
return keys, list(set(prefixes))

def list(self):
if hasattr(self, 'keys'):
return list(self.keys())
raise NotImplementedError(
"The list method has not been implemented for this store type."
)

# TODO: Remove listdir? This method is just to match the current V2 stores
# The v3 spec mentions: list, list_dir, list_prefix
def listdir(self, path: str = ""):
if path and not path.endswith("/"):
path = path + "/"
keys, prefixes = self.list_dir(path)
prefixes = [p[len(path):].rstrip("/") for p in prefixes]
keys = [k[len(path):] for k in keys]
return keys + prefixes

# TODO: rmdir here is identical to the rmdir on Store so could potentially
# move to BaseStore instead.
def rmdir(self, path: str = "") -> None:
if not self.is_erasable():
raise NotImplementedError(
f'{type(self)} is not erasable, cannot call "rmdir"'
) # pragma: no cover
path = normalize_storage_path(path)
_rmdir_from_keys(self, path)

def __contains__(self, key):
# TODO: re-enable this check?
# if not key.startswith(("meta/", "data/")):
# raise ValueError(
# f'Key must start with either "meta/" or "data/". '
# f'Got {key}'
# )
return key in self.list()

def clear(self):
"""Remove all items from store."""
self.erase_prefix("/")

def __eq__(self, other):
from zarr.storage import KVStoreV3 # avoid circular import
if isinstance(other, KVStoreV3):
return self._mutable_mapping == other._mutable_mapping
else:
return NotImplemented

@staticmethod
def _ensure_store(store):
"""
We want to make sure internally that zarr stores are always a class
with a specific interface derived from ``Store``, which is slightly
different than ``MutableMapping``.

We'll do this conversion in a few places automatically
"""
from zarr.storage import KVStoreV3 # avoid circular import
if store is None:
return None
elif isinstance(store, StoreV3):
return store
elif isinstance(store, MutableMapping):
return KVStoreV3(store)
else:
for attr in [
"keys",
"values",
"get",
"__setitem__",
"__getitem__",
"__delitem__",
"__contains__",
]:
if not hasattr(store, attr):
break
else:
return KVStoreV3(store)

raise ValueError(
"v3 stores must be subclasses of StoreV3, "
"if your store exposes the MutableMapping interface wrap it in "
f"Zarr.storage.KVStoreV3. Got {store}"
)


# allow MutableMapping for backwards compatibility
StoreLike = Union[BaseStore, MutableMapping]


def _path_to_prefix(path: Optional[str]) -> str:
# assume path already normalized
if path:
Expand All @@ -140,17 +305,49 @@ def _path_to_prefix(path: Optional[str]) -> str:
return prefix


# TODO: Should this return default metadata or raise an Error if zarr.json
# is absent?
def _get_hierarchy_metadata(store=None):
meta = _default_entry_point_metadata_v3
if store is not None:
version = getattr(store, '_store_version', 2)
if version < 3:
raise ValueError("zarr.json hierarchy metadata not stored for "
f"zarr v{version} stores")
if 'zarr.json' in store:
meta = store._metadata_class.decode_hierarchy_metadata(store['zarr.json'])
return meta


def _rename_from_keys(store: BaseStore, src_path: str, dst_path: str) -> None:
# assume path already normalized
src_prefix = _path_to_prefix(src_path)
dst_prefix = _path_to_prefix(dst_path)
for key in list(store.keys()):
if key.startswith(src_prefix):
new_key = dst_prefix + key.lstrip(src_prefix)
store[new_key] = store.pop(key)


def _rmdir_from_keys(store: Union[BaseStore, MutableMapping], path: Optional[str] = None) -> None:
version = getattr(store, '_store_version', 2)
if version == 2:
root_prefixes = ['']
elif version == 3:
root_prefixes = ['meta/root/', 'data/root/']
for root_prefix in root_prefixes:
_src_prefix = root_prefix + src_prefix
_dst_prefix = root_prefix + dst_prefix
for key in list(store.keys()):
if key.startswith(_src_prefix):
new_key = _dst_prefix + key.lstrip(_src_prefix)
store[new_key] = store.pop(key)
if version == 3:
sfx = _get_hierarchy_metadata(store)['metadata_key_suffix']
_src_array_json = 'meta/root/' + src_prefix[:-1] + '.array' + sfx
if _src_array_json in store:
new_key = 'meta/root/' + dst_prefix[:-1] + '.array' + sfx
store[new_key] = store.pop(_src_array_json)
_src_group_json = 'meta/root/' + src_prefix[:-1] + '.group' + sfx
if _src_group_json in store:
new_key = 'meta/root/' + dst_prefix[:-1] + '.group' + sfx
store[new_key] = store.pop(_src_group_json)


def _rmdir_from_keys(store: StoreLike, path: Optional[str] = None) -> None:
# assume path already normalized
prefix = _path_to_prefix(path)
for key in list(store.keys()):
Expand All @@ -168,3 +365,40 @@ def _listdir_from_keys(store: BaseStore, path: Optional[str] = None) -> List[str
child = suffix.split('/')[0]
children.add(child)
return sorted(children)


def _prefix_to_array_key(store: StoreLike, prefix: str) -> str:
if getattr(store, "_store_version", 2) == 3:
if prefix:
sfx = _get_hierarchy_metadata(store)['metadata_key_suffix']
key = "meta/root/" + prefix.rstrip("/") + ".array" + sfx
else:
raise ValueError("prefix must be supplied to get a v3 array key")
else:
key = prefix + array_meta_key
return key


def _prefix_to_group_key(store: StoreLike, prefix: str) -> str:
if getattr(store, "_store_version", 2) == 3:
if prefix:
sfx = _get_hierarchy_metadata(store)['metadata_key_suffix']
key = "meta/root/" + prefix.rstrip('/') + ".group" + sfx
else:
raise ValueError("prefix must be supplied to get a v3 group key")
else:
key = prefix + group_meta_key
return key


def _prefix_to_attrs_key(store: StoreLike, prefix: str) -> str:
if getattr(store, "_store_version", 2) == 3:
# for v3, attributes are stored in the array metadata
sfx = _get_hierarchy_metadata(store)['metadata_key_suffix']
if prefix:
key = "meta/root/" + prefix.rstrip('/') + ".array" + sfx
else:
raise ValueError("prefix must be supplied to get a v3 array key")
else:
key = prefix + attrs_key
return key
Loading