diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs index 7ddc18c07abd0..589f967a6df09 100644 --- a/benchmarks/src/bin/parquet.rs +++ b/benchmarks/src/bin/parquet.rs @@ -325,7 +325,7 @@ async fn exec_sort( ) -> Result<(usize, std::time::Duration)> { let start = Instant::now(); let scan = test_file.create_scan(None).await?; - let exec = Arc::new(SortExec::try_new(expr.to_owned(), scan, None)?); + let exec = Arc::new(SortExec::new(expr.to_owned(), scan)); let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; let elapsed = start.elapsed(); diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 907392bc3e341..4045702d6308e 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -189,7 +189,7 @@ impl BenchCase { let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); let exec = - SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None); + SortExec::new(sort.clone(), Arc::new(exec)).with_preserve_partitioning(true); let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); Self { @@ -211,7 +211,7 @@ impl BenchCase { let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); - let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); + let plan = Arc::new(SortExec::new(sort, exec)); Self { runtime, @@ -231,7 +231,7 @@ impl BenchCase { let sort = make_sort_exprs(schema.as_ref()); let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); - let exec = SortExec::new_with_partitioning(sort, Arc::new(exec), true, None); + let exec = SortExec::new(sort, Arc::new(exec)).with_preserve_partitioning(true); let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); Self { diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index e29735e741f50..9466297d24d00 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -60,12 +60,12 @@ impl PhysicalOptimizerRule for GlobalSortSelection { && !sort_exec.preserve_partitioning() && (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts) { - let sort = SortExec::new_with_partitioning( + let sort = SortExec::new( sort_exec.expr().to_vec(), - sort_exec.input().clone(), - true, - sort_exec.fetch(), - ); + sort_exec.input().clone() + ) + .with_fetch(sort_exec.fetch()) + .with_preserve_partitioning(true); let global_sort: Arc = Arc::new(SortPreservingMergeExec::new( sort_exec.expr().to_vec(), diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 6c2d5c93482c5..63fd2eb75529b 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -462,12 +462,9 @@ mod tests { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - Arc::new(SortExec::new_with_partitioning( - sort_exprs, - input, - preserve_partitioning, - None, - )) + let new_sort = SortExec::new(sort_exprs, input) + .with_preserve_partitioning(preserve_partitioning); + Arc::new(new_sort) } fn projection_exec(input: Arc) -> Arc { diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 4817c01e5c413..ebede0c98a3d7 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -1210,12 +1210,10 @@ mod tests { sort_expr("non_nullable_col", &schema), ]; let repartition_exec = repartition_exec(spm); - let sort2 = Arc::new(SortExec::new_with_partitioning( - sort_exprs.clone(), - repartition_exec, - true, - None, - )) as _; + let sort2 = Arc::new( + SortExec::new(sort_exprs.clone(), repartition_exec) + .with_preserve_partitioning(true), + ) as _; let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); let physical_plan = aggregate_exec(spm2); @@ -1253,12 +1251,9 @@ mod tests { let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; // let sort = sort_exec(sort_exprs.clone(), union); - let sort = Arc::new(SortExec::new_with_partitioning( - sort_exprs.clone(), - union, - true, - None, - )) as _; + let sort = Arc::new( + SortExec::new(sort_exprs.clone(), union).with_preserve_partitioning(true), + ) as _; let spm = sort_preserving_merge_exec(sort_exprs, sort); let filter = filter_exec( @@ -2264,12 +2259,8 @@ mod tests { let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec); let repartition = repartition_exec(window); - let orig_plan = Arc::new(SortExec::new_with_partitioning( - sort_exprs, - repartition, - false, - None, - )) as Arc; + let orig_plan = + Arc::new(SortExec::new(sort_exprs, repartition)) as Arc; let mut plan = orig_plan.clone(); let rules = vec![ @@ -2305,12 +2296,10 @@ mod tests { let repartition = repartition_exec(coalesce_partitions); let sort_exprs = vec![sort_expr("nullable_col", &schema)]; // Add local sort - let sort = Arc::new(SortExec::new_with_partitioning( - sort_exprs.clone(), - repartition, - true, - None, - )) as _; + let sort = Arc::new( + SortExec::new(sort_exprs.clone(), repartition) + .with_preserve_partitioning(true), + ) as _; let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort); let sort = sort_exec(sort_exprs, spm); @@ -2362,7 +2351,7 @@ mod tests { input: Arc, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap()) + Arc::new(SortExec::new(sort_exprs, input)) } fn sort_preserving_merge_exec( diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 2fa833bb7e9e0..06bef0fbda53e 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -66,10 +66,12 @@ pub fn add_sort_above( if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || { node.equivalence_properties() }) { + let new_sort = SortExec::new(sort_expr, node.clone()); + *node = Arc::new(if node.output_partitioning().partition_count() > 1 { - SortExec::new_with_partitioning(sort_expr, node.clone(), true, None) + new_sort.with_preserve_partitioning(true) } else { - SortExec::try_new(sort_expr, node.clone(), None)? + new_sort }) as _ } Ok(()) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 7fb67d758f3da..42cd8fada96d6 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -485,7 +485,7 @@ mod tests { options: SortOptions::default(), }]; let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _; - let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?) + let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec)) as Arc; let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _; // memory_exec2 doesn't have output ordering diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 8c3f61b01ec7a..01efa2f7964cf 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -821,7 +821,9 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)) + let new_sort = SortExec::new(sort_expr, physical_input) + .with_fetch(*fetch); + Ok(Arc::new(new_sort)) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c3fc06206ca15..62271c587025d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -619,7 +619,10 @@ fn read_spill(sender: Sender>, path: &Path) -> Result<()> { Ok(()) } -/// External Sort execution plan +/// Sort execution plan. +/// +/// This operator supports sorting datasets that are larger than the +/// memory allotted by the memory manager, by spilling to disk. #[derive(Debug)] pub struct SortExec { /// Input schema @@ -628,7 +631,8 @@ pub struct SortExec { expr: Vec, /// Containing all metrics set created during sort metrics_set: CompositeMetricsSet, - /// Preserve partitions of input plan + /// Preserve partitions of input plan. If false, the input partitions + /// will be sorted and merged into a single output partition. preserve_partitioning: bool, /// Fetch highest/lowest n results fetch: Option, @@ -636,34 +640,65 @@ pub struct SortExec { impl SortExec { /// Create a new sort execution plan + #[deprecated(since = "22.0.0", note = "use `new` and `with_fetch`")] pub fn try_new( expr: Vec, input: Arc, fetch: Option, ) -> Result { - Ok(Self::new_with_partitioning(expr, input, false, fetch)) + Ok(Self::new(expr, input).with_fetch(fetch)) } - /// Whether this `SortExec` preserves partitioning of the children - pub fn preserve_partitioning(&self) -> bool { - self.preserve_partitioning + /// Create a new sort execution plan that produces a single, + /// sorted output partition. + pub fn new(expr: Vec, input: Arc) -> Self { + Self { + expr, + input, + metrics_set: CompositeMetricsSet::new(), + preserve_partitioning: false, + fetch: None, + } } /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan + #[deprecated( + since = "22.0.0", + note = "use `new`, `with_fetch` and `with_preserve_partioning` instead" + )] pub fn new_with_partitioning( expr: Vec, input: Arc, preserve_partitioning: bool, fetch: Option, ) -> Self { - Self { - expr, - input, - metrics_set: CompositeMetricsSet::new(), - preserve_partitioning, - fetch, - } + Self::new(expr, input) + .with_fetch(fetch) + .with_preserve_partitioning(preserve_partitioning) + } + + /// Whether this `SortExec` preserves partitioning of the children + pub fn preserve_partitioning(&self) -> bool { + self.preserve_partitioning + } + + /// Specify the partitioning behavior of this sort exec + /// + /// If `preserve_partitioning` is true, sorts each partition + /// individually, producing one sorted strema for each input partition. + /// + /// If `preserve_partitioning` is false, sorts and merges all + /// input partitions producing a single, sorted partition. + pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { + self.preserve_partitioning = preserve_partitioning; + self + } + + /// Whether this `SortExec` preserves partitioning of the children + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self } /// Input schema @@ -702,7 +737,7 @@ impl ExecutionPlan for SortExec { /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but it its input(s) are - /// infinite, returns an error to indicate this. + /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { Err(DataFusionError::Plan( @@ -743,12 +778,11 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(SortExec::new_with_partitioning( - self.expr.clone(), - children[0].clone(), - self.preserve_partitioning, - self.fetch, - ))) + let new_sort = SortExec::new(self.expr.clone(), children[0].clone()) + .with_fetch(self.fetch) + .with_preserve_partitioning(self.preserve_partitioning); + + Ok(Arc::new(new_sort)) } fn execute( @@ -933,7 +967,7 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ // c1 string column PhysicalSortExpr { @@ -952,8 +986,7 @@ mod tests { }, ], Arc::new(CoalescePartitionsExec::new(csv)), - None, - )?); + )); let result = collect(sort_exec, task_ctx).await?; @@ -993,7 +1026,7 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ // c1 string column PhysicalSortExpr { @@ -1012,8 +1045,7 @@ mod tests { }, ], Arc::new(CoalescePartitionsExec::new(csv)), - None, - )?); + )); let task_ctx = session_ctx.task_ctx(); let result = collect(sort_exec.clone(), task_ctx).await?; @@ -1077,27 +1109,29 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::try_new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(csv)), - fetch, - )?); + let sort_exec = Arc::new( + SortExec::new( + vec![ + // c1 string column + PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions::default(), + }, + // c2 uin32 column + PhysicalSortExpr { + expr: col("c2", &schema)?, + options: SortOptions::default(), + }, + // c7 uin8 column + PhysicalSortExpr { + expr: col("c7", &schema)?, + options: SortOptions::default(), + }, + ], + Arc::new(CoalescePartitionsExec::new(csv)), + ) + .with_fetch(fetch), + ); let task_ctx = session_ctx.task_ctx(); let result = collect(sort_exec.clone(), task_ctx).await?; @@ -1135,14 +1169,13 @@ mod tests { let input = Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![PhysicalSortExpr { expr: col("field_name", &schema)?, options: SortOptions::default(), }], input, - None, - )?); + )); let result: Vec = collect(sort_exec, task_ctx).await?; @@ -1197,7 +1230,7 @@ mod tests { ], )?; - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ PhysicalSortExpr { expr: col("a", &schema)?, @@ -1215,8 +1248,7 @@ mod tests { }, ], Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), - None, - )?); + )); assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); @@ -1277,14 +1309,13 @@ mod tests { let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![PhysicalSortExpr { expr: col("a", &schema)?, options: SortOptions::default(), }], blocking_exec, - None, - )?); + )); let fut = collect(sort_exec, task_ctx); let mut fut = fut.boxed(); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 14204ef3b4b55..a0aa29f77079a 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -980,12 +980,8 @@ mod tests { sort: Vec, context: Arc, ) -> RecordBatch { - let sort_exec = Arc::new(SortExec::new_with_partitioning( - sort.clone(), - input, - true, - None, - )); + let sort_exec = + Arc::new(SortExec::new(sort.clone(), input).with_preserve_partitioning(true)); sorted_merge(sort_exec, sort, context).await } @@ -995,7 +991,7 @@ mod tests { context: Arc, ) -> RecordBatch { let merge = Arc::new(CoalescePartitionsExec::new(src)); - let sort_exec = Arc::new(SortExec::try_new(sort, merge, None).unwrap()); + let sort_exec = Arc::new(SortExec::new(sort, merge)); let mut result = collect(sort_exec, context).await.unwrap(); assert_eq!(result.len(), 1); result.remove(0) diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index 42a7d58d2785e..fdf69ec0d1c84 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -74,7 +74,7 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { }]; let exec = MemoryExec::try_new(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec), None).unwrap()); + let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); let runtime_config = RuntimeConfig::new() .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7711b3c9617ae..5bf82e423c684 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -613,12 +613,11 @@ impl AsExecutionPlan for PhysicalPlanNode { } else { Some(sort.fetch as usize) }; - Ok(Arc::new(SortExec::new_with_partitioning( - exprs, - input, - sort.preserve_partitioning, - fetch, - ))) + let new_sort = SortExec::new(exprs, input) + .with_fetch(fetch) + .with_preserve_partitioning(sort.preserve_partitioning); + + Ok(Arc::new(new_sort)) } PhysicalPlanType::SortPreservingMerge(sort) => { let input: Arc = @@ -1438,11 +1437,10 @@ mod roundtrip_tests { }, }, ]; - roundtrip_test(Arc::new(SortExec::try_new( + roundtrip_test(Arc::new(SortExec::new( sort_exprs, Arc::new(EmptyExec::new(false, schema)), - None, - )?)) + ))) } #[test] @@ -1467,19 +1465,15 @@ mod roundtrip_tests { }, ]; - roundtrip_test(Arc::new(SortExec::new_with_partitioning( + roundtrip_test(Arc::new(SortExec::new( sort_exprs.clone(), Arc::new(EmptyExec::new(false, schema.clone())), - false, - None, )))?; - roundtrip_test(Arc::new(SortExec::new_with_partitioning( - sort_exprs, - Arc::new(EmptyExec::new(false, schema)), - true, - None, - ))) + roundtrip_test(Arc::new( + SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema))) + .with_preserve_partitioning(true), + )) } #[test]