diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 99f44f9c6ce..048e231d76b 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -370,16 +370,16 @@ def analyze_plan( >>> builder = builder.when_matched_update_all().when_not_matched_insert_all() >>> analysis = builder.analyze_plan(new_data) >>> print(analysis) # doctest: +ELLIPSIS - MergeInsert: on=[id], ..., metrics=[..., bytes_written=..., ...], cumulative_cpu=... - CoalescePartitionsExec, metrics=[output_rows=..., elapsed_compute=...], cumulative_cpu=... - ProjectionExec: expr=[_rowid@1 as _rowid, ...], metrics=[...], cumulative_cpu=... - ProjectionExec: expr=[id@2 IS NOT NULL as __common_expr_1, ...], metrics=[...], cumulative_cpu=... - CoalesceBatchesExec: ..., metrics=[...], cumulative_cpu=... - HashJoinExec: mode=CollectLeft, join_type=Right, ... - CooperativeExec, metrics=[], cumulative_cpu=... - LanceRead: ..., metrics=[..., bytes_read=..., ...], cumulative_cpu=... + MergeInsert: elapsed=..., on=[id], ..., metrics=[..., bytes_written=..., ...] + CoalescePartitionsExec, elapsed=..., metrics=[output_rows=..., elapsed_compute=...] + ProjectionExec: elapsed=..., expr=[_rowid@1 as _rowid, ...], metrics=[...] + ProjectionExec: elapsed=..., expr=[id@2 IS NOT NULL as __common_expr_1, ...], metrics=[...] + CoalesceBatchesExec: elapsed=..., ..., metrics=[...] + HashJoinExec: elapsed=..., mode=CollectLeft, join_type=Right, ... + CooperativeExec, elapsed=..., metrics=[] + LanceRead: elapsed=..., ..., metrics=[..., bytes_read=..., ...] RepartitionExec: ... - StreamingTableExec: ..., metrics=[], ... + StreamingTableExec: ..., metrics=[] The two key parts of the plan analysis are LanceRead and MergeInsert. LanceRead scans join keys and columns in conditions. MergeInsert writes diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9ddb6db6881..2e3d8250837 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1048,7 +1048,9 @@ def test_analyze_vector_search(tmp_path: Path): plan = dataset.scanner( nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]} ).analyze_plan() - assert "KNNVectorDistance: metric=l2, metrics=[output_rows=10" in plan + assert "KNNVectorDistance:" in plan + assert "metric=l2" in plan + assert "output_rows=10" in plan def test_get_fragments(tmp_path: Path): diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 1bac7466700..513b195010d 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -10,6 +10,8 @@ use std::{ time::Duration, }; +use chrono::{DateTime, Utc}; + use arrow_array::RecordBatch; use arrow_schema::Schema as ArrowSchema; use datafusion::{ @@ -26,6 +28,7 @@ use datafusion::{ analyze::AnalyzeExec, display::DisplayableExecutionPlan, execution_plan::{Boundedness, CardinalityEffect, EmissionType}, + metrics::MetricValue, stream::RecordBatchStreamAdapter, streaming::PartitionStream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, @@ -566,23 +569,72 @@ pub fn format_plan(plan: Arc) -> String { /// A visitor which calculates additional metrics for all the plans. struct CalculateVisitor { highest_index: usize, - index_to_cumulative_cpu: HashMap, + index_to_elapsed: HashMap, + } + + /// Result of calculating metrics for a subtree + struct SubtreeMetrics { + min_start: Option>, + max_end: Option>, } + impl CalculateVisitor { - fn calculate_cumulative_cpu(&mut self, plan: &Arc) -> usize { + fn calculate_metrics(&mut self, plan: &Arc) -> SubtreeMetrics { self.highest_index += 1; let plan_index = self.highest_index; - let elapsed_cpu: usize = match plan.metrics() { - Some(metrics) => metrics.elapsed_compute().unwrap_or_default(), - None => 0, - }; - let mut cumulative_cpu = elapsed_cpu; + + // Get timestamps for this node + let (mut min_start, mut max_end) = Self::node_timerange(plan); + + // Accumulate from children for child in plan.children() { - cumulative_cpu += self.calculate_cumulative_cpu(child); + let child_metrics = self.calculate_metrics(child); + min_start = Self::min_option(min_start, child_metrics.min_start); + max_end = Self::max_option(max_end, child_metrics.max_end); } - self.index_to_cumulative_cpu - .insert(plan_index, cumulative_cpu); - cumulative_cpu + + // Calculate wall clock duration for this subtree (only if we have timestamps) + let elapsed = match (min_start, max_end) { + (Some(start), Some(end)) => Some((end - start).to_std().unwrap_or_default()), + _ => None, + }; + + if let Some(e) = elapsed { + self.index_to_elapsed.insert(plan_index, e); + } + + SubtreeMetrics { min_start, max_end } + } + + fn node_timerange( + plan: &Arc, + ) -> (Option>, Option>) { + let Some(metrics) = plan.metrics() else { + return (None, None); + }; + let min_start = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::StartTimestamp(ts) => ts.value(), + _ => None, + }) + .min(); + let max_end = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::EndTimestamp(ts) => ts.value(), + _ => None, + }) + .max(); + (min_start, max_end) + } + + fn min_option(a: Option>, b: Option>) -> Option> { + [a, b].into_iter().flatten().min() + } + + fn max_option(a: Option>, b: Option>) -> Option> { + [a, b].into_iter().flatten().max() } } @@ -600,7 +652,27 @@ pub fn format_plan(plan: Arc) -> String { ) -> std::fmt::Result { self.highest_index += 1; write!(f, "{:indent$}", "", indent = self.indent * 2)?; - plan.fmt_as(datafusion::physical_plan::DisplayFormatType::Verbose, f)?; + + // Format the plan description + let displayable = + datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref()); + let plan_str = displayable.one_line().to_string(); + let plan_str = plan_str.trim(); + + // Write operator with elapsed time inserted after the name + match calcs.index_to_elapsed.get(&self.highest_index) { + Some(elapsed) => match plan_str.find(": ") { + Some(i) => write!( + f, + "{}: elapsed={elapsed:?}, {}", + &plan_str[..i], + &plan_str[i + 2..] + )?, + None => write!(f, "{plan_str}, elapsed={elapsed:?}")?, + }, + None => write!(f, "{plan_str}")?, + } + if let Some(metrics) = plan.metrics() { let metrics = metrics .aggregate_by_name() @@ -611,12 +683,6 @@ pub fn format_plan(plan: Arc) -> String { } else { write!(f, ", metrics=[]")?; } - let cumulative_cpu = calcs - .index_to_cumulative_cpu - .get(&self.highest_index) - .unwrap(); - let cumulative_cpu_duration = Duration::from_nanos((*cumulative_cpu) as u64); - write!(f, ", cumulative_cpu={cumulative_cpu_duration:?}")?; writeln!(f)?; self.indent += 1; for child in plan.children() { @@ -634,9 +700,9 @@ pub fn format_plan(plan: Arc) -> String { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { let mut calcs = CalculateVisitor { highest_index: 0, - index_to_cumulative_cpu: HashMap::new(), + index_to_elapsed: HashMap::new(), }; - calcs.calculate_cumulative_cpu(&self.plan); + calcs.calculate_metrics(&self.plan); let mut prints = PrintVisitor { highest_index: 0, indent: 0, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index cd50f01226b..45df4e3581e 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -4312,9 +4312,9 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n ); // Also validate the full string structure with pattern matching - let expected_pattern = "[...MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_not_matched_by_source=Keep, metrics=...bytes_written=...num_deleted_rows=0, num_files_written=...num_inserted_rows=1, num_updated_rows=1], cumulative_cpu=... + let expected_pattern = "[...MergeInsert: elapsed=..., on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_not_matched_by_source=Keep, metrics=...bytes_written=...num_deleted_rows=0, num_files_written=...num_inserted_rows=1, num_updated_rows=1] ... - StreamingTableExec: partition_sizes=1, projection=[id, name], metrics=[], cumulative_cpu=...]"; + StreamingTableExec: partition_sizes=1, projection=[id, name], metrics=[]...]"; assert_string_matches(&analysis, expected_pattern).unwrap(); assert!(analysis.contains("bytes_written")); assert!(analysis.contains("num_files_written"));