From 46f713345c36bc71c3a53baedca412e9c52807a6 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 11 Oct 2020 19:07:04 +0200 Subject: [PATCH 1/3] Fixed benchmarks --- .../datafusion/benches/aggregate_query_sql.rs | 14 ++++---- rust/datafusion/benches/math_query_sql.rs | 36 +++++++++++-------- .../benches/sort_limit_query_sql.rs | 17 +++++---- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/rust/datafusion/benches/aggregate_query_sql.rs b/rust/datafusion/benches/aggregate_query_sql.rs index 547bf9e5d3c..bbb692d329e 100644 --- a/rust/datafusion/benches/aggregate_query_sql.rs +++ b/rust/datafusion/benches/aggregate_query_sql.rs @@ -22,6 +22,7 @@ use criterion::Criterion; use rand::seq::SliceRandom; use rand::Rng; use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; extern crate arrow; extern crate datafusion; @@ -38,13 +39,12 @@ use datafusion::datasource::MemTable; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; -async fn query(ctx: Arc>, sql: &str) { +fn query(ctx: Arc>, sql: &str) { + let mut rt = Runtime::new().unwrap(); + // execute the query let df = ctx.lock().unwrap().sql(&sql).unwrap(); - let results = df.collect().await.unwrap(); - - // display the relation - for _batch in results {} + rt.block_on(df.collect()).unwrap(); } fn create_data(size: usize, null_density: f64) -> Vec> { @@ -116,8 +116,8 @@ fn create_context( } fn criterion_benchmark(c: &mut Criterion) { - let partitions_len = 4; - let array_len = 32768; // 2^15 + let partitions_len = 8; + let array_len = 32768 * 2; // 2^16 let batch_size = 2048; // 2^11 let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); diff --git a/rust/datafusion/benches/math_query_sql.rs b/rust/datafusion/benches/math_query_sql.rs index b7e08106ff6..65f613b6cdd 100644 --- a/rust/datafusion/benches/math_query_sql.rs +++ b/rust/datafusion/benches/math_query_sql.rs @@ -21,6 +21,8 @@ use criterion::Criterion; use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; + extern crate arrow; extern crate datafusion; @@ -34,13 +36,12 @@ use datafusion::error::Result; use datafusion::datasource::MemTable; use datafusion::execution::context::ExecutionContext; -async fn query(ctx: Arc>, sql: &str) { +fn query(ctx: Arc>, sql: &str) { + let mut rt = Runtime::new().unwrap(); + // execute the query let df = ctx.lock().unwrap().sql(&sql).unwrap(); - let results = df.collect().await.unwrap(); - - // display the relation - for _batch in results {} + rt.block_on(df.collect()).unwrap(); } fn create_context( @@ -77,24 +78,31 @@ fn create_context( } fn criterion_benchmark(c: &mut Criterion) { + let array_len = 1048576; // 2^20 + let batch_size = 512; // 2^9 + let ctx = create_context(array_len, batch_size).unwrap(); + c.bench_function("sqrt_20_9", |b| { + b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) + }); + + let array_len = 1048576; // 2^20 + let batch_size = 4096; // 2^12 + let ctx = create_context(array_len, batch_size).unwrap(); c.bench_function("sqrt_20_12", |b| { - let array_len = 1048576; // 2^20 - let batch_size = 4096; // 2^12 - let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) }); + let array_len = 4194304; // 2^22 + let batch_size = 4096; // 2^12 + let ctx = create_context(array_len, batch_size).unwrap(); c.bench_function("sqrt_22_12", |b| { - let array_len = 4194304; // 2^22 - let batch_size = 4096; // 2^12 - let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) }); + let array_len = 4194304; // 2^22 + let batch_size = 16384; // 2^14 + let ctx = create_context(array_len, batch_size).unwrap(); c.bench_function("sqrt_22_14", |b| { - let array_len = 4194304; // 2^22 - let batch_size = 16384; // 2^14 - let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) }); } diff --git a/rust/datafusion/benches/sort_limit_query_sql.rs b/rust/datafusion/benches/sort_limit_query_sql.rs index 1b2f1621c67..02440046b99 100644 --- a/rust/datafusion/benches/sort_limit_query_sql.rs +++ b/rust/datafusion/benches/sort_limit_query_sql.rs @@ -32,13 +32,12 @@ use datafusion::execution::context::ExecutionContext; use tokio::runtime::Runtime; -async fn run_query(ctx: Arc>, sql: &str) { +fn query(ctx: Arc>, sql: &str) { + let mut rt = Runtime::new().unwrap(); + // execute the query let df = ctx.lock().unwrap().sql(&sql).unwrap(); - let results = df.collect().await.unwrap(); - - // display the relation - for _batch in results {} + rt.block_on(df.collect()).unwrap(); } fn create_context() -> Arc> { @@ -90,7 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("sort_and_limit_by_int", |b| { let ctx = create_context(); b.iter(|| { - run_query( + query( ctx.clone(), "SELECT c1, c13, c6, c10 \ FROM aggregate_test_100 \ @@ -103,7 +102,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("sort_and_limit_by_float", |b| { let ctx = create_context(); b.iter(|| { - run_query( + query( ctx.clone(), "SELECT c1, c13, c12 \ FROM aggregate_test_100 \ @@ -116,7 +115,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("sort_and_limit_lex_by_int", |b| { let ctx = create_context(); b.iter(|| { - run_query( + query( ctx.clone(), "SELECT c1, c13, c6, c10 \ FROM aggregate_test_100 \ @@ -129,7 +128,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("sort_and_limit_lex_by_string", |b| { let ctx = create_context(); b.iter(|| { - run_query( + query( ctx.clone(), "SELECT c1, c13, c6, c10 \ FROM aggregate_test_100 \ From d6a68c1c946fce1c6ed5900beaecb1d7b8246f13 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 13 Oct 2020 05:25:13 +0200 Subject: [PATCH 2/3] Simplified code. --- rust/datafusion/src/physical_plan/merge.rs | 38 ++++++++-------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/rust/datafusion/src/physical_plan/merge.rs b/rust/datafusion/src/physical_plan/merge.rs index 02243bc7cc6..5376e160a7d 100644 --- a/rust/datafusion/src/physical_plan/merge.rs +++ b/rust/datafusion/src/physical_plan/merge.rs @@ -32,7 +32,7 @@ use arrow::record_batch::RecordBatch; use super::SendableRecordBatchReader; use async_trait::async_trait; -use tokio::task::{self, JoinHandle}; +use tokio; /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. @@ -108,35 +108,23 @@ impl ExecutionPlan for MergeExec { self.input.execute(0).await } _ => { - let partitions_per_thread = (input_partitions / self.concurrency).max(1); - let range: Vec = (0..input_partitions).collect(); - let chunks = range.chunks(partitions_per_thread); - - let mut tasks = vec![]; - for chunk in chunks { - let chunk = chunk.to_vec(); - let input = self.input.clone(); - let task: JoinHandle>>> = - task::spawn(async move { - let mut batches: Vec> = vec![]; - for partition in chunk { - let it = input.execute(partition).await?; - common::collect(it).iter().for_each(|b| { - b.iter() - .for_each(|b| batches.push(Arc::new(b.clone()))) - }); - } - Ok(batches) - }); - tasks.push(task); - } + let tasks = (0..input_partitions) + .map(|part_i| { + let input = self.input.clone(); + tokio::spawn(async move { + let it = input.execute(part_i).await?; + common::collect(it) + }) + }) + // this collect *is needed* so that the join below can + // switch between tasks + .collect::>(); - // combine the results from each thread let mut combined_results: Vec> = vec![]; for task in tasks { let result = task.await.unwrap()?; for batch in &result { - combined_results.push(batch.clone()); + combined_results.push(Arc::new(batch.clone())); } } From b89ff96a2c5fd1b9a31a7c3953deb78078260d4b Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 13 Oct 2020 06:04:38 +0200 Subject: [PATCH 3/3] Removed un-used parameter. --- rust/datafusion/src/execution/context.rs | 2 +- .../src/physical_plan/hash_aggregate.rs | 2 +- rust/datafusion/src/physical_plan/limit.rs | 3 +-- rust/datafusion/src/physical_plan/merge.rs | 16 ++++------------ rust/datafusion/src/physical_plan/planner.rs | 5 +---- rust/datafusion/src/physical_plan/sort.rs | 2 +- 6 files changed, 9 insertions(+), 21 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index eabc779e49d..5d46d2e54de 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -332,7 +332,7 @@ impl ExecutionContext { } _ => { // merge into a single partition - let plan = MergeExec::new(plan.clone(), self.state.config.concurrency); + let plan = MergeExec::new(plan.clone()); // MergeExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); common::collect(plan.execute(0).await?) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 5f4fe9876b7..80e2ec3cba3 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -820,7 +820,7 @@ mod tests { .unwrap(); assert_eq!(*sums, Float64Array::from(vec![2.0, 7.0, 11.0])); - let merge = Arc::new(MergeExec::new(partial_aggregate, 2)); + let merge = Arc::new(MergeExec::new(partial_aggregate)); let final_group: Vec> = (0..groups.len()).map(|i| col(&groups[i].1)).collect(); diff --git a/rust/datafusion/src/physical_plan/limit.rs b/rust/datafusion/src/physical_plan/limit.rs index 8c0e563b031..753cbf7bdbf 100644 --- a/rust/datafusion/src/physical_plan/limit.rs +++ b/rust/datafusion/src/physical_plan/limit.rs @@ -243,8 +243,7 @@ mod tests { // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let limit = - GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv), 2)), 7, 2); + let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7, 2); // the result should contain 4 batches (one per input partition) let iter = limit.execute(0).await?; diff --git a/rust/datafusion/src/physical_plan/merge.rs b/rust/datafusion/src/physical_plan/merge.rs index 5376e160a7d..7ce737c9910 100644 --- a/rust/datafusion/src/physical_plan/merge.rs +++ b/rust/datafusion/src/physical_plan/merge.rs @@ -40,17 +40,12 @@ use tokio; pub struct MergeExec { /// Input execution plan input: Arc, - /// Maximum number of concurrent threads - concurrency: usize, } impl MergeExec { /// Create a new MergeExec - pub fn new(input: Arc, max_concurrency: usize) -> Self { - MergeExec { - input, - concurrency: max_concurrency, - } + pub fn new(input: Arc) -> Self { + MergeExec { input } } } @@ -79,10 +74,7 @@ impl ExecutionPlan for MergeExec { children: Vec>, ) -> Result> { match children.len() { - 1 => Ok(Arc::new(MergeExec::new( - children[0].clone(), - self.concurrency, - ))), + 1 => Ok(Arc::new(MergeExec::new(children[0].clone()))), _ => Err(ExecutionError::General( "MergeExec wrong number of children".to_string(), )), @@ -159,7 +151,7 @@ mod tests { // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let merge = MergeExec::new(Arc::new(csv), 2); + let merge = MergeExec::new(Arc::new(csv)); // output of MergeExec should have a single partition assert_eq!(merge.output_partitioning().partition_count(), 1); diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index bdaf79c7b2c..c4ae2dc6853 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -117,10 +117,7 @@ impl DefaultPhysicalPlanner { if child.output_partitioning().partition_count() == 1 { child.clone() } else { - Arc::new(MergeExec::new( - child.clone(), - ctx_state.config.concurrency, - )) + Arc::new(MergeExec::new(child.clone())) } }) .collect(), diff --git a/rust/datafusion/src/physical_plan/sort.rs b/rust/datafusion/src/physical_plan/sort.rs index 3ddfa183117..7c00cc5cb50 100644 --- a/rust/datafusion/src/physical_plan/sort.rs +++ b/rust/datafusion/src/physical_plan/sort.rs @@ -208,7 +208,7 @@ mod tests { options: SortOptions::default(), }, ], - Arc::new(MergeExec::new(Arc::new(csv), 2)), + Arc::new(MergeExec::new(Arc::new(csv))), 2, )?);