From 681e72236c855977a46d37198ecbb76e1ce181e7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 15 Dec 2025 20:35:08 +0800 Subject: [PATCH 1/3] refactor: expose take_blobs_by_addresses to python --- python/python/lance/dataset.py | 5 ++- python/python/lance/lance/__init__.pyi | 10 ++++++ python/python/tests/test_blob.py | 43 ++++++++++++++++++++++++++ python/src/dataset.rs | 20 ++++++++++-- rust/lance/src/dataset.rs | 2 +- 5 files changed, 74 insertions(+), 6 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 87bbfefe422..d4d60cf511b 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1640,12 +1640,11 @@ def take_blobs( if ids is not None: lance_blob_files = self._ds.take_blobs(ids, blob_column) elif addresses is not None: - # ROW ids and Row address are the same until stable ROW ID is implemented. - lance_blob_files = self._ds.take_blobs(addresses, blob_column) + lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column) elif indices is not None: lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column) else: - raise ValueError("Either ids or indices must be specified") + raise ValueError("Either ids, addresses, or indices must be specified") return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files] def head(self, num_rows, **kwargs): diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index f0cf1243d61..6e5e40fc23b 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -260,6 +260,16 @@ class _Dataset: columns_with_transform: Optional[List[Tuple[str, str]]] = None, ) -> pa.RecordBatch: ... def take_blobs( + self, + row_ids: List[int], + blob_column: str, + ) -> List[LanceBlobFile]: ... + def take_blobs_by_addresses( + self, + row_addresses: List[int], + blob_column: str, + ) -> List[LanceBlobFile]: ... + def take_blobs_by_indices( self, row_indices: List[int], blob_column: str, diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 54c53485329..c86e271e6d8 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -110,6 +110,49 @@ def test_blob_files_by_address(dataset_with_blobs): assert f.read() == expected +def test_blob_files_by_address_with_stable_row_ids(tmp_path): + table = pa.table( + { + "blobs": pa.array([b"foo"], pa.large_binary()), + "idx": pa.array([0], pa.uint64()), + }, + schema=pa.schema( + [ + pa.field( + "blobs", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + pa.field("idx", pa.uint64()), + ] + ), + ) + ds = lance.write_dataset( + table, + tmp_path / "test_ds", + enable_stable_row_ids=True, + ) + + ds.insert( + pa.table( + { + "blobs": pa.array([b"bar"], pa.large_binary()), + "idx": pa.array([1], pa.uint64()), + }, + schema=table.schema, + ) + ) + + t = ds.to_table(columns=["idx"], with_row_address=True) + # Pick a row from the second fragment. With stable row ids enabled, the row address + # is not a valid row id, so calling the row-id based API would fail. + row_idx = t.column("idx").to_pylist().index(1) + addr = t.column("_rowaddr").to_pylist()[row_idx] + + blobs = ds.take_blobs("blobs", addresses=[addr]) + assert len(blobs) == 1 + with blobs[0] as f: + assert f.read() == b"bar" + + def test_blob_by_indices(tmp_path, dataset_with_blobs): indices = [0, 4] blobs = dataset_with_blobs.take_blobs("blobs", indices=indices) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index ade2b4516ca..5daaf58e82f 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1174,13 +1174,29 @@ impl Dataset { fn take_blobs( self_: PyRef<'_, Self>, - row_indices: Vec, + row_ids: Vec, + blob_column: &str, + ) -> PyResult> { + let blobs = rt() + .block_on( + Some(self_.py()), + self_.ds.take_blobs(&row_ids, blob_column), + )? + .infer_error()?; + Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) + } + + fn take_blobs_by_addresses( + self_: PyRef<'_, Self>, + row_addresses: Vec, blob_column: &str, ) -> PyResult> { let blobs = rt() .block_on( Some(self_.py()), - self_.ds.take_blobs(&row_indices, blob_column), + self_ + .ds + .take_blobs_by_addresses(&row_addresses, blob_column), )? .infer_error()?; Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 1079a72d600..547dfaf8b9e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1484,7 +1484,7 @@ impl Dataset { TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into()) } - /// Take [BlobFile] by row ids (row address). + /// Take [BlobFile] by row IDs. pub async fn take_blobs( self: &Arc, row_ids: &[u64], From 5e01f400c7b7a5527ed6ae10dab67c2f15207bbb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Dec 2025 16:37:46 +0800 Subject: [PATCH 2/3] Format code --- python/src/dataset.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 5daaf58e82f..c056f222485 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1178,10 +1178,7 @@ impl Dataset { blob_column: &str, ) -> PyResult> { let blobs = rt() - .block_on( - Some(self_.py()), - self_.ds.take_blobs(&row_ids, blob_column), - )? + .block_on(Some(self_.py()), self_.ds.take_blobs(&row_ids, blob_column))? .infer_error()?; Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) } From 8aa02386abc75acbf6c6dbfad20b950ee286c182 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Dec 2025 17:38:06 +0800 Subject: [PATCH 3/3] Remove not needed comments statement --- python/python/tests/test_blob.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index c86e271e6d8..79ea97414fa 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -142,8 +142,6 @@ def test_blob_files_by_address_with_stable_row_ids(tmp_path): ) t = ds.to_table(columns=["idx"], with_row_address=True) - # Pick a row from the second fragment. With stable row ids enabled, the row address - # is not a valid row id, so calling the row-id based API would fail. row_idx = t.column("idx").to_pylist().index(1) addr = t.column("_rowaddr").to_pylist()[row_idx]