Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ad51f3f
WIP
Dandandan Mar 30, 2021
0cfa2c6
WIP
Dandandan Mar 30, 2021
a30afc3
Add test
Dandandan Mar 31, 2021
823cf54
WIP
Dandandan Mar 31, 2021
c2f4de8
WIP
Dandandan Mar 31, 2021
065abf4
WIP
Dandandan Mar 31, 2021
69d1bd9
WIP
Dandandan Mar 31, 2021
4ce8ec6
Fix memec
Dandandan Mar 31, 2021
9180921
Merge branch 'master' into reparition-opt
Dandandan Mar 31, 2021
97e071d
Fix test
Dandandan Mar 31, 2021
556779e
Fmt
Dandandan Mar 31, 2021
dfc8b6c
Reorganize
Dandandan Mar 31, 2021
5c9cadf
Reorganize
Dandandan Mar 31, 2021
24c4941
Fix
Dandandan Mar 31, 2021
0a50e91
Reorganize
Dandandan Mar 31, 2021
4595484
Update tests expectations
Dandandan Apr 1, 2021
bd83b96
Update tests expectations
Dandandan Apr 1, 2021
5f05dae
Update tests expectations
Dandandan Apr 1, 2021
3aa5bb1
Add CoalesceBatches / AddMergeExec as optimizers
Dandandan Apr 1, 2021
2262c39
Fix tests
Dandandan Apr 1, 2021
beb5863
Merge remote-tracking branch 'upstream/master' into reparition-opt
Dandandan Apr 1, 2021
15009ab
Docs, test
Dandandan Apr 2, 2021
7aafebd
Merge remote-tracking branch 'upstream/master' into reparition-opt
Dandandan Apr 2, 2021
86f8dda
Fmt
Dandandan Apr 2, 2021
691b354
Exclude empty exec from repartition optimizer
Dandandan Apr 2, 2021
e5558d5
Add method to add physical optimizer rule as well
Dandandan Apr 2, 2021
c8dd45e
Merge remote-tracking branch 'upstream/master' into reparition-opt
Dandandan Apr 3, 2021
fb2183b
Disable rule for concurrency of 1
Dandandan Apr 3, 2021
bf434a9
Change to Distribution::SinglePartition
Dandandan Apr 5, 2021
d9c7a2c
Use derive(Debug)
Dandandan Apr 5, 2021
1cad621
Revert debug implementation
Dandandan Apr 5, 2021
c6da67d
Merge remote-tracking branch 'upstream/master' into reparition-opt
Dandandan Apr 7, 2021
c674970
Merge branch 'reparition-opt' of github.com:Dandandan/arrow into repa…
Dandandan Apr 7, 2021
6bf6a72
Merge remote-tracking branch 'upstream/master' into reparition-opt
Dandandan Apr 7, 2021
40fc828
Merge remote-tracking branch 'upstream/master' into reparition-opt
Dandandan Apr 8, 2021
2a1e53f
Fix topkexec
Dandandan Apr 8, 2021
4bb4dc5
Remove print
Dandandan Apr 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ mod tests {
let exec = table.scan(&projection, 2, &[], None)?;
let stream = exec.execute(0).await?;

let count = stream
let _ = stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(11, batch.num_columns());
Expand All @@ -129,9 +129,6 @@ mod tests {
.fold(0, |acc, _| async move { acc + 1i32 })
.await;

// we should have seen 4 batches of 2 rows
assert_eq!(4, count);

// test metadata
assert_eq!(table.statistics().num_rows, Some(8));
assert_eq!(table.statistics().total_byte_size, Some(671));
Expand Down
70 changes: 26 additions & 44 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
information_schema::CatalogWithInformationSchema,
},
optimizer::hash_build_probe_order::HashBuildProbeOrder,
physical_optimizer::optimizer::PhysicalOptimizerRule,
};
use log::debug;
use std::fs;
Expand Down Expand Up @@ -56,6 +57,10 @@ use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::limit_push_down::LimitPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddMergeExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
Expand Down Expand Up @@ -605,6 +610,8 @@ pub struct ExecutionConfig {
pub batch_size: usize,
/// Responsible for optimizing a logical plan
optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
/// Responsible for optimizing a physical execution plan
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Default catalog name for table resolution
Expand Down Expand Up @@ -634,6 +641,11 @@ impl ExecutionConfig {
Arc::new(HashBuildProbeOrder::new()),
Arc::new(LimitPushDown::new()),
],
physical_optimizers: vec![
Arc::new(Repartition::new()),
Arc::new(CoalesceBatches::new()),
Arc::new(AddMergeExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
default_catalog: "datafusion".to_owned(),
default_schema: "public".to_owned(),
Expand Down Expand Up @@ -677,6 +689,15 @@ impl ExecutionConfig {
self
}

/// Adds a new [`PhysicalOptimizerRule`]
pub fn add_physical_optimizer_rule(
mut self,
optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
) -> Self {
self.physical_optimizers.push(optimizer_rule);
self
}

/// Selects a name for the default catalog and schema
pub fn with_default_catalog_and_schema(
mut self,
Expand Down Expand Up @@ -835,15 +856,6 @@ mod tests {
let partition_count = 4;
let results = execute("SELECT c1, c2 FROM test", partition_count).await?;

// there should be one batch per partition
assert_eq!(results.len(), partition_count);

// each batch should contain 2 columns and 10 rows with correct field names
for batch in &results {
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 10);
}

let expected = vec![
"+----+----+",
"| c1 | c2 |",
Expand Down Expand Up @@ -953,21 +965,14 @@ mod tests {
println!("{:?}", physical_plan);

let results = collect_partitioned(physical_plan).await?;
assert_eq!(results.len(), partition_count);

// there should be a total of 2 batches with 20 rows because the where clause filters
// out results from 2 partitions

// note that the order of partitions is not deterministic
let mut num_batches = 0;
let mut num_rows = 0;
for partition in &results {
for batch in partition {
num_batches += 1;
num_rows += batch.num_rows();
}
}
assert_eq!(2, num_batches);
assert_eq!(20, num_rows);

let results: Vec<RecordBatch> = results.into_iter().flatten().collect();
Expand Down Expand Up @@ -1039,9 +1044,7 @@ mod tests {
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());

let batches = collect(physical_plan).await?;
assert_eq!(4, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(10, batches[0].num_rows());
assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());

Ok(())
}
Expand Down Expand Up @@ -2017,27 +2020,15 @@ mod tests {
// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)?;
ctx.register_csv("part1", &format!("{}/part-1.csv", out_dir), csv_read_option)?;
ctx.register_csv("part2", &format!("{}/part-2.csv", out_dir), csv_read_option)?;
ctx.register_csv("part3", &format!("{}/part-3.csv", out_dir), csv_read_option)?;
ctx.register_csv("allparts", &out_dir, csv_read_option)?;

let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part1").await?;
let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part2").await?;
let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?;
let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;

let part0_count: usize = part0.iter().map(|batch| batch.num_rows()).sum();
let part1_count: usize = part1.iter().map(|batch| batch.num_rows()).sum();
let part2_count: usize = part2.iter().map(|batch| batch.num_rows()).sum();
let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum();
let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0_count, 10);
assert_eq!(part1_count, 10);
assert_eq!(part2_count, 10);
assert_eq!(part3_count, 10);
assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 40);

Ok(())
Expand All @@ -2064,21 +2055,12 @@ mod tests {
ctx.register_parquet("allparts", &out_dir)?;

let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part1").await?;
let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part2").await?;
let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?;
let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;

let part0_count: usize = part0.iter().map(|batch| batch.num_rows()).sum();
let part1_count: usize = part1.iter().map(|batch| batch.num_rows()).sum();
let part2_count: usize = part2.iter().map(|batch| batch.num_rows()).sum();
let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum();
let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0_count, 10);
assert_eq!(part1_count, 10);
assert_eq!(part2_count, 10);
assert_eq!(part3_count, 10);
assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 40);

Ok(())
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub mod error;
pub mod execution;
pub mod logical_plan;
pub mod optimizer;
pub mod physical_optimizer;
pub mod physical_plan;
pub mod prelude;
pub mod scalar;
Expand Down
1 change: 0 additions & 1 deletion rust/datafusion/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ pub mod hash_build_probe_order;
pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;

pub mod utils;
88 changes: 88 additions & 0 deletions rust/datafusion/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use super::optimizer::PhysicalOptimizerRule;
use crate::{
error::Result,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
},
};
use std::sync::Arc;

/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches
pub struct CoalesceBatches {}

impl CoalesceBatches {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::ExecutionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
// wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
// highly selective filters
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
Comment on lines +46 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize you are just moving code around so this comment is outside the context of this PR....

However, I wonder if it would be more performant to do the coalescing directly in the filter kernel code -- the way coalsce is written today requires copying the the (filtered) output into a different (coalesced) array

I think @ritchie46 had some code that allowed incrementally building up output in several chunks as part of polars which may be relevant

I think this code is good, but I wanted to plant a seed 🌱 for future optimizations

Copy link
Contributor Author

@Dandandan Dandandan Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be a useful direction indeed!

I think indeed it can be more efficient in some cases for nodes to write to mutable buffers than produce smaller batches and concatenate them afterwards, although currently it does not seem to me like it would be a enormous performance improvement based on what I saw in profiling info.

Probably not something in the scope of this PR indeed as it's already getting pretty big.

Some other notes:

  • In this PR I think I had to create the physical optimizer abstraction, as otherwise I felt the planner would become unmaintainable. The planning and optimization are now separated and not in the same pass like before (I was a bit confused actually about how it worked before!)
  • Currently I added the AddMergeExec as an optimization pass, as that was like that in the code before, however it feels a bit off as optimization pass? But I will probably keep it like that for now.


let plan_any = plan.as_any();
//TODO we should do this in a more generic way either by wrapping all operators
// or having an API so that operators can declare when their inputs or outputs
// need to be wrapped in a coalesce batches operator.
// See https://issues.apache.org/jira/browse/ARROW-11068
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();

//TODO we should also do this for HashAggregateExec but we need to update tests
// as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068
// || plan_any.downcast_ref::<HashAggregateExec>().is_some();

if plan.children().is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
let plan = plan.with_new_children(children)?;
Ok(if wrap_in_coalesce {
//TODO we should add specific configuration settings for coalescing batches and
// we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
// implemented. For now, we choose half the configured batch size to avoid copies
// when a small number of rows are removed from a batch
let target_batch_size = config.batch_size / 2;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
} else {
plan.clone()
})
}
}

fn name(&self) -> &str {
"coalesce_batches"
}
}
74 changes: 74 additions & 0 deletions rust/datafusion/src/physical_optimizer/merge_exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! AddMergeExec adds MergeExec to merge plans
//! with more partitions into one partition when the node
//! needs a single partition
use super::optimizer::PhysicalOptimizerRule;
use crate::{
error::Result,
physical_plan::{merge::MergeExec, Distribution},
};
use std::sync::Arc;

/// Introduces MergeExec
pub struct AddMergeExec {}

impl AddMergeExec {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for AddMergeExec {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::ExecutionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(MergeExec::new(child.clone()))
}
})
.collect(),
),
}
}
}

fn name(&self) -> &str {
"add_merge_exec"
}
}
24 changes: 24 additions & 0 deletions rust/datafusion/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module contains a query optimizer that operates against a physical plan and applies
//! rules to a physical plan, such as "Repartition".

pub mod coalesce_batches;
pub mod merge_exec;
pub mod optimizer;
pub mod repartition;
Loading