From 657a3a2f9706b88a8ddbd71ca9373a368dde5029 Mon Sep 17 00:00:00 2001 From: Daniel Date: Sun, 11 Jan 2026 15:18:57 -0800 Subject: [PATCH 1/4] Make ensure cooperative optimizer idempotent --- .../physical-optimizer/src/ensure_coop.rs | 201 ++++++++++++++++-- 1 file changed, 183 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index dfa97fc840333..b55647a056bf6 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -27,10 +27,10 @@ use crate::PhysicalOptimizerRule; use datafusion_common::Result; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::coop::CooperativeExec; -use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType}; +use datafusion_physical_plan::execution_plan::{EvaluationType}; /// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for /// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub @@ -67,23 +67,46 @@ impl PhysicalOptimizerRule for EnsureCooperative { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_up(|plan| { - let is_leaf = plan.children().is_empty(); - let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager; - if (is_leaf || is_exchange) - && plan.properties().scheduling_type != SchedulingType::Cooperative - { - // Wrap non-cooperative leaves or eager evaluation roots in a cooperative exec to - // ensure the plans they participate in are properly cooperative. - Ok(Transformed::new( - Arc::new(CooperativeExec::new(Arc::clone(&plan))), - true, - TreeNodeRecursion::Continue, - )) - } else { + use std::cell::Cell; + + // Track depth: 0 means not under any CooperativeExec + // Using Cell to allow interior mutability from multiple closures + let coop_depth = Cell::new(0usize); + + plan.transform_down_up( + // Down phase: Track when entering CooperativeExec subtrees + |plan| { + if plan.as_any().downcast_ref::().is_some() { + coop_depth.set(coop_depth.get() + 1); + } Ok(Transformed::no(plan)) - } - }) + }, + // Up phase: Wrap nodes with CooperativeExec if needed, then restore depth + |plan| { + let is_coop_node = + plan.as_any().downcast_ref::().is_some(); + let is_leaf = plan.children().is_empty(); + let is_exchange = + plan.properties().evaluation_type == EvaluationType::Eager; + + // Wrap if: + // 1. Node is a leaf or exchange point + // 2. Node is not already a CooperativeExec + // 3. Not under any CooperativeExec (depth == 0) + if (is_leaf || is_exchange) && !is_coop_node && coop_depth.get() == 0 { + // Note: We don't decrement depth here because this node + // wasn't a CooperativeExec before wrapping + return Ok(Transformed::yes(Arc::new(CooperativeExec::new(plan)))); + } + + // Restore depth when leaving a CooperativeExec node + if is_coop_node { + coop_depth.set(coop_depth.get() - 1); + } + + Ok(Transformed::no(plan)) + }, + ) .map(|t| t.data) } @@ -97,6 +120,7 @@ impl PhysicalOptimizerRule for EnsureCooperative { mod tests { use super::*; use datafusion_common::config::ConfigOptions; + use datafusion_physical_plan::execution_plan::EvaluationType; use datafusion_physical_plan::{displayable, test::scan_partitioned}; use insta::assert_snapshot; @@ -115,4 +139,145 @@ mod tests { DataSourceExec: partitions=1, partition_sizes=[1] "); } + + #[tokio::test] + async fn test_optimizer_is_idempotent() { + // Comprehensive idempotency test: verify f(f(...f(x))) = f(x) + // This test covers: + // 1. Multiple runs on unwrapped plan + // 2. Multiple runs on already-wrapped plan + // 3. No accumulation of CooperativeExec nodes + + let config = ConfigOptions::new(); + let rule = EnsureCooperative::new(); + + // Test 1: Start with unwrapped plan, run multiple times + let unwrapped_plan = scan_partitioned(1); + let mut current = unwrapped_plan; + let mut stable_result = String::new(); + + for run in 1..=5 { + current = rule.optimize(current, &config).unwrap(); + let display = displayable(current.as_ref()).indent(true).to_string(); + + if run == 1 { + stable_result = display.clone(); + assert_eq!(display.matches("CooperativeExec").count(), 1); + } else { + assert_eq!( + display, stable_result, + "Run {run} should match run 1 (idempotent)" + ); + assert_eq!( + display.matches("CooperativeExec").count(), + 1, + "Should always have exactly 1 CooperativeExec, not accumulate" + ); + } + } + + // Test 2: Start with already-wrapped plan, verify no double wrapping + let pre_wrapped = Arc::new(CooperativeExec::new(scan_partitioned(1))); + let result = rule.optimize(pre_wrapped, &config).unwrap(); + let display = displayable(result.as_ref()).indent(true).to_string(); + + assert_eq!( + display.matches("CooperativeExec").count(), + 1, + "Should not double-wrap already cooperative plans" + ); + assert_eq!( + display, stable_result, + "Pre-wrapped plan should produce same result as unwrapped after optimization" + ); + } + + #[tokio::test] + async fn test_selective_wrapping() { + // Test that wrapping is selective: only leaf/eager nodes, not intermediate nodes + // Also verify depth tracking prevents double wrapping in subtrees + use datafusion_physical_expr::expressions::lit; + use datafusion_physical_plan::filter::FilterExec; + + let config = ConfigOptions::new(); + let rule = EnsureCooperative::new(); + + // Case 1: Filter -> Scan (middle node should not be wrapped) + let scan = scan_partitioned(1); + let filter = Arc::new(FilterExec::try_new(lit(true), scan).unwrap()); + let optimized = rule.optimize(filter, &config).unwrap(); + let display = displayable(optimized.as_ref()).indent(true).to_string(); + + assert_eq!(display.matches("CooperativeExec").count(), 1); + assert!(display.contains("FilterExec")); + + // Case 2: Filter -> CoopExec -> Scan (depth tracking prevents double wrap) + let scan2 = scan_partitioned(1); + let wrapped_scan = Arc::new(CooperativeExec::new(scan2)); + let filter2 = Arc::new(FilterExec::try_new(lit(true), wrapped_scan).unwrap()); + let optimized2 = rule.optimize(filter2, &config).unwrap(); + let display2 = displayable(optimized2.as_ref()).indent(true).to_string(); + + assert_eq!(display2.matches("CooperativeExec").count(), 1); + } + + #[tokio::test] + async fn test_multiple_leaf_nodes() { + // When there are multiple leaf nodes, each should be wrapped separately + use datafusion_physical_plan::union::UnionExec; + + let scan1 = scan_partitioned(1); + let scan2 = scan_partitioned(1); + let union = UnionExec::try_new(vec![scan1, scan2]).unwrap(); + + let config = ConfigOptions::new(); + let optimized = EnsureCooperative::new() + .optimize(union as Arc, &config) + .unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + + // Each leaf should have its own CooperativeExec + assert_eq!( + display.matches("CooperativeExec").count(), + 2, + "Each leaf node should be wrapped separately" + ); + assert_eq!( + display.matches("DataSourceExec").count(), + 2, + "Both data sources should be present" + ); + } + + #[tokio::test] + async fn test_eager_exchange_nodes() { + // Test eager evaluation nodes (exchange points like RepartitionExec) + // Should wrap both the eager node and its leaf child, and be idempotent + use datafusion_physical_plan::Partitioning; + use datafusion_physical_plan::repartition::RepartitionExec; + + let scan = scan_partitioned(1); + let repartition = Arc::new( + RepartitionExec::try_new(scan, Partitioning::RoundRobinBatch(4)).unwrap(), + ); + + assert_eq!( + repartition.properties().evaluation_type, + EvaluationType::Eager + ); + + let config = ConfigOptions::new(); + let rule = EnsureCooperative::new(); + + // First run: should wrap both eager node and leaf + let first = rule.optimize(repartition, &config).unwrap(); + let first_display = displayable(first.as_ref()).indent(true).to_string(); + assert_eq!(first_display.matches("CooperativeExec").count(), 2); + + // Idempotency check + let second = rule.optimize(Arc::clone(&first), &config).unwrap(); + let second_display = displayable(second.as_ref()).indent(true).to_string(); + assert_eq!(first_display, second_display); + } } From f43b23f7b9aa998ef67489b2294ff75ea38e53ce Mon Sep 17 00:00:00 2001 From: Daniel Date: Sun, 11 Jan 2026 16:05:19 -0800 Subject: [PATCH 2/4] update wrap condition when up & test --- .../physical-optimizer/src/ensure_coop.rs | 48 +++---------------- 1 file changed, 7 insertions(+), 41 deletions(-) diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index b55647a056bf6..c8e5a8c1050c4 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -30,7 +30,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::coop::CooperativeExec; -use datafusion_physical_plan::execution_plan::{EvaluationType}; +use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType}; /// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for /// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub @@ -83,24 +83,22 @@ impl PhysicalOptimizerRule for EnsureCooperative { }, // Up phase: Wrap nodes with CooperativeExec if needed, then restore depth |plan| { - let is_coop_node = - plan.as_any().downcast_ref::().is_some(); + let is_cooperative = + plan.properties().scheduling_type == SchedulingType::Cooperative; let is_leaf = plan.children().is_empty(); let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager; // Wrap if: // 1. Node is a leaf or exchange point - // 2. Node is not already a CooperativeExec + // 2. Node is not already cooperative // 3. Not under any CooperativeExec (depth == 0) - if (is_leaf || is_exchange) && !is_coop_node && coop_depth.get() == 0 { - // Note: We don't decrement depth here because this node - // wasn't a CooperativeExec before wrapping + if (is_leaf || is_exchange) && !is_cooperative && coop_depth.get() == 0 { return Ok(Transformed::yes(Arc::new(CooperativeExec::new(plan)))); } - // Restore depth when leaving a CooperativeExec node - if is_coop_node { + // Restore depth when leaving a CooperativeExec + if plan.as_any().downcast_ref::().is_some() { coop_depth.set(coop_depth.get() - 1); } @@ -120,7 +118,6 @@ impl PhysicalOptimizerRule for EnsureCooperative { mod tests { use super::*; use datafusion_common::config::ConfigOptions; - use datafusion_physical_plan::execution_plan::EvaluationType; use datafusion_physical_plan::{displayable, test::scan_partitioned}; use insta::assert_snapshot; @@ -249,35 +246,4 @@ mod tests { "Both data sources should be present" ); } - - #[tokio::test] - async fn test_eager_exchange_nodes() { - // Test eager evaluation nodes (exchange points like RepartitionExec) - // Should wrap both the eager node and its leaf child, and be idempotent - use datafusion_physical_plan::Partitioning; - use datafusion_physical_plan::repartition::RepartitionExec; - - let scan = scan_partitioned(1); - let repartition = Arc::new( - RepartitionExec::try_new(scan, Partitioning::RoundRobinBatch(4)).unwrap(), - ); - - assert_eq!( - repartition.properties().evaluation_type, - EvaluationType::Eager - ); - - let config = ConfigOptions::new(); - let rule = EnsureCooperative::new(); - - // First run: should wrap both eager node and leaf - let first = rule.optimize(repartition, &config).unwrap(); - let first_display = displayable(first.as_ref()).indent(true).to_string(); - assert_eq!(first_display.matches("CooperativeExec").count(), 2); - - // Idempotency check - let second = rule.optimize(Arc::clone(&first), &config).unwrap(); - let second_display = displayable(second.as_ref()).indent(true).to_string(); - assert_eq!(first_display, second_display); - } } From 7ab0988fc3e9138a0705b996b33005d0f9122dde Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 12 Jan 2026 22:23:45 -0800 Subject: [PATCH 3/4] cover eager boundary in transform_down_up --- .../physical-optimizer/src/ensure_coop.rs | 204 ++++++++++++++++-- 1 file changed, 184 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index c8e5a8c1050c4..4b64b4e2fb9cc 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -67,41 +67,55 @@ impl PhysicalOptimizerRule for EnsureCooperative { plan: Arc, _config: &ConfigOptions, ) -> Result> { - use std::cell::Cell; + use std::cell::RefCell; - // Track depth: 0 means not under any CooperativeExec - // Using Cell to allow interior mutability from multiple closures - let coop_depth = Cell::new(0usize); + let ancestry_stack = RefCell::new(Vec::<(SchedulingType, EvaluationType)>::new()); plan.transform_down_up( - // Down phase: Track when entering CooperativeExec subtrees + // Down phase: Push parent properties into the stack |plan| { - if plan.as_any().downcast_ref::().is_some() { - coop_depth.set(coop_depth.get() + 1); - } + let props = plan.properties(); + ancestry_stack + .borrow_mut() + .push((props.scheduling_type, props.evaluation_type)); Ok(Transformed::no(plan)) }, - // Up phase: Wrap nodes with CooperativeExec if needed, then restore depth + // Up phase: Wrap nodes with CooperativeExec if needed |plan| { - let is_cooperative = - plan.properties().scheduling_type == SchedulingType::Cooperative; + + ancestry_stack.borrow_mut().pop(); + + let props = plan.properties(); + let is_cooperative = props.scheduling_type == SchedulingType::Cooperative; let is_leaf = plan.children().is_empty(); - let is_exchange = - plan.properties().evaluation_type == EvaluationType::Eager; + let is_exchange = props.evaluation_type == EvaluationType::Eager; + + let mut is_under_cooperative_context = false; + for (scheduling_type, evaluation_type) in + ancestry_stack.borrow().iter().rev() + { + // If nearest ancestor is cooperative, we are under a cooperative context + if *scheduling_type == SchedulingType::Cooperative { + is_under_cooperative_context = true; + break; + // If nearest ancestor is eager, the cooperative context will be reset + } else if *evaluation_type == EvaluationType::Eager { + is_under_cooperative_context = false; + break; + } + } // Wrap if: // 1. Node is a leaf or exchange point // 2. Node is not already cooperative - // 3. Not under any CooperativeExec (depth == 0) - if (is_leaf || is_exchange) && !is_cooperative && coop_depth.get() == 0 { + // 3. Not under any Cooperative context + if (is_leaf || is_exchange) + && !is_cooperative + && !is_under_cooperative_context + { return Ok(Transformed::yes(Arc::new(CooperativeExec::new(plan)))); } - // Restore depth when leaving a CooperativeExec - if plan.as_any().downcast_ref::().is_some() { - coop_depth.set(coop_depth.get() - 1); - } - Ok(Transformed::no(plan)) }, ) @@ -246,4 +260,154 @@ mod tests { "Both data sources should be present" ); } + + #[tokio::test] + async fn test_eager_evaluation_resets_cooperative_context() { + // Test that cooperative context is reset when encountering an eager evaluation boundary. + use arrow::datatypes::Schema; + use datafusion_common::{Result, internal_err}; + use datafusion_execution::TaskContext; + use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, Partitioning, PlanProperties, + SendableRecordBatchStream, + execution_plan::{Boundedness, EmissionType}, + }; + use std::any::Any; + use std::fmt::Formatter; + + #[derive(Debug)] + struct DummyExec { + name: String, + input: Arc, + scheduling_type: SchedulingType, + evaluation_type: EvaluationType, + properties: PlanProperties, + } + + impl DummyExec { + fn new( + name: &str, + input: Arc, + scheduling_type: SchedulingType, + evaluation_type: EvaluationType, + ) -> Self { + let properties = PlanProperties::new( + EquivalenceProperties::new(Arc::new(Schema::empty())), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + .with_scheduling_type(scheduling_type) + .with_evaluation_type(evaluation_type); + + Self { + name: name.to_string(), + input, + scheduling_type, + evaluation_type, + properties, + } + } + } + + impl DisplayAs for DummyExec { + fn fmt_as( + &self, + _: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result { + write!(f, "{}", self.name) + } + } + + impl ExecutionPlan for DummyExec { + fn name(&self) -> &str { + &self.name + } + fn as_any(&self) -> &dyn Any { + self + } + fn properties(&self) -> &PlanProperties { + &self.properties + } + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(DummyExec::new( + &self.name, + Arc::clone(&children[0]), + self.scheduling_type, + self.evaluation_type, + ))) + } + fn execute( + &self, + _: usize, + _: Arc, + ) -> Result { + internal_err!("DummyExec does not support execution") + } + } + + // Build a plan similar to the original test: + // scan -> exch1(NonCoop,Eager) -> CoopExec -> filter -> exch2(Coop,Eager) -> filter + let scan = scan_partitioned(1); + let exch1 = Arc::new(DummyExec::new( + "exch1", + scan, + SchedulingType::NonCooperative, + EvaluationType::Eager, + )); + let coop = Arc::new(CooperativeExec::new(exch1)); + let filter1 = Arc::new(DummyExec::new( + "filter1", + coop, + SchedulingType::NonCooperative, + EvaluationType::Lazy, + )); + let exch2 = Arc::new(DummyExec::new( + "exch2", + filter1, + SchedulingType::Cooperative, + EvaluationType::Eager, + )); + let filter2 = Arc::new(DummyExec::new( + "filter2", + exch2, + SchedulingType::NonCooperative, + EvaluationType::Lazy, + )); + + let config = ConfigOptions::new(); + let optimized = EnsureCooperative::new().optimize(filter2, &config).unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + + // Expected wrapping: + // - Scan (leaf) gets wrapped + // - exch1 (eager+noncoop) keeps its manual CooperativeExec wrapper + // - filter1 is protected by exch2's cooperative context, no extra wrap + // - exch2 (already Cooperative) does NOT get wrapped + // - filter2 (not leaf or eager) does NOT get wrapped + assert_eq!( + display.matches("CooperativeExec").count(), + 2, + "Should have 2 CooperativeExec: one wrapping scan, one wrapping exch1" + ); + + assert_snapshot!(display, @r" + filter2 + exch2 + filter1 + CooperativeExec + exch1 + CooperativeExec + DataSourceExec: partitions=1, partition_sizes=[1] + "); + } } From 653192a74331c4b096217122bbdf0d8dba5007d9 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 14 Jan 2026 22:09:02 -0800 Subject: [PATCH 4/4] fmt --- datafusion/physical-optimizer/src/ensure_coop.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index 4b64b4e2fb9cc..5d00d00bce21d 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -82,7 +82,6 @@ impl PhysicalOptimizerRule for EnsureCooperative { }, // Up phase: Wrap nodes with CooperativeExec if needed |plan| { - ancestry_stack.borrow_mut().pop(); let props = plan.properties();