diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 0d2cc899ebf6b..329edcb31d65f 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -97,8 +97,6 @@ impl ExecutionPlan for CoalescePartitionsExec { } async fn execute(&self, partition: usize) -> Result { - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - // CoalescePartitionsExec produces a single partition if 0 != partition { return Err(DataFusionError::Internal(format!( @@ -113,10 +111,16 @@ impl ExecutionPlan for CoalescePartitionsExec { "CoalescePartitionsExec requires at least one input partition".to_owned(), )), 1 => { - // bypass any threading if there is a single partition + // bypass any threading / metrics if there is a single partition self.input.execute(0).await } _ => { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + // record the (very) minimal work done so that + // elapsed_compute is not reported as 0 + let elapsed_compute = baseline_metrics.elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); + // use a stream that allows each sender to put in at // least one result in an attempt to maximize // parallelism. diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index f30cd575f997d..a2f5952b80909 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -23,13 +23,18 @@ use std::{any::Any, sync::Arc}; -use arrow::datatypes::SchemaRef; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use futures::StreamExt; use super::{ - ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::{error::Result, physical_plan::expressions}; +use crate::{ + error::Result, + physical_plan::{expressions, metrics::BaselineMetrics}, +}; use async_trait::async_trait; /// UNION ALL execution plan @@ -37,12 +42,17 @@ use async_trait::async_trait; pub struct UnionExec { /// Input execution plan inputs: Vec>, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl UnionExec { /// Create a new UnionExec pub fn new(inputs: Vec>) -> Self { - UnionExec { inputs } + UnionExec { + inputs, + metrics: ExecutionPlanMetricsSet::new(), + } } } @@ -82,11 +92,18 @@ impl ExecutionPlan for UnionExec { } async fn execute(&self, mut partition: usize) -> Result { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + // record the tiny amount of work done in this function so + // elapsed_compute is reported as non zero + let elapsed_compute = baseline_metrics.elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); // record on drop + // find partition to execute for input in self.inputs.iter() { // Calculate whether partition belongs to the current partition if partition < input.output_partitioning().partition_count() { - return input.execute(partition).await; + let stream = input.execute(partition).await?; + return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); } else { partition -= input.output_partitioning().partition_count(); } @@ -110,6 +127,10 @@ impl ExecutionPlan for UnionExec { } } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Statistics { self.inputs .iter() @@ -119,6 +140,40 @@ impl ExecutionPlan for UnionExec { } } +/// Stream wrapper that records `BaselineMetrics` for a particular +/// partition +struct ObservedStream { + inner: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, +} + +impl ObservedStream { + fn new(inner: SendableRecordBatchStream, baseline_metrics: BaselineMetrics) -> Self { + Self { + inner, + baseline_metrics, + } + } +} + +impl RecordBatchStream for ObservedStream { + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.inner.schema() + } +} + +impl futures::Stream for ObservedStream { + type Item = arrow::error::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let poll = self.inner.poll_next_unpin(cx); + self.baseline_metrics.record_poll(poll) + } +} + fn col_stats_union( mut left: ColumnStatistics, right: ColumnStatistics, diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index 0524adc7073a9..03307be8addd7 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -18,6 +18,9 @@ //! Stream and channel implementations for window function expressions. use crate::error::{DataFusionError, Result}; +use crate::physical_plan::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use crate::physical_plan::{ common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, @@ -30,7 +33,7 @@ use arrow::{ }; use async_trait::async_trait; use futures::stream::Stream; -use futures::Future; +use futures::FutureExt; use pin_project_lite::pin_project; use std::any::Any; use std::pin::Pin; @@ -48,6 +51,8 @@ pub struct WindowAggExec { schema: SchemaRef, /// Schema before the window input_schema: SchemaRef, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl WindowAggExec { @@ -59,11 +64,12 @@ impl WindowAggExec { ) -> Result { let schema = create_schema(&input_schema, &window_expr)?; let schema = Arc::new(schema); - Ok(WindowAggExec { + Ok(Self { input, window_expr, schema, input_schema, + metrics: ExecutionPlanMetricsSet::new(), }) } @@ -140,6 +146,7 @@ impl ExecutionPlan for WindowAggExec { self.schema.clone(), self.window_expr.clone(), input, + BaselineMetrics::new(&self.metrics, partition), )); Ok(stream) } @@ -163,6 +170,10 @@ impl ExecutionPlan for WindowAggExec { Ok(()) } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Statistics { let input_stat = self.input.statistics(); let win_cols = self.window_expr.len(); @@ -214,6 +225,7 @@ pin_project! { #[pin] output: futures::channel::oneshot::Receiver>, finished: bool, + baseline_metrics: BaselineMetrics, } } @@ -223,19 +235,24 @@ impl WindowAggStream { schema: SchemaRef, window_expr: Vec>, input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, ) -> Self { let (tx, rx) = futures::channel::oneshot::channel(); let schema_clone = schema.clone(); + let elapsed_compute = baseline_metrics.elapsed_compute().clone(); tokio::spawn(async move { let schema = schema_clone.clone(); - let result = WindowAggStream::process(input, window_expr, schema).await; + let result = + WindowAggStream::process(input, window_expr, schema, elapsed_compute) + .await; tx.send(result) }); Self { + schema, output: rx, finished: false, - schema, + baseline_metrics, } } @@ -243,11 +260,16 @@ impl WindowAggStream { input: SendableRecordBatchStream, window_expr: Vec>, schema: SchemaRef, + elapsed_compute: crate::physical_plan::metrics::Time, ) -> ArrowResult { let input_schema = input.schema(); let batches = common::collect(input) .await .map_err(DataFusionError::into_arrow_external_error)?; + + // record compute time on drop + let _timer = elapsed_compute.timer(); + let batch = common::combine_batches(&batches, input_schema.clone())?; if let Some(batch) = batch { // calculate window cols @@ -267,18 +289,31 @@ impl WindowAggStream { impl Stream for WindowAggStream { type Item = ArrowResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let poll = self.poll_next_inner(cx); + self.baseline_metrics.record_poll(poll) + } +} + +impl WindowAggStream { + #[inline] + fn poll_next_inner( + self: &mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { if self.finished { return Poll::Ready(None); } // is the output ready? - let this = self.project(); - let output_poll = this.output.poll(cx); + let output_poll = self.output.poll_unpin(cx); match output_poll { Poll::Ready(result) => { - *this.finished = true; + self.finished = true; // check for error in receiving channel and unwrap actual result let result = match result { Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 89163390b19e8..1cc903bf81966 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2302,9 +2302,9 @@ async fn csv_explain_analyze() { let formatted = normalize_for_explain(&formatted); // Only test basic plumbing and try to avoid having to change too - // many things - let needle = - "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=NOT RECORDED"; + // many things. explain_analyze_baseline_metrics covers the values + // in greater depth + let needle = "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute="; assert_contains!(&formatted, needle); let verbose_needle = "Output Rows"; @@ -2352,13 +2352,17 @@ async fn explain_analyze_baseline_metrics() { register_aggregate_csv_by_sql(&mut ctx).await; // a query with as many operators as we have metrics for let sql = "EXPLAIN ANALYZE \ - select count(*) from \ - (SELECT count(*), c1 \ - FROM aggregate_test_100 \ - WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \ - GROUP BY c1 \ - ORDER BY c1) \ - LIMIT 1"; + SELECT count(*) as cnt FROM \ + (SELECT count(*), c1 \ + FROM aggregate_test_100 \ + WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \ + GROUP BY c1 \ + ORDER BY c1 ) \ + UNION ALL \ + SELECT 1 as cnt \ + UNION ALL \ + SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) \ + LIMIT 3"; println!("running query: {}", sql); let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); @@ -2368,11 +2372,6 @@ async fn explain_analyze_baseline_metrics() { println!("Query Output:\n\n{}", formatted); let formatted = normalize_for_explain(&formatted); - assert_metrics!( - &formatted, - "CoalescePartitionsExec", - "metrics=[output_rows=5, elapsed_compute=NOT RECORDED" - ); assert_metrics!( &formatted, "HashAggregateExec: mode=Partial, gby=[]", @@ -2385,7 +2384,7 @@ async fn explain_analyze_baseline_metrics() { ); assert_metrics!( &formatted, - "SortExec: [c1@0 ASC]", + "SortExec: [c1@1 ASC]", "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( @@ -2395,9 +2394,14 @@ async fn explain_analyze_baseline_metrics() { ); assert_metrics!( &formatted, - "GlobalLimitExec: limit=1, ", + "GlobalLimitExec: limit=3, ", "metrics=[output_rows=1, elapsed_compute=" ); + assert_metrics!( + &formatted, + "LocalLimitExec: limit=3", + "metrics=[output_rows=3, elapsed_compute=" + ); assert_metrics!( &formatted, "ProjectionExec: expr=[COUNT(UInt8(1))", @@ -2408,6 +2412,21 @@ async fn explain_analyze_baseline_metrics() { "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); + assert_metrics!( + &formatted, + "CoalescePartitionsExec", + "metrics=[output_rows=5, elapsed_compute=" + ); + assert_metrics!( + &formatted, + "UnionExec", + "metrics=[output_rows=3, elapsed_compute=" + ); + assert_metrics!( + &formatted, + "WindowAggExec", + "metrics=[output_rows=1, elapsed_compute=" + ); fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool { use datafusion::physical_plan; @@ -2416,9 +2435,13 @@ async fn explain_analyze_baseline_metrics() { || plan.as_any().downcast_ref::().is_some() // CoalescePartitionsExec doesn't do any work so is not included || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() } // Validate that the recorded elapsed compute time was more than