From 009902130596f38605ca6a6166696b6825cdb73f Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 16 Dec 2025 21:38:26 -0800 Subject: [PATCH 1/4] feat: track cumulative wall time in analyze plan This adds a cumulative_wall metric to execution stats. This is useful for more clearly understanding the execution times experienced by the user. Unlike existing timers, these include IO. When IO is performed across children in parallel, the computed time range is based on the collective minimum and maximum bounds of the children. --- rust/lance-datafusion/src/exec.rs | 82 +++++++++++++++++++- rust/lance/src/dataset/write/merge_insert.rs | 4 +- 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 1bac7466700..e7d117ee0d4 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -10,6 +10,8 @@ use std::{ time::Duration, }; +use chrono::{DateTime, Utc}; + use arrow_array::RecordBatch; use arrow_schema::Schema as ArrowSchema; use datafusion::{ @@ -26,6 +28,7 @@ use datafusion::{ analyze::AnalyzeExec, display::DisplayableExecutionPlan, execution_plan::{Boundedness, CardinalityEffect, EmissionType}, + metrics::MetricValue, stream::RecordBatchStreamAdapter, streaming::PartitionStream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, @@ -567,22 +570,87 @@ pub fn format_plan(plan: Arc) -> String { struct CalculateVisitor { highest_index: usize, index_to_cumulative_cpu: HashMap, + index_to_cumulative_wall: HashMap, + } + + /// Result of calculating metrics for a subtree + struct SubtreeMetrics { + cumulative_cpu: usize, + min_start: Option>, + max_end: Option>, } + impl CalculateVisitor { - fn calculate_cumulative_cpu(&mut self, plan: &Arc) -> usize { + fn calculate_metrics(&mut self, plan: &Arc) -> SubtreeMetrics { self.highest_index += 1; let plan_index = self.highest_index; + + // Get CPU time for this node let elapsed_cpu: usize = match plan.metrics() { Some(metrics) => metrics.elapsed_compute().unwrap_or_default(), None => 0, }; + + // Get timestamps for this node + let (mut min_start, mut max_end) = Self::node_timerange(plan); + + // Accumulate from children let mut cumulative_cpu = elapsed_cpu; for child in plan.children() { - cumulative_cpu += self.calculate_cumulative_cpu(child); + let child_metrics = self.calculate_metrics(child); + cumulative_cpu += child_metrics.cumulative_cpu; + min_start = Self::min_option(min_start, child_metrics.min_start); + max_end = Self::max_option(max_end, child_metrics.max_end); } + + // Calculate wall clock duration for this subtree + let wall_duration = match (min_start, max_end) { + (Some(start), Some(end)) => (end - start).to_std().unwrap_or_default(), + _ => Duration::ZERO, + }; + self.index_to_cumulative_cpu .insert(plan_index, cumulative_cpu); - cumulative_cpu + self.index_to_cumulative_wall + .insert(plan_index, wall_duration); + + SubtreeMetrics { + cumulative_cpu, + min_start, + max_end, + } + } + + fn node_timerange( + plan: &Arc, + ) -> (Option>, Option>) { + let metrics = match plan.metrics() { + Some(m) => m, + None => return (None, None), + }; + let min_start = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::StartTimestamp(ts) => ts.value(), + _ => None, + }) + .min(); + let max_end = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::EndTimestamp(ts) => ts.value(), + _ => None, + }) + .max(); + (min_start, max_end) + } + + fn min_option(a: Option>, b: Option>) -> Option> { + [a, b].into_iter().flatten().min() + } + + fn max_option(a: Option>, b: Option>) -> Option> { + [a, b].into_iter().flatten().max() } } @@ -617,6 +685,11 @@ pub fn format_plan(plan: Arc) -> String { .unwrap(); let cumulative_cpu_duration = Duration::from_nanos((*cumulative_cpu) as u64); write!(f, ", cumulative_cpu={cumulative_cpu_duration:?}")?; + let cumulative_wall = calcs + .index_to_cumulative_wall + .get(&self.highest_index) + .unwrap(); + write!(f, ", cumulative_wall={cumulative_wall:?}")?; writeln!(f)?; self.indent += 1; for child in plan.children() { @@ -635,8 +708,9 @@ pub fn format_plan(plan: Arc) -> String { let mut calcs = CalculateVisitor { highest_index: 0, index_to_cumulative_cpu: HashMap::new(), + index_to_cumulative_wall: HashMap::new(), }; - calcs.calculate_cumulative_cpu(&self.plan); + calcs.calculate_metrics(&self.plan); let mut prints = PrintVisitor { highest_index: 0, indent: 0, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index cd50f01226b..2f6f15291c8 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -4312,9 +4312,9 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n ); // Also validate the full string structure with pattern matching - let expected_pattern = "[...MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_not_matched_by_source=Keep, metrics=...bytes_written=...num_deleted_rows=0, num_files_written=...num_inserted_rows=1, num_updated_rows=1], cumulative_cpu=... + let expected_pattern = "[...MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_not_matched_by_source=Keep, metrics=...bytes_written=...num_deleted_rows=0, num_files_written=...num_inserted_rows=1, num_updated_rows=1], cumulative_cpu=..., cumulative_wall=... ... - StreamingTableExec: partition_sizes=1, projection=[id, name], metrics=[], cumulative_cpu=...]"; + StreamingTableExec: partition_sizes=1, projection=[id, name], metrics=[], cumulative_cpu=..., cumulative_wall=...]"; assert_string_matches(&analysis, expected_pattern).unwrap(); assert!(analysis.contains("bytes_written")); assert!(analysis.contains("num_files_written")); From b1dd51c7431048de7514454908609e2efc427029 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 17 Dec 2025 05:44:10 -0800 Subject: [PATCH 2/4] remove cumulative_cpu, reorder formatting --- python/python/tests/test_dataset.py | 4 +- rust/lance-datafusion/src/exec.rs | 69 ++++++++------------ rust/lance/src/dataset/write/merge_insert.rs | 4 +- 3 files changed, 33 insertions(+), 44 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9ddb6db6881..2e3d8250837 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1048,7 +1048,9 @@ def test_analyze_vector_search(tmp_path: Path): plan = dataset.scanner( nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]} ).analyze_plan() - assert "KNNVectorDistance: metric=l2, metrics=[output_rows=10" in plan + assert "KNNVectorDistance:" in plan + assert "metric=l2" in plan + assert "output_rows=10" in plan def test_get_fragments(tmp_path: Path): diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index e7d117ee0d4..d292ed174f5 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -569,13 +569,11 @@ pub fn format_plan(plan: Arc) -> String { /// A visitor which calculates additional metrics for all the plans. struct CalculateVisitor { highest_index: usize, - index_to_cumulative_cpu: HashMap, - index_to_cumulative_wall: HashMap, + index_to_elapsed: HashMap, } /// Result of calculating metrics for a subtree struct SubtreeMetrics { - cumulative_cpu: usize, min_start: Option>, max_end: Option>, } @@ -585,48 +583,34 @@ pub fn format_plan(plan: Arc) -> String { self.highest_index += 1; let plan_index = self.highest_index; - // Get CPU time for this node - let elapsed_cpu: usize = match plan.metrics() { - Some(metrics) => metrics.elapsed_compute().unwrap_or_default(), - None => 0, - }; - // Get timestamps for this node let (mut min_start, mut max_end) = Self::node_timerange(plan); // Accumulate from children - let mut cumulative_cpu = elapsed_cpu; for child in plan.children() { let child_metrics = self.calculate_metrics(child); - cumulative_cpu += child_metrics.cumulative_cpu; min_start = Self::min_option(min_start, child_metrics.min_start); max_end = Self::max_option(max_end, child_metrics.max_end); } - // Calculate wall clock duration for this subtree - let wall_duration = match (min_start, max_end) { - (Some(start), Some(end)) => (end - start).to_std().unwrap_or_default(), - _ => Duration::ZERO, + // Calculate wall clock duration for this subtree (only if we have timestamps) + let elapsed = match (min_start, max_end) { + (Some(start), Some(end)) => Some((end - start).to_std().unwrap_or_default()), + _ => None, }; - self.index_to_cumulative_cpu - .insert(plan_index, cumulative_cpu); - self.index_to_cumulative_wall - .insert(plan_index, wall_duration); - - SubtreeMetrics { - cumulative_cpu, - min_start, - max_end, + if let Some(e) = elapsed { + self.index_to_elapsed.insert(plan_index, e); } + + SubtreeMetrics { min_start, max_end } } fn node_timerange( plan: &Arc, ) -> (Option>, Option>) { - let metrics = match plan.metrics() { - Some(m) => m, - None => return (None, None), + let Some(metrics) = plan.metrics() else { + return (None, None); }; let min_start = metrics .iter() @@ -668,7 +652,22 @@ pub fn format_plan(plan: Arc) -> String { ) -> std::fmt::Result { self.highest_index += 1; write!(f, "{:indent$}", "", indent = self.indent * 2)?; - plan.fmt_as(datafusion::physical_plan::DisplayFormatType::Verbose, f)?; + + // Format the plan description + let displayable = + datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref()); + let plan_str = displayable.one_line().to_string(); + let plan_str = plan_str.trim(); + + // Write operator with elapsed time inserted after the name + match calcs.index_to_elapsed.get(&self.highest_index) { + Some(elapsed) => match plan_str.find(": ") { + Some(i) => write!(f, "{}: elapsed={elapsed:?}, {}", &plan_str[..i], &plan_str[i + 2..])?, + None => write!(f, "{plan_str}, elapsed={elapsed:?}")?, + }, + None => write!(f, "{plan_str}")?, + } + if let Some(metrics) = plan.metrics() { let metrics = metrics .aggregate_by_name() @@ -679,17 +678,6 @@ pub fn format_plan(plan: Arc) -> String { } else { write!(f, ", metrics=[]")?; } - let cumulative_cpu = calcs - .index_to_cumulative_cpu - .get(&self.highest_index) - .unwrap(); - let cumulative_cpu_duration = Duration::from_nanos((*cumulative_cpu) as u64); - write!(f, ", cumulative_cpu={cumulative_cpu_duration:?}")?; - let cumulative_wall = calcs - .index_to_cumulative_wall - .get(&self.highest_index) - .unwrap(); - write!(f, ", cumulative_wall={cumulative_wall:?}")?; writeln!(f)?; self.indent += 1; for child in plan.children() { @@ -707,8 +695,7 @@ pub fn format_plan(plan: Arc) -> String { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { let mut calcs = CalculateVisitor { highest_index: 0, - index_to_cumulative_cpu: HashMap::new(), - index_to_cumulative_wall: HashMap::new(), + index_to_elapsed: HashMap::new(), }; calcs.calculate_metrics(&self.plan); let mut prints = PrintVisitor { diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 2f6f15291c8..45df4e3581e 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -4312,9 +4312,9 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n ); // Also validate the full string structure with pattern matching - let expected_pattern = "[...MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_not_matched_by_source=Keep, metrics=...bytes_written=...num_deleted_rows=0, num_files_written=...num_inserted_rows=1, num_updated_rows=1], cumulative_cpu=..., cumulative_wall=... + let expected_pattern = "[...MergeInsert: elapsed=..., on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_not_matched_by_source=Keep, metrics=...bytes_written=...num_deleted_rows=0, num_files_written=...num_inserted_rows=1, num_updated_rows=1] ... - StreamingTableExec: partition_sizes=1, projection=[id, name], metrics=[], cumulative_cpu=..., cumulative_wall=...]"; + StreamingTableExec: partition_sizes=1, projection=[id, name], metrics=[]...]"; assert_string_matches(&analysis, expected_pattern).unwrap(); assert!(analysis.contains("bytes_written")); assert!(analysis.contains("num_files_written")); From 91c1e0f11ff2d92ddce0ec8b30a882ff84361e7e Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 17 Dec 2025 06:56:11 -0800 Subject: [PATCH 3/4] lint --- rust/lance-datafusion/src/exec.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index d292ed174f5..513b195010d 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -662,7 +662,12 @@ pub fn format_plan(plan: Arc) -> String { // Write operator with elapsed time inserted after the name match calcs.index_to_elapsed.get(&self.highest_index) { Some(elapsed) => match plan_str.find(": ") { - Some(i) => write!(f, "{}: elapsed={elapsed:?}, {}", &plan_str[..i], &plan_str[i + 2..])?, + Some(i) => write!( + f, + "{}: elapsed={elapsed:?}, {}", + &plan_str[..i], + &plan_str[i + 2..] + )?, None => write!(f, "{plan_str}, elapsed={elapsed:?}")?, }, None => write!(f, "{plan_str}")?, From 0b37c1fa0c026522696f25b5b5a95a326c869694 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 17 Dec 2025 07:14:26 -0800 Subject: [PATCH 4/4] Fix doctest --- python/python/lance/dataset.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 99f44f9c6ce..048e231d76b 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -370,16 +370,16 @@ def analyze_plan( >>> builder = builder.when_matched_update_all().when_not_matched_insert_all() >>> analysis = builder.analyze_plan(new_data) >>> print(analysis) # doctest: +ELLIPSIS - MergeInsert: on=[id], ..., metrics=[..., bytes_written=..., ...], cumulative_cpu=... - CoalescePartitionsExec, metrics=[output_rows=..., elapsed_compute=...], cumulative_cpu=... - ProjectionExec: expr=[_rowid@1 as _rowid, ...], metrics=[...], cumulative_cpu=... - ProjectionExec: expr=[id@2 IS NOT NULL as __common_expr_1, ...], metrics=[...], cumulative_cpu=... - CoalesceBatchesExec: ..., metrics=[...], cumulative_cpu=... - HashJoinExec: mode=CollectLeft, join_type=Right, ... - CooperativeExec, metrics=[], cumulative_cpu=... - LanceRead: ..., metrics=[..., bytes_read=..., ...], cumulative_cpu=... + MergeInsert: elapsed=..., on=[id], ..., metrics=[..., bytes_written=..., ...] + CoalescePartitionsExec, elapsed=..., metrics=[output_rows=..., elapsed_compute=...] + ProjectionExec: elapsed=..., expr=[_rowid@1 as _rowid, ...], metrics=[...] + ProjectionExec: elapsed=..., expr=[id@2 IS NOT NULL as __common_expr_1, ...], metrics=[...] + CoalesceBatchesExec: elapsed=..., ..., metrics=[...] + HashJoinExec: elapsed=..., mode=CollectLeft, join_type=Right, ... + CooperativeExec, elapsed=..., metrics=[] + LanceRead: elapsed=..., ..., metrics=[..., bytes_read=..., ...] RepartitionExec: ... - StreamingTableExec: ..., metrics=[], ... + StreamingTableExec: ..., metrics=[] The two key parts of the plan analysis are LanceRead and MergeInsert. LanceRead scans join keys and columns in conditions. MergeInsert writes