From 6da99ee0e60d5a579a925d04f96916bad00427f0 Mon Sep 17 00:00:00 2001 From: Jiashu-Hu Date: Mon, 10 Mar 2025 23:35:27 -0500 Subject: [PATCH 1/5] fixed PushDownFilter bug [15047] by adding a new branch to match to prevent this specific situation --- datafusion/optimizer/src/push_down_filter.rs | 169 ++++++++++++++++++- 1 file changed, 168 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 6b408521c5cf9..f0f6cf39267a3 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -774,11 +774,19 @@ impl OptimizerRule for PushDownFilter { let plan_schema = Arc::clone(plan.schema()); - let LogicalPlan::Filter(mut filter) = plan else { + let LogicalPlan::Filter(mut filter) = plan.clone() else { return Ok(Transformed::no(plan)); }; match Arc::unwrap_or_clone(filter.input) { + // This prevents the Filter from being removed when the extension node has no children, + // so we return the original Filter unchanged. + LogicalPlan::Extension(extension_plan) + if extension_plan.node.inputs().is_empty() => + { + filter.input = Arc::new(LogicalPlan::Extension(extension_plan)); + Ok(Transformed::no(LogicalPlan::Filter(filter))) + } LogicalPlan::Filter(child_filter) => { let parents_predicates = split_conjunction_owned(filter.predicate); @@ -3786,4 +3794,163 @@ Projection: a, b \n TableScan: test"; assert_optimized_plan_eq(plan, expected_after) } + + #[test] + fn test_push_down_filter_to_user_defined_node() -> Result<()> { + // Define a custom user-defined logical node + #[derive(Debug, Hash, Eq, PartialEq)] + struct TestUserNode { + schema: DFSchemaRef, + } + + impl PartialOrd for TestUserNode { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } + } + + impl TestUserNode { + fn new() -> Self { + let schema = Arc::new( + DFSchema::new_with_metadata( + vec![(None, Field::new("a", DataType::Int64, false).into())], + Default::default(), + ) + .unwrap(), + ); + + Self { schema } + } + } + + impl UserDefinedLogicalNodeCore for TestUserNode { + fn name(&self) -> &str { + "test_node" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "TestUserNode") + } + + fn with_exprs_and_inputs( + &self, + exprs: Vec, + inputs: Vec, + ) -> Result { + assert!(exprs.is_empty()); + assert!(inputs.is_empty()); + Ok(Self { + schema: self.schema.clone(), + }) + } + } + + // Create a node and build a plan with a filter + let node = LogicalPlan::Extension(Extension { + node: Arc::new(TestUserNode::new()), + }); + + let plan = LogicalPlanBuilder::from(node).filter(lit(false))?.build()?; + + // Check the original plan format (not part of the test assertions) + let expected_before = "Filter: Boolean(false)\ + \n TestUserNode"; + assert_eq!(format!("{plan}"), expected_before); + + // Check that the filter is pushed down to the user-defined node + let expected_after = "Filter: Boolean(false)\n TestUserNode"; + assert_optimized_plan_eq(plan, expected_after) + } + + #[test] + fn test_push_down_filter_to_user_defined_node_2() -> Result<()> { + // Define a custom user-defined logical node + #[derive(Debug, Hash, Eq, PartialEq)] + struct TestUserNode { + schema: DFSchemaRef, + } + + impl PartialOrd for TestUserNode { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } + } + + impl TestUserNode { + fn new() -> Self { + let schema = Arc::new( + DFSchema::new_with_metadata( + vec![(None, Field::new("a", DataType::Int64, false).into())], + Default::default(), + ) + .unwrap(), + ); + + Self { schema } + } + } + + impl UserDefinedLogicalNodeCore for TestUserNode { + fn name(&self) -> &str { + "test_node" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "TestUserNode") + } + + fn with_exprs_and_inputs( + &self, + exprs: Vec, + inputs: Vec, + ) -> Result { + assert!(exprs.is_empty()); + assert!(inputs.is_empty()); + Ok(Self { + schema: self.schema.clone(), + }) + } + } + + // Create a node and build a plan with a filter + let node = LogicalPlan::Extension(Extension { + node: Arc::new(TestUserNode::new()), + }); + + let plan = LogicalPlanBuilder::from(node).filter(lit(false))?.build()?; + + // Check the original plan format (not part of the test assertions) + let expected_before = "Filter: Boolean(false)\ + \n TestUserNode"; + assert_eq!(format!("{plan}"), expected_before); + + // Check that the filter is NOT pushed down and removed from the plan + // The filter should remain in place since the node has no children to push to + let expected_after = "Filter: Boolean(false)\n TestUserNode"; + assert_optimized_plan_eq(plan, expected_after) + } } From 5c8dc907a6da9ace32f0e70366347ba51231fde7 Mon Sep 17 00:00:00 2001 From: Jiashu-Hu Date: Tue, 11 Mar 2025 00:11:40 -0500 Subject: [PATCH 2/5] improved syntax as request by CICL process --- datafusion/optimizer/src/push_down_filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index f0f6cf39267a3..30a497185a6dd 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3852,7 +3852,7 @@ Projection: a, b assert!(exprs.is_empty()); assert!(inputs.is_empty()); Ok(Self { - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), }) } } @@ -3931,7 +3931,7 @@ Projection: a, b assert!(exprs.is_empty()); assert!(inputs.is_empty()); Ok(Self { - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), }) } } From cd57d6e515b073859f8c1471fec4f6865e42925a Mon Sep 17 00:00:00 2001 From: Jiashu-Hu Date: Tue, 11 Mar 2025 14:49:56 -0500 Subject: [PATCH 3/5] moved check empty node logic into LogicalPlan::Extension(extensioon_plan) --- datafusion/optimizer/src/push_down_filter.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 30a497185a6dd..7bc1bcf456882 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -779,14 +779,6 @@ impl OptimizerRule for PushDownFilter { }; match Arc::unwrap_or_clone(filter.input) { - // This prevents the Filter from being removed when the extension node has no children, - // so we return the original Filter unchanged. - LogicalPlan::Extension(extension_plan) - if extension_plan.node.inputs().is_empty() => - { - filter.input = Arc::new(LogicalPlan::Extension(extension_plan)); - Ok(Transformed::no(LogicalPlan::Filter(filter))) - } LogicalPlan::Filter(child_filter) => { let parents_predicates = split_conjunction_owned(filter.predicate); @@ -1148,6 +1140,12 @@ impl OptimizerRule for PushDownFilter { }) } LogicalPlan::Extension(extension_plan) => { + // This check prevents the Filter from being removed when the extension node has no children, + // so we return the original Filter unchanged. + if extension_plan.node.inputs().is_empty() { + filter.input = Arc::new(LogicalPlan::Extension(extension_plan)); + return Ok(Transformed::no(LogicalPlan::Filter(filter))); + } let prevent_cols = extension_plan.node.prevent_predicate_push_down_columns(); From b0a842c0a95449bbf4a3fec11448768d0e099502 Mon Sep 17 00:00:00 2001 From: Jiashu-Hu Date: Tue, 11 Mar 2025 15:12:05 -0500 Subject: [PATCH 4/5] removed unecessary clone --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 7bc1bcf456882..4866af5e1feaf 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -774,7 +774,7 @@ impl OptimizerRule for PushDownFilter { let plan_schema = Arc::clone(plan.schema()); - let LogicalPlan::Filter(mut filter) = plan.clone() else { + let LogicalPlan::Filter(mut filter) = plan else { return Ok(Transformed::no(plan)); }; From f7d2169e2f1674c841b3b499d156a6abadcc0fa1 Mon Sep 17 00:00:00 2001 From: Jiashu-Hu Date: Tue, 11 Mar 2025 15:22:19 -0500 Subject: [PATCH 5/5] removed unecessary test --- datafusion/optimizer/src/push_down_filter.rs | 80 -------------------- 1 file changed, 80 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4866af5e1feaf..0dbb78a2680ec 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3871,84 +3871,4 @@ Projection: a, b let expected_after = "Filter: Boolean(false)\n TestUserNode"; assert_optimized_plan_eq(plan, expected_after) } - - #[test] - fn test_push_down_filter_to_user_defined_node_2() -> Result<()> { - // Define a custom user-defined logical node - #[derive(Debug, Hash, Eq, PartialEq)] - struct TestUserNode { - schema: DFSchemaRef, - } - - impl PartialOrd for TestUserNode { - fn partial_cmp(&self, _other: &Self) -> Option { - None - } - } - - impl TestUserNode { - fn new() -> Self { - let schema = Arc::new( - DFSchema::new_with_metadata( - vec![(None, Field::new("a", DataType::Int64, false).into())], - Default::default(), - ) - .unwrap(), - ); - - Self { schema } - } - } - - impl UserDefinedLogicalNodeCore for TestUserNode { - fn name(&self) -> &str { - "test_node" - } - - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - - fn schema(&self) -> &DFSchemaRef { - &self.schema - } - - fn expressions(&self) -> Vec { - vec![] - } - - fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "TestUserNode") - } - - fn with_exprs_and_inputs( - &self, - exprs: Vec, - inputs: Vec, - ) -> Result { - assert!(exprs.is_empty()); - assert!(inputs.is_empty()); - Ok(Self { - schema: Arc::clone(&self.schema), - }) - } - } - - // Create a node and build a plan with a filter - let node = LogicalPlan::Extension(Extension { - node: Arc::new(TestUserNode::new()), - }); - - let plan = LogicalPlanBuilder::from(node).filter(lit(false))?.build()?; - - // Check the original plan format (not part of the test assertions) - let expected_before = "Filter: Boolean(false)\ - \n TestUserNode"; - assert_eq!(format!("{plan}"), expected_before); - - // Check that the filter is NOT pushed down and removed from the plan - // The filter should remain in place since the node has no children to push to - let expected_after = "Filter: Boolean(false)\n TestUserNode"; - assert_optimized_plan_eq(plan, expected_after) - } }