From 7dd8982f0830d9a65de3caebad19a6aa5b8bee9a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Apr 2023 16:58:29 -0400 Subject: [PATCH 1/5] Clean up SortExec creation and add doc comments --- benchmarks/src/bin/parquet.rs | 2 +- datafusion/core/benches/merge.rs | 2 +- datafusion/core/benches/sort.rs | 12 +-- .../global_sort_selection.rs | 10 +- .../src/physical_optimizer/repartition.rs | 9 +- .../physical_optimizer/sort_enforcement.rs | 40 +++----- .../core/src/physical_optimizer/utils.rs | 6 +- datafusion/core/src/physical_plan/common.rs | 2 +- datafusion/core/src/physical_plan/planner.rs | 2 +- .../core/src/physical_plan/sorts/sort.rs | 99 +++++++++++++------ .../sorts/sort_preserving_merge.rs | 11 +-- datafusion/core/tests/order_spill_fuzz.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 27 +++-- 13 files changed, 120 insertions(+), 104 deletions(-) diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs index 7ddc18c07abd0..9731ee012f3ff 100644 --- a/benchmarks/src/bin/parquet.rs +++ b/benchmarks/src/bin/parquet.rs @@ -325,7 +325,7 @@ async fn exec_sort( ) -> Result<(usize, std::time::Duration)> { let start = Instant::now(); let scan = test_file.create_scan(None).await?; - let exec = Arc::new(SortExec::try_new(expr.to_owned(), scan, None)?); + let exec = Arc::new(SortExec::new(expr.to_owned(), scan, None)); let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; let elapsed = start.elapsed(); diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs index f1c4736039f9c..05f3108457778 100644 --- a/datafusion/core/benches/merge.rs +++ b/datafusion/core/benches/merge.rs @@ -276,7 +276,7 @@ impl MergeBenchCase { let projection = None; let exec = Arc::new(MemoryExec::try_new(partitions, schema, projection).unwrap()); - let sort_exec = SortExec::try_new(sort.to_owned(), exec, None).unwrap(); + let sort_exec = SortExec::new(sort.to_owned(), exec, None); let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(sort_exec))); Self { diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 0507a9308a289..a8e96bcb8d1d8 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -201,7 +201,7 @@ impl SortBenchCase { let projection = None; let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); - let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); + let plan = Arc::new(SortExec::new(sort, exec, None)); Self { runtime, @@ -250,12 +250,10 @@ impl SortBenchCasePreservePartitioning { let projection = None; let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let plan = Arc::new(SortExec::new_with_partitioning( - sort, - Arc::new(exec), - true, - None, - )); + let new_sort = + SortExec::new(sort, Arc::new(exec), None).with_preserve_partitioning(true); + + let plan = Arc::new(new_sort); Self { runtime, diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index e29735e741f50..03e5bf6c42984 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -59,13 +59,9 @@ impl PhysicalOptimizerRule for GlobalSortSelection { // It's already preserving the partitioning so that it can be regarded as a local sort && !sort_exec.preserve_partitioning() && (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts) - { - let sort = SortExec::new_with_partitioning( - sort_exec.expr().to_vec(), - sort_exec.input().clone(), - true, - sort_exec.fetch(), - ); + { + let sort = SortExec::from_other(sort_exec) + .with_preserve_partitioning(true); let global_sort: Arc = Arc::new(SortPreservingMergeExec::new( sort_exec.expr().to_vec(), diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 6c2d5c93482c5..a56e49ec91958 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -462,12 +462,9 @@ mod tests { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - Arc::new(SortExec::new_with_partitioning( - sort_exprs, - input, - preserve_partitioning, - None, - )) + let new_sort = SortExec::new(sort_exprs, input, None) + .with_preserve_partitioning(preserve_partitioning); + Arc::new(new_sort) } fn projection_exec(input: Arc) -> Arc { diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 7428c339dccce..aeaed48eb5b23 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -1220,12 +1220,10 @@ mod tests { sort_expr("non_nullable_col", &schema), ]; let repartition_exec = repartition_exec(spm); - let sort2 = Arc::new(SortExec::new_with_partitioning( - sort_exprs.clone(), - repartition_exec, - true, - None, - )) as _; + let sort2 = Arc::new( + SortExec::new(sort_exprs.clone(), repartition_exec, None) + .with_preserve_partitioning(true), + ) as _; let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); let physical_plan = aggregate_exec(spm2); @@ -1263,12 +1261,10 @@ mod tests { let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; // let sort = sort_exec(sort_exprs.clone(), union); - let sort = Arc::new(SortExec::new_with_partitioning( - sort_exprs.clone(), - union, - true, - None, - )) as _; + let sort = Arc::new( + SortExec::new(sort_exprs.clone(), union, None) + .with_preserve_partitioning(true), + ) as _; let spm = sort_preserving_merge_exec(sort_exprs, sort); let filter = filter_exec( @@ -2274,12 +2270,8 @@ mod tests { let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec); let repartition = repartition_exec(window); - let orig_plan = Arc::new(SortExec::new_with_partitioning( - sort_exprs, - repartition, - false, - None, - )) as Arc; + let orig_plan = Arc::new(SortExec::new(sort_exprs, repartition, None)) + as Arc; let mut plan = orig_plan.clone(); let rules = vec![ @@ -2315,12 +2307,10 @@ mod tests { let repartition = repartition_exec(coalesce_partitions); let sort_exprs = vec![sort_expr("nullable_col", &schema)]; // Add local sort - let sort = Arc::new(SortExec::new_with_partitioning( - sort_exprs.clone(), - repartition, - true, - None, - )) as _; + let sort = Arc::new( + SortExec::new(sort_exprs.clone(), repartition, None) + .with_preserve_partitioning(true), + ) as _; let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort); let sort = sort_exec(sort_exprs, spm); @@ -2372,7 +2362,7 @@ mod tests { input: Arc, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap()) + Arc::new(SortExec::new(sort_exprs, input, None)) } fn sort_preserving_merge_exec( diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 2fa833bb7e9e0..d5f8c6e7968b8 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -66,10 +66,12 @@ pub fn add_sort_above( if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || { node.equivalence_properties() }) { + let new_sort = SortExec::new(sort_expr, node.clone(), None); + *node = Arc::new(if node.output_partitioning().partition_count() > 1 { - SortExec::new_with_partitioning(sort_expr, node.clone(), true, None) + new_sort.with_preserve_partitioning(true) } else { - SortExec::try_new(sort_expr, node.clone(), None)? + new_sort }) as _ } Ok(()) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 7fb67d758f3da..cda31e1891634 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -485,7 +485,7 @@ mod tests { options: SortOptions::default(), }]; let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _; - let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?) + let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec, None)) as Arc; let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _; // memory_exec2 doesn't have output ordering diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 8c3f61b01ec7a..6dd82043bdfbf 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -821,7 +821,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)) + Ok(Arc::new(SortExec::new(sort_expr, physical_input, *fetch))) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c3fc06206ca15..0ab0206e0dbfb 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -619,7 +619,10 @@ fn read_spill(sender: Sender>, path: &Path) -> Result<()> { Ok(()) } -/// External Sort execution plan +/// Sort execution plan. +/// +/// This operator supports sorting datasets that are larger than the +/// memory allotted by the memory manager, by spilling to disk. #[derive(Debug)] pub struct SortExec { /// Input schema @@ -628,7 +631,8 @@ pub struct SortExec { expr: Vec, /// Containing all metrics set created during sort metrics_set: CompositeMetricsSet, - /// Preserve partitions of input plan + /// Preserve partitions of input plan. If false, the input partitions + /// will be sorted and merged into a single output partition. preserve_partitioning: bool, /// Fetch highest/lowest n results fetch: Option, @@ -636,36 +640,75 @@ pub struct SortExec { impl SortExec { /// Create a new sort execution plan + #[deprecated(since = "22.0.0", note = "please use `new`")] pub fn try_new( expr: Vec, input: Arc, fetch: Option, ) -> Result { - Ok(Self::new_with_partitioning(expr, input, false, fetch)) + Ok(Self::new(expr, input, fetch)) } - /// Whether this `SortExec` preserves partitioning of the children - pub fn preserve_partitioning(&self) -> bool { - self.preserve_partitioning + /// Create a new sort execution plan that produces a single, + /// sorted output partition. + pub fn new( + expr: Vec, + input: Arc, + fetch: Option, + ) -> Self { + Self { + expr, + input, + metrics_set: CompositeMetricsSet::new(), + preserve_partitioning: false, + fetch, + } } /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan + #[deprecated( + since = "22.0.0", + note = "please use `new` and `with_preserve_partioning` instead" + )] pub fn new_with_partitioning( expr: Vec, input: Arc, preserve_partitioning: bool, fetch: Option, ) -> Self { + Self::new(expr, input, fetch).with_preserve_partitioning(preserve_partitioning) + } + + /// Create a new sort execution plan that is the same as `sort_exec` + /// but does not share a metrics set. + pub fn from_other(sort_exec: &SortExec) -> Self { Self { - expr, - input, + expr: sort_exec.expr().to_vec(), + input: sort_exec.input().clone(), metrics_set: CompositeMetricsSet::new(), - preserve_partitioning, - fetch, + preserve_partitioning: sort_exec.preserve_partitioning(), + fetch: sort_exec.fetch(), } } + /// Whether this `SortExec` preserves partitioning of the children + pub fn preserve_partitioning(&self) -> bool { + self.preserve_partitioning + } + + /// Specify the partitioning behavior of this sort exec + /// + /// If `preserve_partitioning` is true, sorts each partition + /// individually, producing one sorted strema for each input partition. + /// + /// If `preserve_partitioning` is false, sorts and merges all + /// input partitions producing a single, sorted partition. + pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { + self.preserve_partitioning = preserve_partitioning; + self + } + /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -702,7 +745,7 @@ impl ExecutionPlan for SortExec { /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but it its input(s) are - /// infinite, returns an error to indicate this. + /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { Err(DataFusionError::Plan( @@ -743,12 +786,10 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(SortExec::new_with_partitioning( - self.expr.clone(), - children[0].clone(), - self.preserve_partitioning, - self.fetch, - ))) + let new_sort = SortExec::new(self.expr.clone(), children[0].clone(), self.fetch) + .with_preserve_partitioning(self.preserve_partitioning); + + Ok(Arc::new(new_sort)) } fn execute( @@ -933,7 +974,7 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ // c1 string column PhysicalSortExpr { @@ -953,7 +994,7 @@ mod tests { ], Arc::new(CoalescePartitionsExec::new(csv)), None, - )?); + )); let result = collect(sort_exec, task_ctx).await?; @@ -993,7 +1034,7 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ // c1 string column PhysicalSortExpr { @@ -1013,7 +1054,7 @@ mod tests { ], Arc::new(CoalescePartitionsExec::new(csv)), None, - )?); + )); let task_ctx = session_ctx.task_ctx(); let result = collect(sort_exec.clone(), task_ctx).await?; @@ -1077,7 +1118,7 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ // c1 string column PhysicalSortExpr { @@ -1097,7 +1138,7 @@ mod tests { ], Arc::new(CoalescePartitionsExec::new(csv)), fetch, - )?); + )); let task_ctx = session_ctx.task_ctx(); let result = collect(sort_exec.clone(), task_ctx).await?; @@ -1135,14 +1176,14 @@ mod tests { let input = Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![PhysicalSortExpr { expr: col("field_name", &schema)?, options: SortOptions::default(), }], input, None, - )?); + )); let result: Vec = collect(sort_exec, task_ctx).await?; @@ -1197,7 +1238,7 @@ mod tests { ], )?; - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![ PhysicalSortExpr { expr: col("a", &schema)?, @@ -1216,7 +1257,7 @@ mod tests { ], Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), None, - )?); + )); assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); @@ -1277,14 +1318,14 @@ mod tests { let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); - let sort_exec = Arc::new(SortExec::try_new( + let sort_exec = Arc::new(SortExec::new( vec![PhysicalSortExpr { expr: col("a", &schema)?, options: SortOptions::default(), }], blocking_exec, None, - )?); + )); let fut = collect(sort_exec, task_ctx); let mut fut = fut.boxed(); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 14204ef3b4b55..6a649e98534fe 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -980,12 +980,9 @@ mod tests { sort: Vec, context: Arc, ) -> RecordBatch { - let sort_exec = Arc::new(SortExec::new_with_partitioning( - sort.clone(), - input, - true, - None, - )); + let sort_exec = Arc::new( + SortExec::new(sort.clone(), input, None).with_preserve_partitioning(true), + ); sorted_merge(sort_exec, sort, context).await } @@ -995,7 +992,7 @@ mod tests { context: Arc, ) -> RecordBatch { let merge = Arc::new(CoalescePartitionsExec::new(src)); - let sort_exec = Arc::new(SortExec::try_new(sort, merge, None).unwrap()); + let sort_exec = Arc::new(SortExec::new(sort, merge, None)); let mut result = collect(sort_exec, context).await.unwrap(); assert_eq!(result.len(), 1); result.remove(0) diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index 42a7d58d2785e..b116a385d06c4 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -74,7 +74,7 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { }]; let exec = MemoryExec::try_new(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec), None).unwrap()); + let sort = Arc::new(SortExec::new(sort, Arc::new(exec), None)); let runtime_config = RuntimeConfig::new() .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7711b3c9617ae..999534ed5cba2 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -613,12 +613,10 @@ impl AsExecutionPlan for PhysicalPlanNode { } else { Some(sort.fetch as usize) }; - Ok(Arc::new(SortExec::new_with_partitioning( - exprs, - input, - sort.preserve_partitioning, - fetch, - ))) + let new_sort = SortExec::new(exprs, input, fetch) + .with_preserve_partitioning(sort.preserve_partitioning); + + Ok(Arc::new(new_sort)) } PhysicalPlanType::SortPreservingMerge(sort) => { let input: Arc = @@ -1438,11 +1436,11 @@ mod roundtrip_tests { }, }, ]; - roundtrip_test(Arc::new(SortExec::try_new( + roundtrip_test(Arc::new(SortExec::new( sort_exprs, Arc::new(EmptyExec::new(false, schema)), None, - )?)) + ))) } #[test] @@ -1467,19 +1465,16 @@ mod roundtrip_tests { }, ]; - roundtrip_test(Arc::new(SortExec::new_with_partitioning( + roundtrip_test(Arc::new(SortExec::new( sort_exprs.clone(), Arc::new(EmptyExec::new(false, schema.clone())), - false, None, )))?; - roundtrip_test(Arc::new(SortExec::new_with_partitioning( - sort_exprs, - Arc::new(EmptyExec::new(false, schema)), - true, - None, - ))) + roundtrip_test(Arc::new( + SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema)), None) + .with_preserve_partitioning(true), + )) } #[test] From 2308d8c805587b537e324c637208c19f5e492370 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Apr 2023 17:02:54 -0400 Subject: [PATCH 2/5] Reduce API surface --- .../src/physical_optimizer/global_sort_selection.rs | 9 ++++++--- datafusion/core/src/physical_plan/sorts/sort.rs | 12 ------------ 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index 03e5bf6c42984..3d77897d4a0af 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -59,9 +59,12 @@ impl PhysicalOptimizerRule for GlobalSortSelection { // It's already preserving the partitioning so that it can be regarded as a local sort && !sort_exec.preserve_partitioning() && (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts) - { - let sort = SortExec::from_other(sort_exec) - .with_preserve_partitioning(true); + { + let sort = SortExec::new( + sort_exec.expr().to_vec(), + sort_exec.input().clone(), + sort_exec.fetch(), + ).with_preserve_partitioning(true); let global_sort: Arc = Arc::new(SortPreservingMergeExec::new( sort_exec.expr().to_vec(), diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 0ab0206e0dbfb..1d5a56fa909d1 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -680,18 +680,6 @@ impl SortExec { Self::new(expr, input, fetch).with_preserve_partitioning(preserve_partitioning) } - /// Create a new sort execution plan that is the same as `sort_exec` - /// but does not share a metrics set. - pub fn from_other(sort_exec: &SortExec) -> Self { - Self { - expr: sort_exec.expr().to_vec(), - input: sort_exec.input().clone(), - metrics_set: CompositeMetricsSet::new(), - preserve_partitioning: sort_exec.preserve_partitioning(), - fetch: sort_exec.fetch(), - } - } - /// Whether this `SortExec` preserves partitioning of the children pub fn preserve_partitioning(&self) -> bool { self.preserve_partitioning From b157f47860a4e98e04d97275a0b4e3ed084de00a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Apr 2023 17:19:22 -0400 Subject: [PATCH 3/5] restore sort bench --- datafusion/core/benches/sort.rs | 674 ++++++++++++++++---------------- 1 file changed, 331 insertions(+), 343 deletions(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index a8e96bcb8d1d8..907392bc3e341 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -15,24 +15,77 @@ // specific language governing permissions and limitations // under the License. -//! Adapted from merge benchmark. Primary difference is that the input data is not ordered. +//! Benchmarks for Merge and sort performance +//! +//! Each benchmark: +//! 1. Creates a list of tuples (sorted if necessary) +//! +//! 2. Divides those tuples across some number of streams of [`RecordBatch`] +//! preserving any ordering +//! +//! 3. Times how long it takes for a given sort plan to process the input +//! +//! Pictorially: +//! +//! ``` +//! Rows are randomly +//! divided into separate +//! RecordBatch "streams", +//! ┌────┐ ┌────┐ ┌────┐ preserving the order ┌────┐ ┌────┐ ┌────┐ +//! │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ ──────────────┐ │ │ │ │ │ │ +//! │ │ │ │ │ │ └─────────────▶ │ C1 │ │... │ │ CN │ +//! │ │ │ │ │ │ ───────────────┐ │ │ │ │ │ │ +//! │ │ │ │ │ │ ┌┼─────────────▶ │ │ │ │ │ │ +//! │ │ │ │ │ │ ││ │ │ │ │ │ │ +//! │ │ │ │ │ │ ││ └────┘ └────┘ └────┘ +//! │ │ │ │ │ │ ││ ┌────┐ ┌────┐ ┌────┐ +//! │ │ │ │ │ │ │└───────────────▶│ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ ... │ │ C1 │ │... │ │ CN │ +//! │ │ │ │ │ │ ──────────────┘ │ │ │ │ │ │ +//! │ │ │ │ │ │ ┌──────────────▶ │ │ │ │ │ │ +//! │ C1 │ │... │ │ CN │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │───────────────┐│ └────┘ └────┘ └────┘ +//! │ │ │ │ │ │ ││ +//! │ │ │ │ │ │ ││ +//! │ │ │ │ │ │ ││ ... +//! │ │ │ │ │ │ ────────────┼┼┐ +//! │ │ │ │ │ │ │││ +//! │ │ │ │ │ │ │││ ┌────┐ ┌────┐ ┌────┐ +//! │ │ │ │ │ │ ──────────────┼┘│ │ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ C1 │ │... │ │ CN │ +//! │ │ │ │ │ │ └─┼────────────▶ │ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ └─────────────▶ │ │ │ │ │ │ +//! └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ +//! Input RecordBatch NUM_STREAMS input +//! Columns 1..N RecordBatches +//! INPUT_SIZE sorted rows (still INPUT_SIZE total +//! ~10% duplicates rows) +//! ``` + use std::sync::Arc; use arrow::array::DictionaryArray; use arrow::datatypes::Int32Type; use arrow::{ - array::{Float64Array, Int64Array, StringArray, UInt64Array}, - compute::{self, SortOptions, TakeOptions}, + array::{Float64Array, Int64Array, StringArray}, + compute::SortOptions, datatypes::Schema, record_batch::RecordBatch, }; -/// Benchmarks for SortExec +/// Benchmarks for SortPreservingMerge stream use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::{ execution::context::TaskContext, - physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan}, + physical_plan::{ + memory::MemoryExec, sorts::sort_preserving_merge::SortPreservingMergeExec, + ExecutionPlan, + }, prelude::SessionContext, }; use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; @@ -41,145 +94,62 @@ use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use tokio::runtime::Runtime; -use lazy_static::lazy_static; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; /// Total number of streams to divide each input into /// models 8 partition plan (should it be 16??) -const NUM_STREAMS: u64 = 8; +const NUM_STREAMS: usize = 8; + +/// The size of each batch within each stream +const BATCH_SIZE: usize = 1024; /// Total number of input rows to generate const INPUT_SIZE: u64 = 100000; -// cases: -// * physical sort expr (X, Y Z, NULLS FIRST, ASC) (not parameterized) -// -// streams of distinct values -// streams with 10% duplicated values (within each stream, and across streams) -// These cases are intended to model important usecases in TPCH -// parameters: -// -// Input schemas -lazy_static! { - static ref I64_STREAMS: Vec> = i64_streams(); - static ref F64_STREAMS: Vec> = f64_streams(); - - static ref UTF8_LOW_CARDINALITY_STREAMS: Vec> = utf8_low_cardinality_streams(); - static ref UTF8_HIGH_CARDINALITY_STREAMS: Vec> = utf8_high_cardinality_streams(); - - static ref DICTIONARY_STREAMS: Vec> = dictionary_streams(); - static ref DICTIONARY_TUPLE_STREAMS: Vec> = dictionary_tuple_streams(); - static ref MIXED_DICTIONARY_TUPLE_STREAMS: Vec> = mixed_dictionary_tuple_streams(); - // * (string(low), string(low), string(high)) -- tpch q1 + iox - static ref UTF8_TUPLE_STREAMS: Vec> = utf8_tuple_streams(); - // * (f64, string, string, int) -- tpch q2 - static ref MIXED_TUPLE_STREAMS: Vec> = mixed_tuple_streams(); - -} +type PartitionedBatches = Vec>; fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("sort i64", |b| { - let case = SortBenchCase::new(&I64_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort i64 preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&I64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort f64", |b| { - let case = SortBenchCase::new(&F64_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort f64 preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&F64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 low cardinality", |b| { - let case = SortBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 low cardinality preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&UTF8_LOW_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 high cardinality", |b| { - let case = SortBenchCase::new(&UTF8_HIGH_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 high cardinality preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&UTF8_HIGH_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 tuple", |b| { - let case = SortBenchCase::new(&UTF8_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 tuple preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&UTF8_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 dictionary", |b| { - let case = SortBenchCase::new(&DICTIONARY_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 dictionary preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&DICTIONARY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 dictionary tuple", |b| { - let case = SortBenchCase::new(&DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 dictionary tuple preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function("sort mixed utf8 dictionary tuple", |b| { - let case = SortBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function( - "sort mixed utf8 dictionary tuple preserve partitioning", - |b| { - let case = - SortBenchCasePreservePartitioning::new(&MIXED_DICTIONARY_TUPLE_STREAMS); + let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![ + ("i64", &i64_streams), + ("f64", &f64_streams), + ("utf8 low cardinality", &utf8_low_cardinality_streams), + ("utf8 high cardinality", &utf8_high_cardinality_streams), + ("utf8 tuple", &utf8_tuple_streams), + ("utf8 dictionary", &dictionary_streams), + ("utf8 dictionary tuple", &dictionary_tuple_streams), + ("mixed dictionary tuple", &mixed_dictionary_tuple_streams), + ("mixed tuple", &mixed_tuple_streams), + ]; + + for (name, f) in cases { + c.bench_function(&format!("merge sorted {name}"), |b| { + let data = f(true); + let case = BenchCase::merge_sorted(&data); b.iter(move || case.run()) - }, - ); + }); - c.bench_function("sort mixed tuple", |b| { - let case = SortBenchCase::new(&MIXED_TUPLE_STREAMS); + c.bench_function(&format!("sort merge {name}"), |b| { + let data = f(false); + let case = BenchCase::sort_merge(&data); + b.iter(move || case.run()) + }); - b.iter(move || case.run()) - }); - c.bench_function("sort mixed tuple preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&MIXED_TUPLE_STREAMS); + c.bench_function(&format!("sort {name}"), |b| { + let data = f(false); + let case = BenchCase::sort(&data); + b.iter(move || case.run()) + }); - b.iter(move || case.run()) - }); + c.bench_function(&format!("sort partitioned {name}"), |b| { + let data = f(false); + let case = BenchCase::sort_partitioned(&data); + b.iter(move || case.run()) + }); + } } -/// Encapsulates running a test case where input partitioning is not preserved. -struct SortBenchCase { +/// Encapsulates running each test case +struct BenchCase { runtime: Runtime, task_ctx: Arc, @@ -187,10 +157,10 @@ struct SortBenchCase { plan: Arc, } -impl SortBenchCase { +impl BenchCase { /// Prepare to run a benchmark that merges the specified - /// partitions (streams) together using all keyes - fn new(partitions: &[Vec]) -> Self { + /// pre-sorted partitions (streams) together using all keys + fn merge_sorted(partitions: &[Vec]) -> Self { let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -198,10 +168,8 @@ impl SortBenchCase { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let projection = None; - let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); - let plan = Arc::new(SortExec::new(sort, exec, None)); + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); Self { runtime, @@ -210,37 +178,30 @@ impl SortBenchCase { } } - /// runs the specified plan to completion, draining all input and - /// panic'ing on error - fn run(&self) { - let plan = Arc::clone(&self.plan); - let task_ctx = Arc::clone(&self.task_ctx); + /// Test SortExec in "partitioned" mode followed by a SortPreservingMerge + fn sort_merge(partitions: &[Vec]) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); - assert_eq!(plan.output_partitioning().partition_count(), 1); + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs(schema.as_ref()); - self.runtime.block_on(async move { - let mut stream = plan.execute(0, task_ctx).unwrap(); - while let Some(b) = stream.next().await { - b.expect("unexpected execution error"); - } - }) - } -} -/// Encapsulates running a test case where input partitioning is not preserved. -struct SortBenchCasePreservePartitioning { - runtime: Runtime, - task_ctx: Arc, + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = + SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None); + let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); - // The plan to run - plan: Arc, - partition_count: usize, -} + Self { + runtime, + task_ctx, + plan, + } + } -impl SortBenchCasePreservePartitioning { - /// Prepare to run a benchmark that merges the specified - /// partitions (streams) together using all keyes - fn new(partitions: &[Vec]) -> Self { - let partition_count = partitions.len(); + /// Test SortExec in "partitioned" mode which sorts the input streams + /// individually into some number of output streams + fn sort(partitions: &[Vec]) -> Self { let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -248,18 +209,35 @@ impl SortBenchCasePreservePartitioning { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let projection = None; - let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let new_sort = - SortExec::new(sort, Arc::new(exec), None).with_preserve_partitioning(true); + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); + let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); + + Self { + runtime, + task_ctx, + plan, + } + } - let plan = Arc::new(new_sort); + /// Test SortExec in "partitioned" mode which sorts the input streams + /// individually into some number of output streams + fn sort_partitioned(partitions: &[Vec]) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs(schema.as_ref()); + + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = SortExec::new_with_partitioning(sort, Arc::new(exec), true, None); + let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); Self { runtime, task_ctx, plan, - partition_count, } } @@ -269,17 +247,12 @@ impl SortBenchCasePreservePartitioning { let plan = Arc::clone(&self.plan); let task_ctx = Arc::clone(&self.task_ctx); - assert_eq!( - plan.output_partitioning().partition_count(), - self.partition_count - ); + assert_eq!(plan.output_partitioning().partition_count(), 1); self.runtime.block_on(async move { - for i in 0..self.partition_count { - let mut stream = plan.execute(i, task_ctx.clone()).unwrap(); - while let Some(b) = stream.next().await { - b.expect("unexpected execution error"); - } + let mut stream = plan.execute(0, task_ctx).unwrap(); + while let Some(b) = stream.next().await { + b.expect("unexpected execution error"); } }) } @@ -298,51 +271,58 @@ fn make_sort_exprs(schema: &Schema) -> Vec { } /// Create streams of int64 (where approximately 1/3 values is repeated) -fn i64_streams() -> Vec> { - let array: Int64Array = DataGenerator::new().i64_values().into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![("i64", Arc::new(array) as _)]).unwrap(); +fn i64_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().i64_values(); + if sorted { + values.sort_unstable(); + } - split_batch(batch) + split_tuples(values, |v| { + let array = Int64Array::from(v); + RecordBatch::try_from_iter(vec![("i64", Arc::new(array) as _)]).unwrap() + }) } /// Create streams of f64 (where approximately 1/3 values are repeated) /// with the same distribution as i64_streams -fn f64_streams() -> Vec> { - let array: Float64Array = DataGenerator::new().f64_values().into_iter().collect(); - let batch = RecordBatch::try_from_iter(vec![("f64", Arc::new(array) as _)]).unwrap(); +fn f64_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().f64_values(); + if sorted { + values.sort_unstable_by(|a, b| a.total_cmp(b)); + } - split_batch(batch) + split_tuples(values, |v| { + let array = Float64Array::from(v); + RecordBatch::try_from_iter(vec![("f64", Arc::new(array) as _)]).unwrap() + }) } /// Create streams of random low cardinality utf8 values -fn utf8_low_cardinality_streams() -> Vec> { - let array: StringArray = DataGenerator::new() - .utf8_low_cardinality_values() - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("utf_low", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) +fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().utf8_low_cardinality_values(); + if sorted { + values.sort_unstable(); + } + split_tuples(values, |v| { + let array: StringArray = v.into_iter().collect(); + RecordBatch::try_from_iter(vec![("utf_low", Arc::new(array) as _)]).unwrap() + }) } /// Create streams of high cardinality (~ no duplicates) utf8 values -fn utf8_high_cardinality_streams() -> Vec> { - let array: StringArray = DataGenerator::new() - .utf8_high_cardinality_values() - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("utf_high", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) +fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().utf8_high_cardinality_values(); + if sorted { + values.sort_unstable(); + } + split_tuples(values, |v| { + let array: StringArray = v.into_iter().collect(); + RecordBatch::try_from_iter(vec![("utf_high", Arc::new(array) as _)]).unwrap() + }) } /// Create a batch of (utf8_low, utf8_low, utf8_high) -fn utf8_tuple_streams() -> Vec> { +fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); // need to sort by the combined key, so combine them together @@ -353,27 +333,29 @@ fn utf8_tuple_streams() -> Vec> { .zip(gen.utf8_high_cardinality_values().into_iter()) .collect(); - tuples.sort_unstable(); - - let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let utf8_high: StringArray = utf8_high.into_iter().collect(); - let utf8_low1: StringArray = utf8_low1.into_iter().collect(); - let utf8_low2: StringArray = utf8_low2.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("utf_low1", Arc::new(utf8_low1) as _), - ("utf_low2", Arc::new(utf8_low2) as _), - ("utf_high", Arc::new(utf8_high) as _), - ]) - .unwrap(); + if sorted { + tuples.sort_unstable(); + } - split_batch(batch) + split_tuples(tuples, |tuples| { + let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let utf8_high: StringArray = utf8_high.into_iter().collect(); + let utf8_low1: StringArray = utf8_low1.into_iter().collect(); + let utf8_low2: StringArray = utf8_low2.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("utf_low1", Arc::new(utf8_low1) as _), + ("utf_low2", Arc::new(utf8_low2) as _), + ("utf_high", Arc::new(utf8_high) as _), + ]) + .unwrap() + }) } /// Create a batch of (f64, utf8_low, utf8_low, i64) -fn mixed_tuple_streams() -> Vec> { +fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); // need to sort by the combined key, so combine them together @@ -384,43 +366,50 @@ fn mixed_tuple_streams() -> Vec> { .zip(gen.utf8_low_cardinality_values().into_iter()) .zip(gen.i64_values().into_iter()) .collect(); - tuples.sort_unstable(); - - let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect(); - let utf8_low1: StringArray = utf8_low1.into_iter().collect(); - let utf8_low2: StringArray = utf8_low2.into_iter().collect(); - let i64_values: Int64Array = i64_values.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("f64", Arc::new(f64_values) as _), - ("utf_low1", Arc::new(utf8_low1) as _), - ("utf_low2", Arc::new(utf8_low2) as _), - ("i64", Arc::new(i64_values) as _), - ]) - .unwrap(); - - split_batch(batch) + + if sorted { + tuples.sort_unstable(); + } + + split_tuples(tuples, |tuples| { + let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect(); + + let utf8_low1: StringArray = utf8_low1.into_iter().collect(); + let utf8_low2: StringArray = utf8_low2.into_iter().collect(); + let i64_values: Int64Array = i64_values.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("f64", Arc::new(f64_values) as _), + ("utf_low1", Arc::new(utf8_low1) as _), + ("utf_low2", Arc::new(utf8_low2) as _), + ("i64", Arc::new(i64_values) as _), + ]) + .unwrap() + }) } /// Create a batch of (utf8_dict) -fn dictionary_streams() -> Vec> { +fn dictionary_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); - let values = gen.utf8_low_cardinality_values(); - let dictionary: DictionaryArray = - values.iter().map(Option::as_deref).collect(); + let mut values = gen.utf8_low_cardinality_values(); + if sorted { + values.sort_unstable(); + } - let batch = - RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap(); + split_tuples(values, |v| { + let dictionary: DictionaryArray = + v.iter().map(Option::as_deref).collect(); - split_batch(batch) + RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap() + }) } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict) -fn dictionary_tuple_streams() -> Vec> { +fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); let mut tuples: Vec<_> = gen .utf8_low_cardinality_values() @@ -428,27 +417,30 @@ fn dictionary_tuple_streams() -> Vec> { .zip(gen.utf8_low_cardinality_values()) .zip(gen.utf8_low_cardinality_values()) .collect(); - tuples.sort_unstable(); - let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); - let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); - let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("a", Arc::new(a) as _), - ("b", Arc::new(b) as _), - ("c", Arc::new(c) as _), - ]) - .unwrap(); + if sorted { + tuples.sort_unstable(); + } - split_batch(batch) + split_tuples(tuples, |tuples| { + let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); + let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); + let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); + + RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as _), + ("b", Arc::new(b) as _), + ("c", Arc::new(c) as _), + ]) + .unwrap() + }) } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) -fn mixed_dictionary_tuple_streams() -> Vec> { +fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); let mut tuples: Vec<_> = gen .utf8_low_cardinality_values() @@ -457,26 +449,29 @@ fn mixed_dictionary_tuple_streams() -> Vec> { .zip(gen.utf8_low_cardinality_values()) .zip(gen.i64_values()) .collect(); - tuples.sort_unstable(); - - let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); - let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); - let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); - let d: Int64Array = d.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("a", Arc::new(a) as _), - ("b", Arc::new(b) as _), - ("c", Arc::new(c) as _), - ("d", Arc::new(d) as _), - ]) - .unwrap(); - - split_batch(batch) + + if sorted { + tuples.sort_unstable(); + } + + split_tuples(tuples, |tuples| { + let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); + let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); + let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); + let d: Int64Array = d.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as _), + ("b", Arc::new(b) as _), + ("c", Arc::new(c) as _), + ("d", Arc::new(d) as _), + ]) + .unwrap() + }) } /// Encapsulates creating data for this test @@ -491,11 +486,18 @@ impl DataGenerator { } } - /// Create an array of i64 unsorted values (where approximately 1/3 values is repeated) + /// Create an array of i64 sorted values (where approximately 1/3 values is repeated) fn i64_values(&mut self) -> Vec { - (0..INPUT_SIZE) + let mut vec: Vec<_> = (0..INPUT_SIZE) .map(|_| self.rng.gen_range(0..INPUT_SIZE as i64)) - .collect() + .collect(); + + vec.sort_unstable(); + + // 6287 distinct / 10000 total + //let num_distinct = vec.iter().collect::>().len(); + //println!("{} distinct / {} total", num_distinct, vec.len()); + vec } /// Create an array of f64 sorted values (with same distribution of `i64_values`) @@ -510,21 +512,27 @@ impl DataGenerator { .collect::>(); // pick from the 100 strings randomly - (0..INPUT_SIZE) + let mut input = (0..INPUT_SIZE) .map(|_| { let idx = self.rng.gen_range(0..strings.len()); let s = Arc::clone(&strings[idx]); Some(s) }) - .collect::>() + .collect::>(); + + input.sort_unstable(); + input } - /// Create values of high cardinality (~ no duplicates) utf8 values + /// Create sorted values of high cardinality (~ no duplicates) utf8 values fn utf8_high_cardinality_values(&mut self) -> Vec> { // make random strings - (0..INPUT_SIZE) + let mut input = (0..INPUT_SIZE) .map(|_| Some(self.random_string())) - .collect::>() + .collect::>(); + + input.sort_unstable(); + input } fn random_string(&mut self) -> String { @@ -537,56 +545,36 @@ impl DataGenerator { } } -/// Splits the `input_batch` randomly into `NUM_STREAMS` approximately evenly sorted streams -fn split_batch(input_batch: RecordBatch) -> Vec> { +/// Splits the `input` tuples randomly into batches of `BATCH_SIZE` distributed across +/// `NUM_STREAMS` partitions, preserving any ordering +/// +/// `f` is function that takes a list of tuples and produces a [`RecordBatch`] +fn split_tuples(input: Vec, f: F) -> PartitionedBatches +where + F: Fn(Vec) -> RecordBatch, +{ // figure out which inputs go where let mut rng = StdRng::seed_from_u64(1337); - // randomly assign rows to streams - let stream_assignments = (0..input_batch.num_rows()) - .map(|_| rng.gen_range(0..NUM_STREAMS)) - .collect(); - - // split the inputs into streams - (0..NUM_STREAMS) - .map(|stream| { - // make a "stream" of 1 record batch - vec![take_columns(&input_batch, &stream_assignments, stream)] - }) - .collect::>() -} - -/// returns a record batch that contains all there values where -/// stream_assignment[i] = stream (aka this is the equivalent of -/// calling take(indicies) where indicies[i] == stream_index) -fn take_columns( - input_batch: &RecordBatch, - stream_assignments: &UInt64Array, - stream: u64, -) -> RecordBatch { - // find just the indicies needed from record batches to extract - let stream_indices: UInt64Array = stream_assignments - .iter() - .enumerate() - .filter_map(|(idx, stream_idx)| { - if stream_idx.unwrap() == stream { - Some(idx as u64) - } else { - None + let mut outputs: Vec>> = (0..NUM_STREAMS).map(|_| Vec::new()).collect(); + + for i in input { + let stream_idx = rng.gen_range(0..NUM_STREAMS); + let stream = &mut outputs[stream_idx]; + match stream.last_mut() { + Some(x) if x.len() < BATCH_SIZE => x.push(i), + _ => { + let mut v = Vec::with_capacity(BATCH_SIZE); + v.push(i); + stream.push(v) } - }) - .collect(); - - let options = Some(TakeOptions { check_bounds: true }); - - // now, get the columns from each array - let new_columns = input_batch - .columns() - .iter() - .map(|array| compute::take(array, &stream_indices, options.clone()).unwrap()) - .collect(); + } + } - RecordBatch::try_new(input_batch.schema(), new_columns).unwrap() + outputs + .into_iter() + .map(|stream| stream.into_iter().map(&f).collect()) + .collect() } criterion_group!(benches, criterion_benchmark); From 4eed69cacb0f107f9a6ccb07e18f23f35b119da7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Apr 2023 17:24:47 -0400 Subject: [PATCH 4/5] fix benchmark --- datafusion/core/benches/sort.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 907392bc3e341..1f06cce242ee4 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -188,8 +188,8 @@ impl BenchCase { let sort = make_sort_exprs(schema.as_ref()); let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); - let exec = - SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None); + let exec = SortExec::new(sort.clone(), Arc::new(exec), None) + .with_preserve_partitioning(true); let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); Self { @@ -211,7 +211,7 @@ impl BenchCase { let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); - let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); + let plan = Arc::new(SortExec::new(sort, exec, None)); Self { runtime, @@ -231,7 +231,8 @@ impl BenchCase { let sort = make_sort_exprs(schema.as_ref()); let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); - let exec = SortExec::new_with_partitioning(sort, Arc::new(exec), true, None); + let exec = + SortExec::new(sort, Arc::new(exec), None).with_preserve_partitioning(true); let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); Self { From 8c5bee4f123c50ffbcad99ce790f5282bbbc2c89 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Apr 2023 17:32:23 -0400 Subject: [PATCH 5/5] Add with_fetch --- benchmarks/src/bin/parquet.rs | 2 +- datafusion/core/benches/sort.rs | 9 +-- .../global_sort_selection.rs | 7 +- .../src/physical_optimizer/repartition.rs | 2 +- .../physical_optimizer/sort_enforcement.rs | 13 ++-- .../core/src/physical_optimizer/utils.rs | 2 +- datafusion/core/src/physical_plan/common.rs | 2 +- datafusion/core/src/physical_plan/planner.rs | 4 +- .../core/src/physical_plan/sorts/sort.rs | 76 ++++++++++--------- .../sorts/sort_preserving_merge.rs | 7 +- datafusion/core/tests/order_spill_fuzz.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 7 +- 12 files changed, 67 insertions(+), 66 deletions(-) diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs index 9731ee012f3ff..589f967a6df09 100644 --- a/benchmarks/src/bin/parquet.rs +++ b/benchmarks/src/bin/parquet.rs @@ -325,7 +325,7 @@ async fn exec_sort( ) -> Result<(usize, std::time::Duration)> { let start = Instant::now(); let scan = test_file.create_scan(None).await?; - let exec = Arc::new(SortExec::new(expr.to_owned(), scan, None)); + let exec = Arc::new(SortExec::new(expr.to_owned(), scan)); let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; let elapsed = start.elapsed(); diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 1f06cce242ee4..4045702d6308e 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -188,8 +188,8 @@ impl BenchCase { let sort = make_sort_exprs(schema.as_ref()); let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); - let exec = SortExec::new(sort.clone(), Arc::new(exec), None) - .with_preserve_partitioning(true); + let exec = + SortExec::new(sort.clone(), Arc::new(exec)).with_preserve_partitioning(true); let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); Self { @@ -211,7 +211,7 @@ impl BenchCase { let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); - let plan = Arc::new(SortExec::new(sort, exec, None)); + let plan = Arc::new(SortExec::new(sort, exec)); Self { runtime, @@ -231,8 +231,7 @@ impl BenchCase { let sort = make_sort_exprs(schema.as_ref()); let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); - let exec = - SortExec::new(sort, Arc::new(exec), None).with_preserve_partitioning(true); + let exec = SortExec::new(sort, Arc::new(exec)).with_preserve_partitioning(true); let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); Self { diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index 3d77897d4a0af..9466297d24d00 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -62,9 +62,10 @@ impl PhysicalOptimizerRule for GlobalSortSelection { { let sort = SortExec::new( sort_exec.expr().to_vec(), - sort_exec.input().clone(), - sort_exec.fetch(), - ).with_preserve_partitioning(true); + sort_exec.input().clone() + ) + .with_fetch(sort_exec.fetch()) + .with_preserve_partitioning(true); let global_sort: Arc = Arc::new(SortPreservingMergeExec::new( sort_exec.expr().to_vec(), diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index a56e49ec91958..63fd2eb75529b 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -462,7 +462,7 @@ mod tests { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - let new_sort = SortExec::new(sort_exprs, input, None) + let new_sort = SortExec::new(sort_exprs, input) .with_preserve_partitioning(preserve_partitioning); Arc::new(new_sort) } diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 571f3f846b0fb..ebede0c98a3d7 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -1211,7 +1211,7 @@ mod tests { ]; let repartition_exec = repartition_exec(spm); let sort2 = Arc::new( - SortExec::new(sort_exprs.clone(), repartition_exec, None) + SortExec::new(sort_exprs.clone(), repartition_exec) .with_preserve_partitioning(true), ) as _; let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); @@ -1252,8 +1252,7 @@ mod tests { let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; // let sort = sort_exec(sort_exprs.clone(), union); let sort = Arc::new( - SortExec::new(sort_exprs.clone(), union, None) - .with_preserve_partitioning(true), + SortExec::new(sort_exprs.clone(), union).with_preserve_partitioning(true), ) as _; let spm = sort_preserving_merge_exec(sort_exprs, sort); @@ -2260,8 +2259,8 @@ mod tests { let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec); let repartition = repartition_exec(window); - let orig_plan = Arc::new(SortExec::new(sort_exprs, repartition, None)) - as Arc; + let orig_plan = + Arc::new(SortExec::new(sort_exprs, repartition)) as Arc; let mut plan = orig_plan.clone(); let rules = vec![ @@ -2298,7 +2297,7 @@ mod tests { let sort_exprs = vec![sort_expr("nullable_col", &schema)]; // Add local sort let sort = Arc::new( - SortExec::new(sort_exprs.clone(), repartition, None) + SortExec::new(sort_exprs.clone(), repartition) .with_preserve_partitioning(true), ) as _; let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort); @@ -2352,7 +2351,7 @@ mod tests { input: Arc, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::new(sort_exprs, input, None)) + Arc::new(SortExec::new(sort_exprs, input)) } fn sort_preserving_merge_exec( diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index d5f8c6e7968b8..06bef0fbda53e 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -66,7 +66,7 @@ pub fn add_sort_above( if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || { node.equivalence_properties() }) { - let new_sort = SortExec::new(sort_expr, node.clone(), None); + let new_sort = SortExec::new(sort_expr, node.clone()); *node = Arc::new(if node.output_partitioning().partition_count() > 1 { new_sort.with_preserve_partitioning(true) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index cda31e1891634..42cd8fada96d6 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -485,7 +485,7 @@ mod tests { options: SortOptions::default(), }]; let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _; - let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec, None)) + let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec)) as Arc; let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _; // memory_exec2 doesn't have output ordering diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 6dd82043bdfbf..01efa2f7964cf 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -821,7 +821,9 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::new(sort_expr, physical_input, *fetch))) + let new_sort = SortExec::new(sort_expr, physical_input) + .with_fetch(*fetch); + Ok(Arc::new(new_sort)) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 1d5a56fa909d1..62271c587025d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -640,28 +640,24 @@ pub struct SortExec { impl SortExec { /// Create a new sort execution plan - #[deprecated(since = "22.0.0", note = "please use `new`")] + #[deprecated(since = "22.0.0", note = "use `new` and `with_fetch`")] pub fn try_new( expr: Vec, input: Arc, fetch: Option, ) -> Result { - Ok(Self::new(expr, input, fetch)) + Ok(Self::new(expr, input).with_fetch(fetch)) } /// Create a new sort execution plan that produces a single, /// sorted output partition. - pub fn new( - expr: Vec, - input: Arc, - fetch: Option, - ) -> Self { + pub fn new(expr: Vec, input: Arc) -> Self { Self { expr, input, metrics_set: CompositeMetricsSet::new(), preserve_partitioning: false, - fetch, + fetch: None, } } @@ -669,7 +665,7 @@ impl SortExec { /// the partitioning of the input plan #[deprecated( since = "22.0.0", - note = "please use `new` and `with_preserve_partioning` instead" + note = "use `new`, `with_fetch` and `with_preserve_partioning` instead" )] pub fn new_with_partitioning( expr: Vec, @@ -677,7 +673,9 @@ impl SortExec { preserve_partitioning: bool, fetch: Option, ) -> Self { - Self::new(expr, input, fetch).with_preserve_partitioning(preserve_partitioning) + Self::new(expr, input) + .with_fetch(fetch) + .with_preserve_partitioning(preserve_partitioning) } /// Whether this `SortExec` preserves partitioning of the children @@ -697,6 +695,12 @@ impl SortExec { self } + /// Whether this `SortExec` preserves partitioning of the children + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -774,7 +778,8 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let new_sort = SortExec::new(self.expr.clone(), children[0].clone(), self.fetch) + let new_sort = SortExec::new(self.expr.clone(), children[0].clone()) + .with_fetch(self.fetch) .with_preserve_partitioning(self.preserve_partitioning); Ok(Arc::new(new_sort)) @@ -981,7 +986,6 @@ mod tests { }, ], Arc::new(CoalescePartitionsExec::new(csv)), - None, )); let result = collect(sort_exec, task_ctx).await?; @@ -1041,7 +1045,6 @@ mod tests { }, ], Arc::new(CoalescePartitionsExec::new(csv)), - None, )); let task_ctx = session_ctx.task_ctx(); @@ -1106,27 +1109,29 @@ mod tests { let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); - let sort_exec = Arc::new(SortExec::new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(csv)), - fetch, - )); + let sort_exec = Arc::new( + SortExec::new( + vec![ + // c1 string column + PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions::default(), + }, + // c2 uin32 column + PhysicalSortExpr { + expr: col("c2", &schema)?, + options: SortOptions::default(), + }, + // c7 uin8 column + PhysicalSortExpr { + expr: col("c7", &schema)?, + options: SortOptions::default(), + }, + ], + Arc::new(CoalescePartitionsExec::new(csv)), + ) + .with_fetch(fetch), + ); let task_ctx = session_ctx.task_ctx(); let result = collect(sort_exec.clone(), task_ctx).await?; @@ -1170,7 +1175,6 @@ mod tests { options: SortOptions::default(), }], input, - None, )); let result: Vec = collect(sort_exec, task_ctx).await?; @@ -1244,7 +1248,6 @@ mod tests { }, ], Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), - None, )); assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); @@ -1312,7 +1315,6 @@ mod tests { options: SortOptions::default(), }], blocking_exec, - None, )); let fut = collect(sort_exec, task_ctx); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 6a649e98534fe..a0aa29f77079a 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -980,9 +980,8 @@ mod tests { sort: Vec, context: Arc, ) -> RecordBatch { - let sort_exec = Arc::new( - SortExec::new(sort.clone(), input, None).with_preserve_partitioning(true), - ); + let sort_exec = + Arc::new(SortExec::new(sort.clone(), input).with_preserve_partitioning(true)); sorted_merge(sort_exec, sort, context).await } @@ -992,7 +991,7 @@ mod tests { context: Arc, ) -> RecordBatch { let merge = Arc::new(CoalescePartitionsExec::new(src)); - let sort_exec = Arc::new(SortExec::new(sort, merge, None)); + let sort_exec = Arc::new(SortExec::new(sort, merge)); let mut result = collect(sort_exec, context).await.unwrap(); assert_eq!(result.len(), 1); result.remove(0) diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index b116a385d06c4..fdf69ec0d1c84 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -74,7 +74,7 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { }]; let exec = MemoryExec::try_new(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::new(sort, Arc::new(exec), None)); + let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); let runtime_config = RuntimeConfig::new() .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 999534ed5cba2..5bf82e423c684 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -613,7 +613,8 @@ impl AsExecutionPlan for PhysicalPlanNode { } else { Some(sort.fetch as usize) }; - let new_sort = SortExec::new(exprs, input, fetch) + let new_sort = SortExec::new(exprs, input) + .with_fetch(fetch) .with_preserve_partitioning(sort.preserve_partitioning); Ok(Arc::new(new_sort)) @@ -1439,7 +1440,6 @@ mod roundtrip_tests { roundtrip_test(Arc::new(SortExec::new( sort_exprs, Arc::new(EmptyExec::new(false, schema)), - None, ))) } @@ -1468,11 +1468,10 @@ mod roundtrip_tests { roundtrip_test(Arc::new(SortExec::new( sort_exprs.clone(), Arc::new(EmptyExec::new(false, schema.clone())), - None, )))?; roundtrip_test(Arc::new( - SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema)), None) + SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema))) .with_preserve_partitioning(true), )) }