diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index ed45230c0fe..30e47df5f64 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -120,7 +120,7 @@ mod tests { let exec = table.scan(&projection, 2, &[], None)?; let stream = exec.execute(0).await?; - let count = stream + let _ = stream .map(|batch| { let batch = batch.unwrap(); assert_eq!(11, batch.num_columns()); @@ -129,9 +129,6 @@ mod tests { .fold(0, |acc, _| async move { acc + 1i32 }) .await; - // we should have seen 4 batches of 2 rows - assert_eq!(4, count); - // test metadata assert_eq!(table.statistics().num_rows, Some(8)); assert_eq!(table.statistics().total_byte_size, Some(671)); diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 4c419d983a6..c31817872b0 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -22,6 +22,7 @@ use crate::{ information_schema::CatalogWithInformationSchema, }, optimizer::hash_build_probe_order::HashBuildProbeOrder, + physical_optimizer::optimizer::PhysicalOptimizerRule, }; use log::debug; use std::fs; @@ -56,6 +57,10 @@ use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; +use crate::physical_optimizer::coalesce_batches::CoalesceBatches; +use crate::physical_optimizer::merge_exec::AddMergeExec; +use crate::physical_optimizer::repartition::Repartition; + use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; @@ -605,6 +610,8 @@ pub struct ExecutionConfig { pub batch_size: usize, /// Responsible for optimizing a logical plan optimizers: Vec>, + /// Responsible for optimizing a physical execution plan + pub physical_optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` query_planner: Arc, /// Default catalog name for table resolution @@ -634,6 +641,11 @@ impl ExecutionConfig { Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), ], + physical_optimizers: vec![ + Arc::new(Repartition::new()), + Arc::new(CoalesceBatches::new()), + Arc::new(AddMergeExec::new()), + ], query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), @@ -677,6 +689,15 @@ impl ExecutionConfig { self } + /// Adds a new [`PhysicalOptimizerRule`] + pub fn add_physical_optimizer_rule( + mut self, + optimizer_rule: Arc, + ) -> Self { + self.physical_optimizers.push(optimizer_rule); + self + } + /// Selects a name for the default catalog and schema pub fn with_default_catalog_and_schema( mut self, @@ -835,15 +856,6 @@ mod tests { let partition_count = 4; let results = execute("SELECT c1, c2 FROM test", partition_count).await?; - // there should be one batch per partition - assert_eq!(results.len(), partition_count); - - // each batch should contain 2 columns and 10 rows with correct field names - for batch in &results { - assert_eq!(batch.num_columns(), 2); - assert_eq!(batch.num_rows(), 10); - } - let expected = vec![ "+----+----+", "| c1 | c2 |", @@ -953,21 +965,14 @@ mod tests { println!("{:?}", physical_plan); let results = collect_partitioned(physical_plan).await?; - assert_eq!(results.len(), partition_count); - - // there should be a total of 2 batches with 20 rows because the where clause filters - // out results from 2 partitions // note that the order of partitions is not deterministic - let mut num_batches = 0; let mut num_rows = 0; for partition in &results { for batch in partition { - num_batches += 1; num_rows += batch.num_rows(); } } - assert_eq!(2, num_batches); assert_eq!(20, num_rows); let results: Vec = results.into_iter().flatten().collect(); @@ -1039,9 +1044,7 @@ mod tests { assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); let batches = collect(physical_plan).await?; - assert_eq!(4, batches.len()); - assert_eq!(1, batches[0].num_columns()); - assert_eq!(10, batches[0].num_rows()); + assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::()); Ok(()) } @@ -2017,27 +2020,15 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)?; - ctx.register_csv("part1", &format!("{}/part-1.csv", out_dir), csv_read_option)?; - ctx.register_csv("part2", &format!("{}/part-2.csv", out_dir), csv_read_option)?; - ctx.register_csv("part3", &format!("{}/part-3.csv", out_dir), csv_read_option)?; ctx.register_csv("allparts", &out_dir, csv_read_option)?; let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?; - let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part1").await?; - let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part2").await?; - let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?; let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?; - let part0_count: usize = part0.iter().map(|batch| batch.num_rows()).sum(); - let part1_count: usize = part1.iter().map(|batch| batch.num_rows()).sum(); - let part2_count: usize = part2.iter().map(|batch| batch.num_rows()).sum(); - let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum(); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(part0_count, 10); - assert_eq!(part1_count, 10); - assert_eq!(part2_count, 10); - assert_eq!(part3_count, 10); + assert_eq!(part0[0].schema(), allparts[0].schema()); + assert_eq!(allparts_count, 40); Ok(()) @@ -2064,21 +2055,12 @@ mod tests { ctx.register_parquet("allparts", &out_dir)?; let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?; - let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part1").await?; - let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part2").await?; - let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?; let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?; - let part0_count: usize = part0.iter().map(|batch| batch.num_rows()).sum(); - let part1_count: usize = part1.iter().map(|batch| batch.num_rows()).sum(); - let part2_count: usize = part2.iter().map(|batch| batch.num_rows()).sum(); - let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum(); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(part0_count, 10); - assert_eq!(part1_count, 10); - assert_eq!(part2_count, 10); - assert_eq!(part3_count, 10); + assert_eq!(part0[0].schema(), allparts[0].schema()); + assert_eq!(allparts_count, 40); Ok(()) diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs index 2733430cbe9..44a8a686a49 100644 --- a/rust/datafusion/src/lib.rs +++ b/rust/datafusion/src/lib.rs @@ -193,6 +193,7 @@ pub mod error; pub mod execution; pub mod logical_plan; pub mod optimizer; +pub mod physical_optimizer; pub mod physical_plan; pub mod prelude; pub mod scalar; diff --git a/rust/datafusion/src/optimizer/mod.rs b/rust/datafusion/src/optimizer/mod.rs index 87dc62da2a3..dc59b64ff46 100644 --- a/rust/datafusion/src/optimizer/mod.rs +++ b/rust/datafusion/src/optimizer/mod.rs @@ -24,5 +24,4 @@ pub mod hash_build_probe_order; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; - pub mod utils; diff --git a/rust/datafusion/src/physical_optimizer/coalesce_batches.rs b/rust/datafusion/src/physical_optimizer/coalesce_batches.rs new file mode 100644 index 00000000000..9af8911062d --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/coalesce_batches.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows +//! in bigger batches to avoid overhead with small batches + +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{ + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, + hash_join::HashJoinExec, repartition::RepartitionExec, + }, +}; +use std::sync::Arc; + +/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches +pub struct CoalesceBatches {} + +impl CoalesceBatches { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc, + config: &crate::execution::context::ExecutionConfig, + ) -> Result> { + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::>>()?; + + let plan_any = plan.as_any(); + //TODO we should do this in a more generic way either by wrapping all operators + // or having an API so that operators can declare when their inputs or outputs + // need to be wrapped in a coalesce batches operator. + // See https://issues.apache.org/jira/browse/ARROW-11068 + let wrap_in_coalesce = plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some(); + + //TODO we should also do this for HashAggregateExec but we need to update tests + // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 + // || plan_any.downcast_ref::().is_some(); + + if plan.children().is_empty() { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + let plan = plan.with_new_children(children)?; + Ok(if wrap_in_coalesce { + //TODO we should add specific configuration settings for coalescing batches and + // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is + // implemented. For now, we choose half the configured batch size to avoid copies + // when a small number of rows are removed from a batch + let target_batch_size = config.batch_size / 2; + Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) + } else { + plan.clone() + }) + } + } + + fn name(&self) -> &str { + "coalesce_batches" + } +} diff --git a/rust/datafusion/src/physical_optimizer/merge_exec.rs b/rust/datafusion/src/physical_optimizer/merge_exec.rs new file mode 100644 index 00000000000..255d1bc2458 --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/merge_exec.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! AddMergeExec adds MergeExec to merge plans +//! with more partitions into one partition when the node +//! needs a single partition +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{merge::MergeExec, Distribution}, +}; +use std::sync::Arc; + +/// Introduces MergeExec +pub struct AddMergeExec {} + +impl AddMergeExec { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for AddMergeExec { + fn optimize( + &self, + plan: Arc, + config: &crate::execution::context::ExecutionConfig, + ) -> Result> { + if plan.children().is_empty() { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::>>()?; + match plan.required_child_distribution() { + Distribution::UnspecifiedDistribution => plan.with_new_children(children), + Distribution::SinglePartition => plan.with_new_children( + children + .iter() + .map(|child| { + if child.output_partitioning().partition_count() == 1 { + child.clone() + } else { + Arc::new(MergeExec::new(child.clone())) + } + }) + .collect(), + ), + } + } + } + + fn name(&self) -> &str { + "add_merge_exec" + } +} diff --git a/rust/datafusion/src/physical_optimizer/mod.rs b/rust/datafusion/src/physical_optimizer/mod.rs new file mode 100644 index 00000000000..eca63db9f3d --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains a query optimizer that operates against a physical plan and applies +//! rules to a physical plan, such as "Repartition". + +pub mod coalesce_batches; +pub mod merge_exec; +pub mod optimizer; +pub mod repartition; diff --git a/rust/datafusion/src/physical_optimizer/optimizer.rs b/rust/datafusion/src/physical_optimizer/optimizer.rs new file mode 100644 index 00000000000..e2f40ae9540 --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/optimizer.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical optimizer traits + +use std::sync::Arc; + +use crate::{ + error::Result, execution::context::ExecutionConfig, physical_plan::ExecutionPlan, +}; + +/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which +/// computes the same results, but in a potentially more efficient +/// way. +pub trait PhysicalOptimizerRule { + /// Rewrite `plan` to an optimized form + fn optimize( + &self, + plan: Arc, + config: &ExecutionConfig, + ) -> Result>; + + /// A human readable name for this optimizer rule + fn name(&self) -> &str; +} diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs new file mode 100644 index 00000000000..82f46f9cbbb --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Repartition optimizer that introduces repartition nodes to increase the level of parallism available +use std::sync::Arc; + +use super::optimizer::PhysicalOptimizerRule; +use crate::physical_plan::{ + empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan, +}; +use crate::physical_plan::{Distribution, Partitioning::*}; +use crate::{error::Result, execution::context::ExecutionConfig}; + +/// Optimizer that introduces repartition to introduce more parallelism in the plan +pub struct Repartition {} + +impl Repartition { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +fn optimize_concurrency( + concurrency: usize, + requires_single_partition: bool, + plan: Arc, +) -> Result> { + // Recurse into children bottom-up (added nodes should be as deep as possible) + + let new_plan = if plan.children().is_empty() { + // leaf node - don't replace children + plan.clone() + } else { + let children = plan + .children() + .iter() + .map(|child| { + optimize_concurrency( + concurrency, + plan.required_child_distribution() == Distribution::SinglePartition, + child.clone(), + ) + }) + .collect::>()?; + plan.with_new_children(children)? + }; + + let perform_repartition = match new_plan.output_partitioning() { + // Apply when underlying node has less than `self.concurrency` amount of concurrency + RoundRobinBatch(x) => x < concurrency, + UnknownPartitioning(x) => x < concurrency, + // we don't want to introduce partitioning after hash partitioning + // as the plan will likely depend on this + Hash(_, _) => false, + }; + + // TODO: EmptyExec causes failures with RepartitionExec + // But also not very useful to inlude + let is_empty_exec = plan.as_any().downcast_ref::().is_some(); + + if perform_repartition && !requires_single_partition && !is_empty_exec { + Ok(Arc::new(RepartitionExec::try_new( + new_plan, + RoundRobinBatch(concurrency), + )?)) + } else { + Ok(new_plan) + } +} + +impl PhysicalOptimizerRule for Repartition { + fn optimize( + &self, + plan: Arc, + config: &ExecutionConfig, + ) -> Result> { + // Don't run optimizer if concurrency == 1 + if config.concurrency == 1 { + Ok(plan) + } else { + optimize_concurrency(config.concurrency, true, plan) + } + } + + fn name(&self) -> &str { + "repartition" + } +} +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + + use super::*; + use crate::datasource::datasource::Statistics; + use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; + use crate::physical_plan::projection::ProjectionExec; + + #[test] + fn added_repartition_to_single_partition() -> Result<()> { + let parquet_project = ProjectionExec::try_new( + vec![], + Arc::new(ParquetExec::new( + vec![ParquetPartition { + filenames: vec!["x".to_string()], + statistics: Statistics::default(), + }], + Schema::empty(), + None, + None, + 2048, + None, + )), + )?; + + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + Arc::new(parquet_project), + &ExecutionConfig::new().with_concurrency(10), + )?; + + assert_eq!( + optimized.children()[0] + .output_partitioning() + .partition_count(), + 10 + ); + + Ok(()) + } + + #[test] + fn repartition_deepest_node() -> Result<()> { + let parquet_project = ProjectionExec::try_new( + vec![], + Arc::new(ProjectionExec::try_new( + vec![], + Arc::new(ParquetExec::new( + vec![ParquetPartition { + filenames: vec!["x".to_string()], + statistics: Statistics::default(), + }], + Schema::empty(), + None, + None, + 2048, + None, + )), + )?), + )?; + + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + Arc::new(parquet_project), + &ExecutionConfig::new().with_concurrency(10), + )?; + + // RepartitionExec is added to deepest node + assert!(optimized.children()[0] + .as_any() + .downcast_ref::() + .is_none()); + assert!(optimized.children()[0].children()[0] + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } +} diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 3d549ec9323..d529e98f75d 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -150,7 +150,7 @@ impl Partitioning { } /// Distribution schemes -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum Distribution { /// Unspecified distribution UnspecifiedDistribution, diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index fce85e36074..d41d6968fee 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -100,9 +100,9 @@ pub struct ParquetExec { #[derive(Debug, Clone)] pub struct ParquetPartition { /// The Parquet filename for this partition - filenames: Vec, + pub filenames: Vec, /// Statistics for this partition - statistics: Statistics, + pub statistics: Statistics, } impl ParquetExec { diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 0ab67c41a57..f9279ae48f0 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -29,19 +29,17 @@ use crate::logical_plan::{ DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, StringifiedPlan, UserDefinedLogicalNode, }; -use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::explain::ExplainExec; +use crate::physical_plan::expressions; use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::merge::MergeExec; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sort::SortExec; use crate::physical_plan::udf; -use crate::physical_plan::{expressions, Distribution}; use crate::physical_plan::{hash_utils, Partitioning}; use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner}; use crate::prelude::JoinType; @@ -52,6 +50,7 @@ use arrow::compute::can_cast_types; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use expressions::col; +use log::debug; /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`]. pub trait ExtensionPlanner { @@ -104,66 +103,21 @@ impl DefaultPhysicalPlanner { Self { extension_planners } } - /// Create a physical plan from a logical plan + /// Optimize a physical plan fn optimize_plan( &self, plan: Arc, ctx_state: &ExecutionContextState, ) -> Result> { - let children = plan - .children() - .iter() - .map(|child| self.optimize_plan(child.clone(), ctx_state)) - .collect::>>()?; - - if children.is_empty() { - // leaf node, children cannot be replaced - Ok(plan.clone()) - } else { - // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have - // highly selective filters - let plan_any = plan.as_any(); - //TODO we should do this in a more generic way either by wrapping all operators - // or having an API so that operators can declare when their inputs or outputs - // need to be wrapped in a coalesce batches operator. - // See https://issues.apache.org/jira/browse/ARROW-11068 - let wrap_in_coalesce = plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some(); - - //TODO we should also do this for HashAggregateExec but we need to update tests - // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 - // || plan_any.downcast_ref::().is_some(); - - let plan = if wrap_in_coalesce { - //TODO we should add specific configuration settings for coalescing batches and - // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is - // implemented. For now, we choose half the configured batch size to avoid copies - // when a small number of rows are removed from a batch - let target_batch_size = ctx_state.config.batch_size / 2; - Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) - } else { - plan.clone() - }; - - let children = plan.children().clone(); - - match plan.required_child_distribution() { - Distribution::UnspecifiedDistribution => plan.with_new_children(children), - Distribution::SinglePartition => plan.with_new_children( - children - .iter() - .map(|child| { - if child.output_partitioning().partition_count() == 1 { - child.clone() - } else { - Arc::new(MergeExec::new(child.clone())) - } - }) - .collect(), - ), - } + let optimizers = &ctx_state.config.physical_optimizers; + debug!("Physical plan:\n{:?}", plan); + + let mut new_plan = plan; + for optimizer in optimizers { + new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; } + debug!("Optimized physical plan:\n{:?}", new_plan); + Ok(new_plan) } /// Create a physical plan from a logical plan @@ -189,7 +143,7 @@ impl DefaultPhysicalPlanner { .. } => { // Initially need to perform the aggregate and then merge the partitions - let input_exec = self.create_physical_plan(input, ctx_state)?; + let input_exec = self.create_initial_plan(input, ctx_state)?; let input_schema = input_exec.schema(); let physical_input_schema = input_exec.as_ref().schema(); let logical_input_schema = input.as_ref().schema(); @@ -245,7 +199,7 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Projection { input, expr, .. } => { - let input_exec = self.create_physical_plan(input, ctx_state)?; + let input_exec = self.create_initial_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); let runtime_expr = expr .iter() @@ -265,7 +219,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Filter { input, predicate, .. } => { - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); let runtime_expr = self.create_physical_expr(predicate, &input_schema, ctx_state)?; @@ -274,7 +228,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Union { inputs, .. } => { let physical_plans = inputs .iter() - .map(|input| self.create_physical_plan(input, ctx_state)) + .map(|input| self.create_initial_plan(input, ctx_state)) .collect::>>()?; Ok(Arc::new(UnionExec::new(physical_plans))) } @@ -282,7 +236,7 @@ impl DefaultPhysicalPlanner { input, partitioning_scheme, } => { - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; let input_schema = input.schema(); let physical_partitioning = match partitioning_scheme { LogicalPartitioning::RoundRobinBatch(n) => { @@ -304,7 +258,7 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Sort { expr, input, .. } => { - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); let sort_expr = expr @@ -338,8 +292,8 @@ impl DefaultPhysicalPlanner { join_type, .. } => { - let left = self.create_physical_plan(left, ctx_state)?; - let right = self.create_physical_plan(right, ctx_state)?; + let left = self.create_initial_plan(left, ctx_state)?; + let right = self.create_initial_plan(right, ctx_state)?; let physical_join_type = match join_type { JoinType::Inner => hash_utils::JoinType::Inner, JoinType::Left => hash_utils::JoinType::Left, @@ -383,7 +337,7 @@ impl DefaultPhysicalPlanner { ))), LogicalPlan::Limit { input, n, .. } => { let limit = *n; - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; // GlobalLimitExec requires a single partition for input let input = if input.output_partitioning().partition_count() == 1 { @@ -411,7 +365,7 @@ impl DefaultPhysicalPlanner { stringified_plans, schema, } => { - let input = self.create_physical_plan(plan, ctx_state)?; + let input = self.create_initial_plan(plan, ctx_state)?; let mut stringified_plans = stringified_plans .iter() @@ -435,7 +389,7 @@ impl DefaultPhysicalPlanner { let inputs = node .inputs() .into_iter() - .map(|input_plan| self.create_physical_plan(input_plan, ctx_state)) + .map(|input_plan| self.create_initial_plan(input_plan, ctx_state)) .collect::>>()?; let maybe_plan = self.extension_planners.iter().try_fold( diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 4f43c46c717..aae5c597d82 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -353,7 +353,7 @@ impl ExecutionPlan for TopKExec { } fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution + Distribution::SinglePartition } fn children(&self) -> Vec> { @@ -471,12 +471,10 @@ impl Stream for TopKReader { return Poll::Ready(None); } // this aggregates and thus returns a single RecordBatch. - self.done = true; // take this as immutable let k = self.k; let schema = self.schema(); - let top_values = self .input .as_mut() @@ -508,7 +506,10 @@ impl Stream for TopKReader { }); let mut top_values = Box::pin(top_values.into_stream()); - top_values.poll_next_unpin(cx) + top_values.poll_next_unpin(cx).map(|batch| { + self.done = true; + batch + }) } }