Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/python/benchmarks/test_random_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
# specifically for random access scans

tab = pq.read_table("~/lineitemsf1.snappy.parquet")
dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", use_legacy_format=True)
dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", use_legacy_format=False)
dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", data_storage_version="2.0")
dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", data_storage_version="2.1")

dsv1 = lance.dataset("/tmp/lineitem.lancev1")
dsv2 = lance.dataset("/tmp/lineitem.lancev2")
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ strum = { workspace =true, features = ["derive"] }
tokio.workspace = true
tracing.workspace = true
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
bytemuck = "1.14"
bytemuck = { version = "1.14", features = ["extern_crate_alloc"] }
byteorder.workspace = true
lz4 = { version = "1", optional = true }
zstd = { version = "0.13", optional = true }
Expand Down
40 changes: 28 additions & 12 deletions rust/lance-encoding/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
use lance_core::{utils::bit::is_pwr_two, Error, Result};
use snafu::location;
use std::borrow::Cow;

/// A copy-on-write byte buffer.
///
Expand Down Expand Up @@ -108,18 +109,6 @@ impl LanceBuffer {
}
}

/// Convert a buffer into a bytes::Bytes object
///
/// This convert is zero cost.
pub fn into_bytes(self) -> bytes::Bytes {
self.0.into_vec::<u8>().unwrap().into()
}

/// Creates an owned copy of the buffer, will always involve a full copy of the bytes
pub fn to_owned(&self) -> Self {
Self(Buffer::from_vec(self.0.to_vec()))
}

/// Make an owned copy of the buffer (always does a copy of the data)
pub fn deep_copy(&self) -> Self {
Self(Buffer::from_vec(self.0.to_vec()))
Expand Down Expand Up @@ -179,6 +168,33 @@ impl LanceBuffer {
}
}

/// Reinterprets a LanceBuffer into a &[T]
///
/// Unlike [`borrow_to_typed_slice`], this function returns a `Cow<'_, [T]>` instead of an owned
/// buffer. It saves the cost of Arc creation and destruction, which can be really helpful when
/// we borrow data and just drop it without reusing it.
///
/// Caller should decide which way to use based on their own needs.
///
/// If the underlying buffer is not properly aligned, this will involve a copy of the data
///
/// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
/// of the data. Lance does not support big-endian machines so this is safe. However, if we end
/// up supporting big-endian machines in the future, then any use of this method will need to be
/// carefully reviewed.
pub fn borrow_to_typed_view<T: ArrowNativeType + bytemuck::Pod>(&self) -> Cow<'_, [T]> {
let align = std::mem::align_of::<T>();
if self.len() % std::mem::size_of::<T>() != 0 {
panic!("attempt to view data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
}

if self.as_ptr().align_offset(align) == 0 {
Cow::Borrowed(bytemuck::cast_slice(&self.0))
} else {
Cow::Owned(bytemuck::pod_collect_to_vec(self.0.as_slice()))
}
}
Comment thread
Xuanwo marked this conversation as resolved.

/// Concatenates multiple buffers into a single buffer, consuming the input buffers
///
/// If there is only one buffer, it will be returned as is
Expand Down
9 changes: 3 additions & 6 deletions rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use arrow_array::{
types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type},
Array, ArrayRef, OffsetSizeTrait, UInt64Array,
};
use arrow_buffer::{
ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
};
use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::DataType;
use lance_arrow::DataTypeExt;
Expand Down Expand Up @@ -228,12 +226,11 @@ impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
}
}

impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
let block = data_block.as_variable_width_ref().unwrap();
assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);

let offsets: ScalarBuffer<T> = block.offsets.clone().borrow_to_typed_slice();
Comment thread
Xuanwo marked this conversation as resolved.
let offsets = block.offsets.borrow_to_typed_view::<T>();
Comment thread
Xuanwo marked this conversation as resolved.

