Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ uuid.workspace = true
arrow.workspace = true
# TODO: use datafusion sub-modules to reduce build size?
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-physical-plan.workspace = true
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub mod udtf;
pub mod updater;
mod utils;
mod write;
mod blob_stream;

use self::builder::DatasetBuilder;
use self::cleanup::RemovalStats;
Expand Down
223 changes: 223 additions & 0 deletions rust/lance/src/dataset/blob_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// rust/lance/src/dataset/blob_stream.rs

use arrow_array::{
ArrayRef, RecordBatch, StructArray, UInt64Array,
};
use arrow_array::builder::LargeBinaryBuilder;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{DataFusionError, Result as DFResult};
use datafusion_physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::collections::HashSet;
use datafusion::common::HashMap;

use crate::dataset::Dataset;
use lance_core::utils::address::RowAddress;

pub fn wrap_blob_stream_if_needed(
inner: SendableRecordBatchStream,
dataset: Arc<Dataset>,
) -> SendableRecordBatchStream {
let blob_fields: Vec<&lance_core::datatypes::Field> = dataset
.schema()
.fields
.iter()
.filter(|field| field.is_blob())
.collect();

if blob_fields.is_empty() {
// No blob fields → return original stream
inner
} else {
Box::pin(ResolvedBlobStream::new(inner, dataset))
}
}

/// Resolve blob column by reading from disk based on position/size metadata.
pub fn resolve_blob_column(
batch: &RecordBatch,
dataset: &Dataset,
blob_field_name: &str,
blob_field_id: u32,
row_id_col: &UInt64Array,
) -> DFResult<ArrayRef> {
let blob_struct = batch
.column_by_name(blob_field_name)
.ok_or_else(|| DataFusionError::Execution("blob column missing".to_string()))?
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| DataFusionError::Execution("blob is not a struct".to_string()))?;

let position_array = blob_struct
.column_by_name("position")
.ok_or_else(|| DataFusionError::Execution("position field missing".to_string()))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| DataFusionError::Execution("position is not UInt64".to_string()))?;

let size_array = blob_struct
.column_by_name("size")
.ok_or_else(|| DataFusionError::Execution("size field missing".to_string()))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| DataFusionError::Execution("size is not UInt64".to_string()))?;

let mut blobs = Vec::with_capacity(batch.num_rows());

for i in 0..batch.num_rows() {
let row_id = row_id_col.value(i);
let position = position_array.value(i);
let size = size_array.value(i) as usize;

let frag_id = RowAddress::from(row_id).fragment_id();
let frag = dataset
.get_fragment(frag_id as usize)
.ok_or_else(|| DataFusionError::Execution("fragment not found".to_string()))?;

let data_file = frag
.data_file_for_field(blob_field_id)
.ok_or_else(|| DataFusionError::Execution("blob data file not found".to_string()))?;

let path_str = dataset.data_dir().child(data_file.path.as_str()).to_string();
let local_path = PathBuf::from("/".to_owned() + &path_str);

let mut file = File::open(&local_path)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
file.seek(SeekFrom::Start(position))
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut buffer = vec![0; size];
file.read_exact(&mut buffer)
Comment on lines +83 to +95
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Blob resolver bypasses object store

When materializing blob columns, the code constructs a local filesystem path and reads with std::fs::File. This ignores the dataset’s configured object store and only works for local POSIX paths; datasets stored on S3, GCS, or any non-local store will fail during compaction because the files cannot be opened. It also performs blocking I/O inside the async stream. The blob bytes should be fetched through the dataset’s object_store (similar to BlobFile elsewhere) rather than direct File access.

Useful? React with 👍 / 👎.

.map_err(|e| DataFusionError::External(Box::new(e)))?;

blobs.push(buffer);
}

let mut builder = LargeBinaryBuilder::with_capacity(blobs.len(), blobs.iter().map(Vec::len).sum());
for blob in blobs {
builder.append_value(blob);
}
let binary_array = builder.finish();

Ok(Arc::new(binary_array))
}

/// A stream that resolves blob columns from on-disk storage.
pub struct ResolvedBlobStream {
inner: SendableRecordBatchStream,
dataset: Arc<Dataset>,
output_schema: SchemaRef,
blob_field_name_to_id: Vec<(String, u32)>,
}

