From 3da3298fd6422aee31376088b8673f9ec22fcb3e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Dec 2025 23:24:07 +0800 Subject: [PATCH 1/9] feat(blob_v2): add Python API for Blob v2 --- python/python/lance/__init__.py | 6 +- python/python/lance/blob.py | 171 +++++++++++++++++++++++++ python/python/lance/lance/__init__.pyi | 14 ++ python/python/tests/test_blob.py | 35 +++++ python/src/dataset.rs | 14 ++ python/src/dataset/blob.rs | 22 ++++ 6 files changed, 261 insertions(+), 1 deletion(-) diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index aa05c70286d..ff2870b6d1a 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Union from . import io, log -from .blob import BlobColumn, BlobFile +from .blob import Blob, BlobArray, BlobColumn, BlobFile, blob_array, blob_field from .dataset import ( DataStatistics, FieldStatistics, @@ -51,8 +51,12 @@ __all__ = [ + "Blob", + "BlobArray", "BlobColumn", "BlobFile", + "blob_array", + "blob_field", "DatasetBasePath", "DataStatistics", "FieldStatistics", diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index cf2c9ef3118..a89c8924062 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -2,6 +2,8 @@ # SPDX-FileCopyrightText: Copyright The Lance Authors import io +from dataclasses import dataclass +from typing import Any from typing import IO, Iterator, Optional, Union import pyarrow as pa @@ -9,6 +11,155 @@ from .lance import LanceBlobFile +@dataclass(frozen=True, slots=True) +class Blob: + """ + A logical blob value for writing Lance blob columns. + + A blob can be represented either by inlined bytes or by an external URI. + """ + + data: Optional[bytes] = None + uri: Optional[str] = None + + def __post_init__(self) -> None: + if self.data is not None and self.uri is not None: + raise ValueError("Blob cannot have both data and uri") + if self.uri == "": + raise ValueError("Blob uri cannot be empty") + + @staticmethod + def bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob": + return Blob(data=bytes(data)) + + @staticmethod + def uri(uri: str) -> "Blob": + if uri == "": + raise ValueError("Blob uri cannot be empty") + return Blob(uri=uri) + + @staticmethod + def empty() -> "Blob": + return Blob(data=b"") + + +class BlobType(pa.ExtensionType): + """ + A PyArrow extension type for Lance blob columns. + + This is the "logical" type users write. Lance will store it in a compact + descriptor format, and reads will return descriptors by default. + """ + + def __init__(self) -> None: + storage_type = pa.struct( + [ + pa.field("data", pa.large_binary(), nullable=True), + pa.field("uri", pa.utf8(), nullable=True), + ] + ) + pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2") + + def __arrow_ext_serialize__(self) -> bytes: + return b"" + + @classmethod + def __arrow_ext_deserialize__( + cls, storage_type: pa.DataType, serialized: bytes + ) -> "BlobType": + return BlobType() + + def __arrow_ext_class__(self): + return BlobArray + + def __reduce__(self): + # Workaround to ensure pickle works in earlier versions of PyArrow + # https://github.com/apache/arrow/issues/35599 + return type(self).__arrow_ext_deserialize__, ( + self.storage_type, + self.__arrow_ext_serialize__(), + ) + + +try: + pa.register_extension_type(BlobType()) +except pa.ArrowKeyError: + # Already registered in this interpreter. + pass + + +class BlobArray(pa.ExtensionArray): + """ + A PyArrow extension array for Lance blob columns. + + Construct with :meth:`from_pylist` or use :func:`blob_array`. + """ + + @classmethod + def from_pylist(cls, values: list[Any]) -> "BlobArray": + data_values: list[Optional[bytes]] = [] + uri_values: list[Optional[str]] = [] + null_mask: list[bool] = [] + + for v in values: + if v is None: + data_values.append(None) + uri_values.append(None) + null_mask.append(True) + continue + + if isinstance(v, Blob): + data_values.append(v.data) + uri_values.append(v.uri) + null_mask.append(False) + continue + + if isinstance(v, str): + if v == "": + raise ValueError("Blob uri cannot be empty") + data_values.append(None) + uri_values.append(v) + null_mask.append(False) + continue + + if isinstance(v, (bytes, bytearray, memoryview)): + data_values.append(bytes(v)) + uri_values.append(None) + null_mask.append(False) + continue + + raise TypeError( + "BlobArray values must be bytes-like, str (URI), Blob, or None; " + f"got {type(v)}" + ) + + data_arr = pa.array(data_values, type=pa.large_binary()) + uri_arr = pa.array(uri_values, type=pa.utf8()) + storage = pa.StructArray.from_arrays( + [data_arr, uri_arr], names=["data", "uri"], mask=null_mask + ) + return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value] + + +def blob_array(values: list[Any]) -> BlobArray: + """ + Construct a blob array from Python values. + + Each value must be one of: + - bytes-like: inline bytes + - str: an external URI + - Blob: explicit inline/uri/empty + - None: null + """ + + return BlobArray.from_pylist(values) + + +def blob_field(name: str, *, nullable: bool = True) -> pa.Field: + """Construct an Arrow field for a Lance blob column.""" + return pa.field(name, BlobType(), nullable=nullable) + + class BlobIterator: def __init__(self, binary_iter: Iterator[pa.BinaryScalar]): self.binary_iter = binary_iter @@ -95,6 +246,26 @@ def size(self) -> int: """ return self.inner.size() + @property + def kind(self) -> str: + """Returns the blob storage kind (inline / packed / dedicated / external).""" + return self.inner.kind() + + @property + def uri(self) -> Optional[str]: + """Returns the blob URI for external blobs.""" + return self.inner.uri() + + @property + def position(self) -> int: + """Returns the byte offset within the backing file (inline/packed).""" + return self.inner.position() + + @property + def data_path(self) -> str: + """Returns the object-store path of the backing file.""" + return self.inner.data_path() + def readall(self) -> bytes: return self.inner.readall() diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 6e5e40fc23b..426f0cf173c 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -189,6 +189,10 @@ class LanceBlobFile: def seek(self, offset: int): ... def tell(self) -> int: ... def size(self) -> int: ... + def kind(self) -> str: ... + def uri(self) -> Optional[str]: ... + def position(self) -> int: ... + def data_path(self) -> str: ... def readall(self) -> bytes: ... def read_into(self, b: bytearray) -> int: ... @@ -274,6 +278,16 @@ class _Dataset: row_indices: List[int], blob_column: str, ) -> List[LanceBlobFile]: ... + def take_blobs_by_addresses( + self, + row_addresses: List[int], + blob_column: str, + ) -> List[LanceBlobFile]: ... + def take_blobs_by_indices( + self, + row_indices: List[int], + blob_column: str, + ) -> List[LanceBlobFile]: ... def take_scan( self, row_slices: Iterable[Tuple[int, int]], diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 79ea97414fa..4699e0a7a54 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -255,3 +255,38 @@ def test_take_deleted_blob(tmp_path, dataset_with_blobs): def test_scan_blob(tmp_path, dataset_with_blobs): ds = dataset_with_blobs.scanner(filter="idx = 2").to_table() assert ds.num_rows == 1 + + +def test_blob_extension_write_inline(tmp_path): + table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])}) + ds = lance.write_dataset( + table, tmp_path / "test_ds_v2", data_storage_version="2.2" + ) + + desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) + assert pa.types.is_struct(desc.type) + assert [f.name for f in desc.type] == ["kind", "position", "size", "blob_id", "blob_uri"] + + blobs = ds.take_blobs("blob", indices=[0, 1]) + assert blobs[0].kind == "inline" + assert blobs[0].uri is None + with blobs[0] as f: + assert f.read() == b"foo" + + +def test_blob_extension_write_external(tmp_path): + blob_path = tmp_path / "external_blob.bin" + blob_path.write_bytes(b"hello") + uri = blob_path.as_uri() + + table = pa.table({"blob": lance.blob_array([uri])}) + ds = lance.write_dataset( + table, tmp_path / "test_ds_v2_external", data_storage_version="2.2" + ) + + blob = ds.take_blobs("blob", indices=[0])[0] + assert blob.kind == "external" + assert blob.uri == uri + assert blob.size() == 5 + with blob as f: + assert f.read() == b"hello" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index c056f222485..334596e7eee 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1199,6 +1199,20 @@ impl Dataset { Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) } + fn take_blobs_by_addresses( + self_: PyRef<'_, Self>, + row_addresses: Vec, + blob_column: &str, + ) -> PyResult> { + let blobs = rt() + .block_on( + Some(self_.py()), + self_.ds.take_blobs_by_addresses(&row_addresses, blob_column), + )? + .infer_error()?; + Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) + } + fn take_blobs_by_indices( self_: PyRef<'_, Self>, row_indices: Vec, diff --git a/python/src/dataset/blob.rs b/python/src/dataset/blob.rs index 12d0f362fea..b0e23b859c1 100644 --- a/python/src/dataset/blob.rs +++ b/python/src/dataset/blob.rs @@ -22,6 +22,7 @@ use pyo3::{ }; use lance::dataset::BlobFile as InnerBlobFile; +use lance_core::datatypes::BlobKind; use crate::{error::PythonErrorExt, rt}; @@ -56,6 +57,27 @@ impl LanceBlobFile { self.inner.size() } + pub fn kind(&self) -> &'static str { + match self.inner.kind() { + BlobKind::Inline => "inline", + BlobKind::Packed => "packed", + BlobKind::Dedicated => "dedicated", + BlobKind::External => "external", + } + } + + pub fn uri(&self) -> Option { + self.inner.uri().map(|v| v.to_string()) + } + + pub fn position(&self) -> u64 { + self.inner.position() + } + + pub fn data_path(&self) -> String { + self.inner.data_path().to_string() + } + pub fn readall<'a>(&'a self, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); let data = rt().block_on(Some(py), inner.read())?.infer_error()?; From 96d4575a1143142b27e8aae85bb97fc6b768bd8b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Dec 2025 23:26:12 +0800 Subject: [PATCH 2/9] Cleanup --- python/python/lance/lance/__init__.pyi | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 426f0cf173c..76e5834168e 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -278,16 +278,6 @@ class _Dataset: row_indices: List[int], blob_column: str, ) -> List[LanceBlobFile]: ... - def take_blobs_by_addresses( - self, - row_addresses: List[int], - blob_column: str, - ) -> List[LanceBlobFile]: ... - def take_blobs_by_indices( - self, - row_indices: List[int], - blob_column: str, - ) -> List[LanceBlobFile]: ... def take_scan( self, row_slices: Iterable[Tuple[int, int]], From 3ae958bd072e283ac29672bcf744a4ce8d25cedd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Dec 2025 23:38:25 +0800 Subject: [PATCH 3/9] Fix merge error --- python/src/dataset.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 334596e7eee..c056f222485 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1199,20 +1199,6 @@ impl Dataset { Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) } - fn take_blobs_by_addresses( - self_: PyRef<'_, Self>, - row_addresses: Vec, - blob_column: &str, - ) -> PyResult> { - let blobs = rt() - .block_on( - Some(self_.py()), - self_.ds.take_blobs_by_addresses(&row_addresses, blob_column), - )? - .infer_error()?; - Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) - } - fn take_blobs_by_indices( self_: PyRef<'_, Self>, row_indices: Vec, From 0f33df3898ed61ed89f13b5240bda6fe90d8de4d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Dec 2025 23:44:08 +0800 Subject: [PATCH 4/9] cleanup not needed APIs --- python/python/lance/blob.py | 27 +++----------------------- python/python/lance/lance/__init__.pyi | 4 ---- python/python/tests/test_blob.py | 16 +++++++-------- python/src/dataset/blob.rs | 22 --------------------- 4 files changed, 11 insertions(+), 58 deletions(-) diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index a89c8924062..aeca45828df 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -3,8 +3,7 @@ import io from dataclasses import dataclass -from typing import Any -from typing import IO, Iterator, Optional, Union +from typing import IO, Any, Iterator, Optional, Union import pyarrow as pa @@ -29,11 +28,11 @@ def __post_init__(self) -> None: raise ValueError("Blob uri cannot be empty") @staticmethod - def bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob": + def from_bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob": return Blob(data=bytes(data)) @staticmethod - def uri(uri: str) -> "Blob": + def from_uri(uri: str) -> "Blob": if uri == "": raise ValueError("Blob uri cannot be empty") return Blob(uri=uri) @@ -246,26 +245,6 @@ def size(self) -> int: """ return self.inner.size() - @property - def kind(self) -> str: - """Returns the blob storage kind (inline / packed / dedicated / external).""" - return self.inner.kind() - - @property - def uri(self) -> Optional[str]: - """Returns the blob URI for external blobs.""" - return self.inner.uri() - - @property - def position(self) -> int: - """Returns the byte offset within the backing file (inline/packed).""" - return self.inner.position() - - @property - def data_path(self) -> str: - """Returns the object-store path of the backing file.""" - return self.inner.data_path() - def readall(self) -> bytes: return self.inner.readall() diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 76e5834168e..6e5e40fc23b 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -189,10 +189,6 @@ class LanceBlobFile: def seek(self, offset: int): ... def tell(self) -> int: ... def size(self) -> int: ... - def kind(self) -> str: ... - def uri(self) -> Optional[str]: ... - def position(self) -> int: ... - def data_path(self) -> str: ... def readall(self) -> bytes: ... def read_into(self, b: bytearray) -> int: ... diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 4699e0a7a54..8274f18a31e 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -259,17 +259,19 @@ def test_scan_blob(tmp_path, dataset_with_blobs): def test_blob_extension_write_inline(tmp_path): table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])}) - ds = lance.write_dataset( - table, tmp_path / "test_ds_v2", data_storage_version="2.2" - ) + ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2") desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) assert pa.types.is_struct(desc.type) - assert [f.name for f in desc.type] == ["kind", "position", "size", "blob_id", "blob_uri"] + assert [f.name for f in desc.type] == [ + "kind", + "position", + "size", + "blob_id", + "blob_uri", + ] blobs = ds.take_blobs("blob", indices=[0, 1]) - assert blobs[0].kind == "inline" - assert blobs[0].uri is None with blobs[0] as f: assert f.read() == b"foo" @@ -285,8 +287,6 @@ def test_blob_extension_write_external(tmp_path): ) blob = ds.take_blobs("blob", indices=[0])[0] - assert blob.kind == "external" - assert blob.uri == uri assert blob.size() == 5 with blob as f: assert f.read() == b"hello" diff --git a/python/src/dataset/blob.rs b/python/src/dataset/blob.rs index b0e23b859c1..12d0f362fea 100644 --- a/python/src/dataset/blob.rs +++ b/python/src/dataset/blob.rs @@ -22,7 +22,6 @@ use pyo3::{ }; use lance::dataset::BlobFile as InnerBlobFile; -use lance_core::datatypes::BlobKind; use crate::{error::PythonErrorExt, rt}; @@ -57,27 +56,6 @@ impl LanceBlobFile { self.inner.size() } - pub fn kind(&self) -> &'static str { - match self.inner.kind() { - BlobKind::Inline => "inline", - BlobKind::Packed => "packed", - BlobKind::Dedicated => "dedicated", - BlobKind::External => "external", - } - } - - pub fn uri(&self) -> Option { - self.inner.uri().map(|v| v.to_string()) - } - - pub fn position(&self) -> u64 { - self.inner.position() - } - - pub fn data_path(&self) -> String { - self.inner.data_path().to_string() - } - pub fn readall<'a>(&'a self, py: Python<'a>) -> PyResult> { let inner = self.inner.clone(); let data = rt().block_on(Some(py), inner.read())?.infer_error()?; From 3ac0ee6ca6c02244c16ba62395d4d0ed895c3a5e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 18 Dec 2025 12:01:59 +0800 Subject: [PATCH 5/9] Fix tests --- python/python/lance/blob.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index aeca45828df..574b65a0048 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -134,8 +134,9 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": data_arr = pa.array(data_values, type=pa.large_binary()) uri_arr = pa.array(uri_values, type=pa.utf8()) + mask_arr = pa.array(null_mask, type=pa.bool_()) storage = pa.StructArray.from_arrays( - [data_arr, uri_arr], names=["data", "uri"], mask=null_mask + [data_arr, uri_arr], names=["data", "uri"], mask=mask_arr ) return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value] From e836c2536059189c8415a2ed132192b7921932f5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 18 Dec 2025 16:17:04 +0800 Subject: [PATCH 6/9] Don't project blob v2 columns --- python/python/tests/test_blob.py | 7 ------- rust/lance-core/src/datatypes/field.rs | 6 ++++++ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 8274f18a31e..48d724fbf15 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -263,13 +263,6 @@ def test_blob_extension_write_inline(tmp_path): desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) assert pa.types.is_struct(desc.type) - assert [f.name for f in desc.type] == [ - "kind", - "position", - "size", - "blob_id", - "blob_uri", - ] blobs = ds.take_blobs("blob", indices=[0, 1]) with blobs[0] as f: diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 7f0bc0327db..46f62f84f93 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -682,6 +682,12 @@ impl Field { Ok(self.clone()) } (DataType::Struct(_), DataType::Struct(_)) => { + // Blob v2 columns are special: they can have different struct layouts + // (logical input vs. descriptor struct). We treat blob v2 structs as opaque + // during schema set operations (union/subtract). + if self.is_blob() { + return Ok(self.clone()); + } let mut fields = vec![]; for other_field in other.children.iter() { let Some(child) = self.child(&other_field.name) else { From b2ed9018430551f475809ab7a2339b95418eb3ea Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 18 Dec 2025 16:49:21 +0800 Subject: [PATCH 7/9] Fix --- python/python/lance/blob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index 574b65a0048..d23e6299f49 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -10,7 +10,7 @@ from .lance import LanceBlobFile -@dataclass(frozen=True, slots=True) +@dataclass(frozen=True) class Blob: """ A logical blob value for writing Lance blob columns. From 5f353d00e8ff8a3b559202bf2453550fe89f3901 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 18 Dec 2025 17:09:19 +0800 Subject: [PATCH 8/9] Ignore tf warnings --- python/pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyproject.toml b/python/pyproject.toml index bffb76c33d7..c020410921d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -108,6 +108,9 @@ markers = [ filterwarnings = [ 'error::FutureWarning', 'error::DeprecationWarning', + # TensorFlow import can emit NumPy deprecation FutureWarnings in some environments. + # We keep FutureWarnings as errors generally, but ignore this known-noisy import-time warning. + 'ignore:.*`np\\.object` will be defined as the corresponding NumPy scalar\\..*:FutureWarning', # Boto3 'ignore:.*datetime\.datetime\.utcnow\(\) is deprecated.*:DeprecationWarning', # Pandas 2.2 on Python 2.12 From 3fd42ead39b9bd5f6c683333f1b712bc14f0acf7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 18 Dec 2025 22:25:33 +0800 Subject: [PATCH 9/9] Polish comment --- rust/lance-core/src/datatypes/field.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 46f62f84f93..156e78c77e9 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -683,8 +683,8 @@ impl Field { } (DataType::Struct(_), DataType::Struct(_)) => { // Blob v2 columns are special: they can have different struct layouts - // (logical input vs. descriptor struct). We treat blob v2 structs as opaque - // during schema set operations (union/subtract). + // (logical input vs. descriptor struct). We treat blob v2 structs like primitive + // fields (e.g. a binary column) during schema set operations (union/subtract). if self.is_blob() { return Ok(self.clone()); }