From 5f71f9c8357a3ebff17192311a3fcdbfb662b083 Mon Sep 17 00:00:00 2001 From: "zhengzhisheng@bilibili.com" Date: Sun, 9 Nov 2025 19:53:28 +0800 Subject: [PATCH 1/3] support to compact_files with blob --- Cargo.lock | 1 + rust/lance/Cargo.toml | 1 + rust/lance/src/dataset.rs | 1 + rust/lance/src/dataset/blob_stream.rs | 221 +++++++++++++++++++++ rust/lance/src/dataset/index/frag_reuse.rs | 44 +++- rust/lance/src/dataset/optimize.rs | 6 + 6 files changed, 270 insertions(+), 4 deletions(-) create mode 100644 rust/lance/src/dataset/blob_stream.rs diff --git a/Cargo.lock b/Cargo.lock index d8fafbe18d5..d13af6cffcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4339,6 +4339,7 @@ dependencies = [ "criterion", "dashmap", "datafusion", + "datafusion-common", "datafusion-expr", "datafusion-functions", "datafusion-physical-expr", diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 44093e2f6fa..7dad47c0f6a 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -61,6 +61,7 @@ uuid.workspace = true arrow.workspace = true # TODO: use datafusion sub-modules to reduce build size? datafusion.workspace = true +datafusion-common.workspace = true datafusion-functions.workspace = true datafusion-physical-expr.workspace = true datafusion-physical-plan.workspace = true diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b9e56b0d9c4..233c59426e6 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -82,6 +82,7 @@ pub mod udtf; pub mod updater; mod utils; mod write; +mod blob_stream; use self::builder::DatasetBuilder; use self::cleanup::RemovalStats; diff --git a/rust/lance/src/dataset/blob_stream.rs b/rust/lance/src/dataset/blob_stream.rs new file mode 100644 index 00000000000..a64c914cfa1 --- /dev/null +++ b/rust/lance/src/dataset/blob_stream.rs @@ -0,0 +1,221 @@ +// rust/lance/src/dataset/blob_stream.rs + +use arrow_array::{ + ArrayRef, RecordBatch, StructArray, UInt64Array, +}; +use arrow_array::builder::LargeBinaryBuilder; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::{DataFusionError, Result as DFResult}; +use datafusion_physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom}; +use std::path::PathBuf; +use std::collections::HashSet; + +use crate::dataset::Dataset; +use lance_core::utils::address::RowAddress; + +// 🔧 注意:理想情况下应通过 field name 动态获取 ID,此处暂保留硬编码 +const BLOB_STORAGE_FIELD_ID: u32 = 1; + +pub fn wrap_blob_stream_if_needed( + inner: SendableRecordBatchStream, + dataset: Arc, +) -> SendableRecordBatchStream { + let blob_fields: Vec<&lance_core::datatypes::Field> = dataset + .schema() + .fields + .iter() + .filter(|field| field.is_blob()) + .collect(); + + if blob_fields.is_empty() { + // 🟢 No blob fields → return original stream + inner + } else { + Box::pin(ResolvedBlobStream::new(inner, dataset)) + } +} + +/// Resolve blob column by reading from disk based on position/size metadata. +pub fn resolve_blob_column( + batch: &RecordBatch, + dataset: &Dataset, + blob_field_name: &str, + row_id_col: &UInt64Array, +) -> DFResult { + let blob_struct = batch + .column_by_name(blob_field_name) + .ok_or_else(|| DataFusionError::Execution("blob column missing".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("blob is not a struct".to_string()))?; + + let position_array = blob_struct + .column_by_name("position") + .ok_or_else(|| DataFusionError::Execution("position field missing".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("position is not UInt64".to_string()))?; + + let size_array = blob_struct + .column_by_name("size") + .ok_or_else(|| DataFusionError::Execution("size field missing".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("size is not UInt64".to_string()))?; + + let mut blobs = Vec::with_capacity(batch.num_rows()); + + for i in 0..batch.num_rows() { + let row_id = row_id_col.value(i); + let position = position_array.value(i); + let size = size_array.value(i) as usize; + + let frag_id = RowAddress::from(row_id).fragment_id(); + let frag = dataset + .get_fragment(frag_id as usize) + .ok_or_else(|| DataFusionError::Execution("fragment not found".to_string()))?; + + let data_file = frag + .data_file_for_field(BLOB_STORAGE_FIELD_ID) + .ok_or_else(|| DataFusionError::Execution("blob data file not found".to_string()))?; + + let path_str = dataset.data_dir().child(data_file.path.as_str()).to_string(); + let local_path = PathBuf::from("/".to_owned() + &path_str); + + let mut file = File::open(&local_path) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + file.seek(SeekFrom::Start(position)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let mut buffer = vec![0; size]; + file.read_exact(&mut buffer) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + blobs.push(buffer); + } + + let mut builder = LargeBinaryBuilder::with_capacity(blobs.len(), blobs.iter().map(Vec::len).sum()); + for blob in blobs { + builder.append_value(blob); + } + let binary_array = builder.finish(); + + Ok(Arc::new(binary_array)) +} + +/// A stream that resolves blob columns from on-disk storage. +pub struct ResolvedBlobStream { + inner: SendableRecordBatchStream, + dataset: Arc, + output_schema: SchemaRef, + blob_field_names: Vec, // 支持多个 +} + +impl ResolvedBlobStream { + /// Create a new [`ResolvedBlobStream`] that automatically resolves the **first** blob column + /// found in the dataset schema. + /// + /// # Panics + /// - If no blob field is found in the dataset schema + /// - If more than one blob field is found (currently unsupported) + /// - If the input stream does not contain the blob field + pub fn new( + inner: SendableRecordBatchStream, + dataset: Arc, + ) -> Self { + // 🔍 自动查找所有 blob 字段 + let blob_fields: Vec<&lance_core::datatypes::Field> = dataset + .schema() + .fields + .iter() + .filter(|field| field.is_blob()) + .collect(); + + let blob_field_names: Vec = blob_fields.iter().map(|f| f.name.clone()).collect(); + + let input_schema = inner.schema(); + // 检查所有 blob 字段是否都在输入流中 + for name in &blob_field_names { + if input_schema.column_with_name(name).is_none() { + panic!("Input schema missing blob field: {}", name); + } + } + // 构建输出 schema:将每个 blob struct 替换为 LargeBinary + let blob_set: HashSet<&String> = blob_field_names.iter().collect(); + let fields: Vec = input_schema + .fields() + .iter() + .map(|f| { + if blob_set.contains(f.name()) { + Field::new(f.name(), DataType::LargeBinary, f.is_nullable()) + } else { + f.as_ref().clone() + } + }) + .collect(); + let output_schema = Arc::new(Schema::new(fields)); + + Self { + inner, + dataset, + output_schema, + blob_field_names, + } + } +} + +impl Stream for ResolvedBlobStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + let row_ids = batch + .column_by_name("_rowid") + .ok_or_else(|| DataFusionError::Execution("must have _rowid".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("_rowid is not UInt64".to_string()))?; + + // Step 1: 预先解析所有 blob 列 + let mut resolved_blobs = std::collections::HashMap::new(); + for name in &self.blob_field_names { + let resolved = resolve_blob_column(&batch, &self.dataset, name, row_ids)?; + resolved_blobs.insert(name.as_str(), resolved); + } + + // Step 2: 构建新列 + let mut new_columns = Vec::with_capacity(batch.num_columns()); + for (i, col) in batch.columns().iter().enumerate() { + let field_name = batch.schema_ref().field(i).name(); + if let Some(resolved) = resolved_blobs.get(field_name.as_str()) { + new_columns.push(resolved.clone()); + } else { + new_columns.push(col.clone()); + } + } + + let new_batch = RecordBatch::try_new(self.output_schema.clone(), new_columns) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + + Poll::Ready(Some(Ok(new_batch))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(DataFusionError::Execution( + format!("inner stream error: {:?}", e), + )))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for ResolvedBlobStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} \ No newline at end of file diff --git a/rust/lance/src/dataset/index/frag_reuse.rs b/rust/lance/src/dataset/index/frag_reuse.rs index 80f1281a297..508db553c49 100644 --- a/rust/lance/src/dataset/index/frag_reuse.rs +++ b/rust/lance/src/dataset/index/frag_reuse.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors - +use std::{sync::Arc}; use crate::dataset::transaction::{Operation, Transaction}; use crate::index::frag_reuse::{build_frag_reuse_index_metadata, load_frag_reuse_index_details}; use crate::Dataset; @@ -150,13 +150,49 @@ fn is_index_remap_caught_up( mod tests { use super::*; use crate::dataset::optimize::{compact_files, remapping, CompactionOptions}; - use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; + use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount, TestDatasetGenerator}; use all_asserts::{assert_false, assert_true}; - use arrow_array::types::{Float32Type, Int32Type}; - use lance_datagen::Dimension; + use arrow_array::types::{Float32Type, Int32Type, UInt64Type}; + use lance_core::Result; + use lance_core::utils::tempfile::TempStrDir; + use lance_datagen::{array, BatchCount, Dimension, RowCount}; + use lance_encoding::version::LanceFileVersion; use lance_index::scalar::ScalarIndexParams; use lance_index::{DatasetIndexExt, IndexType}; + #[tokio::test] + async fn test_compact_blob_files(){ + let test_dir = TempStrDir::default(); + + let data = lance_datagen::gen_batch() + .col("filterme", array::step::()) + .col("blobs", array::blob()) + .into_reader_rows(RowCount::from(5), BatchCount::from(5)) + .map(|batch| Ok(batch?)) + .collect::>>() + .unwrap(); + + let dataset = Arc::new( + TestDatasetGenerator::new(data.clone(), LanceFileVersion::default()) + .make_hostile(&test_dir) + .await, + ); + let mut exclusive_dataset = Arc::try_unwrap(dataset).expect("This should be the only Arc pointing to the dataset"); + + // Compact and check index not caught up + compact_files( + &mut exclusive_dataset, + CompactionOptions { + target_rows_per_fragment: 2_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + } + #[tokio::test] async fn test_cleanup_frag_reuse_index() { let mut dataset = lance_datagen::gen_batch() diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index bfa788bb178..e2eeeff421f 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -108,6 +108,8 @@ use serde::{Deserialize, Serialize}; use snafu::location; use tracing::info; +use crate::dataset::blob_stream::wrap_blob_stream_if_needed; + pub mod remapping; use crate::index::frag_reuse::build_new_frag_reuse_index; @@ -691,6 +693,10 @@ async fn rewrite_files( let (row_ids_rx, reader) = if needs_remapping { scanner.with_row_id(); let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); + + let dataset_arc = Arc::new(dataset.clone().into_owned()); + let data = wrap_blob_stream_if_needed(data, dataset_arc); + let (data_no_row_ids, row_id_rx) = make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?; (Some(row_id_rx), data_no_row_ids) From 0396f6e269f350f4a69bc868b1cbfc990376f761 Mon Sep 17 00:00:00 2001 From: "zhengzhisheng@bilibili.com" Date: Sun, 9 Nov 2025 20:37:07 +0800 Subject: [PATCH 2/3] unit test to clean up after compact blobs --- python/Cargo.lock | 1 + rust/lance/src/dataset/index/frag_reuse.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/python/Cargo.lock b/python/Cargo.lock index 8d2652c035c..d5b2d1c1d33 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3837,6 +3837,7 @@ dependencies = [ "chrono", "dashmap", "datafusion", + "datafusion-common", "datafusion-expr", "datafusion-functions", "datafusion-physical-expr", diff --git a/rust/lance/src/dataset/index/frag_reuse.rs b/rust/lance/src/dataset/index/frag_reuse.rs index 508db553c49..c6a3115aabb 100644 --- a/rust/lance/src/dataset/index/frag_reuse.rs +++ b/rust/lance/src/dataset/index/frag_reuse.rs @@ -153,12 +153,15 @@ mod tests { use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount, TestDatasetGenerator}; use all_asserts::{assert_false, assert_true}; use arrow_array::types::{Float32Type, Int32Type, UInt64Type}; + use chrono::TimeDelta; use lance_core::Result; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::{array, BatchCount, Dimension, RowCount}; use lance_encoding::version::LanceFileVersion; use lance_index::scalar::ScalarIndexParams; use lance_index::{DatasetIndexExt, IndexType}; + use crate::dataset::cleanup::{cleanup_old_versions, CleanupPolicyBuilder}; + use crate::utils::temporal::utc_now; #[tokio::test] async fn test_compact_blob_files(){ @@ -191,6 +194,17 @@ mod tests { ) .await .unwrap(); + + cleanup_old_versions( + &exclusive_dataset, + CleanupPolicyBuilder::default() + .before_timestamp(utc_now() - TimeDelta::try_seconds(1).unwrap()) + .delete_unverified(true) + .error_if_tagged_old_versions(true) + .build(), + ) + .await + .unwrap(); } #[tokio::test] From a60fc55346d320fb85f23b5f110934123345d2fc Mon Sep 17 00:00:00 2001 From: "zhengzhisheng@bilibili.com" Date: Sun, 9 Nov 2025 21:42:48 +0800 Subject: [PATCH 3/3] update support multi blobs --- rust/lance/src/dataset/blob_stream.rs | 48 ++++++++++++++------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/rust/lance/src/dataset/blob_stream.rs b/rust/lance/src/dataset/blob_stream.rs index a64c914cfa1..78b1e907501 100644 --- a/rust/lance/src/dataset/blob_stream.rs +++ b/rust/lance/src/dataset/blob_stream.rs @@ -15,13 +15,11 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::path::PathBuf; use std::collections::HashSet; +use datafusion::common::HashMap; use crate::dataset::Dataset; use lance_core::utils::address::RowAddress; -// 🔧 注意:理想情况下应通过 field name 动态获取 ID,此处暂保留硬编码 -const BLOB_STORAGE_FIELD_ID: u32 = 1; - pub fn wrap_blob_stream_if_needed( inner: SendableRecordBatchStream, dataset: Arc, @@ -34,7 +32,7 @@ pub fn wrap_blob_stream_if_needed( .collect(); if blob_fields.is_empty() { - // 🟢 No blob fields → return original stream + // No blob fields → return original stream inner } else { Box::pin(ResolvedBlobStream::new(inner, dataset)) @@ -46,6 +44,7 @@ pub fn resolve_blob_column( batch: &RecordBatch, dataset: &Dataset, blob_field_name: &str, + blob_field_id: u32, row_id_col: &UInt64Array, ) -> DFResult { let blob_struct = batch @@ -82,7 +81,7 @@ pub fn resolve_blob_column( .ok_or_else(|| DataFusionError::Execution("fragment not found".to_string()))?; let data_file = frag - .data_file_for_field(BLOB_STORAGE_FIELD_ID) + .data_file_for_field(blob_field_id) .ok_or_else(|| DataFusionError::Execution("blob data file not found".to_string()))?; let path_str = dataset.data_dir().child(data_file.path.as_str()).to_string(); @@ -113,7 +112,7 @@ pub struct ResolvedBlobStream { inner: SendableRecordBatchStream, dataset: Arc, output_schema: SchemaRef, - blob_field_names: Vec, // 支持多个 + blob_field_name_to_id: Vec<(String, u32)>, } impl ResolvedBlobStream { @@ -129,29 +128,28 @@ impl ResolvedBlobStream { dataset: Arc, ) -> Self { // 🔍 自动查找所有 blob 字段 - let blob_fields: Vec<&lance_core::datatypes::Field> = dataset + let blob_field_name_to_id: Vec<(String, u32)> = dataset .schema() .fields .iter() .filter(|field| field.is_blob()) + .map(|f| (f.name.clone(), f.id as u32)) .collect(); - let blob_field_names: Vec = blob_fields.iter().map(|f| f.name.clone()).collect(); - let input_schema = inner.schema(); - // 检查所有 blob 字段是否都在输入流中 - for name in &blob_field_names { + let blob_names: HashSet<&String> = blob_field_name_to_id.iter().map(|(n, _)| n).collect(); + + for (name, _) in &blob_field_name_to_id { if input_schema.column_with_name(name).is_none() { panic!("Input schema missing blob field: {}", name); } } - // 构建输出 schema:将每个 blob struct 替换为 LargeBinary - let blob_set: HashSet<&String> = blob_field_names.iter().collect(); + let fields: Vec = input_schema .fields() .iter() .map(|f| { - if blob_set.contains(f.name()) { + if blob_names.contains(f.name()) { Field::new(f.name(), DataType::LargeBinary, f.is_nullable()) } else { f.as_ref().clone() @@ -164,7 +162,7 @@ impl ResolvedBlobStream { inner, dataset, output_schema, - blob_field_names, + blob_field_name_to_id, } } } @@ -182,18 +180,22 @@ impl Stream for ResolvedBlobStream { .downcast_ref::() .ok_or_else(|| DataFusionError::Execution("_rowid is not UInt64".to_string()))?; - // Step 1: 预先解析所有 blob 列 - let mut resolved_blobs = std::collections::HashMap::new(); - for name in &self.blob_field_names { - let resolved = resolve_blob_column(&batch, &self.dataset, name, row_ids)?; - resolved_blobs.insert(name.as_str(), resolved); + let mut resolved_blobs: HashMap = HashMap::new(); + for (name, field_id) in &self.blob_field_name_to_id { + let resolved = resolve_blob_column( + &batch, + &self.dataset, + name, + *field_id, + row_ids, + )?; + resolved_blobs.insert(name.clone(), resolved); } - // Step 2: 构建新列 let mut new_columns = Vec::with_capacity(batch.num_columns()); for (i, col) in batch.columns().iter().enumerate() { let field_name = batch.schema_ref().field(i).name(); - if let Some(resolved) = resolved_blobs.get(field_name.as_str()) { + if let Some(resolved) = resolved_blobs.get(field_name) { new_columns.push(resolved.clone()); } else { new_columns.push(col.clone()); @@ -218,4 +220,4 @@ impl RecordBatchStream for ResolvedBlobStream { fn schema(&self) -> SchemaRef { self.output_schema.clone() } -} \ No newline at end of file +}