Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,8 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

Expand Down Expand Up @@ -846,10 +846,8 @@ mod tests {

let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

Expand Down
55 changes: 55 additions & 0 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ impl ExecutionPlan for ProjectionExec {
)?))
}

fn benefits_from_input_partitioning(&self) -> bool {
let all_column_expr = self
.expr
.iter()
.all(|(e, _)| e.as_any().downcast_ref::<Column>().is_some());
// If expressions are all column_expr, then all computations in this projection are reorder or rename,
// and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
!all_column_expr
}

fn execute(
&self,
partition: usize,
Expand Down Expand Up @@ -384,8 +394,21 @@ mod tests {
use crate::scalar::ScalarValue;
use crate::test::{self};
use crate::test_util;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
use futures::future;

// Create a binary expression without coercion. Used here when we do not want to coerce the expressions
// to valid types. Usage can result in an execution (after plan) error.
fn binary_simple(
l: Arc<dyn PhysicalExpr>,
op: Operator,
r: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Arc<dyn PhysicalExpr> {
binary(l, op, r, input_schema).unwrap()
}

#[tokio::test]
async fn project_first_column() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -425,6 +448,38 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn project_input_not_partitioning() -> Result<()> {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;

// pick column c1 and name it column c1 in the output schema
let projection =
ProjectionExec::try_new(vec![(col("c1", &schema)?, "c1".to_string())], csv)?;
assert!(!projection.benefits_from_input_partitioning());
Ok(())
}

#[tokio::test]
async fn project_input_partitioning() -> Result<()> {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;

let c1 = col("c2", &schema).unwrap();
let c2 = col("c9", &schema).unwrap();
let c1_plus_c2 = binary_simple(c1, Operator::Plus, c2, &schema);

let projection =
ProjectionExec::try_new(vec![(c1_plus_c2, "c2 + c9".to_string())], csv)?;

assert!(projection.benefits_from_input_partitioning());
Ok(())
}

#[tokio::test]
async fn project_no_column() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,13 @@ async fn test_physical_plan_display_indent_multi_children() {
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
" ProjectionExec: expr=[c1@0 as c1]",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1]",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000",
" ProjectionExec: expr=[c1@0 as c2]",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c2]",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
];

Expand Down
75 changes: 33 additions & 42 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1991,21 +1991,19 @@ async fn left_semi_join() -> Result<()> {
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
" ProjectionExec: expr=[t2_id@0 as t2_id]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" ProjectionExec: expr=[t2_id@0 as t2_id]",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
vec![
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
"SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
Expand Down Expand Up @@ -2078,14 +2076,12 @@ async fn left_semi_join() -> Result<()> {
]
} else {
vec![
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
"SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
Expand Down Expand Up @@ -2275,14 +2271,12 @@ async fn right_semi_join() -> Result<()> {
]
} else {
vec![
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
"SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
Expand Down Expand Up @@ -2323,14 +2317,12 @@ async fn right_semi_join() -> Result<()> {
]
} else {
vec![
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
" SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
"SortExec: [t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
Expand Down Expand Up @@ -2658,14 +2650,13 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
Expand Down
Loading