From 6562cc744114866e93ce8029a0954d2ab6e5b5db Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 Aug 2025 00:06:02 -0500 Subject: [PATCH 1/8] Add ExecutionPlan::reset_state Co-authored-by: Robert Ream --- .../physical-plan/src/execution_plan.rs | 25 +++++ .../physical-plan/src/recursive_query.rs | 5 +- datafusion/physical-plan/src/sorts/sort.rs | 91 ++++++++++++++----- datafusion/sqllogictest/test_files/cte.slt | 25 ++++- 4 files changed, 120 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 730d4962014dd..d4e0fe82bd127 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -195,6 +195,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; + /// Reset any internal state within this [`ExecutionPlan`]. + /// + /// This method is called when an [`ExecutionPlan`] needs to be re-executed, + /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method + /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`]) + /// are reset to their initial state. + /// + /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children, + /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without + /// necessarily resetting any internal state. Implementations that require resetting of some + /// internal state should override this method to provide the necessary logic. + /// + /// This method should *not* reset state recursively for children, as it is expected that + /// it will be called from within a walk of the execution plan tree so that it will be called on each child later + /// or was already called on each child. + /// + /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument, + /// thus it is expected that any cached plan properties will remain valid after the reset. + /// + /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr + fn reset_state(self: Arc) -> Result> { + let children = self.children().into_iter().cloned().collect(); + self.with_new_children(children) + } + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 99b460dfcfdcd..700a9076fecf0 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -372,7 +372,7 @@ fn assign_work_table( } /// Some plans will change their internal states after execution, making them unable to be executed again. -/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states. +/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan. /// /// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. /// However, if the data of the left table is derived from the work table, it will become outdated @@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc) -> Result() { Ok(Transformed::no(plan)) } else { - let new_plan = Arc::clone(&plan) - .with_new_children(plan.children().into_iter().cloned().collect())?; + let new_plan = Arc::clone(&plan).reset_state()?; Ok(Transformed::yes(new_plan)) } }) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 0b7d3977d2707..3eed98781f750 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -905,6 +905,29 @@ impl SortExec { self } + /// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`. + fn create_filter(&self) -> Arc { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + } + + fn cloned(&self) -> Self { + SortExec { + input: Arc::clone(&self.input), + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + common_sort_prefix: self.common_sort_prefix.clone(), + fetch: self.fetch, + cache: self.cache.clone(), + filter: self.filter.clone(), + } + } + /// Modify how many rows to include in the result /// /// If None, then all rows will be returned, in sorted order. @@ -926,25 +949,13 @@ impl SortExec { } let filter = fetch.is_some().then(|| { // If we already have a filter, keep it. Otherwise, create a new one. - self.filter.clone().unwrap_or_else(|| { - let children = self - .expr - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) - }) + self.filter.clone().unwrap_or_else(|| self.create_filter()) }); - SortExec { - input: Arc::clone(&self.input), - expr: self.expr.clone(), - metrics_set: self.metrics_set.clone(), - preserve_partitioning: self.preserve_partitioning, - common_sort_prefix: self.common_sort_prefix.clone(), - fetch, - cache, - filter, - } + let mut new_sort = self.cloned(); + new_sort.fetch = fetch; + new_sort.cache = cache; + new_sort.filter = filter; + new_sort } /// Input schema @@ -1116,10 +1127,46 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) - .with_fetch(self.fetch) - .with_preserve_partitioning(self.preserve_partitioning); - new_sort.filter = self.filter.clone(); + let mut new_sort = self.cloned(); + assert!( + children.len() == 1, + "SortExec should have exactly one child" + ); + new_sort.input = Arc::clone(&children[0]); + // Recompute the properties based on the new input since they may have changed. + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + ) + .expect(concat!( + "Safety: we had already been calling `compute_properties(...).unwrap()` in `new()` ", + "and it seems to be okay", + "\n", + "We assumed that doing the same thing here directly instead ", + "of calling `new()` (as we did before this commit) is also okay but it's possible that ", + "implementations have drifted and this is no longer safe even if `new()` still works, ", + "for example if `new()` now does something different than just calling `compute_properties(...).unwrap()`", + "\n", + "This is clearly a bug, please report it!" + )); + new_sort.cache = cache; + new_sort.common_sort_prefix = sort_prefix; + + Ok(Arc::new(new_sort)) + } + + fn reset_state(self: Arc) -> Result> { + let children = self.children().into_iter().cloned().collect(); + let new_sort = self.with_new_children(children)?; + let mut new_sort = new_sort + .as_any() + .downcast_ref::() + .expect("cloned 1 lines above this line, we know the type") + .clone(); + // Our dynamic filter and execution metrics are the state we need to reset. + new_sort.filter = Some(new_sort.create_filter()); + new_sort.metrics_set = ExecutionPlanMetricsSet::new(); Ok(Arc::new(new_sort)) } diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 32320a06f4fb0..275d65c7e77c9 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -996,6 +996,29 @@ physical_plan 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)------------WorkTableExec: name=numbers +# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions +# This should return 5 rows but currently returns only 2 due to the bug +query II +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +0 0 +0 0 +0 0 +0 0 +0 0 + statement count 0 set datafusion.execution.enable_recursive_ctes = false; @@ -1004,4 +1027,4 @@ explain WITH RECURSIVE numbers AS ( select 1 as n UNION ALL select n + 1 FROM numbers WHERE N < 10 -) select * from numbers; +) select * from numbers; \ No newline at end of file From c4838de91d5b16346ecf091ac830a179b871541f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 Aug 2025 07:44:53 -0500 Subject: [PATCH 2/8] Update datafusion/sqllogictest/test_files/cte.slt --- datafusion/sqllogictest/test_files/cte.slt | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 275d65c7e77c9..7938ff6bf8c55 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -997,7 +997,6 @@ physical_plan 09)------------WorkTableExec: name=numbers # Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions -# This should return 5 rows but currently returns only 2 due to the bug query II with recursive r as ( select 0 as k, 0 as v From 939171309248f035e6450337111a7ca6e9b5affc Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 Aug 2025 16:51:48 -0500 Subject: [PATCH 3/8] Add reference --- .../physical-expr/src/expressions/dynamic_filters.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index a9a4e23233b7c..12d83e4e79466 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. +/// +/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also +/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where +/// the same `ExecutionPlan` is reused with different data. #[derive(Debug)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. @@ -120,6 +124,10 @@ impl DynamicFilterPhysicalExpr { /// Generally the important bit is that the *leaf children that reference columns /// do not change* since those will be used to determine what columns need to read or projected /// when evaluating the expression. + /// + /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also + /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where + /// the same `ExecutionPlan` is reused with different data. /// /// [`collect_columns`]: crate::utils::collect_columns pub fn new( From 5425c9bb977122439666f409a35432b59c073ca6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 Aug 2025 16:55:11 -0500 Subject: [PATCH 4/8] fmt --- datafusion/physical-expr/src/expressions/dynamic_filters.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 12d83e4e79466..d4b3180a6fc67 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -32,7 +32,7 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. -/// +/// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where /// the same `ExecutionPlan` is reused with different data. @@ -124,7 +124,7 @@ impl DynamicFilterPhysicalExpr { /// Generally the important bit is that the *leaf children that reference columns /// do not change* since those will be used to determine what columns need to read or projected /// when evaluating the expression. - /// + /// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where /// the same `ExecutionPlan` is reused with different data. From f50d737ea7042eaedd2fe9a2de4fb247b18cc9ce Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 5 Aug 2025 11:58:31 -0500 Subject: [PATCH 5/8] add to upgrade guide --- docs/source/library-user-guide/upgrading.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index eece034ce33e9..62a559ce85a9e 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -100,6 +100,16 @@ This version of DataFusion upgrades the underlying Apache Arrow implementation to version `56.0.0`. See the [release notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0) for more details. +### Added `ExecutionPlan::reset_state` + +In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently only generated in the precense of a query such as `ORDER BY ... LIMIT ...`) +produced incorrect results in recursive queries, a new method `reset_state` has been added to the `ExecutionPlan` trait. + +Any `ExecutionPlan` that needs to maintain internal state or references to other nodes in the execution plan tree should implement this method to reset that state. +See [#17028] for more details and an example implementation for `SortExec`. + +[#17028]: https://github.com/apache/datafusion/pull/17028 + ## DataFusion `49.0.0` ### `MSRV` updated to 1.85.1 From 5323b11d0518f5e4cd37c4b25fe759e0740d86d9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 5 Aug 2025 12:38:20 -0500 Subject: [PATCH 6/8] add explain plan, implement in more plans --- .../physical-plan/src/joins/cross_join.rs | 12 ++++++++++ .../physical-plan/src/joins/hash_join.rs | 20 ++++++++++++++++ datafusion/physical-plan/src/sorts/sort.rs | 15 ++---------- datafusion/sqllogictest/test_files/cte.slt | 24 +++++++++++++++++++ 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index a41e668ab4dab..b191cd4e80598 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec { ))) } + fn reset_state(self: Arc) -> Result> { + let new_exec = CrossJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + schema: Arc::clone(&self.schema), + left_fut: Default::default(), // reset the build side! + metrics: ExecutionPlanMetricsSet::default(), + cache: self.cache.clone(), + }; + Ok(Arc::new(new_exec)) + } + fn required_input_distribution(&self) -> Vec { vec![ Distribution::SinglePartition, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 0a26039462a42..6058b7974e923 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec { )?)) } + fn reset_state(self: Arc) -> Result> { + // Reset the left_fut to allow re-execution + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: OnceAsync::default(), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + })) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3eed98781f750..dc2a5640f40b9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1133,23 +1133,12 @@ impl ExecutionPlan for SortExec { "SortExec should have exactly one child" ); new_sort.input = Arc::clone(&children[0]); - // Recompute the properties based on the new input since they may have changed. + // Recompute the properties based on the new input since they may have changed let (cache, sort_prefix) = Self::compute_properties( &new_sort.input, new_sort.expr.clone(), new_sort.preserve_partitioning, - ) - .expect(concat!( - "Safety: we had already been calling `compute_properties(...).unwrap()` in `new()` ", - "and it seems to be okay", - "\n", - "We assumed that doing the same thing here directly instead ", - "of calling `new()` (as we did before this commit) is also okay but it's possible that ", - "implementations have drifted and this is no longer safe even if `new()` still works, ", - "for example if `new()` now does something different than just calling `compute_properties(...).unwrap()`", - "\n", - "This is clearly a bug, please report it!" - )); + )?; new_sort.cache = cache; new_sort.common_sort_prefix = sort_prefix; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 7938ff6bf8c55..bf668f5729cf4 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -1018,6 +1018,30 @@ limit 5; 0 0 0 0 +query TT +explain analyze +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +Plan with Metrics +01)GlobalLimitExec: skip=0, fetch=5, metrics=[output_rows=5, elapsed_compute=33.167µs] +02)--RecursiveQueryExec: name=r, is_distinct=false, metrics=[output_rows=0, elapsed_compute=1ns] +03)----ProjectionExec: expr=[0 as k, 0 as v], metrics=[output_rows=1, elapsed_compute=12.042µs] +04)------PlaceholderRowExec, metrics=[] +05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false], metrics=[] +06)------WorkTableExec: name=r, metrics=[] + statement count 0 set datafusion.execution.enable_recursive_ctes = false; From 9073b6a0b3e9aa0999d3e34072a03891608ca7a6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 5 Aug 2025 12:41:18 -0500 Subject: [PATCH 7/8] fmt --- datafusion/physical-plan/src/joins/cross_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index b191cd4e80598..b8ea6330a1e2e 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -275,7 +275,7 @@ impl ExecutionPlan for CrossJoinExec { left: Arc::clone(&self.left), right: Arc::clone(&self.right), schema: Arc::clone(&self.schema), - left_fut: Default::default(), // reset the build side! + left_fut: Default::default(), // reset the build side! metrics: ExecutionPlanMetricsSet::default(), cache: self.cache.clone(), }; From 73b50536570cbf87f95bbebd4952b8017dfe25b6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 5 Aug 2025 13:00:32 -0500 Subject: [PATCH 8/8] only explain --- datafusion/sqllogictest/test_files/cte.slt | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index bf668f5729cf4..5f8fd1a0b5efd 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -1019,7 +1019,7 @@ limit 5; 0 0 query TT -explain analyze +explain with recursive r as ( select 0 as k, 0 as v union all @@ -1034,13 +1034,22 @@ select * from r limit 5; ---- -Plan with Metrics -01)GlobalLimitExec: skip=0, fetch=5, metrics=[output_rows=5, elapsed_compute=33.167µs] -02)--RecursiveQueryExec: name=r, is_distinct=false, metrics=[output_rows=0, elapsed_compute=1ns] -03)----ProjectionExec: expr=[0 as k, 0 as v], metrics=[output_rows=1, elapsed_compute=12.042µs] -04)------PlaceholderRowExec, metrics=[] -05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false], metrics=[] -06)------WorkTableExec: name=r, metrics=[] +logical_plan +01)SubqueryAlias: r +02)--Limit: skip=0, fetch=5 +03)----RecursiveQuery: is_distinct=false +04)------Projection: Int64(0) AS k, Int64(0) AS v +05)--------EmptyRelation +06)------Sort: r.v ASC NULLS LAST, fetch=1 +07)--------Projection: r.k, r.v +08)----------TableScan: r +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--RecursiveQueryExec: name=r, is_distinct=false +03)----ProjectionExec: expr=[0 as k, 0 as v] +04)------PlaceholderRowExec +05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false] +06)------WorkTableExec: name=r statement count 0 set datafusion.execution.enable_recursive_ctes = false; @@ -1050,4 +1059,4 @@ explain WITH RECURSIVE numbers AS ( select 1 as n UNION ALL select n + 1 FROM numbers WHERE N < 10 -) select * from numbers; \ No newline at end of file +) select * from numbers;