impl ResolvedBlobStream {
/// Create a new [`ResolvedBlobStream`] that automatically resolves the **first** blob column
/// found in the dataset schema.
///
/// # Panics
/// - If no blob field is found in the dataset schema
/// - If more than one blob field is found (currently unsupported)
/// - If the input stream does not contain the blob field
pub fn new(
inner: SendableRecordBatchStream,
dataset: Arc<Dataset>,
) -> Self {
// 🔍 自动查找所有 blob 字段
let blob_field_name_to_id: Vec<(String, u32)> = dataset
.schema()
.fields
.iter()
.filter(|field| field.is_blob())
.map(|f| (f.name.clone(), f.id as u32))
.collect();

let input_schema = inner.schema();
let blob_names: HashSet<&String> = blob_field_name_to_id.iter().map(|(n, _)| n).collect();

for (name, _) in &blob_field_name_to_id {
if input_schema.column_with_name(name).is_none() {
panic!("Input schema missing blob field: {}", name);
}
}

let fields: Vec<Field> = input_schema
.fields()
.iter()
.map(|f| {
if blob_names.contains(f.name()) {
Field::new(f.name(), DataType::LargeBinary, f.is_nullable())
Comment on lines +139 to +153
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Badge Fix blob name lookup compilation error

The new ResolvedBlobStream::new creates a HashSet<&String> and later calls blob_names.contains(f.name()) where f.name() yields &str. HashSet::contains requires the same borrowed type as its keys, but &String does not implement Borrow<str>, so the call does not compile. As written this module will fail to build. Store owned Strings in the set (e.g. HashSet<String>) or convert the lookup to compare against a String.

Useful? React with 👍 / 👎.

} else {
f.as_ref().clone()
}
})
.collect();
let output_schema = Arc::new(Schema::new(fields));

Self {
inner,
dataset,
output_schema,
blob_field_name_to_id,
}
}
}

impl Stream for ResolvedBlobStream {
type Item = DFResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let row_ids = batch
.column_by_name("_rowid")
.ok_or_else(|| DataFusionError::Execution("must have _rowid".to_string()))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| DataFusionError::Execution("_rowid is not UInt64".to_string()))?;

let mut resolved_blobs: HashMap<String, ArrayRef> = HashMap::new();
for (name, field_id) in &self.blob_field_name_to_id {
let resolved = resolve_blob_column(
&batch,
&self.dataset,
name,
*field_id,
row_ids,
)?;
resolved_blobs.insert(name.clone(), resolved);
}

let mut new_columns = Vec::with_capacity(batch.num_columns());
for (i, col) in batch.columns().iter().enumerate() {
let field_name = batch.schema_ref().field(i).name();
if let Some(resolved) = resolved_blobs.get(field_name) {
new_columns.push(resolved.clone());
} else {
new_columns.push(col.clone());
}
}

let new_batch = RecordBatch::try_new(self.output_schema.clone(), new_columns)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;

Poll::Ready(Some(Ok(new_batch)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(DataFusionError::Execution(
format!("inner stream error: {:?}", e),
)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

impl RecordBatchStream for ResolvedBlobStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
}
58 changes: 54 additions & 4 deletions rust/lance/src/dataset/index/frag_reuse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{sync::Arc};
use crate::dataset::transaction::{Operation, Transaction};
use crate::index::frag_reuse::{build_frag_reuse_index_metadata, load_frag_reuse_index_details};
use crate::Dataset;
Expand Down Expand Up @@ -150,12 +150,62 @@ fn is_index_remap_caught_up(
mod tests {
use super::*;
use crate::dataset::optimize::{compact_files, remapping, CompactionOptions};
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount, TestDatasetGenerator};
use all_asserts::{assert_false, assert_true};
use arrow_array::types::{Float32Type, Int32Type};
use lance_datagen::Dimension;
use arrow_array::types::{Float32Type, Int32Type, UInt64Type};
use chrono::TimeDelta;
use lance_core::Result;
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::{array, BatchCount, Dimension, RowCount};
use lance_encoding::version::LanceFileVersion;
use lance_index::scalar::ScalarIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
use crate::dataset::cleanup::{cleanup_old_versions, CleanupPolicyBuilder};
use crate::utils::temporal::utc_now;

#[tokio::test]
async fn test_compact_blob_files(){
let test_dir = TempStrDir::default();

let data = lance_datagen::gen_batch()
.col("filterme", array::step::<UInt64Type>())
.col("blobs", array::blob())
.into_reader_rows(RowCount::from(5), BatchCount::from(5))
.map(|batch| Ok(batch?))
.collect::<Result<Vec<_>>>()
.unwrap();

let dataset = Arc::new(
TestDatasetGenerator::new(data.clone(), LanceFileVersion::default())
.make_hostile(&test_dir)
.await,
);
let mut exclusive_dataset = Arc::try_unwrap(dataset).expect("This should be the only Arc pointing to the dataset");

// Compact and check index not caught up
compact_files(
&mut exclusive_dataset,
CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
},
None,
)
.await
.unwrap();

cleanup_old_versions(
&exclusive_dataset,
CleanupPolicyBuilder::default()
.before_timestamp(utc_now() - TimeDelta::try_seconds(1).unwrap())
.delete_unverified(true)
.error_if_tagged_old_versions(true)
.build(),
)
.await
.unwrap();
}

#[tokio::test]
async fn test_cleanup_frag_reuse_index() {
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ use serde::{Deserialize, Serialize};
use snafu::location;
use tracing::info;

use crate::dataset::blob_stream::wrap_blob_stream_if_needed;

pub mod remapping;

use crate::index::frag_reuse::build_new_frag_reuse_index;
Expand Down Expand Up @@ -691,6 +693,10 @@ async fn rewrite_files(
let (row_ids_rx, reader) = if needs_remapping {
scanner.with_row_id();
let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?);

let dataset_arc = Arc::new(dataset.clone().into_owned());
let data = wrap_blob_stream_if_needed(data, dataset_arc);

let (data_no_row_ids, row_id_rx) =
make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?;
(Some(row_id_rx), data_no_row_ids)
Expand Down
Loading