diff --git a/python/pyproject.toml b/python/pyproject.toml index bffb76c33d7..c020410921d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -108,6 +108,9 @@ markers = [ filterwarnings = [ 'error::FutureWarning', 'error::DeprecationWarning', + # TensorFlow import can emit NumPy deprecation FutureWarnings in some environments. + # We keep FutureWarnings as errors generally, but ignore this known-noisy import-time warning. + 'ignore:.*`np\\.object` will be defined as the corresponding NumPy scalar\\..*:FutureWarning', # Boto3 'ignore:.*datetime\.datetime\.utcnow\(\) is deprecated.*:DeprecationWarning', # Pandas 2.2 on Python 2.12 diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index aa05c70286d..ff2870b6d1a 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Union from . import io, log -from .blob import BlobColumn, BlobFile +from .blob import Blob, BlobArray, BlobColumn, BlobFile, blob_array, blob_field from .dataset import ( DataStatistics, FieldStatistics, @@ -51,8 +51,12 @@ __all__ = [ + "Blob", + "BlobArray", "BlobColumn", "BlobFile", + "blob_array", + "blob_field", "DatasetBasePath", "DataStatistics", "FieldStatistics", diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index cf2c9ef3118..d23e6299f49 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -2,13 +2,164 @@ # SPDX-FileCopyrightText: Copyright The Lance Authors import io -from typing import IO, Iterator, Optional, Union +from dataclasses import dataclass +from typing import IO, Any, Iterator, Optional, Union import pyarrow as pa from .lance import LanceBlobFile +@dataclass(frozen=True) +class Blob: + """ + A logical blob value for writing Lance blob columns. + + A blob can be represented either by inlined bytes or by an external URI. + """ + + data: Optional[bytes] = None + uri: Optional[str] = None + + def __post_init__(self) -> None: + if self.data is not None and self.uri is not None: + raise ValueError("Blob cannot have both data and uri") + if self.uri == "": + raise ValueError("Blob uri cannot be empty") + + @staticmethod + def from_bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob": + return Blob(data=bytes(data)) + + @staticmethod + def from_uri(uri: str) -> "Blob": + if uri == "": + raise ValueError("Blob uri cannot be empty") + return Blob(uri=uri) + + @staticmethod + def empty() -> "Blob": + return Blob(data=b"") + + +class BlobType(pa.ExtensionType): + """ + A PyArrow extension type for Lance blob columns. + + This is the "logical" type users write. Lance will store it in a compact + descriptor format, and reads will return descriptors by default. + """ + + def __init__(self) -> None: + storage_type = pa.struct( + [ + pa.field("data", pa.large_binary(), nullable=True), + pa.field("uri", pa.utf8(), nullable=True), + ] + ) + pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2") + + def __arrow_ext_serialize__(self) -> bytes: + return b"" + + @classmethod + def __arrow_ext_deserialize__( + cls, storage_type: pa.DataType, serialized: bytes + ) -> "BlobType": + return BlobType() + + def __arrow_ext_class__(self): + return BlobArray + + def __reduce__(self): + # Workaround to ensure pickle works in earlier versions of PyArrow + # https://github.com/apache/arrow/issues/35599 + return type(self).__arrow_ext_deserialize__, ( + self.storage_type, + self.__arrow_ext_serialize__(), + ) + + +try: + pa.register_extension_type(BlobType()) +except pa.ArrowKeyError: + # Already registered in this interpreter. + pass + + +class BlobArray(pa.ExtensionArray): + """ + A PyArrow extension array for Lance blob columns. + + Construct with :meth:`from_pylist` or use :func:`blob_array`. + """ + + @classmethod + def from_pylist(cls, values: list[Any]) -> "BlobArray": + data_values: list[Optional[bytes]] = [] + uri_values: list[Optional[str]] = [] + null_mask: list[bool] = [] + + for v in values: + if v is None: + data_values.append(None) + uri_values.append(None) + null_mask.append(True) + continue + + if isinstance(v, Blob): + data_values.append(v.data) + uri_values.append(v.uri) + null_mask.append(False) + continue + + if isinstance(v, str): + if v == "": + raise ValueError("Blob uri cannot be empty") + data_values.append(None) + uri_values.append(v) + null_mask.append(False) + continue + + if isinstance(v, (bytes, bytearray, memoryview)): + data_values.append(bytes(v)) + uri_values.append(None) + null_mask.append(False) + continue + + raise TypeError( + "BlobArray values must be bytes-like, str (URI), Blob, or None; " + f"got {type(v)}" + ) + + data_arr = pa.array(data_values, type=pa.large_binary()) + uri_arr = pa.array(uri_values, type=pa.utf8()) + mask_arr = pa.array(null_mask, type=pa.bool_()) + storage = pa.StructArray.from_arrays( + [data_arr, uri_arr], names=["data", "uri"], mask=mask_arr + ) + return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value] + + +def blob_array(values: list[Any]) -> BlobArray: + """ + Construct a blob array from Python values. + + Each value must be one of: + - bytes-like: inline bytes + - str: an external URI + - Blob: explicit inline/uri/empty + - None: null + """ + + return BlobArray.from_pylist(values) + + +def blob_field(name: str, *, nullable: bool = True) -> pa.Field: + """Construct an Arrow field for a Lance blob column.""" + return pa.field(name, BlobType(), nullable=nullable) + + class BlobIterator: def __init__(self, binary_iter: Iterator[pa.BinaryScalar]): self.binary_iter = binary_iter diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 79ea97414fa..48d724fbf15 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -255,3 +255,31 @@ def test_take_deleted_blob(tmp_path, dataset_with_blobs): def test_scan_blob(tmp_path, dataset_with_blobs): ds = dataset_with_blobs.scanner(filter="idx = 2").to_table() assert ds.num_rows == 1 + + +def test_blob_extension_write_inline(tmp_path): + table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])}) + ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2") + + desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) + assert pa.types.is_struct(desc.type) + + blobs = ds.take_blobs("blob", indices=[0, 1]) + with blobs[0] as f: + assert f.read() == b"foo" + + +def test_blob_extension_write_external(tmp_path): + blob_path = tmp_path / "external_blob.bin" + blob_path.write_bytes(b"hello") + uri = blob_path.as_uri() + + table = pa.table({"blob": lance.blob_array([uri])}) + ds = lance.write_dataset( + table, tmp_path / "test_ds_v2_external", data_storage_version="2.2" + ) + + blob = ds.take_blobs("blob", indices=[0])[0] + assert blob.size() == 5 + with blob as f: + assert f.read() == b"hello" diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 7f0bc0327db..156e78c77e9 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -682,6 +682,12 @@ impl Field { Ok(self.clone()) } (DataType::Struct(_), DataType::Struct(_)) => { + // Blob v2 columns are special: they can have different struct layouts + // (logical input vs. descriptor struct). We treat blob v2 structs like primitive + // fields (e.g. a binary column) during schema set operations (union/subtract). + if self.is_blob() { + return Ok(self.clone()); + } let mut fields = vec![]; for other_field in other.children.iter() { let Some(child) = self.child(&other_field.name) else {