From a675bf1984ef935a7767f4b4efd9a5f15caf5b8e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 14 Sep 2021 14:38:50 -0400 Subject: [PATCH 1/2] Add metrics for Limit and Projection, and CoalesceBatches --- .../src/physical_plan/coalesce_batches.rs | 38 +++++++++-- datafusion/src/physical_plan/limit.rs | 63 ++++++++++++++++--- datafusion/src/physical_plan/projection.rs | 46 +++++++++----- datafusion/tests/sql.rs | 35 ++++++++--- 4 files changed, 143 insertions(+), 39 deletions(-) diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs index aee9aea71ddb0..7397493c3a740 100644 --- a/datafusion/src/physical_plan/coalesce_batches.rs +++ b/datafusion/src/physical_plan/coalesce_batches.rs @@ -37,7 +37,8 @@ use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use log::debug; -use super::Statistics; +use super::metrics::{BaselineMetrics, MetricsSet}; +use super::{metrics::ExecutionPlanMetricsSet, Statistics}; /// CoalesceBatchesExec combines small batches into larger batches for more efficient use of /// vectorized processing by upstream operators. @@ -47,6 +48,8 @@ pub struct CoalesceBatchesExec { input: Arc, /// Minimum number of rows for coalesces batches target_batch_size: usize, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl CoalesceBatchesExec { @@ -55,6 +58,7 @@ impl CoalesceBatchesExec { Self { input, target_batch_size, + metrics: ExecutionPlanMetricsSet::new(), } } @@ -115,6 +119,7 @@ impl ExecutionPlan for CoalesceBatchesExec { buffer: Vec::new(), buffered_rows: 0, is_closed: false, + baseline_metrics: BaselineMetrics::new(&self.metrics, partition), })) } @@ -134,6 +139,10 @@ impl ExecutionPlan for CoalesceBatchesExec { } } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Statistics { self.input.statistics() } @@ -152,6 +161,8 @@ struct CoalesceBatchesStream { buffered_rows: usize, /// Whether the stream has finished returning all of its data or not is_closed: bool, + /// Execution metrics + baseline_metrics: BaselineMetrics, } impl Stream for CoalesceBatchesStream { @@ -161,6 +172,26 @@ impl Stream for CoalesceBatchesStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + let poll = self.poll_next_inner(cx); + self.baseline_metrics.record_poll(poll) + } + + fn size_hint(&self) -> (usize, Option) { + // we can't predict the size of incoming batches so re-use the size hint from the input + self.input.size_hint() + } +} + +impl CoalesceBatchesStream { + fn poll_next_inner( + self: &mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + // Get a clone (uses same underlying atomic) as self gets borrowed below + let cloned_time = self.baseline_metrics.elapsed_compute().clone(); + // records time on drop + let _timer = cloned_time.timer(); + if self.is_closed { return Poll::Ready(None); } @@ -221,11 +252,6 @@ impl Stream for CoalesceBatchesStream { } } } - - fn size_hint(&self) -> (usize, Option) { - // we can't predict the size of incoming batches so re-use the size hint from the input - self.input.size_hint() - } } impl RecordBatchStream for CoalesceBatchesStream { diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index 792b8f50e1d66..ccd719f324683 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -35,7 +35,10 @@ use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; +use super::{ + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; use async_trait::async_trait; @@ -46,12 +49,18 @@ pub struct GlobalLimitExec { input: Arc, /// Maximum number of rows to return limit: usize, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl GlobalLimitExec { /// Create a new GlobalLimitExec pub fn new(input: Arc, limit: usize) -> Self { - GlobalLimitExec { input, limit } + GlobalLimitExec { + input, + limit, + metrics: ExecutionPlanMetricsSet::new(), + } } /// Input execution plan @@ -120,8 +129,13 @@ impl ExecutionPlan for GlobalLimitExec { )); } + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); let stream = self.input.execute(0).await?; - Ok(Box::pin(LimitStream::new(stream, self.limit))) + Ok(Box::pin(LimitStream::new( + stream, + self.limit, + baseline_metrics, + ))) } fn fmt_as( @@ -136,6 +150,10 @@ impl ExecutionPlan for GlobalLimitExec { } } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Statistics { let input_stats = self.input.statistics(); match input_stats { @@ -165,12 +183,18 @@ pub struct LocalLimitExec { input: Arc, /// Maximum number of rows to return limit: usize, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl LocalLimitExec { /// Create a new LocalLimitExec partition pub fn new(input: Arc, limit: usize) -> Self { - Self { input, limit } + Self { + input, + limit, + metrics: ExecutionPlanMetricsSet::new(), + } } /// Input execution plan @@ -219,8 +243,13 @@ impl ExecutionPlan for LocalLimitExec { } async fn execute(&self, partition: usize) -> Result { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); let stream = self.input.execute(partition).await?; - Ok(Box::pin(LimitStream::new(stream, self.limit))) + Ok(Box::pin(LimitStream::new( + stream, + self.limit, + baseline_metrics, + ))) } fn fmt_as( @@ -235,6 +264,10 @@ impl ExecutionPlan for LocalLimitExec { } } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Statistics { let input_stats = self.input.statistics(); match input_stats { @@ -280,20 +313,29 @@ struct LimitStream { schema: SchemaRef, // the current number of rows which have been produced current_len: usize, + /// Execution time metrics + baseline_metrics: BaselineMetrics, } impl LimitStream { - fn new(input: SendableRecordBatchStream, limit: usize) -> Self { + fn new( + input: SendableRecordBatchStream, + limit: usize, + baseline_metrics: BaselineMetrics, + ) -> Self { let schema = input.schema(); Self { limit, input: Some(input), schema, current_len: 0, + baseline_metrics, } } fn stream_limit(&mut self, batch: RecordBatch) -> Option { + // records time on drop + let _timer = self.baseline_metrics.elapsed_compute().timer(); if self.current_len == self.limit { self.input = None; // clear input so it can be dropped early None @@ -316,14 +358,16 @@ impl Stream for LimitStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match &mut self.input { + let poll = match &mut self.input { Some(input) => input.poll_next_unpin(cx).map(|x| match x { Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(), other => other, }), // input has been cleared None => Poll::Ready(None), - } + }; + + self.baseline_metrics.record_poll(poll) } } @@ -394,7 +438,8 @@ mod tests { // limit of six needs to consume the entire first record batch // (5 rows) and 1 row from the second (1 row) - let limit_stream = LimitStream::new(Box::pin(input), 6); + let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let limit_stream = LimitStream::new(Box::pin(input), 6, baseline_metrics); assert_eq!(index.value(), 0); let results = collect(Box::pin(limit_stream)).await.unwrap(); diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index 97ff83edd2fce..f24726123f9df 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -34,6 +34,7 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use super::expressions::Column; +use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use async_trait::async_trait; @@ -49,6 +50,8 @@ pub struct ProjectionExec { schema: SchemaRef, /// The input plan input: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl ProjectionExec { @@ -76,6 +79,7 @@ impl ProjectionExec { expr, schema, input: input.clone(), + metrics: ExecutionPlanMetricsSet::new(), }) } @@ -131,6 +135,7 @@ impl ExecutionPlan for ProjectionExec { schema: self.schema.clone(), expr: self.expr.iter().map(|x| x.0.clone()).collect(), input: self.input.execute(partition).await?, + baseline_metrics: BaselineMetrics::new(&self.metrics, partition), })) } @@ -159,6 +164,10 @@ impl ExecutionPlan for ProjectionExec { } } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Statistics { stats_projection( self.input.statistics(), @@ -194,20 +203,20 @@ fn stats_projection( } } -fn batch_project( - batch: &RecordBatch, - expressions: &[Arc], - schema: &SchemaRef, -) -> ArrowResult { - expressions - .iter() - .map(|expr| expr.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>() - .map_or_else( - |e| Err(DataFusionError::into_arrow_external_error(e)), - |arrays| RecordBatch::try_new(schema.clone(), arrays), - ) +impl ProjectionStream { + fn batch_project(&self, batch: &RecordBatch) -> ArrowResult { + // records time on drop + let _timer = self.baseline_metrics.elapsed_compute().timer(); + self.expr + .iter() + .map(|expr| expr.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect::>>() + .map_or_else( + |e| Err(DataFusionError::into_arrow_external_error(e)), + |arrays| RecordBatch::try_new(self.schema.clone(), arrays), + ) + } } /// Projection iterator @@ -215,6 +224,7 @@ struct ProjectionStream { schema: SchemaRef, expr: Vec>, input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, } impl Stream for ProjectionStream { @@ -224,10 +234,12 @@ impl Stream for ProjectionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.input.poll_next_unpin(cx).map(|x| match x { - Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)), + let poll = self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => Some(self.batch_project(&batch)), other => other, - }) + }); + + self.baseline_metrics.record_poll(poll) } fn size_hint(&self) -> (usize, Option) { diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index eaf988fba0cf3..eff434d544971 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2361,13 +2361,15 @@ async fn explain_analyze_baseline_metrics() { FROM aggregate_test_100 \ WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \ GROUP BY c1 \ - ORDER BY c1)"; + ORDER BY c1) \ + LIMIT 1"; println!("running query: {}", sql); let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).unwrap(); let results = collect(physical_plan.clone()).await.unwrap(); let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap(); + println!("Query Output:\n\n{}", formatted); let formatted = normalize_for_explain(&formatted); assert_metrics!( @@ -2395,14 +2397,33 @@ async fn explain_analyze_baseline_metrics() { "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", "metrics=[output_rows=99, elapsed_compute=" ); + assert_metrics!( + &formatted, + "GlobalLimitExec: limit=1, ", + "metrics=[output_rows=1, elapsed_compute=" + ); + assert_metrics!( + &formatted, + "ProjectionExec: expr=[COUNT(UInt8(1))", + "metrics=[output_rows=1, elapsed_compute=" + ); + assert_metrics!( + &formatted, + "CoalesceBatchesExec: target_batch_size=4096", + "metrics=[output_rows=5, elapsed_compute" + ); fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool { - use datafusion::physical_plan::{ - hash_aggregate::HashAggregateExec, sort::SortExec, - }; - - plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() + use datafusion::physical_plan; + + plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + // CoalescePartitionsExec doesn't do any work so is not included + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() } // Validate that the recorded elapsed compute time was more than From fe26d4acb4aba81e94829e67ca172721debdbba3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 14 Sep 2021 16:29:42 -0400 Subject: [PATCH 2/2] remove duplication --- datafusion/tests/sql.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index eff434d544971..90173be6ac642 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2419,7 +2419,6 @@ async fn explain_analyze_baseline_metrics() { plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() // CoalescePartitionsExec doesn't do any work so is not included - || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some()