let start_offset = offsets[selection.start as usize];
let end_offset = offsets[selection.end as usize];
Expand Down
20 changes: 18 additions & 2 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,15 @@ impl BatchDecodeStream {
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
let task = async move {
let next_task = next_task?;
next_task.into_batch(emitted_batch_size_warning)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were going to do the spawn fix by replacing the existing spawn with a spawn_cpu call? Looks like we are still introducing a new spawn call?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I misunderstood your previous comments. The ReadBatchTask contains a future, but spawn_cpu only accepts a blocking function. Are you suggesting we add a spawn_async_cpu function for our CPU runtime?

Copy link
Copy Markdown
Collaborator Author

@Xuanwo Xuanwo Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rethink about this and sure that we can remove the spwan inside wrap_with_row_id_and_delete which can speed up our perf a bit better.

But the spawn inside into_stream should keep as it allows the decode task to start as long as it's created instead of been polled.

Benchmarking V2_0 Filtered Scan (10000 limit): Collecting 100 samples in estimated 6.1V2_0 Filtered Scan (10000 limit)
                        time:   [1.2100 ms 1.2133 ms 1.2164 ms]
                        change: [-0.5410% -0.0909% +0.3994%] (p = 0.72 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  2 (2.00%) low mild
  5 (5.00%) high mild

Benchmarking V2_0 Random Take 5 rows: Collecting 100 samples in estimated 5.2221 s (76V2_0 Random Take 5 rows time:   [68.316 µs 68.599 µs 68.855 µs]
                        change: [-1.9455% -1.5034% -1.0910%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low severe
  3 (3.00%) high mild

Benchmarking V2_1 (FSST) Filtered Scan (10000 limit): Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.7s, enable flat sampling, or reduce sample count to 60.
Benchmarking V2_1 (FSST) Filtered Scan (10000 limit): Collecting 100 samples in estimaV2_1 (FSST) Filtered Scan (10000 limit)
                        time:   [1.3157 ms 1.3198 ms 1.3241 ms]
                        change: [-2.7881% -2.4058% -1.9991%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high mild

Benchmarking V2_1 (FSST) Random Take 5 rows: Collecting 100 samples in estimated 5.279V2_1 (FSST) Random Take 5 rows
                        time:   [64.018 µs 64.162 µs 64.311 µs]
                        change: [-2.7584% -2.3465% -1.9349%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  3 (3.00%) high mild
  3 (3.00%) high severe

Benchmarking V2_1 (FSST disabled) Filtered Scan (10000 limit): Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
Benchmarking V2_1 (FSST disabled) Filtered Scan (10000 limit): Collecting 100 samples V2_1 (FSST disabled) Filtered Scan (10000 limit)
                        time:   [1.2192 ms 1.2230 ms 1.2270 ms]
                        change: [-3.0966% -2.7663% -2.4295%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild

Benchmarking V2_1 (FSST disabled) Random Take 5 rows: Collecting 100 samples in estimaV2_1 (FSST disabled) Random Take 5 rows
                        time:   [63.852 µs 63.967 µs 64.110 µs]
                        change: [-4.0087% -3.6134% -3.2627%] (p = 0.00 < 0.05)
                        Performance has improved.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not entirely sure I agree but I don't want to go back and forth too much. We can merge this and revisit later (I still want to get rid of some of the I/O tasks) if you would like.

I think we will also want a more complex benchmark, we could use one of the more compute intensive TPC-H queries.

We will also need to add support for FilteredReadThreadingMode::MultiplePartitions in the Lance table provider.

The goal should be that one thread task does decoding and filtering. This way when we reach the filtering stage, the data is already in the CPU cache. If we put a spawn here then the decoding will happen on one thread task and the filtering on another. This means we will have to transfer the data between main memory.

Copy link
Copy Markdown
Collaborator Author

@Xuanwo Xuanwo Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #5242

I agree most of your comments. The blocker here is the change set might be bigger than we expected. Let's revisit this part as follow ups.

// Real decode work happens inside into_batch, which can block the current
// thread for a long time. By spawning it as a new task, we allow Tokio's
// worker threads to keep making progress.
tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
.await
.map_err(|err| Error::Wrapped {
error: err.into(),
location: location!(),
})?
};
(task, num_rows)
});
Expand Down Expand Up @@ -1760,7 +1768,15 @@ impl StructuralBatchDecodeStream {
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
let task = async move {
let next_task = next_task?;
next_task.into_batch(emitted_batch_size_warning)
// Real decode work happens inside into_batch, which can block the current
// thread for a long time. By spawning it as a new task, we allow Tokio's
// worker threads to keep making progress.
tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
.await
.map_err(|err| Error::Wrapped {
error: err.into(),
location: location!(),
})?
};
(task, num_rows)
});
Expand Down
18 changes: 7 additions & 11 deletions rust/lance-table/src/utils/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use lance_core::{
ROW_LAST_UPDATED_AT_VERSION_FIELD,
};
use lance_io::ReadBatchParams;
use tracing::{instrument, Instrument};
use tracing::instrument;

use crate::rowids::RowIdSequence;

Expand Down Expand Up @@ -379,16 +379,12 @@ pub fn wrap_with_row_id_and_delete(
let this_offset = offset;
let num_rows = batch_task.num_rows;
offset += num_rows;
let task = batch_task.task;
tokio::spawn(
async move {
let batch = task.await?;
apply_row_id_and_deletes(batch, this_offset, fragment_id, config.as_ref())
}
.in_current_span(),
)
.map(|join_wrapper| join_wrapper.unwrap())
.boxed()
batch_task
.task
.map(move |batch| {
apply_row_id_and_deletes(batch?, this_offset, fragment_id, config.as_ref())
})
.boxed()
})
.boxed()
}
Expand Down
4 changes: 4 additions & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,9 @@ harness = false
name = "take"
harness = false

[[bench]]
name = "random_access"
harness = false

[lints]
workspace = true
167 changes: 167 additions & 0 deletions rust/lance/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use arrow_array::{Float64Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use criterion::{criterion_group, criterion_main, Criterion};
use lance::dataset::{Dataset, ProjectionRequest, WriteParams};
use lance_file::version::LanceFileVersion;
use std::collections::HashMap;
use tokio::runtime::Runtime;
use uuid::Uuid;

const TOTAL_ROWS: usize = 500_000;
const BATCH_SIZE: usize = 1024;
const LIMIT: i64 = 10_000;
const SHIP_MODES: [&str; 5] = ["FOB", "RAIL", "AIR", "MAIL", "TRUCK"];
const ROW_IDS: [u64; 5] = [1, 40, 100, 130, 200];

fn bench_random_access(c: &mut Criterion) {
let runtime = Runtime::new().expect("failed to build tokio runtime");

let dataset_v2_0 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_0, true));
let dataset_v2_1_fsst = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1, true));
let dataset_v2_1_no_fsst = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1, false));

