diff --git a/Cargo.lock b/Cargo.lock index d8fafbe18d5..d13af6cffcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4339,6 +4339,7 @@ dependencies = [ "criterion", "dashmap", "datafusion", + "datafusion-common", "datafusion-expr", "datafusion-functions", "datafusion-physical-expr", diff --git a/python/Cargo.lock b/python/Cargo.lock index 8d2652c035c..d5b2d1c1d33 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3837,6 +3837,7 @@ dependencies = [ "chrono", "dashmap", "datafusion", + "datafusion-common", "datafusion-expr", "datafusion-functions", "datafusion-physical-expr", diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 44093e2f6fa..7dad47c0f6a 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -61,6 +61,7 @@ uuid.workspace = true arrow.workspace = true # TODO: use datafusion sub-modules to reduce build size? datafusion.workspace = true +datafusion-common.workspace = true datafusion-functions.workspace = true datafusion-physical-expr.workspace = true datafusion-physical-plan.workspace = true diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b9e56b0d9c4..233c59426e6 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -82,6 +82,7 @@ pub mod udtf; pub mod updater; mod utils; mod write; +mod blob_stream; use self::builder::DatasetBuilder; use self::cleanup::RemovalStats; diff --git a/rust/lance/src/dataset/blob_stream.rs b/rust/lance/src/dataset/blob_stream.rs new file mode 100644 index 00000000000..78b1e907501 --- /dev/null +++ b/rust/lance/src/dataset/blob_stream.rs @@ -0,0 +1,223 @@ +// rust/lance/src/dataset/blob_stream.rs + +use arrow_array::{ + ArrayRef, RecordBatch, StructArray, UInt64Array, +}; +use arrow_array::builder::LargeBinaryBuilder; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::{DataFusionError, Result as DFResult}; +use datafusion_physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom}; +use std::path::PathBuf; +use std::collections::HashSet; +use datafusion::common::HashMap; + +use crate::dataset::Dataset; +use lance_core::utils::address::RowAddress; + +pub fn wrap_blob_stream_if_needed( + inner: SendableRecordBatchStream, + dataset: Arc, +) -> SendableRecordBatchStream { + let blob_fields: Vec<&lance_core::datatypes::Field> = dataset + .schema() + .fields + .iter() + .filter(|field| field.is_blob()) + .collect(); + + if blob_fields.is_empty() { + // No blob fields → return original stream + inner + } else { + Box::pin(ResolvedBlobStream::new(inner, dataset)) + } +} + +/// Resolve blob column by reading from disk based on position/size metadata. +pub fn resolve_blob_column( + batch: &RecordBatch, + dataset: &Dataset, + blob_field_name: &str, + blob_field_id: u32, + row_id_col: &UInt64Array, +) -> DFResult { + let blob_struct = batch + .column_by_name(blob_field_name) + .ok_or_else(|| DataFusionError::Execution("blob column missing".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("blob is not a struct".to_string()))?; + + let position_array = blob_struct + .column_by_name("position") + .ok_or_else(|| DataFusionError::Execution("position field missing".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("position is not UInt64".to_string()))?; + + let size_array = blob_struct + .column_by_name("size") + .ok_or_else(|| DataFusionError::Execution("size field missing".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("size is not UInt64".to_string()))?; + + let mut blobs = Vec::with_capacity(batch.num_rows()); + + for i in 0..batch.num_rows() { + let row_id = row_id_col.value(i); + let position = position_array.value(i); + let size = size_array.value(i) as usize; + + let frag_id = RowAddress::from(row_id).fragment_id(); + let frag = dataset + .get_fragment(frag_id as usize) + .ok_or_else(|| DataFusionError::Execution("fragment not found".to_string()))?; + + let data_file = frag + .data_file_for_field(blob_field_id) + .ok_or_else(|| DataFusionError::Execution("blob data file not found".to_string()))?; + + let path_str = dataset.data_dir().child(data_file.path.as_str()).to_string(); + let local_path = PathBuf::from("/".to_owned() + &path_str); + + let mut file = File::open(&local_path) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + file.seek(SeekFrom::Start(position)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let mut buffer = vec![0; size]; + file.read_exact(&mut buffer) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + blobs.push(buffer); + } + + let mut builder = LargeBinaryBuilder::with_capacity(blobs.len(), blobs.iter().map(Vec::len).sum()); + for blob in blobs { + builder.append_value(blob); + } + let binary_array = builder.finish(); + + Ok(Arc::new(binary_array)) +} + +/// A stream that resolves blob columns from on-disk storage. +pub struct ResolvedBlobStream { + inner: SendableRecordBatchStream, + dataset: Arc, + output_schema: SchemaRef, + blob_field_name_to_id: Vec<(String, u32)>, +} + +impl ResolvedBlobStream { + /// Create a new [`ResolvedBlobStream`] that automatically resolves the **first** blob column + /// found in the dataset schema. + /// + /// # Panics + /// - If no blob field is found in the dataset schema + /// - If more than one blob field is found (currently unsupported) + /// - If the input stream does not contain the blob field + pub fn new( + inner: SendableRecordBatchStream, + dataset: Arc, + ) -> Self { + // 🔍 自动查找所有 blob 字段 + let blob_field_name_to_id: Vec<(String, u32)> = dataset + .schema() + .fields + .iter() + .filter(|field| field.is_blob()) + .map(|f| (f.name.clone(), f.id as u32)) + .collect(); + + let input_schema = inner.schema(); + let blob_names: HashSet<&String> = blob_field_name_to_id.iter().map(|(n, _)| n).collect(); + + for (name, _) in &blob_field_name_to_id { + if input_schema.column_with_name(name).is_none() { + panic!("Input schema missing blob field: {}", name); + } + } + + let fields: Vec = input_schema + .fields() + .iter() + .map(|f| { + if blob_names.contains(f.name()) { + Field::new(f.name(), DataType::LargeBinary, f.is_nullable()) + } else { + f.as_ref().clone() + } + }) + .collect(); + let output_schema = Arc::new(Schema::new(fields)); + + Self { + inner, + dataset, + output_schema, + blob_field_name_to_id, + } + } +} + +impl Stream for ResolvedBlobStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + let row_ids = batch + .column_by_name("_rowid") + .ok_or_else(|| DataFusionError::Execution("must have _rowid".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("_rowid is not UInt64".to_string()))?; + + let mut resolved_blobs: HashMap = HashMap::new(); + for (name, field_id) in &self.blob_field_name_to_id { + let resolved = resolve_blob_column( + &batch, + &self.dataset, + name, + *field_id, + row_ids, + )?; + resolved_blobs.insert(name.clone(), resolved); + } + + let mut new_columns = Vec::with_capacity(batch.num_columns()); + for (i, col) in batch.columns().iter().enumerate() { + let field_name = batch.schema_ref().field(i).name(); + if let Some(resolved) = resolved_blobs.get(field_name) { + new_columns.push(resolved.clone()); + } else { + new_columns.push(col.clone()); + } + } + + let new_batch = RecordBatch::try_new(self.output_schema.clone(), new_columns) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + + Poll::Ready(Some(Ok(new_batch))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(DataFusionError::Execution( + format!("inner stream error: {:?}", e), + )))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for ResolvedBlobStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} diff --git a/rust/lance/src/dataset/index/frag_reuse.rs b/rust/lance/src/dataset/index/frag_reuse.rs index 80f1281a297..c6a3115aabb 100644 --- a/rust/lance/src/dataset/index/frag_reuse.rs +++ b/rust/lance/src/dataset/index/frag_reuse.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors - +use std::{sync::Arc}; use crate::dataset::transaction::{Operation, Transaction}; use crate::index::frag_reuse::{build_frag_reuse_index_metadata, load_frag_reuse_index_details}; use crate::Dataset; @@ -150,12 +150,62 @@ fn is_index_remap_caught_up( mod tests { use super::*; use crate::dataset::optimize::{compact_files, remapping, CompactionOptions}; - use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; + use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount, TestDatasetGenerator}; use all_asserts::{assert_false, assert_true}; - use arrow_array::types::{Float32Type, Int32Type}; - use lance_datagen::Dimension; + use arrow_array::types::{Float32Type, Int32Type, UInt64Type}; + use chrono::TimeDelta; + use lance_core::Result; + use lance_core::utils::tempfile::TempStrDir; + use lance_datagen::{array, BatchCount, Dimension, RowCount}; + use lance_encoding::version::LanceFileVersion; use lance_index::scalar::ScalarIndexParams; use lance_index::{DatasetIndexExt, IndexType}; + use crate::dataset::cleanup::{cleanup_old_versions, CleanupPolicyBuilder}; + use crate::utils::temporal::utc_now; + + #[tokio::test] + async fn test_compact_blob_files(){ + let test_dir = TempStrDir::default(); + + let data = lance_datagen::gen_batch() + .col("filterme", array::step::()) + .col("blobs", array::blob()) + .into_reader_rows(RowCount::from(5), BatchCount::from(5)) + .map(|batch| Ok(batch?)) + .collect::>>() + .unwrap(); + + let dataset = Arc::new( + TestDatasetGenerator::new(data.clone(), LanceFileVersion::default()) + .make_hostile(&test_dir) + .await, + ); + let mut exclusive_dataset = Arc::try_unwrap(dataset).expect("This should be the only Arc pointing to the dataset"); + + // Compact and check index not caught up + compact_files( + &mut exclusive_dataset, + CompactionOptions { + target_rows_per_fragment: 2_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + cleanup_old_versions( + &exclusive_dataset, + CleanupPolicyBuilder::default() + .before_timestamp(utc_now() - TimeDelta::try_seconds(1).unwrap()) + .delete_unverified(true) + .error_if_tagged_old_versions(true) + .build(), + ) + .await + .unwrap(); + } #[tokio::test] async fn test_cleanup_frag_reuse_index() { diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index bfa788bb178..e2eeeff421f 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -108,6 +108,8 @@ use serde::{Deserialize, Serialize}; use snafu::location; use tracing::info; +use crate::dataset::blob_stream::wrap_blob_stream_if_needed; + pub mod remapping; use crate::index::frag_reuse::build_new_frag_reuse_index; @@ -691,6 +693,10 @@ async fn rewrite_files( let (row_ids_rx, reader) = if needs_remapping { scanner.with_row_id(); let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); + + let dataset_arc = Arc::new(dataset.clone().into_owned()); + let data = wrap_blob_stream_if_needed(data, dataset_arc); + let (data_no_row_ids, row_id_rx) = make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?; (Some(row_id_rx), data_no_row_ids)