From 98272a3d8c9195e214be80eda56bd7292bcd7f4c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 6 Nov 2025 17:42:29 +0800 Subject: [PATCH 1/8] add bench cases for rust Signed-off-by: Xuanwo --- rust/lance/Cargo.toml | 4 + rust/lance/benches/random_access.rs | 159 ++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 rust/lance/benches/random_access.rs diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 56770ed306b..d92c9cb7532 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -154,5 +154,9 @@ harness = false name = "take" harness = false +[[bench]] +name = "random_access" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/random_access.rs b/rust/lance/benches/random_access.rs new file mode 100644 index 00000000000..e8d0f9e1a6f --- /dev/null +++ b/rust/lance/benches/random_access.rs @@ -0,0 +1,159 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{Float64Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use lance::dataset::{Dataset, ProjectionRequest, WriteParams}; +use lance_file::version::LanceFileVersion; +use std::collections::HashMap; +use tokio::runtime::Runtime; +use uuid::Uuid; + +const TOTAL_ROWS: usize = 500_000; +const BATCH_SIZE: usize = 1024; +const LIMIT: i64 = 10_000; +const SHIP_MODES: [&str; 5] = ["FOB", "RAIL", "AIR", "MAIL", "TRUCK"]; +const ROW_IDS: [u64; 5] = [1, 40, 100, 130, 200]; + +fn bench_random_access(c: &mut Criterion) { + let runtime = Runtime::new().expect("failed to build tokio runtime"); + + let dataset_v2_0 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_0)); + let dataset_v2_1 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1)); + + // benchmark_dataset(&runtime, c, dataset_v2_0, "V2_0"); + benchmark_dataset(&runtime, c, dataset_v2_1, "V2_1"); +} + +fn benchmark_dataset(rt: &Runtime, c: &mut Criterion, dataset: Dataset, label: &str) { + let dataset = Arc::new(dataset); + bench_filtered_scan(rt, c, dataset.clone(), label); + // bench_random_take(rt, c, dataset, label); +} + +fn bench_filtered_scan(rt: &Runtime, c: &mut Criterion, dataset: Arc, label: &str) { + let bench_name = format!("{label} Filtered Scan ({LIMIT} limit)"); + c.bench_function(&bench_name, |b| { + let dataset = dataset.clone(); + b.to_async(rt).iter(move || { + let dataset = dataset.clone(); + async move { + let batch = dataset + .scan() + .filter("l_shipmode = 'FOB'") + .expect("failed to apply filter") + .limit(Some(LIMIT), None) + .expect("failed to set limit") + .try_into_batch() + .await + .expect("scan execution failed"); + assert_eq!(batch.num_rows(), LIMIT as usize); + } + }); + }); +} + +fn bench_random_take(rt: &Runtime, c: &mut Criterion, dataset: Arc, label: &str) { + let bench_name = format!("{label} Random Take {} rows", ROW_IDS.len()); + let projection = Arc::new(dataset.schema().clone()); + c.bench_function(&bench_name, |b| { + let dataset = dataset.clone(); + let projection = projection.clone(); + b.to_async(rt).iter(move || { + let dataset = dataset.clone(); + let projection = projection.clone(); + async move { + let batch = dataset + .take_rows(&ROW_IDS, ProjectionRequest::Schema(projection.clone())) + .await + .expect("take_rows failed"); + assert_eq!(batch.num_rows(), ROW_IDS.len()); + } + }); + }); +} + +fn utf8_field_without_fsst(name: &str) -> Field { + let mut metadata = HashMap::new(); + metadata.insert("lance-encoding:compression".to_string(), "none".to_string()); + Field::new(name, DataType::Utf8, false).with_metadata(metadata) +} + +async fn prepare_dataset(version: LanceFileVersion) -> Dataset { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("l_orderkey", DataType::Int64, false), + // Field::new("l_shipmode", DataType::Utf8, false), + utf8_field_without_fsst("l_shipmode"), + Field::new("l_extendedprice", DataType::Float64, false), + // Field::new("l_comment", DataType::Utf8, false), + utf8_field_without_fsst("l_comment"), + ])); + + let batches = generate_batches(schema.clone()); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + + let params = WriteParams { + data_storage_version: Some(version), + ..Default::default() + }; + + let uri = format!( + "memory://random-access-{}-{}", + version_label(version), + Uuid::new_v4() + ); + + Dataset::write(reader, uri.as_str(), Some(params)) + .await + .expect("failed to write dataset") +} + +fn generate_batches(schema: Arc) -> Vec { + let mut batches = Vec::with_capacity((TOTAL_ROWS + BATCH_SIZE - 1) / BATCH_SIZE); + let mut start = 0usize; + + while start < TOTAL_ROWS { + let end = usize::min(start + BATCH_SIZE, TOTAL_ROWS); + let order_key = Int64Array::from_iter_values((start as i64)..(end as i64)); + let ship_mode = StringArray::from_iter_values( + (start..end).map(|idx| SHIP_MODES[idx % SHIP_MODES.len()].to_string()), + ); + let extended_price = Float64Array::from_iter_values((start..end).map(|idx| { + let base = (idx % 10_000) as f64; + base * 1.5 + 42.0 + })); + let comment = StringArray::from_iter_values( + (start..end).map(|idx| format!("Shipment comment #{idx}")), + ); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(order_key), + Arc::new(ship_mode), + Arc::new(extended_price), + Arc::new(comment), + ], + ) + .expect("failed to build record batch"); + + batches.push(batch); + start = end; + } + + batches +} + +fn version_label(version: LanceFileVersion) -> &'static str { + match version { + LanceFileVersion::V2_0 => "v2_0", + LanceFileVersion::V2_1 => "v2_1", + _ => "other", + } +} + +criterion_group!(benches, bench_random_access); +criterion_main!(benches); From dd9050cffd697243eab2728823fae00b95bd8faf Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 6 Nov 2025 17:42:54 +0800 Subject: [PATCH 2/8] compare between 2.0 and 2.1 Signed-off-by: Xuanwo --- python/python/benchmarks/test_random_access.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/python/benchmarks/test_random_access.py b/python/python/benchmarks/test_random_access.py index b13a81070c3..14dcddb299b 100644 --- a/python/python/benchmarks/test_random_access.py +++ b/python/python/benchmarks/test_random_access.py @@ -10,8 +10,8 @@ # specifically for random access scans tab = pq.read_table("~/lineitemsf1.snappy.parquet") -dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", use_legacy_format=True) -dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", use_legacy_format=False) +dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", data_storage_version="2.0") +dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", data_storage_version="2.1") dsv1 = lance.dataset("/tmp/lineitem.lancev1") dsv2 = lance.dataset("/tmp/lineitem.lancev2") From 2951c3a7d78d5bb4e49cfa4d1b8a7252f68a95dc Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 6 Nov 2025 17:51:56 +0800 Subject: [PATCH 3/8] Make clippy happy Signed-off-by: Xuanwo --- rust/lance/benches/random_access.rs | 30 ++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/rust/lance/benches/random_access.rs b/rust/lance/benches/random_access.rs index e8d0f9e1a6f..7ead6c2045c 100644 --- a/rust/lance/benches/random_access.rs +++ b/rust/lance/benches/random_access.rs @@ -21,17 +21,19 @@ const ROW_IDS: [u64; 5] = [1, 40, 100, 130, 200]; fn bench_random_access(c: &mut Criterion) { let runtime = Runtime::new().expect("failed to build tokio runtime"); - let dataset_v2_0 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_0)); - let dataset_v2_1 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1)); + let dataset_v2_0 = runtime.block_on(prepare_dataset(LanceFileVersion::V2_0, true)); + let dataset_v2_1_fsst = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1, true)); + let dataset_v2_1_no_fsst = runtime.block_on(prepare_dataset(LanceFileVersion::V2_1, false)); - // benchmark_dataset(&runtime, c, dataset_v2_0, "V2_0"); - benchmark_dataset(&runtime, c, dataset_v2_1, "V2_1"); + benchmark_dataset(&runtime, c, dataset_v2_0, "V2_0"); + benchmark_dataset(&runtime, c, dataset_v2_1_fsst, "V2_1 (FSST)"); + benchmark_dataset(&runtime, c, dataset_v2_1_no_fsst, "V2_1 (FSST disabled)"); } fn benchmark_dataset(rt: &Runtime, c: &mut Criterion, dataset: Dataset, label: &str) { let dataset = Arc::new(dataset); bench_filtered_scan(rt, c, dataset.clone(), label); - // bench_random_take(rt, c, dataset, label); + bench_random_take(rt, c, dataset, label); } fn bench_filtered_scan(rt: &Runtime, c: &mut Criterion, dataset: Arc, label: &str) { @@ -82,14 +84,20 @@ fn utf8_field_without_fsst(name: &str) -> Field { Field::new(name, DataType::Utf8, false).with_metadata(metadata) } -async fn prepare_dataset(version: LanceFileVersion) -> Dataset { +fn utf8_field_for(version: LanceFileVersion, enable_fsst: bool, name: &str) -> Field { + if enable_fsst && version >= LanceFileVersion::V2_1 { + Field::new(name, DataType::Utf8, false) + } else { + utf8_field_without_fsst(name) + } +} + +async fn prepare_dataset(version: LanceFileVersion, enable_fsst: bool) -> Dataset { let schema = Arc::new(ArrowSchema::new(vec![ Field::new("l_orderkey", DataType::Int64, false), - // Field::new("l_shipmode", DataType::Utf8, false), - utf8_field_without_fsst("l_shipmode"), + utf8_field_for(version, enable_fsst, "l_shipmode"), Field::new("l_extendedprice", DataType::Float64, false), - // Field::new("l_comment", DataType::Utf8, false), - utf8_field_without_fsst("l_comment"), + utf8_field_for(version, enable_fsst, "l_comment"), ])); let batches = generate_batches(schema.clone()); @@ -112,7 +120,7 @@ async fn prepare_dataset(version: LanceFileVersion) -> Dataset { } fn generate_batches(schema: Arc) -> Vec { - let mut batches = Vec::with_capacity((TOTAL_ROWS + BATCH_SIZE - 1) / BATCH_SIZE); + let mut batches = Vec::with_capacity(TOTAL_ROWS.div_ceil(BATCH_SIZE)); let mut start = 0usize; while start < TOTAL_ROWS { From 1990809a9c523932720db778253ac719f99f4f8e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 6 Nov 2025 18:41:27 +0800 Subject: [PATCH 4/8] optimize Signed-off-by: Xuanwo --- rust/lance-encoding/src/data.rs | 68 +++++++++++++++++++++--------- rust/lance-encoding/src/decoder.rs | 5 ++- 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index 3e495979f24..b097df3fc4d 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -25,9 +25,7 @@ use arrow_array::{ types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type}, Array, ArrayRef, OffsetSizeTrait, UInt64Array, }; -use arrow_buffer::{ - ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer, -}; +use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::DataType; use lance_arrow::DataTypeExt; @@ -233,25 +231,57 @@ impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder< let block = data_block.as_variable_width_ref().unwrap(); assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8); - let offsets: ScalarBuffer = block.offsets.clone().borrow_to_typed_slice(); + let selection_start = selection.start as usize; + let selection_end = selection.end as usize; + if selection_start == selection_end { + return; + } - let start_offset = offsets[selection.start as usize]; - let end_offset = offsets[selection.end as usize]; - let mut previous_len = self.bytes.len(); + let offsets_bytes = block.offsets.as_ref(); + let offset_size = std::mem::size_of::(); + + // TODO: refactor this into a better public API for buffer. + fn read_offset(bytes: &[u8], stride: usize, index: usize) -> usize { + let start = index * stride; + let end = start + stride; + let chunk = &bytes[start..end]; + match stride { + 4 => { + let arr: [u8; 4] = chunk + .try_into() + .expect("invalid 32-bit offset slice length"); + u32::from_le_bytes(arr) as usize + } + 8 => { + let arr: [u8; 8] = chunk + .try_into() + .expect("invalid 64-bit offset slice length"); + u64::from_le_bytes(arr) as usize + } + _ => unreachable!("unsupported offset width"), + } + } - self.bytes - .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]); + let start_offset = read_offset(offsets_bytes, offset_size, selection_start); + let end_offset = read_offset(offsets_bytes, offset_size, selection_end); - self.offsets.extend( - offsets[selection.start as usize..selection.end as usize] - .iter() - .zip(&offsets[selection.start as usize + 1..=selection.end as usize]) - .map(|(¤t, &next)| { - let this_value_len = next - current; - previous_len += this_value_len.as_usize(); - T::from_usize(previous_len).unwrap() - }), - ); + let bytes_to_copy = end_offset - start_offset; + self.bytes.reserve(bytes_to_copy); + self.offsets.reserve(selection_end - selection_start); + + let previous_len = self.bytes.len(); + self.bytes + .extend_from_slice(&block.data[start_offset..end_offset]); + + let mut running_len = previous_len; + let mut prev_offset = start_offset; + for idx in selection_start + 1..=selection_end { + let next_offset = read_offset(offsets_bytes, offset_size, idx); + running_len += next_offset - prev_offset; + self.offsets + .push(T::from_usize(running_len).expect("offset overflow")); + prev_offset = next_offset; + } } fn finish(self: Box) -> DataBlock { diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 50468277bda..8b5a4594f41 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -227,6 +227,7 @@ use lance_arrow::DataTypeExt; use lance_core::cache::LanceCache; use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD}; use lance_core::utils::futures::FinallyStreamExt; +use lance_core::utils::tokio::spawn_cpu; use log::{debug, trace, warn}; use snafu::location; use tokio::sync::mpsc::error::SendError; @@ -1437,7 +1438,7 @@ impl BatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - next_task.into_batch(emitted_batch_size_warning) + spawn_cpu(move || next_task.into_batch(emitted_batch_size_warning)).await }; (task, num_rows) }); @@ -1760,7 +1761,7 @@ impl StructuralBatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - next_task.into_batch(emitted_batch_size_warning) + spawn_cpu(move || next_task.into_batch(emitted_batch_size_warning)).await }; (task, num_rows) }); From 19d7264f8c91aedc60f17bb5e3aee9b8ab373972 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 7 Nov 2025 18:43:15 +0800 Subject: [PATCH 5/8] Use tokio spawn instead Signed-off-by: Xuanwo --- rust/lance-encoding/src/decoder.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 8b5a4594f41..70730d21371 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -227,7 +227,6 @@ use lance_arrow::DataTypeExt; use lance_core::cache::LanceCache; use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD}; use lance_core::utils::futures::FinallyStreamExt; -use lance_core::utils::tokio::spawn_cpu; use log::{debug, trace, warn}; use snafu::location; use tokio::sync::mpsc::error::SendError; @@ -1438,7 +1437,15 @@ impl BatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - spawn_cpu(move || next_task.into_batch(emitted_batch_size_warning)).await + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + .await + .map_err(|err| Error::Wrapped { + error: err.into(), + location: location!(), + })? }; (task, num_rows) }); @@ -1761,7 +1768,15 @@ impl StructuralBatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - spawn_cpu(move || next_task.into_batch(emitted_batch_size_warning)).await + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + .await + .map_err(|err| Error::Wrapped { + error: err.into(), + location: location!(), + })? }; (task, num_rows) }); From 1bd887f743ad0712c327082eedfc92b1142d12dd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 8 Nov 2025 03:13:53 +0800 Subject: [PATCH 6/8] Let's rock Signed-off-by: Xuanwo --- rust/lance-encoding/Cargo.toml | 2 +- rust/lance-encoding/src/buffer.rs | 41 ++++++++++++++----- rust/lance-encoding/src/data.rs | 67 ++++++++----------------------- 3 files changed, 48 insertions(+), 62 deletions(-) diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 27278667a6f..2e233b170ca 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -38,7 +38,7 @@ strum = { workspace =true, features = ["derive"] } tokio.workspace = true tracing.workspace = true xxhash-rust = { version = "0.8.15", features = ["xxh3"] } -bytemuck = "1.14" +bytemuck = { version = "1.14", features = ["extern_crate_alloc"] } byteorder.workspace = true lz4 = { version = "1", optional = true } zstd = { version = "0.13", optional = true } diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index 6808f9f07b7..6912ffe8c4a 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -5,9 +5,10 @@ use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc}; -use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; +use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer}; use lance_core::{utils::bit::is_pwr_two, Error, Result}; use snafu::location; +use std::borrow::Cow; /// A copy-on-write byte buffer. /// @@ -115,11 +116,6 @@ impl LanceBuffer { self.0.into_vec::().unwrap().into() } - /// Creates an owned copy of the buffer, will always involve a full copy of the bytes - pub fn to_owned(&self) -> Self { - Self(Buffer::from_vec(self.0.to_vec())) - } - /// Make an owned copy of the buffer (always does a copy of the data) pub fn deep_copy(&self) -> Self { Self(Buffer::from_vec(self.0.to_vec())) @@ -171,11 +167,34 @@ impl LanceBuffer { if is_aligned { ScalarBuffer::::from(self.clone().into_buffer()) } else { - let num_values = self.len() / std::mem::size_of::(); - let vec = Vec::::with_capacity(num_values); - let mut bytes = MutableBuffer::from(vec); - bytes.extend_from_slice(self); - ScalarBuffer::::from(Buffer::from(bytes)) + ScalarBuffer::::from(self.deep_copy().into_buffer()) + } + } + + /// Reinterprets a LanceBuffer into a &[T] + /// + /// Unlike [`borrow_to_typed_slice`], this function returns a `Cow<'_, [T]>` instead of an owned + /// buffer. It saves the cost of Arc creation and destruction, which can be really helpful when + /// we borrow data and just drop it without reusing it. + /// + /// Caller should decide which way to use based on their own needs. + /// + /// If the underlying buffer is not properly aligned, this will involve a copy of the data + /// + /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness + /// of the data. Lance does not support big-endian machines so this is safe. However, if we end + /// up supporting big-endian machines in the future, then any use of this method will need to be + /// carefully reviewed. + pub fn borrow_to_typed_view(&self) -> Cow<'_, [T]> { + let align = std::mem::align_of::(); + if self.len() % std::mem::size_of::() != 0 { + panic!("attempt to view data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::(), self.len()); + } + + if self.as_ptr().align_offset(align) == 0 { + Cow::Borrowed(bytemuck::cast_slice(&self.0)) + } else { + Cow::Owned(bytemuck::cast_vec(self.0.to_vec())) } } diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index b097df3fc4d..32144ad07e9 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -226,62 +226,29 @@ impl VariableWidthDataBlockBuilder { } } -impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder { +impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder { fn append(&mut self, data_block: &DataBlock, selection: Range) { let block = data_block.as_variable_width_ref().unwrap(); assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8); + let offsets = block.offsets.borrow_to_typed_view::(); - let selection_start = selection.start as usize; - let selection_end = selection.end as usize; - if selection_start == selection_end { - return; - } - - let offsets_bytes = block.offsets.as_ref(); - let offset_size = std::mem::size_of::(); - - // TODO: refactor this into a better public API for buffer. - fn read_offset(bytes: &[u8], stride: usize, index: usize) -> usize { - let start = index * stride; - let end = start + stride; - let chunk = &bytes[start..end]; - match stride { - 4 => { - let arr: [u8; 4] = chunk - .try_into() - .expect("invalid 32-bit offset slice length"); - u32::from_le_bytes(arr) as usize - } - 8 => { - let arr: [u8; 8] = chunk - .try_into() - .expect("invalid 64-bit offset slice length"); - u64::from_le_bytes(arr) as usize - } - _ => unreachable!("unsupported offset width"), - } - } - - let start_offset = read_offset(offsets_bytes, offset_size, selection_start); - let end_offset = read_offset(offsets_bytes, offset_size, selection_end); + let start_offset = offsets[selection.start as usize]; + let end_offset = offsets[selection.end as usize]; + let mut previous_len = self.bytes.len(); - let bytes_to_copy = end_offset - start_offset; - self.bytes.reserve(bytes_to_copy); - self.offsets.reserve(selection_end - selection_start); - - let previous_len = self.bytes.len(); self.bytes - .extend_from_slice(&block.data[start_offset..end_offset]); - - let mut running_len = previous_len; - let mut prev_offset = start_offset; - for idx in selection_start + 1..=selection_end { - let next_offset = read_offset(offsets_bytes, offset_size, idx); - running_len += next_offset - prev_offset; - self.offsets - .push(T::from_usize(running_len).expect("offset overflow")); - prev_offset = next_offset; - } + .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]); + + self.offsets.extend( + offsets[selection.start as usize..selection.end as usize] + .iter() + .zip(&offsets[selection.start as usize + 1..=selection.end as usize]) + .map(|(¤t, &next)| { + let this_value_len = next - current; + previous_len += this_value_len.as_usize(); + T::from_usize(previous_len).unwrap() + }), + ); } fn finish(self: Box) -> DataBlock { From 98d6c07bd5d8efc190c3314f1631c6cd1aa467c6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 8 Nov 2025 12:47:28 +0800 Subject: [PATCH 7/8] Fix buffer Signed-off-by: Xuanwo --- rust/lance-encoding/src/buffer.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index 6912ffe8c4a..d9d32bb0794 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -5,7 +5,7 @@ use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc}; -use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer}; +use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; use lance_core::{utils::bit::is_pwr_two, Error, Result}; use snafu::location; use std::borrow::Cow; @@ -109,13 +109,6 @@ impl LanceBuffer { } } - /// Convert a buffer into a bytes::Bytes object - /// - /// This convert is zero cost. - pub fn into_bytes(self) -> bytes::Bytes { - self.0.into_vec::().unwrap().into() - } - /// Make an owned copy of the buffer (always does a copy of the data) pub fn deep_copy(&self) -> Self { Self(Buffer::from_vec(self.0.to_vec())) @@ -167,7 +160,11 @@ impl LanceBuffer { if is_aligned { ScalarBuffer::::from(self.clone().into_buffer()) } else { - ScalarBuffer::::from(self.deep_copy().into_buffer()) + let num_values = self.len() / std::mem::size_of::(); + let vec = Vec::::with_capacity(num_values); + let mut bytes = MutableBuffer::from(vec); + bytes.extend_from_slice(self); + ScalarBuffer::::from(Buffer::from(bytes)) } } @@ -194,7 +191,7 @@ impl LanceBuffer { if self.as_ptr().align_offset(align) == 0 { Cow::Borrowed(bytemuck::cast_slice(&self.0)) } else { - Cow::Owned(bytemuck::cast_vec(self.0.to_vec())) + Cow::Owned(bytemuck::pod_collect_to_vec(self.0.as_slice())) } } From ed34990489c18b695fe5dfdb3063153372266431 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Nov 2025 17:49:53 +0800 Subject: [PATCH 8/8] Remove spawn in `wrap_with_row_id_and_delete` Signed-off-by: Xuanwo --- rust/lance-table/src/utils/stream.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/rust/lance-table/src/utils/stream.rs b/rust/lance-table/src/utils/stream.rs index 97aa3151cd9..e0ee403749c 100644 --- a/rust/lance-table/src/utils/stream.rs +++ b/rust/lance-table/src/utils/stream.rs @@ -17,7 +17,7 @@ use lance_core::{ ROW_LAST_UPDATED_AT_VERSION_FIELD, }; use lance_io::ReadBatchParams; -use tracing::{instrument, Instrument}; +use tracing::instrument; use crate::rowids::RowIdSequence; @@ -379,16 +379,12 @@ pub fn wrap_with_row_id_and_delete( let this_offset = offset; let num_rows = batch_task.num_rows; offset += num_rows; - let task = batch_task.task; - tokio::spawn( - async move { - let batch = task.await?; - apply_row_id_and_deletes(batch, this_offset, fragment_id, config.as_ref()) - } - .in_current_span(), - ) - .map(|join_wrapper| join_wrapper.unwrap()) - .boxed() + batch_task + .task + .map(move |batch| { + apply_row_id_and_deletes(batch?, this_offset, fragment_id, config.as_ref()) + }) + .boxed() }) .boxed() }