From 9e01526587a9efbf1b61b5f209b1da1c236eb78e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 26 Jan 2026 18:46:27 +0800 Subject: [PATCH] feat: add blob handling support for fragment (#5801) This PR will add blob handling support for fragment --- **Parts of this PR were drafted with assistance from Codex (with `gpt-5.2`) and fully reviewed and edited by me. I take full responsibility for all changes.** --- python/python/lance/fragment.py | 12 ++++++++++++ python/python/lance/lance/__init__.pyi | 20 +++++++++++--------- python/python/tests/test_blob.py | 24 ++++++++++++++++++++++++ python/src/fragment.rs | 22 +++++++++++++++++++++- 4 files changed, 68 insertions(+), 10 deletions(-) diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index 78a199c09a6..f8cff450f06 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -448,6 +448,9 @@ def scanner( with_row_id: bool = False, with_row_address: bool = False, batch_readahead: int = 16, + blob_handling: Optional[ + Literal["all_binary", "blobs_descriptions", "all_descriptions"] + ] = None, order_by: Optional[List[ColumnOrdering]] = None, ) -> "LanceScanner": """See Dataset::scanner for details""" @@ -468,6 +471,7 @@ def scanner( with_row_id=with_row_id, with_row_address=with_row_address, batch_readahead=batch_readahead, + blob_handling=blob_handling, order_by=order_by, **columns_arg, ) @@ -515,6 +519,9 @@ def to_batches( with_row_id: bool = False, with_row_address: bool = False, batch_readahead: int = 16, + blob_handling: Optional[ + Literal["all_binary", "blobs_descriptions", "all_descriptions"] + ] = None, order_by: Optional[List[ColumnOrdering]] = None, ) -> Iterator[pa.RecordBatch]: return self.scanner( @@ -526,6 +533,7 @@ def to_batches( with_row_id=with_row_id, with_row_address=with_row_address, batch_readahead=batch_readahead, + blob_handling=blob_handling, order_by=order_by, ).to_batches() @@ -537,6 +545,9 @@ def to_table( offset: Optional[int] = None, with_row_id: bool = False, with_row_address: bool = False, + blob_handling: Optional[ + Literal["all_binary", "blobs_descriptions", "all_descriptions"] + ] = None, order_by: Optional[List[ColumnOrdering]] = None, ) -> pa.Table: return self.scanner( @@ -546,6 +557,7 @@ def to_table( offset=offset, with_row_id=with_row_id, with_row_address=with_row_address, + blob_handling=blob_handling, order_by=order_by, ).to_table() diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 1573bd6dcff..d976bea8cf4 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -443,15 +443,17 @@ class _Fragment: ) -> pa.RecordBatch: ... def scanner( self, - columns: Optional[List[str]], - columns_with_transform: Optional[List[Tuple[str, str]]], - batch_size: Optional[int], - filter: Optional[str], - limit: Optional[int], - offset: Optional[int], - with_row_id: Optional[bool], - batch_readahead: Optional[int], - **kwargs, + columns: Optional[List[str]] = None, + columns_with_transform: Optional[List[Tuple[str, str]]] = None, + batch_size: Optional[int] = None, + filter: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + with_row_id: Optional[bool] = None, + with_row_address: Optional[bool] = None, + batch_readahead: Optional[int] = None, + blob_handling: Optional[str] = None, + order_by: Optional[List[Any]] = None, ) -> _Scanner: ... def add_columns_from_reader( self, diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 8a003bfab04..0a681e01c36 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -69,6 +69,30 @@ def test_scan_blob_as_binary(tmp_path): assert tbl.column("blobs").to_pylist() == values +def test_fragment_scan_blob_as_binary(tmp_path): + values = [b"foo", b"bar", b"baz"] + arr = pa.array(values, pa.large_binary()) + table = pa.table( + [arr], + schema=pa.schema( + [ + pa.field( + "blobs", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ) + ] + ), + ) + ds = lance.write_dataset(table, tmp_path / "test_ds") + + fragment = ds.get_fragments()[0] + + tbl = fragment.scanner(columns=["blobs"], blob_handling="all_binary").to_table() + assert tbl.column("blobs").to_pylist() == values + + tbl = fragment.to_table(columns=["blobs"], blob_handling="all_binary") + assert tbl.column("blobs").to_pylist() == values + + @pytest.fixture def dataset_with_blobs(tmp_path): values = pa.array([b"foo", b"bar", b"baz"], pa.large_binary()) diff --git a/python/src/fragment.rs b/python/src/fragment.rs index 6c31d5e87a6..9bfdb14beb8 100644 --- a/python/src/fragment.rs +++ b/python/src/fragment.rs @@ -24,6 +24,7 @@ use lance::dataset::scanner::ColumnOrdering; use lance::dataset::transaction::{Operation, Transaction}; use lance::dataset::{InsertBuilder, NewColumnTransform}; use lance::Error; +use lance_core::datatypes::BlobHandling; use lance_io::utils::CachedFileSize; use lance_table::format::{ DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta, @@ -196,7 +197,7 @@ impl FileFragment { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature=(columns=None, columns_with_transform=None, batch_size=None, filter=None, limit=None, offset=None, with_row_id=None, with_row_address=None, batch_readahead=None, order_by=None))] + #[pyo3(signature=(columns=None, columns_with_transform=None, batch_size=None, filter=None, limit=None, offset=None, with_row_id=None, with_row_address=None, batch_readahead=None, blob_handling=None, order_by=None))] fn scanner( self_: PyRef<'_, Self>, columns: Option>, @@ -208,6 +209,7 @@ impl FileFragment { with_row_id: Option, with_row_address: Option, batch_readahead: Option, + blob_handling: Option>, order_by: Option>>, ) -> PyResult { let mut scanner = self_.fragment.scan(); @@ -253,6 +255,24 @@ impl FileFragment { if let Some(batch_readahead) = batch_readahead { scanner.batch_readahead(batch_readahead); } + if let Some(blob_handling) = blob_handling { + let handling = if let Ok(handling) = blob_handling.extract::() { + match handling.as_str() { + "all_binary" => BlobHandling::AllBinary, + "blobs_descriptions" => BlobHandling::BlobsDescriptions, + "all_descriptions" => BlobHandling::AllDescriptions, + other => { + return Err(PyValueError::new_err(format!( + "Invalid blob_handling: {other}. Expected one of: all_binary, blobs_descriptions, all_descriptions" + ))) + } + } + } else { + return Err(PyTypeError::new_err("blob_handling must be a str")); + }; + + scanner.blob_handling(handling); + } if let Some(orderings) = order_by { let col_orderings = Some(orderings.into_iter().map(|co| co.0).collect()); scanner