Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions rust/datafusion/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use criterion::Criterion;
use rand::seq::SliceRandom;
use rand::Rng;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;

extern crate arrow;
extern crate datafusion;
Expand All @@ -38,13 +39,12 @@ use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;

async fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let mut rt = Runtime::new().unwrap();

// execute the query
let df = ctx.lock().unwrap().sql(&sql).unwrap();
let results = df.collect().await.unwrap();

// display the relation
for _batch in results {}
rt.block_on(df.collect()).unwrap();
}

fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
Expand Down Expand Up @@ -116,8 +116,8 @@ fn create_context(
}

fn criterion_benchmark(c: &mut Criterion) {
let partitions_len = 4;
let array_len = 32768; // 2^15
let partitions_len = 8;
let array_len = 32768 * 2; // 2^16
let batch_size = 2048; // 2^11
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();

Expand Down
36 changes: 22 additions & 14 deletions rust/datafusion/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use criterion::Criterion;

use std::sync::{Arc, Mutex};

use tokio::runtime::Runtime;

extern crate arrow;
extern crate datafusion;

Expand All @@ -34,13 +36,12 @@ use datafusion::error::Result;
use datafusion::datasource::MemTable;
use datafusion::execution::context::ExecutionContext;

async fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let mut rt = Runtime::new().unwrap();

// execute the query
let df = ctx.lock().unwrap().sql(&sql).unwrap();
let results = df.collect().await.unwrap();

// display the relation
for _batch in results {}
rt.block_on(df.collect()).unwrap();
}

fn create_context(
Expand Down Expand Up @@ -77,24 +78,31 @@ fn create_context(
}

fn criterion_benchmark(c: &mut Criterion) {
let array_len = 1048576; // 2^20
let batch_size = 512; // 2^9
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_20_9", |b| {
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
});

let array_len = 1048576; // 2^20
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_20_12", |b| {
let array_len = 1048576; // 2^20
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
});

let array_len = 4194304; // 2^22
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_22_12", |b| {
let array_len = 4194304; // 2^22
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
});

let array_len = 4194304; // 2^22
let batch_size = 16384; // 2^14
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_22_14", |b| {
let array_len = 4194304; // 2^22
let batch_size = 16384; // 2^14
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
});
}
Expand Down
17 changes: 8 additions & 9 deletions rust/datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ use datafusion::execution::context::ExecutionContext;

use tokio::runtime::Runtime;

async fn run_query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let mut rt = Runtime::new().unwrap();

// execute the query
let df = ctx.lock().unwrap().sql(&sql).unwrap();
let results = df.collect().await.unwrap();

// display the relation
for _batch in results {}
rt.block_on(df.collect()).unwrap();
}

fn create_context() -> Arc<Mutex<ExecutionContext>> {
Expand Down Expand Up @@ -90,7 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("sort_and_limit_by_int", |b| {
let ctx = create_context();
b.iter(|| {
run_query(
query(
ctx.clone(),
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
Expand All @@ -103,7 +102,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("sort_and_limit_by_float", |b| {
let ctx = create_context();
b.iter(|| {
run_query(
query(
ctx.clone(),
"SELECT c1, c13, c12 \
FROM aggregate_test_100 \
Expand All @@ -116,7 +115,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("sort_and_limit_lex_by_int", |b| {
let ctx = create_context();
b.iter(|| {
run_query(
query(
ctx.clone(),
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
Expand All @@ -129,7 +128,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("sort_and_limit_lex_by_string", |b| {
let ctx = create_context();
b.iter(|| {
run_query(
query(
ctx.clone(),
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl ExecutionContext {
}
_ => {
// merge into a single partition
let plan = MergeExec::new(plan.clone(), self.state.config.concurrency);
let plan = MergeExec::new(plan.clone());
// MergeExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?)
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ mod tests {
.unwrap();
assert_eq!(*sums, Float64Array::from(vec![2.0, 7.0, 11.0]));

let merge = Arc::new(MergeExec::new(partial_aggregate, 2));
let merge = Arc::new(MergeExec::new(partial_aggregate));

let final_group: Vec<Arc<dyn PhysicalExpr>> =
(0..groups.len()).map(|i| col(&groups[i].1)).collect();
Expand Down
3 changes: 1 addition & 2 deletions rust/datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit =
GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv), 2)), 7, 2);
let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7, 2);

// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0).await?;
Expand Down
54 changes: 17 additions & 37 deletions rust/datafusion/src/physical_plan/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,20 @@ use arrow::record_batch::RecordBatch;
use super::SendableRecordBatchReader;

use async_trait::async_trait;
use tokio::task::{self, JoinHandle};
use tokio;

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
#[derive(Debug)]
pub struct MergeExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Maximum number of concurrent threads
concurrency: usize,
}

impl MergeExec {
/// Create a new MergeExec
pub fn new(input: Arc<dyn ExecutionPlan>, max_concurrency: usize) -> Self {
MergeExec {
input,
concurrency: max_concurrency,
}
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
MergeExec { input }
}
}

Expand Down Expand Up @@ -79,10 +74,7 @@ impl ExecutionPlan for MergeExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(MergeExec::new(
children[0].clone(),
self.concurrency,
))),
1 => Ok(Arc::new(MergeExec::new(children[0].clone()))),
_ => Err(ExecutionError::General(
"MergeExec wrong number of children".to_string(),
)),
Expand All @@ -108,35 +100,23 @@ impl ExecutionPlan for MergeExec {
self.input.execute(0).await
}
_ => {
let partitions_per_thread = (input_partitions / self.concurrency).max(1);
let range: Vec<usize> = (0..input_partitions).collect();
let chunks = range.chunks(partitions_per_thread);

let mut tasks = vec![];
for chunk in chunks {
let chunk = chunk.to_vec();
let input = self.input.clone();
let task: JoinHandle<Result<Vec<Arc<RecordBatch>>>> =
task::spawn(async move {
let mut batches: Vec<Arc<RecordBatch>> = vec![];
for partition in chunk {
let it = input.execute(partition).await?;
common::collect(it).iter().for_each(|b| {
b.iter()
.for_each(|b| batches.push(Arc::new(b.clone())))
});
}
Ok(batches)
});
tasks.push(task);
}
let tasks = (0..input_partitions)
.map(|part_i| {
let input = self.input.clone();
tokio::spawn(async move {
let it = input.execute(part_i).await?;
common::collect(it)
})
})
// this collect *is needed* so that the join below can
// switch between tasks
.collect::<Vec<_>>();

// combine the results from each thread
let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
for task in tasks {
let result = task.await.unwrap()?;
for batch in &result {
combined_results.push(batch.clone());
combined_results.push(Arc::new(batch.clone()));
}
}

Expand Down Expand Up @@ -171,7 +151,7 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let merge = MergeExec::new(Arc::new(csv), 2);
let merge = MergeExec::new(Arc::new(csv));

// output of MergeExec should have a single partition
assert_eq!(merge.output_partitioning().partition_count(), 1);
Expand Down
5 changes: 1 addition & 4 deletions rust/datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ impl DefaultPhysicalPlanner {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(MergeExec::new(
child.clone(),
ctx_state.config.concurrency,
))
Arc::new(MergeExec::new(child.clone()))
}
})
.collect(),
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod tests {
options: SortOptions::default(),
},
],
Arc::new(MergeExec::new(Arc::new(csv), 2)),
Arc::new(MergeExec::new(Arc::new(csv))),
2,
)?);

Expand Down