Skip to content

Commit 7209ffc

Browse files
jorgecarleitaokszucs
authored andcommitted
ARROW-10292: [Rust] [DataFusion] Simplify merge
Currently, `mergeExec` uses `tokio::spawn` to parallelize the work, by calling `tokio::spawn` once per logical thread. However, `tokio::spawn` returns a task / future, which `tokio` runtime will then schedule on its thread pool. Therefore, there is no need to limit the number of tasks to the number of logical threads, as tokio's runtime itself is responsible for that work. In particular, since we are using [`rt-threaded`](https://docs.rs/tokio/0.2.22/tokio/runtime/index.html#threaded-scheduler), tokio already declares a thread pool from the number of logical threads available. This PR removes the coupling, in `mergeExec`, between the number of logical threads (`max_concurrency`) and the number of created tasks. I observe no change in performance: <details> <summary>Benchmark results</summary> ``` Switched to branch 'simplify_merge' Your branch is up to date with 'origin/simplify_merge'. Compiling datafusion v2.0.0-SNAPSHOT (/Users/jorgecarleitao/projects/arrow/rust/datafusion) Finished bench [optimized] target(s) in 38.02s Running /Users/jorgecarleitao/projects/arrow/rust/target/release/deps/aggregate_query_sql-5241a705a1ff29ae Gnuplot not found, using plotters backend aggregate_query_no_group_by 15 12 time: [715.17 us 722.60 us 730.19 us] change: [-8.3167% -5.2253% -2.2675%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 1 (1.00%) high mild 2 (2.00%) high severe aggregate_query_group_by 15 12 time: [5.6538 ms 5.6695 ms 5.6892 ms] change: [+0.1012% +0.5308% +0.9913%] (p = 0.02 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 4 (4.00%) high mild 6 (6.00%) high severe aggregate_query_group_by_with_filter 15 12 time: [2.6598 ms 2.6665 ms 2.6751 ms] change: [-0.5532% -0.1446% +0.2679%] (p = 0.51 > 0.05) No change in performance detected. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe ``` </details> Closes #8453 from jorgecarleitao/simplify_merge Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
1 parent 34533b6 commit 7209ffc

File tree

6 files changed

+22
-46
lines changed

6 files changed

+22
-46
lines changed

rust/datafusion/src/execution/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ impl ExecutionContext {
332332
}
333333
_ => {
334334
// merge into a single partition
335-
let plan = MergeExec::new(plan.clone(), self.state.config.concurrency);
335+
let plan = MergeExec::new(plan.clone());
336336
// MergeExec must produce a single partition
337337
assert_eq!(1, plan.output_partitioning().partition_count());
338338
common::collect(plan.execute(0).await?)

rust/datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,7 @@ mod tests {
810810
.unwrap();
811811
assert_eq!(*sums, Float64Array::from(vec![2.0, 7.0, 11.0]));
812812

813-
let merge = Arc::new(MergeExec::new(partial_aggregate, 2));
813+
let merge = Arc::new(MergeExec::new(partial_aggregate));
814814

815815
let final_group: Vec<Arc<dyn PhysicalExpr>> =
816816
(0..groups.len()).map(|i| col(&groups[i].1)).collect();

rust/datafusion/src/physical_plan/limit.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,7 @@ mod tests {
243243
// input should have 4 partitions
244244
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
245245

246-
let limit =
247-
GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv), 2)), 7, 2);
246+
let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7, 2);
248247

249248
// the result should contain 4 batches (one per input partition)
250249
let iter = limit.execute(0).await?;

rust/datafusion/src/physical_plan/merge.rs

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,20 @@ use arrow::record_batch::RecordBatch;
3232
use super::SendableRecordBatchReader;
3333

3434
use async_trait::async_trait;
35-
use tokio::task::{self, JoinHandle};
35+
use tokio;
3636

