From 914171542a51eadca1bfba5cc76c053584372424 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 12 Mar 2026 17:58:34 +0800 Subject: [PATCH 1/3] Add distributed vector merge benchmark --- rust/lance-datafusion/src/utils.rs | 1 + rust/lance/Cargo.toml | 4 + .../lance/benches/distributed_vector_build.rs | 451 ++++++++++++++++++ rust/lance/src/io/exec/knn.rs | 28 +- 4 files changed, 482 insertions(+), 2 deletions(-) create mode 100644 rust/lance/benches/distributed_vector_build.rs diff --git a/rust/lance-datafusion/src/utils.rs b/rust/lance-datafusion/src/utils.rs index f6f28f766f0..c40232bb3d5 100644 --- a/rust/lance-datafusion/src/utils.rs +++ b/rust/lance-datafusion/src/utils.rs @@ -242,5 +242,6 @@ pub const ROWS_SCANNED_METRIC: &str = "rows_scanned"; pub const TASK_WAIT_TIME_METRIC: &str = "task_wait_time"; pub const DELTAS_SEARCHED_METRIC: &str = "deltas_searched"; pub const PARTITIONS_SEARCHED_METRIC: &str = "partitions_searched"; +pub const FIND_PARTITIONS_CALLS_METRIC: &str = "find_partitions_calls"; pub const SCALAR_INDEX_SEARCH_TIME_METRIC: &str = "search_time"; pub const SCALAR_INDEX_SER_TIME_METRIC: &str = "ser_time"; diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 02fb37d2cc0..cf4118820e6 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -182,6 +182,10 @@ harness = false name = "vector_throughput" harness = false +[[bench]] +name = "distributed_vector_build" +harness = false + [[bench]] name = "mem_wal_write" harness = false diff --git a/rust/lance/benches/distributed_vector_build.rs b/rust/lance/benches/distributed_vector_build.rs new file mode 100644 index 00000000000..179810648d5 --- /dev/null +++ b/rust/lance/benches/distributed_vector_build.rs @@ -0,0 +1,451 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use arrow_array::{ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchIterator}; +use arrow_array::{cast::AsArray, types::Float32Type}; +use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema}; +use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; +use serde::Serialize; +use uuid::Uuid; + +use lance::dataset::{Dataset, WriteMode, WriteParams}; +use lance::index::vector::{VectorIndexParams, ivf::finalize_distributed_merge}; +use lance_arrow::FixedSizeListArrayExt; +use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; +use lance_index::{ + DatasetIndexExt, IndexType, + vector::{ivf::IvfBuildParams, pq::PQBuildParams}, +}; +use lance_linalg::distance::DistanceType; +use lance_testing::datagen::generate_random_array; +use tokio::runtime::Runtime; + +const NUM_FRAGMENTS: usize = 128; +const ROWS_PER_FRAGMENT: usize = 1024; +const DIM: i32 = 128; +const NUM_SUB_VECTORS: usize = 16; +const NUM_BITS: usize = 8; +const MAX_ITERS: usize = 20; +const SAMPLE_RATE: usize = 8; + +#[derive(Clone, Copy, Debug)] +struct BenchCase { + num_shards: usize, + num_partitions: usize, +} + +impl BenchCase { + fn label(&self) -> String { + format!( + "pq_shards_{}_partitions_{}", + self.num_shards, self.num_partitions + ) + } +} + +#[derive(Clone, Debug)] +struct MergeFixture { + index_dir: PathBuf, + partial_aux_bytes: u64, + partial_dir_count: usize, +} + +#[derive(Debug, Serialize)] +struct CaseMetadata { + label: String, + num_shards: usize, + num_partitions: usize, + partial_dir_count: usize, + partial_aux_bytes: u64, + partial_aux_bytes_per_shard: u64, + total_rows: usize, + rows_per_shard: usize, +} + +fn dataset_root() -> PathBuf { + std::env::temp_dir().join(format!( + "lance_bench_distributed_build_{}_{}_{}", + NUM_FRAGMENTS, ROWS_PER_FRAGMENT, DIM + )) +} + +fn dataset_uri() -> String { + format!("file://{}", dataset_root().display()) +} + +fn workspace_root() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .ancestors() + .nth(2) + .unwrap() + .to_path_buf() +} + +fn criterion_group_root() -> PathBuf { + workspace_root() + .join("target") + .join("criterion") + .join("distributed_merge_only_ivf_pq") +} + +fn bench_cases() -> [BenchCase; 6] { + [ + BenchCase { + num_shards: 8, + num_partitions: 256, + }, + BenchCase { + num_shards: 32, + num_partitions: 256, + }, + BenchCase { + num_shards: 128, + num_partitions: 256, + }, + BenchCase { + num_shards: 8, + num_partitions: 1024, + }, + BenchCase { + num_shards: 32, + num_partitions: 1024, + }, + BenchCase { + num_shards: 128, + num_partitions: 1024, + }, + ] +} + +fn fixture_uuid(bench_case: BenchCase) -> Uuid { + Uuid::from_u128( + 0x733a_0000_0000_0000_0000_0000_0000_0000 + | ((bench_case.num_shards as u128) << 64) + | bench_case.num_partitions as u128, + ) +} + +fn working_uuid(bench_case: BenchCase) -> Uuid { + Uuid::from_u128( + 0x733b_0000_0000_0000_0000_0000_0000_0000 + | ((bench_case.num_shards as u128) << 64) + | bench_case.num_partitions as u128, + ) +} + +fn create_batches() -> (Arc, Vec) { + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "vector", + DataType::FixedSizeList( + FieldRef::new(Field::new("item", DataType::Float32, true)), + DIM, + ), + false, + )])); + + let batches = (0..NUM_FRAGMENTS) + .map(|_| { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new( + FixedSizeListArray::try_new_from_values( + generate_random_array(ROWS_PER_FRAGMENT * DIM as usize), + DIM, + ) + .unwrap(), + )], + ) + .unwrap() + }) + .collect::>(); + + (schema, batches) +} + +async fn create_or_open_dataset() -> Dataset { + let uri = dataset_uri(); + if let Ok(dataset) = Dataset::open(&uri).await + && dataset.get_fragments().len() == NUM_FRAGMENTS + { + return dataset; + } + + let dataset_path = dataset_root(); + if dataset_path.exists() { + fs::remove_dir_all(&dataset_path).unwrap(); + } + + let (schema, batches) = create_batches(); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + let write_params = WriteParams { + max_rows_per_file: ROWS_PER_FRAGMENT, + max_rows_per_group: ROWS_PER_FRAGMENT, + mode: WriteMode::Overwrite, + ..Default::default() + }; + + let dataset = Dataset::write(reader, &uri, Some(write_params)) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), NUM_FRAGMENTS); + dataset +} + +async fn train_shared_ivf_pq( + dataset: &Dataset, + num_partitions: usize, +) -> (IvfBuildParams, PQBuildParams) { + let batch = dataset + .scan() + .project(&["vector".to_string()]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = batch.column_by_name("vector").unwrap().as_fixed_size_list(); + let dim = vectors.value_length() as usize; + let values = vectors.values().as_primitive::(); + + let kmeans = train_kmeans::( + values, + KMeansParams::new(None, MAX_ITERS as u32, 1, DistanceType::L2), + dim, + num_partitions, + SAMPLE_RATE, + ) + .unwrap(); + + let centroids = Arc::new( + FixedSizeListArray::try_new_from_values( + kmeans.centroids.as_primitive::().clone(), + dim as i32, + ) + .unwrap(), + ); + let mut ivf_params = IvfBuildParams::try_with_centroids(num_partitions, centroids).unwrap(); + ivf_params.max_iters = MAX_ITERS; + ivf_params.sample_rate = SAMPLE_RATE; + + let mut pq_train_params = PQBuildParams::new(NUM_SUB_VECTORS, NUM_BITS); + pq_train_params.max_iters = MAX_ITERS; + pq_train_params.sample_rate = SAMPLE_RATE; + + let pq = pq_train_params.build(vectors, DistanceType::L2).unwrap(); + let codebook: ArrayRef = Arc::new(pq.codebook.values().as_primitive::().clone()); + + let mut pq_params = PQBuildParams::with_codebook(NUM_SUB_VECTORS, NUM_BITS, codebook); + pq_params.max_iters = MAX_ITERS; + pq_params.sample_rate = SAMPLE_RATE; + + (ivf_params, pq_params) +} + +fn contiguous_fragment_groups(dataset: &Dataset, num_shards: usize) -> Vec> { + assert_eq!(NUM_FRAGMENTS % num_shards, 0); + let fragments = dataset.get_fragments(); + let group_size = fragments.len() / num_shards; + fragments + .chunks(group_size) + .map(|group| { + group + .iter() + .map(|frag| frag.id() as u32) + .collect::>() + }) + .collect() +} + +async fn build_partial_fixture(dataset: &mut Dataset, bench_case: BenchCase) -> MergeFixture { + let fixture_uuid = fixture_uuid(bench_case); + let index_dir = dataset_root() + .join("_indices") + .join(fixture_uuid.to_string()); + + if has_partial_dirs(&index_dir) { + return MergeFixture { + partial_aux_bytes: sum_partial_auxiliary_bytes(&index_dir), + partial_dir_count: count_partial_dirs(&index_dir), + index_dir, + }; + } + + if index_dir.exists() { + fs::remove_dir_all(&index_dir).unwrap(); + } + + let fragment_groups = contiguous_fragment_groups(dataset, bench_case.num_shards); + let (ivf_params, pq_params) = train_shared_ivf_pq(dataset, bench_case.num_partitions).await; + let params = VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params); + + for fragments in fragment_groups { + let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, ¶ms); + builder = builder + .name("distributed_merge_only".to_string()) + .fragments(fragments) + .index_uuid(fixture_uuid.to_string()); + builder.execute_uncommitted().await.unwrap(); + } + + MergeFixture { + partial_aux_bytes: sum_partial_auxiliary_bytes(&index_dir), + partial_dir_count: count_partial_dirs(&index_dir), + index_dir, + } +} + +fn has_partial_dirs(index_dir: &Path) -> bool { + fs::read_dir(index_dir) + .ok() + .into_iter() + .flatten() + .flatten() + .any(|entry| { + entry.file_type().map(|t| t.is_dir()).unwrap_or(false) + && entry.file_name().to_string_lossy().starts_with("partial_") + }) +} + +fn count_partial_dirs(index_dir: &Path) -> usize { + fs::read_dir(index_dir) + .unwrap() + .flatten() + .filter(|entry| { + entry.file_type().map(|t| t.is_dir()).unwrap_or(false) + && entry.file_name().to_string_lossy().starts_with("partial_") + }) + .count() +} + +fn sum_partial_auxiliary_bytes(index_dir: &Path) -> u64 { + fs::read_dir(index_dir) + .unwrap() + .flatten() + .filter(|entry| { + entry.file_type().map(|t| t.is_dir()).unwrap_or(false) + && entry.file_name().to_string_lossy().starts_with("partial_") + }) + .map(|entry| entry.path().join("auxiliary.idx")) + .filter_map(|path| fs::metadata(path).ok()) + .map(|metadata| metadata.len()) + .sum() +} + +fn copy_dir_recursive(source: &Path, target: &Path) { + fs::create_dir_all(target).unwrap(); + for entry in fs::read_dir(source).unwrap().flatten() { + let source_path = entry.path(); + let target_path = target.join(entry.file_name()); + let file_type = entry.file_type().unwrap(); + if file_type.is_dir() { + copy_dir_recursive(&source_path, &target_path); + } else { + fs::copy(&source_path, &target_path).unwrap(); + } + } +} + +fn prepare_iteration_target(source: &Path, target: &Path) { + if target.exists() { + fs::remove_dir_all(target).unwrap(); + } + copy_dir_recursive(source, target); +} + +fn write_case_metadata(fixtures: &[(BenchCase, MergeFixture)]) { + let output_dir = criterion_group_root(); + fs::create_dir_all(&output_dir).unwrap(); + let metadata = fixtures + .iter() + .map(|(bench_case, fixture)| CaseMetadata { + label: bench_case.label(), + num_shards: bench_case.num_shards, + num_partitions: bench_case.num_partitions, + partial_dir_count: fixture.partial_dir_count, + partial_aux_bytes: fixture.partial_aux_bytes, + partial_aux_bytes_per_shard: fixture.partial_aux_bytes + / fixture.partial_dir_count as u64, + total_rows: NUM_FRAGMENTS * ROWS_PER_FRAGMENT, + rows_per_shard: (NUM_FRAGMENTS * ROWS_PER_FRAGMENT) / bench_case.num_shards, + }) + .collect::>(); + let payload = serde_json::to_vec_pretty(&metadata).unwrap(); + fs::write(output_dir.join("case_metadata.json"), payload).unwrap(); +} + +fn bench_distributed_merge_only(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut dataset = rt.block_on(create_or_open_dataset()); + let mut fixtures = Vec::new(); + + for bench_case in bench_cases() { + fixtures.push(( + bench_case, + rt.block_on(build_partial_fixture(&mut dataset, bench_case)), + )); + } + write_case_metadata(&fixtures); + + let dataset = Arc::new(dataset); + let mut group = c.benchmark_group("distributed_merge_only_ivf_pq"); + group.sample_size(10); + + for (bench_case, fixture) in fixtures { + let target_uuid = working_uuid(bench_case); + let target_index_dir = dataset.indices_dir().child(target_uuid.to_string()); + let target_index_dir_fs = dataset_root() + .join("_indices") + .join(target_uuid.to_string()); + let source_index_dir_fs = fixture.index_dir.clone(); + + group.throughput(Throughput::Bytes(fixture.partial_aux_bytes)); + group.bench_with_input( + BenchmarkId::new("finalize_only", bench_case.label()), + &bench_case, + |b, _| { + let dataset = dataset.clone(); + let target_index_dir = target_index_dir.clone(); + let target_index_dir_fs = target_index_dir_fs.clone(); + let source_index_dir_fs = source_index_dir_fs.clone(); + b.iter_batched( + || prepare_iteration_target(&source_index_dir_fs, &target_index_dir_fs), + |_| { + rt.block_on(finalize_distributed_merge( + dataset.object_store(), + &target_index_dir, + Some(IndexType::IvfPq), + )) + .unwrap(); + }, + BatchSize::PerIteration, + ); + }, + ); + } + + group.finish(); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name = benches; + config = Criterion::default() + .significance_level(0.1) + .sample_size(10) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_distributed_merge_only +); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = bench_distributed_merge_only +); + +criterion_main!(benches); diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index ab7589ce1a8..f70521f3bbd 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -39,8 +39,8 @@ use lance_core::ROW_ID; use lance_core::utils::futures::FinallyStreamExt; use lance_core::{ROW_ID_FIELD, utils::tokio::get_num_compute_intensive_cpus}; use lance_datafusion::utils::{ - DELTAS_SEARCHED_METRIC, ExecutionPlanMetricsSetExt, PARTITIONS_RANKED_METRIC, - PARTITIONS_SEARCHED_METRIC, + DELTAS_SEARCHED_METRIC, ExecutionPlanMetricsSetExt, FIND_PARTITIONS_CALLS_METRIC, + PARTITIONS_RANKED_METRIC, PARTITIONS_SEARCHED_METRIC, }; use lance_index::prefilter::PreFilter; use lance_index::vector::{ @@ -68,6 +68,7 @@ pub struct AnnPartitionMetrics { index_metrics: IndexMetrics, partitions_ranked: Count, deltas_searched: Count, + find_partitions_calls: Count, baseline_metrics: BaselineMetrics, } @@ -77,6 +78,7 @@ impl AnnPartitionMetrics { index_metrics: IndexMetrics::new(metrics, partition), partitions_ranked: metrics.new_count(PARTITIONS_RANKED_METRIC, partition), deltas_searched: metrics.new_count(DELTAS_SEARCHED_METRIC, partition), + find_partitions_calls: metrics.new_count(FIND_PARTITIONS_CALLS_METRIC, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), } } @@ -508,6 +510,7 @@ impl ExecutionPlan for ANNIvfPartitionExec { }; metrics.partitions_ranked.add(index.total_partitions()); + metrics.find_partitions_calls.add(1); let (partitions, dist_q_c) = index.find_partitions(&query).map_err(|e| { DataFusionError::Execution(format!("Failed to find partitions: {}", e)) @@ -1352,6 +1355,7 @@ mod tests { use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; use lance_core::utils::tempfile::TempStrDir; use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; + use lance_datafusion::utils::FIND_PARTITIONS_CALLS_METRIC; use lance_datagen::{BatchCount, RowCount, array}; use lance_index::optimize::OptimizeOptions; use lance_index::vector::ivf::IvfBuildParams; @@ -1766,6 +1770,10 @@ mod tests { if get_num_compute_intensive_cpus() <= 32 { assert!(*stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap() < 100 * num_deltas); } + assert_eq!( + stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), + &(num_deltas) + ); } #[rstest] @@ -1802,6 +1810,10 @@ mod tests { stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap(), &(10 * num_deltas) ); + assert_eq!( + stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), + &(num_deltas) + ); } #[rstest] @@ -1841,6 +1853,10 @@ mod tests { stats.all_counts.get(PARTITIONS_RANKED_METRIC).unwrap(), &(100 * num_deltas) ); + assert_eq!( + stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), + &(num_deltas) + ); } } @@ -1876,6 +1892,10 @@ mod tests { stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap(), &(10 * num_deltas) ); + assert_eq!( + stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), + &(num_deltas) + ); assert_eq!(results.num_rows(), 20); // 15 of the results come from beyond the closest 10 partitions and these will have infinite @@ -1947,6 +1967,10 @@ mod tests { stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap(), &(100 * num_deltas) ); + assert_eq!( + stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), + &(num_deltas) + ); assert_eq!(results.num_rows(), 10000); } } From d2fd23569bf19563550d7ad69c0202fdfe459fbc Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 26 Mar 2026 03:45:18 +0800 Subject: [PATCH 2/3] perf: measure find_partitions elapsed time --- rust/lance-datafusion/src/exec.rs | 10 ++++++ rust/lance-datafusion/src/utils.rs | 10 +++++- rust/lance/src/io/exec/knn.rs | 56 ++++++++++++++---------------- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 5eb83b8c19e..b3a98275853 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -489,6 +489,9 @@ pub struct ExecutionSummaryCounts { /// Additional metrics for more detailed statistics. These are subject to change in the future /// and should only be used for debugging purposes. pub all_counts: HashMap, + /// Additional time metrics for more detailed statistics, stored in nanoseconds. + /// These are subject to change in the future and should only be used for debugging purposes. + pub all_times: HashMap, } pub fn collect_execution_metrics(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) { @@ -510,6 +513,13 @@ pub fn collect_execution_metrics(node: &dyn ExecutionPlan, counts: &mut Executio } } } + for (metric_name, time) in metrics.iter_times() { + let existing = counts + .all_times + .entry(metric_name.as_ref().to_string()) + .or_insert(0); + *existing += time.value(); + } // Include gauge-based I/O metrics (some nodes record I/O as gauges) for (metric_name, gauge) in metrics.iter_gauges() { match metric_name.as_ref() { diff --git a/rust/lance-datafusion/src/utils.rs b/rust/lance-datafusion/src/utils.rs index c40232bb3d5..3e1d2db2d79 100644 --- a/rust/lance-datafusion/src/utils.rs +++ b/rust/lance-datafusion/src/utils.rs @@ -155,6 +155,7 @@ pub fn reader_to_stream(batches: Box) -> SendableR pub trait MetricsExt { fn find_count(&self, name: &str) -> Option; fn iter_counts(&self) -> impl Iterator, &Count)>; + fn iter_times(&self) -> impl Iterator, &Time)>; fn iter_gauges(&self) -> impl Iterator, &Gauge)>; } @@ -179,6 +180,13 @@ impl MetricsExt for MetricsSet { }) } + fn iter_times(&self) -> impl Iterator, &Time)> { + self.iter().filter_map(|m| match m.value() { + MetricValue::Time { name, time } => Some((name, time)), + _ => None, + }) + } + fn iter_gauges(&self) -> impl Iterator, &Gauge)> { self.iter().filter_map(|m| match m.value() { MetricValue::Gauge { name, gauge } => Some((name, gauge)), @@ -242,6 +250,6 @@ pub const ROWS_SCANNED_METRIC: &str = "rows_scanned"; pub const TASK_WAIT_TIME_METRIC: &str = "task_wait_time"; pub const DELTAS_SEARCHED_METRIC: &str = "deltas_searched"; pub const PARTITIONS_SEARCHED_METRIC: &str = "partitions_searched"; -pub const FIND_PARTITIONS_CALLS_METRIC: &str = "find_partitions_calls"; +pub const FIND_PARTITIONS_ELAPSED_METRIC: &str = "find_partitions_elapsed"; pub const SCALAR_INDEX_SEARCH_TIME_METRIC: &str = "search_time"; pub const SCALAR_INDEX_SER_TIME_METRIC: &str = "ser_time"; diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index f70521f3bbd..ac512aeba67 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -32,14 +32,14 @@ use datafusion::{ physical_plan::metrics::MetricsSet, }; use datafusion_physical_expr::{Distribution, EquivalenceProperties}; -use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; +use datafusion_physical_plan::metrics::{BaselineMetrics, Count, Time}; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future, stream}; use itertools::Itertools; use lance_core::ROW_ID; use lance_core::utils::futures::FinallyStreamExt; use lance_core::{ROW_ID_FIELD, utils::tokio::get_num_compute_intensive_cpus}; use lance_datafusion::utils::{ - DELTAS_SEARCHED_METRIC, ExecutionPlanMetricsSetExt, FIND_PARTITIONS_CALLS_METRIC, + DELTAS_SEARCHED_METRIC, ExecutionPlanMetricsSetExt, FIND_PARTITIONS_ELAPSED_METRIC, PARTITIONS_RANKED_METRIC, PARTITIONS_SEARCHED_METRIC, }; use lance_index::prefilter::PreFilter; @@ -68,7 +68,7 @@ pub struct AnnPartitionMetrics { index_metrics: IndexMetrics, partitions_ranked: Count, deltas_searched: Count, - find_partitions_calls: Count, + find_partitions_elapsed: Time, baseline_metrics: BaselineMetrics, } @@ -78,7 +78,7 @@ impl AnnPartitionMetrics { index_metrics: IndexMetrics::new(metrics, partition), partitions_ranked: metrics.new_count(PARTITIONS_RANKED_METRIC, partition), deltas_searched: metrics.new_count(DELTAS_SEARCHED_METRIC, partition), - find_partitions_calls: metrics.new_count(FIND_PARTITIONS_CALLS_METRIC, partition), + find_partitions_elapsed: metrics.new_time(FIND_PARTITIONS_ELAPSED_METRIC, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), } } @@ -510,11 +510,13 @@ impl ExecutionPlan for ANNIvfPartitionExec { }; metrics.partitions_ranked.add(index.total_partitions()); - metrics.find_partitions_calls.add(1); - let (partitions, dist_q_c) = index.find_partitions(&query).map_err(|e| { - DataFusionError::Execution(format!("Failed to find partitions: {}", e)) - })?; + let (partitions, dist_q_c) = { + let _timer = metrics.find_partitions_elapsed.timer(); + index.find_partitions(&query).map_err(|e| { + DataFusionError::Execution(format!("Failed to find partitions: {}", e)) + })? + }; let mut part_list_builder = ListBuilder::new(UInt32Builder::new()) .with_field(Field::new("item", DataType::UInt32, false)); @@ -1355,7 +1357,7 @@ mod tests { use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; use lance_core::utils::tempfile::TempStrDir; use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; - use lance_datafusion::utils::FIND_PARTITIONS_CALLS_METRIC; + use lance_datafusion::utils::FIND_PARTITIONS_ELAPSED_METRIC; use lance_datagen::{BatchCount, RowCount, array}; use lance_index::optimize::OptimizeOptions; use lance_index::vector::ivf::IvfBuildParams; @@ -1735,6 +1737,17 @@ mod tests { } } + fn assert_find_partitions_elapsed_recorded(stats: &ExecutionSummaryCounts) { + assert!( + stats + .all_times + .get(FIND_PARTITIONS_ELAPSED_METRIC) + .copied() + .unwrap_or_default() + > 0 + ); + } + #[rstest] #[tokio::test] async fn test_no_max_nprobes(#[values(1, 20)] num_deltas: usize) { @@ -1770,10 +1783,7 @@ mod tests { if get_num_compute_intensive_cpus() <= 32 { assert!(*stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap() < 100 * num_deltas); } - assert_eq!( - stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), - &(num_deltas) - ); + assert_find_partitions_elapsed_recorded(&stats); } #[rstest] @@ -1810,10 +1820,7 @@ mod tests { stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap(), &(10 * num_deltas) ); - assert_eq!( - stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), - &(num_deltas) - ); + assert_find_partitions_elapsed_recorded(&stats); } #[rstest] @@ -1853,10 +1860,7 @@ mod tests { stats.all_counts.get(PARTITIONS_RANKED_METRIC).unwrap(), &(100 * num_deltas) ); - assert_eq!( - stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), - &(num_deltas) - ); + assert_find_partitions_elapsed_recorded(&stats); } } @@ -1892,10 +1896,7 @@ mod tests { stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap(), &(10 * num_deltas) ); - assert_eq!( - stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), - &(num_deltas) - ); + assert_find_partitions_elapsed_recorded(&stats); assert_eq!(results.num_rows(), 20); // 15 of the results come from beyond the closest 10 partitions and these will have infinite @@ -1967,10 +1968,7 @@ mod tests { stats.all_counts.get(PARTITIONS_SEARCHED_METRIC).unwrap(), &(100 * num_deltas) ); - assert_eq!( - stats.all_counts.get(FIND_PARTITIONS_CALLS_METRIC).unwrap(), - &(num_deltas) - ); + assert_find_partitions_elapsed_recorded(&stats); assert_eq!(results.num_rows(), 10000); } } From 04f3688ff454ee0a3bd8a83452fcb63ec8ea4c75 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 26 Mar 2026 16:36:51 +0800 Subject: [PATCH 3/3] bench: use public finalize API in distributed merge benchmark --- rust/lance/benches/distributed_vector_build.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/rust/lance/benches/distributed_vector_build.rs b/rust/lance/benches/distributed_vector_build.rs index 179810648d5..831ee8c8299 100644 --- a/rust/lance/benches/distributed_vector_build.rs +++ b/rust/lance/benches/distributed_vector_build.rs @@ -15,7 +15,7 @@ use serde::Serialize; use uuid::Uuid; use lance::dataset::{Dataset, WriteMode, WriteParams}; -use lance::index::vector::{VectorIndexParams, ivf::finalize_distributed_merge}; +use lance::index::vector::VectorIndexParams; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; use lance_index::{ @@ -397,7 +397,6 @@ fn bench_distributed_merge_only(c: &mut Criterion) { for (bench_case, fixture) in fixtures { let target_uuid = working_uuid(bench_case); - let target_index_dir = dataset.indices_dir().child(target_uuid.to_string()); let target_index_dir_fs = dataset_root() .join("_indices") .join(target_uuid.to_string()); @@ -409,16 +408,15 @@ fn bench_distributed_merge_only(c: &mut Criterion) { &bench_case, |b, _| { let dataset = dataset.clone(); - let target_index_dir = target_index_dir.clone(); let target_index_dir_fs = target_index_dir_fs.clone(); let source_index_dir_fs = source_index_dir_fs.clone(); b.iter_batched( || prepare_iteration_target(&source_index_dir_fs, &target_index_dir_fs), |_| { - rt.block_on(finalize_distributed_merge( - dataset.object_store(), - &target_index_dir, - Some(IndexType::IvfPq), + rt.block_on(dataset.merge_index_metadata( + &target_uuid.to_string(), + IndexType::IvfPq, + None, )) .unwrap(); },