diff --git a/python/python/benchmarks/test_random_access.py b/python/python/benchmarks/test_random_access.py index b13a81070c3..14dcddb299b 100644 --- a/python/python/benchmarks/test_random_access.py +++ b/python/python/benchmarks/test_random_access.py @@ -10,8 +10,8 @@ # specifically for random access scans tab = pq.read_table("~/lineitemsf1.snappy.parquet") -dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", use_legacy_format=True) -dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", use_legacy_format=False) +dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", data_storage_version="2.0") +dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", data_storage_version="2.1") dsv1 = lance.dataset("/tmp/lineitem.lancev1") dsv2 = lance.dataset("/tmp/lineitem.lancev2") diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 27278667a6f..2e233b170ca 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -38,7 +38,7 @@ strum = { workspace =true, features = ["derive"] } tokio.workspace = true tracing.workspace = true xxhash-rust = { version = "0.8.15", features = ["xxh3"] } -bytemuck = "1.14" +bytemuck = { version = "1.14", features = ["extern_crate_alloc"] } byteorder.workspace = true lz4 = { version = "1", optional = true } zstd = { version = "0.13", optional = true } diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index 6808f9f07b7..d9d32bb0794 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -8,6 +8,7 @@ use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc}; use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; use lance_core::{utils::bit::is_pwr_two, Error, Result}; use snafu::location; +use std::borrow::Cow; /// A copy-on-write byte buffer. /// @@ -108,18 +109,6 @@ impl LanceBuffer { } } - /// Convert a buffer into a bytes::Bytes object - /// - /// This convert is zero cost. - pub fn into_bytes(self) -> bytes::Bytes { - self.0.into_vec::().unwrap().into() - } - - /// Creates an owned copy of the buffer, will always involve a full copy of the bytes - pub fn to_owned(&self) -> Self { - Self(Buffer::from_vec(self.0.to_vec())) - } - /// Make an owned copy of the buffer (always does a copy of the data) pub fn deep_copy(&self) -> Self { Self(Buffer::from_vec(self.0.to_vec())) @@ -179,6 +168,33 @@ impl LanceBuffer { } } + /// Reinterprets a LanceBuffer into a &[T] + /// + /// Unlike [`borrow_to_typed_slice`], this function returns a `Cow<'_, [T]>` instead of an owned + /// buffer. It saves the cost of Arc creation and destruction, which can be really helpful when + /// we borrow data and just drop it without reusing it. + /// + /// Caller should decide which way to use based on their own needs. + /// + /// If the underlying buffer is not properly aligned, this will involve a copy of the data + /// + /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness + /// of the data. Lance does not support big-endian machines so this is safe. However, if we end + /// up supporting big-endian machines in the future, then any use of this method will need to be + /// carefully reviewed. + pub fn borrow_to_typed_view(&self) -> Cow<'_, [T]> { + let align = std::mem::align_of::(); + if self.len() % std::mem::size_of::() != 0 { + panic!("attempt to view data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::(), self.len()); + } + + if self.as_ptr().align_offset(align) == 0 { + Cow::Borrowed(bytemuck::cast_slice(&self.0)) + } else { + Cow::Owned(bytemuck::pod_collect_to_vec(self.0.as_slice())) + } + } + /// Concatenates multiple buffers into a single buffer, consuming the input buffers /// /// If there is only one buffer, it will be returned as is diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index 3e495979f24..32144ad07e9 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -25,9 +25,7 @@ use arrow_array::{ types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type}, Array, ArrayRef, OffsetSizeTrait, UInt64Array, }; -use arrow_buffer::{ - ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer, -}; +use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::DataType; use lance_arrow::DataTypeExt; @@ -228,12 +226,11 @@ impl VariableWidthDataBlockBuilder { } } -impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder { +impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder { fn append(&mut self, data_block: &DataBlock, selection: Range) { let block = data_block.as_variable_width_ref().unwrap(); assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8); - - let offsets: ScalarBuffer = block.offsets.clone().borrow_to_typed_slice(); + let offsets = block.offsets.borrow_to_typed_view::(); let start_offset = offsets[selection.start as usize]; let end_offset = offsets[selection.end as usize]; diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 50468277bda..70730d21371 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1437,7 +1437,15 @@ impl BatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - next_task.into_batch(emitted_batch_size_warning) + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + .await + .map_err(|err| Error::Wrapped { + error: err.into(), + location: location!(), + })? }; (task, num_rows) }); @@ -1760,7 +1768,15 @@ impl StructuralBatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - next_task.into_batch(emitted_batch_size_warning) + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + .await + .map_err(|err| Error::Wrapped { + error: err.into(), + location: location!(), + })? }; (task, num_rows) }); diff --git a/rust/lance-table/src/utils/stream.rs b/rust/lance-table/src/utils/stream.rs index 97aa3151cd9..e0ee403749c 100644 --- a/rust/lance-table/src/utils/stream.rs +++ b/rust/lance-table/src/utils/stream.rs @@ -17,7 +17,7 @@ use lance_core::{ ROW_LAST_UPDATED_AT_VERSION_FIELD, }; use lance_io::ReadBatchParams; -use tracing::{instrument, Instrument}; +use tracing::instrument; use crate::rowids::RowIdSequence; @@ -379,16 +379,12 @@ pub fn wrap_with_row_id_and_delete( let this_offset = offset; let num_rows = batch_task.num_rows; offset += num_rows; - let task = batch_task.task; - tokio::spawn( - async move { - let batch = task.await?; - apply_row_id_and_deletes(batch, this_offset, fragment_id, config.as_ref()) - } - .in_current_span(), - ) - .map(|join_wrapper| join_wrapper.unwrap()) - .boxed() + batch_task + .task + .map(move |batch| { + apply_row_id_and_deletes(batch?, this_offset, fragment_id, config.as_ref()) + }) + .boxed() }) .boxed() } diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 44093e2f6fa..045d54f289c 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -155,5 +155,9 @@ harness = false name = "take" harness = false +[[bench]] +name = "random_access" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/random_access.rs b/rust/lance/benches/random_access.rs new file mode 100644 index 00000000000..7ead6c2045c --- /dev/null +++ b/rust/lance/benches/random_access.rs @@ -0,0 +1,167 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{Float64Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use lance::dataset::{Dataset, ProjectionRequest, WriteParams}; +use lance_file::version::LanceFileVersion; +use std::collections::HashMap; +use tokio::runtime::Runtime; +use uuid::Uuid; + +const TOTAL_ROWS: usize = 500_000; +const BATCH_SIZE: usize = 1024; +const LIMIT: i64 = 10_000; +const SHIP_MODES: [&str; 5] = ["FOB", "RAIL", "AIR", "MAIL", "TRUCK"]; +const ROW_IDS: [u64; 5] = [1, 40, 100, 130, 200]; + +fn bench_random_access(c: &mut Criterion) { + let runtime = Runtime::new().expect("failed to build tokio runtime"); + + let dataset_v2_0 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_0, true)); + let dataset_v2_1_fsst = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1, true)); + let dataset_v2_1_no_fsst = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1, false)); + + benchmark_dataset(&runtime, c, dataset_v2_0, "V2_0"); + benchmark_dataset(&runtime, c, dataset_v2_1_fsst, "V2_1 (FSST)"); + benchmark_dataset(&runtime, c, dataset_v2_1_no_fsst, "V2_1 (FSST disabled)"); +} + +fn benchmark_dataset(rt: &Runtime, c: &mut Criterion, dataset: Dataset, label: &str) { + let dataset = Arc::new(dataset); + bench_filtered_scan(rt, c, dataset.clone(), label); + bench_random_take(rt, c, dataset, label); +} + +fn bench_filtered_scan(rt: &Runtime, c: &mut Criterion, dataset: Arc, label: &str) { + let bench_name = format!("{label} Filtered Scan ({LIMIT} limit)"); + c.bench_function(&bench_name, |b| { + let dataset = dataset.clone(); + b.to_async(rt).iter(move || { + let dataset = dataset.clone(); + async move { + let batch = dataset + .scan() + .filter("l_shipmode = 'FOB'") + .expect("failed to apply filter") + .limit(Some(LIMIT), None) + .expect("failed to set limit") + .try_into_batch() + .await + .expect("scan execution failed"); + assert_eq!(batch.num_rows(), LIMIT as usize); + } + }); + }); +} + +fn bench_random_take(rt: &Runtime, c: &mut Criterion, dataset: Arc, label: &str) { + let bench_name = format!("{label} Random Take {} rows", ROW_IDS.len()); + let projection = Arc::new(dataset.schema().clone()); + c.bench_function(&bench_name, |b| { + let dataset = dataset.clone(); + let projection = projection.clone(); + b.to_async(rt).iter(move || { + let dataset = dataset.clone(); + let projection = projection.clone(); + async move { + let batch = dataset + .take_rows(&ROW_IDS, ProjectionRequest::Schema(projection.clone())) + .await + .expect("take_rows failed"); + assert_eq!(batch.num_rows(), ROW_IDS.len()); + } + }); + }); +} + +fn utf8_field_without_fsst(name: &str) -> Field { + let mut metadata = HashMap::new(); + metadata.insert("lance-encoding:compression".to_string(), "none".to_string()); + Field::new(name, DataType::Utf8, false).with_metadata(metadata) +} + +fn utf8_field_for(version: LanceFileVersion, enable_fsst: bool, name: &str) -> Field { + if enable_fsst && version >= LanceFileVersion::V2_1 { + Field::new(name, DataType::Utf8, false) + } else { + utf8_field_without_fsst(name) + } +} + +async fn prepare_dataset(version: LanceFileVersion, enable_fsst: bool) -> Dataset { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("l_orderkey", DataType::Int64, false), + utf8_field_for(version, enable_fsst, "l_shipmode"), + Field::new("l_extendedprice", DataType::Float64, false), + utf8_field_for(version, enable_fsst, "l_comment"), + ])); + + let batches = generate_batches(schema.clone()); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + + let params = WriteParams { + data_storage_version: Some(version), + ..Default::default() + }; + + let uri = format!( + "memory://random-access-{}-{}", + version_label(version), + Uuid::new_v4() + ); + + Dataset::write(reader, uri.as_str(), Some(params)) + .await + .expect("failed to write dataset") +} + +fn generate_batches(schema: Arc) -> Vec { + let mut batches = Vec::with_capacity(TOTAL_ROWS.div_ceil(BATCH_SIZE)); + let mut start = 0usize; + + while start < TOTAL_ROWS { + let end = usize::min(start + BATCH_SIZE, TOTAL_ROWS); + let order_key = Int64Array::from_iter_values((start as i64)..(end as i64)); + let ship_mode = StringArray::from_iter_values( + (start..end).map(|idx| SHIP_MODES[idx % SHIP_MODES.len()].to_string()), + ); + let extended_price = Float64Array::from_iter_values((start..end).map(|idx| { + let base = (idx % 10_000) as f64; + base * 1.5 + 42.0 + })); + let comment = StringArray::from_iter_values( + (start..end).map(|idx| format!("Shipment comment #{idx}")), + ); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(order_key), + Arc::new(ship_mode), + Arc::new(extended_price), + Arc::new(comment), + ], + ) + .expect("failed to build record batch"); + + batches.push(batch); + start = end; + } + + batches +} + +fn version_label(version: LanceFileVersion) -> &'static str { + match version { + LanceFileVersion::V2_0 => "v2_0", + LanceFileVersion::V2_1 => "v2_1", + _ => "other", + } +} + +criterion_group!(benches, bench_random_access); +criterion_main!(benches);