3737
/// Merge execution plan executes partitions in parallel and combines them into a single
3838
/// partition. No guarantees are made about the order of the resulting partition.
3939
#[derive(Debug)]
4040
pub struct MergeExec {
4141
/// Input execution plan
4242
input: Arc<dyn ExecutionPlan>,
43-
/// Maximum number of concurrent threads
44-
concurrency: usize,
4543
}
4644

4745
impl MergeExec {
4846
/// Create a new MergeExec
49-
pub fn new(input: Arc<dyn ExecutionPlan>, max_concurrency: usize) -> Self {
50-
MergeExec {
51-
input,
52-
concurrency: max_concurrency,
53-
}
47+
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
48+
MergeExec { input }
5449
}
5550
}
5651

@@ -79,10 +74,7 @@ impl ExecutionPlan for MergeExec {
7974
children: Vec<Arc<dyn ExecutionPlan>>,
8075
) -> Result<Arc<dyn ExecutionPlan>> {
8176
match children.len() {
82-
1 => Ok(Arc::new(MergeExec::new(
83-
children[0].clone(),
84-
self.concurrency,
85-
))),
77+
1 => Ok(Arc::new(MergeExec::new(children[0].clone()))),
8678
_ => Err(ExecutionError::General(
8779
"MergeExec wrong number of children".to_string(),
8880
)),
@@ -108,35 +100,23 @@ impl ExecutionPlan for MergeExec {
108100
self.input.execute(0).await
109101
}
110102
_ => {
111-
let partitions_per_thread = (input_partitions / self.concurrency).max(1);
112-
let range: Vec<usize> = (0..input_partitions).collect();
113-
let chunks = range.chunks(partitions_per_thread);
114-
115-
let mut tasks = vec![];
116-
for chunk in chunks {
117-
let chunk = chunk.to_vec();
118-
let input = self.input.clone();
119-
let task: JoinHandle<Result<Vec<Arc<RecordBatch>>>> =
120-
task::spawn(async move {
121-
let mut batches: Vec<Arc<RecordBatch>> = vec![];
122-
for partition in chunk {
123-
let it = input.execute(partition).await?;
124-
common::collect(it).iter().for_each(|b| {
125-
b.iter()
126-
.for_each(|b| batches.push(Arc::new(b.clone())))
127-
});
128-
}
129-
Ok(batches)
130-
});
131-
tasks.push(task);
132-
}
103+
let tasks = (0..input_partitions)
104+
.map(|part_i| {
105+
let input = self.input.clone();
106+
tokio::spawn(async move {
107+
let it = input.execute(part_i).await?;
108+
common::collect(it)
109+
})
110+
})
111+
// this collect *is needed* so that the join below can
112+
// switch between tasks
113+
.collect::<Vec<_>>();
133114

134-
// combine the results from each thread
135115
let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
136116
for task in tasks {
137117
let result = task.await.unwrap()?;
138118
for batch in &result {
139-
combined_results.push(batch.clone());
119+
combined_results.push(Arc::new(batch.clone()));
140120
}
141121
}
142122

@@ -171,7 +151,7 @@ mod tests {
171151
// input should have 4 partitions
172152
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
173153

174-
let merge = MergeExec::new(Arc::new(csv), 2);
154+
let merge = MergeExec::new(Arc::new(csv));
175155

176156
// output of MergeExec should have a single partition
177157
assert_eq!(merge.output_partitioning().partition_count(), 1);

rust/datafusion/src/physical_plan/planner.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,7 @@ impl DefaultPhysicalPlanner {
117117
if child.output_partitioning().partition_count() == 1 {
118118
child.clone()
119119
} else {
120-
Arc::new(MergeExec::new(
121-
child.clone(),
122-
ctx_state.config.concurrency,
123-
))
120+
Arc::new(MergeExec::new(child.clone()))
124121
}
125122
})
126123
.collect(),

rust/datafusion/src/physical_plan/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ mod tests {
208208
options: SortOptions::default(),
209209
},
210210
],
211-
Arc::new(MergeExec::new(Arc::new(csv), 2)),
211+
Arc::new(MergeExec::new(Arc::new(csv))),
212212
2,
213213
)?);
214214

0 commit comments

Comments
 (0)