benchmark_dataset(&runtime, c, dataset_v2_0, "V2_0");
benchmark_dataset(&runtime, c, dataset_v2_1_fsst, "V2_1 (FSST)");
benchmark_dataset(&runtime, c, dataset_v2_1_no_fsst, "V2_1 (FSST disabled)");
}

fn benchmark_dataset(rt: &Runtime, c: &mut Criterion, dataset: Dataset, label: &str) {
let dataset = Arc::new(dataset);
bench_filtered_scan(rt, c, dataset.clone(), label);
bench_random_take(rt, c, dataset, label);
}

fn bench_filtered_scan(rt: &Runtime, c: &mut Criterion, dataset: Arc<Dataset>, label: &str) {
let bench_name = format!("{label} Filtered Scan ({LIMIT} limit)");
c.bench_function(&bench_name, |b| {
let dataset = dataset.clone();
b.to_async(rt).iter(move || {
let dataset = dataset.clone();
async move {
let batch = dataset
.scan()
.filter("l_shipmode = 'FOB'")
.expect("failed to apply filter")
.limit(Some(LIMIT), None)
.expect("failed to set limit")
.try_into_batch()
.await
.expect("scan execution failed");
assert_eq!(batch.num_rows(), LIMIT as usize);
}
});
});
}

fn bench_random_take(rt: &Runtime, c: &mut Criterion, dataset: Arc<Dataset>, label: &str) {
let bench_name = format!("{label} Random Take {} rows", ROW_IDS.len());
let projection = Arc::new(dataset.schema().clone());
c.bench_function(&bench_name, |b| {
let dataset = dataset.clone();
let projection = projection.clone();
b.to_async(rt).iter(move || {
let dataset = dataset.clone();
let projection = projection.clone();
async move {
let batch = dataset
.take_rows(&ROW_IDS, ProjectionRequest::Schema(projection.clone()))
.await
.expect("take_rows failed");
assert_eq!(batch.num_rows(), ROW_IDS.len());
}
});
});
}

fn utf8_field_without_fsst(name: &str) -> Field {
let mut metadata = HashMap::new();
metadata.insert("lance-encoding:compression".to_string(), "none".to_string());
Field::new(name, DataType::Utf8, false).with_metadata(metadata)
}

fn utf8_field_for(version: LanceFileVersion, enable_fsst: bool, name: &str) -> Field {
if enable_fsst && version >= LanceFileVersion::V2_1 {
Field::new(name, DataType::Utf8, false)
} else {
utf8_field_without_fsst(name)
}
}

async fn prepare_dataset(version: LanceFileVersion, enable_fsst: bool) -> Dataset {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
utf8_field_for(version, enable_fsst, "l_shipmode"),
Field::new("l_extendedprice", DataType::Float64, false),
utf8_field_for(version, enable_fsst, "l_comment"),
]));

let batches = generate_batches(schema.clone());
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);

let params = WriteParams {
data_storage_version: Some(version),
..Default::default()
};

let uri = format!(
"memory://random-access-{}-{}",
version_label(version),
Uuid::new_v4()
);

Dataset::write(reader, uri.as_str(), Some(params))
.await
.expect("failed to write dataset")
}

fn generate_batches(schema: Arc<ArrowSchema>) -> Vec<RecordBatch> {
let mut batches = Vec::with_capacity(TOTAL_ROWS.div_ceil(BATCH_SIZE));
let mut start = 0usize;

while start < TOTAL_ROWS {
let end = usize::min(start + BATCH_SIZE, TOTAL_ROWS);
let order_key = Int64Array::from_iter_values((start as i64)..(end as i64));
let ship_mode = StringArray::from_iter_values(
(start..end).map(|idx| SHIP_MODES[idx % SHIP_MODES.len()].to_string()),
);
let extended_price = Float64Array::from_iter_values((start..end).map(|idx| {
let base = (idx % 10_000) as f64;
base * 1.5 + 42.0
}));
let comment = StringArray::from_iter_values(
(start..end).map(|idx| format!("Shipment comment #{idx}")),
);

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(order_key),
Arc::new(ship_mode),
Arc::new(extended_price),
Arc::new(comment),
],
)
.expect("failed to build record batch");

batches.push(batch);
start = end;
}

batches
}

fn version_label(version: LanceFileVersion) -> &'static str {
match version {
LanceFileVersion::V2_0 => "v2_0",
LanceFileVersion::V2_1 => "v2_1",
_ => "other",
}
}

criterion_group!(benches, bench_random_access);
criterion_main!(benches);
Loading