From 11fe6b5d584e8d235c90a6f6da946d1e985a63a5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 26 Jul 2021 11:20:22 -0400 Subject: [PATCH 01/17] Improve SQLMetric APIs, port existing metrics --- .../src/physical_optimizer/repartition.rs | 13 +- datafusion/src/physical_plan/analyze.rs | 8 + datafusion/src/physical_plan/display.rs | 69 ++- .../src/physical_plan/hash_aggregate.rs | 33 +- datafusion/src/physical_plan/hash_join.rs | 100 ++-- datafusion/src/physical_plan/metrics.rs | 461 ++++++++++++++++++ .../src/physical_plan/metrics/wrappers.rs | 105 ++++ datafusion/src/physical_plan/mod.rs | 111 +---- datafusion/src/physical_plan/parquet.rs | 126 +++-- datafusion/src/physical_plan/repartition.rs | 90 ++-- datafusion/src/physical_plan/sort.rs | 45 +- datafusion/tests/parquet_pruning.rs | 67 +-- datafusion/tests/sql.rs | 2 + 13 files changed, 886 insertions(+), 344 deletions(-) create mode 100644 datafusion/src/physical_plan/metrics.rs create mode 100644 datafusion/src/physical_plan/metrics/wrappers.rs diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 4504c81daa06d..4b1a536b2f124 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -110,24 +110,25 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; - use crate::physical_plan::parquet::{ - ParquetExec, ParquetExecMetrics, ParquetPartition, - }; + use crate::physical_plan::metrics::SharedMetricsSet; + use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; use crate::physical_plan::projection::ProjectionExec; #[test] fn added_repartition_to_single_partition() -> Result<()> { let schema = Arc::new(Schema::empty()); + let metrics = Arc::new(SharedMetricsSet::new()); let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( vec!["x".to_string()], Statistics::default(), + metrics.clone(), )], schema, None, - ParquetExecMetrics::new(), + metrics, None, 2048, None, @@ -154,6 +155,7 @@ mod tests { #[test] fn repartition_deepest_node() -> Result<()> { let schema = Arc::new(Schema::empty()); + let metrics = Arc::new(SharedMetricsSet::new()); let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ProjectionExec::try_new( @@ -162,10 +164,11 @@ mod tests { vec![ParquetPartition::new( vec!["x".to_string()], Statistics::default(), + metrics.clone(), )], schema, None, - ParquetExecMetrics::new(), + metrics, None, 2048, None, diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index 36726ad3a7b48..d0125579ace25 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -164,6 +164,14 @@ impl ExecutionPlan for AnalyzeExec { // Verbose output // TODO make this more sophisticated if verbose { + type_builder.append_value("Plan with Full Metrics").unwrap(); + + let annotated_plan = + DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref()) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan).unwrap(); + type_builder.append_value("Output Rows").unwrap(); plan_builder.append_value(total_rows.to_string()).unwrap(); diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs index e251e4ea53db1..5ff99e5f9704f 100644 --- a/datafusion/src/physical_plan/display.rs +++ b/datafusion/src/physical_plan/display.rs @@ -35,8 +35,8 @@ pub enum DisplayFormatType { /// Wraps an `ExecutionPlan` with various ways to display this plan pub struct DisplayableExecutionPlan<'a> { inner: &'a dyn ExecutionPlan, - /// whether to show metrics or not - with_metrics: bool, + /// How to show metrics + show_metrics: ShowMetrics, } impl<'a> DisplayableExecutionPlan<'a> { @@ -45,16 +45,27 @@ impl<'a> DisplayableExecutionPlan<'a> { pub fn new(inner: &'a dyn ExecutionPlan) -> Self { Self { inner, - with_metrics: false, + show_metrics: ShowMetrics::None, } } /// Create a wrapper around an [`'ExecutionPlan'] which can be - /// pretty printed in a variety of ways + /// pretty printed in a variety of ways that also shows aggregated + /// metrics pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self { Self { inner, - with_metrics: true, + show_metrics: ShowMetrics::Aggregated, + } + } + + /// Create a wrapper around an [`'ExecutionPlan'] which can be + /// pretty printed in a variety of ways that also shows all low + /// level metrics + pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self { + Self { + inner, + show_metrics: ShowMetrics::Full, } } @@ -71,7 +82,7 @@ impl<'a> DisplayableExecutionPlan<'a> { pub fn indent(&self) -> impl fmt::Display + 'a { struct Wrapper<'a> { plan: &'a dyn ExecutionPlan, - with_metrics: bool, + show_metrics: ShowMetrics, } impl<'a> fmt::Display for Wrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -80,18 +91,30 @@ impl<'a> DisplayableExecutionPlan<'a> { t, f, indent: 0, - with_metrics: self.with_metrics, + show_metrics: self.show_metrics, }; accept(self.plan, &mut visitor) } } Wrapper { plan: self.inner, - with_metrics: self.with_metrics, + show_metrics: self.show_metrics, } } } +#[derive(Debug, Clone, Copy)] +enum ShowMetrics { + /// Do not show any metrics + None, + + /// Show aggregrated metrics across partition + Aggregated, + + /// Show full per-partition metrics + Full, +} + /// Formats plans with a single line per node. struct IndentVisitor<'a, 'b> { /// How to format each node @@ -100,8 +123,8 @@ struct IndentVisitor<'a, 'b> { f: &'a mut fmt::Formatter<'b>, /// Indent size indent: usize, - /// whether to show metrics or not - with_metrics: bool, + /// How to show metrics + show_metrics: ShowMetrics, } impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { @@ -112,16 +135,22 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> { ) -> std::result::Result { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; plan.fmt_as(self.t, self.f)?; - if self.with_metrics { - write!( - self.f, - ", metrics=[{}]", - plan.metrics() - .iter() - .map(|(k, v)| format!("{}={:?}", k, v.value)) - .collect::>() - .join(", ") - )?; + match self.show_metrics { + ShowMetrics::None => {} + ShowMetrics::Aggregated => { + if let Some(metrics) = plan.metrics() { + write!(self.f, ", metrics=[{}]", metrics.aggregate_by_partition())?; + } else { + write!(self.f, ", metrics=[]")?; + } + } + ShowMetrics::Full => { + if let Some(metrics) = plan.metrics() { + write!(self.f, ", metrics=[{}]", metrics)?; + } else { + write!(self.f, ", metrics=[]")?; + } + } } writeln!(self.f)?; self.indent += 1; diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 1c07f61f10cd5..683edbb6ac0e2 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -27,13 +27,12 @@ use futures::{ stream::{Stream, StreamExt}, Future, }; -use hashbrown::HashMap; use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; use crate::physical_plan::{ Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, PhysicalExpr, SQLMetric, + Partitioning, PhysicalExpr, }; use crate::scalar::ScalarValue; @@ -51,6 +50,8 @@ use pin_project_lite::pin_project; use async_trait::async_trait; +use super::metrics::wrappers::Count; +use super::metrics::{MetricBuilder, MetricsSet, SharedMetricsSet}; use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream}; /// Hash aggregate modes @@ -86,8 +87,8 @@ pub struct HashAggregateExec { /// same as input.schema() but for the final aggregate it will be the same as the input /// to the partial aggregate input_schema: SchemaRef, - /// Metric to track number of output rows - output_rows: Arc, + /// Execution Metrics + metrics: SharedMetricsSet, } fn create_schema( @@ -136,8 +137,6 @@ impl HashAggregateExec { let schema = Arc::new(schema); - let output_rows = SQLMetric::counter(); - Ok(HashAggregateExec { mode, group_expr, @@ -145,7 +144,7 @@ impl HashAggregateExec { input, schema, input_schema, - output_rows, + metrics: SharedMetricsSet::new(), }) } @@ -209,6 +208,8 @@ impl ExecutionPlan for HashAggregateExec { let input = self.input.execute(partition).await?; let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect(); + let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition); + if self.group_expr.is_empty() { Ok(Box::pin(HashAggregateStream::new( self.mode, @@ -223,7 +224,7 @@ impl ExecutionPlan for HashAggregateExec { group_expr, self.aggr_expr.clone(), input, - self.output_rows.clone(), + output_rows, ))) } } @@ -246,10 +247,8 @@ impl ExecutionPlan for HashAggregateExec { } } - fn metrics(&self) -> HashMap { - let mut metrics = HashMap::new(); - metrics.insert("outputRows".to_owned(), (*self.output_rows).clone()); - metrics + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) } fn fmt_as( @@ -317,7 +316,7 @@ pin_project! { #[pin] output: futures::channel::oneshot::Receiver>, finished: bool, - output_rows: Arc, + output_rows: Count, } } @@ -526,7 +525,7 @@ impl GroupedHashAggregateStream { group_expr: Vec>, aggr_expr: Vec>, input: SendableRecordBatchStream, - output_rows: Arc, + output_rows: Count, ) -> Self { let (tx, rx) = futures::channel::oneshot::channel(); @@ -1072,9 +1071,9 @@ mod tests { assert_batches_sorted_eq!(&expected, &result); - let metrics = merged_aggregate.metrics(); - let output_rows = metrics.get("outputRows").unwrap(); - assert_eq!(3, output_rows.value()); + let metrics = merged_aggregate.metrics().unwrap(); + let output_rows = metrics.output_rows().unwrap(); + assert_eq!(3, output_rows); Ok(()) } diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 99708249fc6a7..eb607c17d05b2 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -35,7 +35,6 @@ use std::{time::Instant, vec}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; -use hashbrown::HashMap; use tokio::sync::Mutex; use arrow::array::Array; @@ -51,12 +50,15 @@ use arrow::array::{ use hashbrown::raw::RawTable; -use super::expressions::Column; use super::hash_utils::create_hashes; use super::{ coalesce_partitions::CoalescePartitionsExec, hash_utils::{build_join_schema, check_join_is_valid, JoinOn}, }; +use super::{ + expressions::Column, + metrics::{self, MetricBuilder, MetricsSet, SharedMetricsSet}, +}; use crate::error::{DataFusionError, Result}; use crate::logical_plan::JoinType; @@ -65,7 +67,7 @@ use super::{ SendableRecordBatchStream, }; use crate::physical_plan::coalesce_batches::concat_batches; -use crate::physical_plan::{PhysicalExpr, SQLMetric}; +use crate::physical_plan::PhysicalExpr; use log::debug; use std::fmt; @@ -111,33 +113,45 @@ pub struct HashJoinExec { random_state: RandomState, /// Partitioning mode to use mode: PartitionMode, - /// Metrics - metrics: Arc, + /// Execution metrics + metrics: SharedMetricsSet, } /// Metrics for HashJoinExec #[derive(Debug)] struct HashJoinMetrics { /// Total time for joining probe-side batches to the build-side batches - join_time: Arc, + join_time: metrics::wrappers::Time, /// Number of batches consumed by this operator - input_batches: Arc, + input_batches: metrics::wrappers::Count, /// Number of rows consumed by this operator - input_rows: Arc, + input_rows: metrics::wrappers::Count, /// Number of batches produced by this operator - output_batches: Arc, + output_batches: metrics::wrappers::Count, /// Number of rows produced by this operator - output_rows: Arc, + output_rows: metrics::wrappers::Count, } impl HashJoinMetrics { - fn new() -> Self { + pub fn new(partition: usize, metrics: &SharedMetricsSet) -> Self { + let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition); + + let input_batches = + MetricBuilder::new(metrics).counter("input_batches", partition); + + let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + + let output_batches = + MetricBuilder::new(metrics).counter("output_rows", partition); + + let output_rows = MetricBuilder::new(metrics).output_rows(partition); + Self { - join_time: SQLMetric::time_nanos(), - input_batches: SQLMetric::counter(), - input_rows: SQLMetric::counter(), - output_batches: SQLMetric::counter(), - output_rows: SQLMetric::counter(), + join_time, + input_batches, + input_rows, + output_batches, + output_rows, } } } @@ -187,7 +201,7 @@ impl HashJoinExec { build_side: Arc::new(Mutex::new(None)), random_state, mode: partition_mode, - metrics: Arc::new(HashJoinMetrics::new()), + metrics: SharedMetricsSet::new(), }) } @@ -425,7 +439,7 @@ impl ExecutionPlan for HashJoinExec { column_indices, self.random_state.clone(), visited_left_side, - self.metrics.clone(), + HashJoinMetrics::new(partition, &self.metrics), ))) } @@ -445,20 +459,8 @@ impl ExecutionPlan for HashJoinExec { } } - fn metrics(&self) -> HashMap { - let mut metrics = HashMap::new(); - metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone()); - metrics.insert( - "inputBatches".to_owned(), - (*self.metrics.input_batches).clone(), - ); - metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone()); - metrics.insert( - "outputBatches".to_owned(), - (*self.metrics.output_batches).clone(), - ); - metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone()); - metrics + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) } } @@ -522,7 +524,7 @@ struct HashJoinStream { /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, /// Metrics - metrics: Arc, + join_metrics: HashJoinMetrics, } #[allow(clippy::too_many_arguments)] @@ -537,7 +539,7 @@ impl HashJoinStream { column_indices: Vec, random_state: RandomState, visited_left_side: Vec, - metrics: Arc, + join_metrics: HashJoinMetrics, ) -> Self { HashJoinStream { schema, @@ -550,7 +552,7 @@ impl HashJoinStream { random_state, visited_left_side, is_exhausted: false, - metrics, + join_metrics, } } } @@ -876,7 +878,7 @@ impl Stream for HashJoinStream { .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { - let start = Instant::now(); + let timer = self.join_metrics.join_time.timer(); let result = build_batch( &batch, &self.left_data, @@ -887,14 +889,12 @@ impl Stream for HashJoinStream { &self.column_indices, &self.random_state, ); - self.metrics.input_batches.add(1); - self.metrics.input_rows.add(batch.num_rows()); + self.join_metrics.input_batches.add(1); + self.join_metrics.input_rows.add(batch.num_rows()); if let Ok((ref batch, ref left_side)) = result { - self.metrics - .join_time - .add(start.elapsed().as_millis() as usize); - self.metrics.output_batches.add(1); - self.metrics.output_rows.add(batch.num_rows()); + timer.done(); + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); match self.join_type { JoinType::Left @@ -911,7 +911,7 @@ impl Stream for HashJoinStream { Some(result.map(|x| x.0)) } other => { - let start = Instant::now(); + let timer = self.join_metrics.join_time.timer(); // For the left join, produce rows for unmatched rows match self.join_type { JoinType::Left @@ -928,16 +928,14 @@ impl Stream for HashJoinStream { self.join_type != JoinType::Semi, ); if let Ok(ref batch) = result { - self.metrics.input_batches.add(1); - self.metrics.input_rows.add(batch.num_rows()); + self.join_metrics.input_batches.add(1); + self.join_metrics.input_rows.add(batch.num_rows()); if let Ok(ref batch) = result { - self.metrics - .join_time - .add(start.elapsed().as_millis() as usize); - self.metrics.output_batches.add(1); - self.metrics.output_rows.add(batch.num_rows()); + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); } } + timer.done(); self.is_exhausted = true; return Some(result); } diff --git a/datafusion/src/physical_plan/metrics.rs b/datafusion/src/physical_plan/metrics.rs new file mode 100644 index 0000000000000..8028210d01259 --- /dev/null +++ b/datafusion/src/physical_plan/metrics.rs @@ -0,0 +1,461 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics for recording information about execution + +pub mod wrappers; + +use std::{ + fmt::{Debug, Display}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; + +use hashbrown::HashMap; + +use self::wrappers::{Count, Time}; + +/// Structure for constructing metrics, counters, timers, etc +pub struct MetricBuilder<'a> { + /// Location that the metric created by this builder will be added do + metrics: &'a SharedMetricsSet, + + /// optional partition number + partition: Option, + + /// arbitrary name=value pairs identifiying this metric + labels: Vec