From c4ba7b7db53a07e7ad8cea8a774701ef49209769 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 19 Jan 2026 17:16:22 +0800 Subject: [PATCH 01/36] Update DML filter extraction and delete tests Enhance DML filter extraction to incorporate table scan filters, split conjunctions, and ensure expressions are de-duplicated after stripping qualifiers. Implement exact filter pushdown handling in the delete test provider, and add a new DELETE plan test to validate that table scan filters propagate into delete_from. --- datafusion/core/src/physical_planner.rs | 28 +++++++-- .../custom_sources_cases/dml_planning.rs | 58 ++++++++++++++++++- 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2715ad98202cb..9cd15cc66ecf3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -18,7 +18,7 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::datasource::file_format::file_type_to_format; @@ -1916,15 +1916,33 @@ fn extract_dml_filters(input: &Arc) -> Result> { let mut filters = Vec::new(); input.apply(|node| { - if let LogicalPlan::Filter(filter) = node { - // Split AND predicates into individual expressions - filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); + match node { + LogicalPlan::Filter(filter) => { + // Split AND predicates into individual expressions + filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); + } + LogicalPlan::TableScan(TableScan { filters: scan_filters, .. }) => { + for filter in scan_filters { + filters.extend(split_conjunction(filter).into_iter().cloned()); + } + } + _ => {} } Ok(TreeNodeRecursion::Continue) })?; // Strip table qualifiers from column references - filters.into_iter().map(strip_column_qualifiers).collect() + let mut seen = HashSet::new(); + let mut deduped = Vec::new(); + + for filter in filters { + let filter = strip_column_qualifiers(filter)?; + if seen.insert(filter.clone()) { + deduped.push(filter); + } + } + + Ok(deduped) } /// Strip table qualifiers from column references in an expression. diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 84cf97710a902..7593669bff205 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -25,7 +25,10 @@ use async_trait::async_trait; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::SessionContext; -use datafusion::logical_expr::Expr; +use datafusion::logical_expr::{ + Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, +}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_catalog::Session; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -34,6 +37,7 @@ use datafusion_physical_plan::empty::EmptyExec; struct CaptureDeleteProvider { schema: SchemaRef, received_filters: Arc>>>, + filter_pushdown: TableProviderFilterPushDown, } impl CaptureDeleteProvider { @@ -41,6 +45,18 @@ impl CaptureDeleteProvider { Self { schema, received_filters: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + } + } + + fn new_with_filter_pushdown( + schema: SchemaRef, + filter_pushdown: TableProviderFilterPushDown, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + filter_pushdown, } } @@ -91,6 +107,13 @@ impl TableProvider for CaptureDeleteProvider { Field::new("count", DataType::UInt64, false), ]))))) } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![self.filter_pushdown.clone(); filters.len()]) + } } /// A TableProvider that captures filters and assignments passed to update(). @@ -246,6 +269,39 @@ async fn test_delete_complex_expr() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_delete_filter_pushdown_extracts_table_scan_filters() -> Result<()> { + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx.sql("DELETE FROM t WHERE id = 1").await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + + assert_eq!(scan_filters.len(), 1); + assert!(scan_filters[0].to_string().contains("id")); + + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 1); + assert!(filters[0].to_string().contains("id")); + Ok(()) +} + #[tokio::test] async fn test_update_assignments() -> Result<()> { let provider = Arc::new(CaptureUpdateProvider::new(test_schema())); From c2fd0f81e0de2fe2fe16e2937a63f170d427608d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 19 Jan 2026 17:29:49 +0800 Subject: [PATCH 02/36] docs: clarify filter extraction includes TableScan and deduplication --- datafusion/core/src/physical_planner.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 9cd15cc66ecf3..e93723a742922 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1907,10 +1907,11 @@ fn get_physical_expr_pair( } /// Extract filter predicates from a DML input plan (DELETE/UPDATE). -/// Walks the logical plan tree and collects Filter predicates, -/// splitting AND conjunctions into individual expressions. +/// Walks the logical plan tree and collects Filter predicates and any filters +/// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions. /// Column qualifiers are stripped so expressions can be evaluated against -/// the TableProvider's schema. +/// the TableProvider's schema. Deduplicates filters to avoid passing the same +/// predicate twice when filters appear in both Filter and TableScan nodes. /// fn extract_dml_filters(input: &Arc) -> Result> { let mut filters = Vec::new(); @@ -1931,7 +1932,10 @@ fn extract_dml_filters(input: &Arc) -> Result> { Ok(TreeNodeRecursion::Continue) })?; - // Strip table qualifiers from column references + // Strip table qualifiers from column references and deduplicate. + // Deduplication is necessary because filters may appear in both Filter nodes + // and TableScan.filters when the optimizer pushes some predicates down. + // We deduplicate by (unqualified) expression to avoid passing the same filter twice. let mut seen = HashSet::new(); let mut deduped = Vec::new(); From 7ad302d5593bb62eb921e9e176d72d94f12883e2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 19 Jan 2026 17:34:37 +0800 Subject: [PATCH 03/36] test: add compound filter deduplication test with pushdown --- datafusion/core/src/physical_planner.rs | 5 ++- .../custom_sources_cases/dml_planning.rs | 38 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e93723a742922..c182d8b7a0350 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1922,7 +1922,10 @@ fn extract_dml_filters(input: &Arc) -> Result> { // Split AND predicates into individual expressions filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); } - LogicalPlan::TableScan(TableScan { filters: scan_filters, .. }) => { + LogicalPlan::TableScan(TableScan { + filters: scan_filters, + .. + }) => { for filter in scan_filters { filters.extend(split_conjunction(filter).into_iter().cloned()); } diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 7593669bff205..b4b3d14d0e3e5 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -28,8 +28,8 @@ use datafusion::execution::context::SessionContext; use datafusion::logical_expr::{ Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, }; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_catalog::Session; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -302,6 +302,42 @@ async fn test_delete_filter_pushdown_extracts_table_scan_filters() -> Result<()> Ok(()) } +#[tokio::test] +async fn test_delete_compound_filters_with_pushdown() -> Result<()> { + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'") + .await? + .collect() + .await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + // Should receive both filters, not deduplicate valid separate predicates + assert_eq!( + filters.len(), + 2, + "compound filters should not be over-suppressed" + ); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("id")), + "should contain id filter" + ); + assert!( + filter_strs.iter().any(|s| s.contains("status")), + "should contain status filter" + ); + Ok(()) +} + #[tokio::test] async fn test_update_assignments() -> Result<()> { let provider = Arc::new(CaptureUpdateProvider::new(test_schema())); From 0b71180ed7fd0e8d9ddc79b7e1b11849ef9d372c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 19 Jan 2026 17:37:01 +0800 Subject: [PATCH 04/36] refactor: use functional style for filter deduplication --- datafusion/core/src/physical_planner.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c182d8b7a0350..f2e0a09b31dfb 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1939,15 +1939,14 @@ fn extract_dml_filters(input: &Arc) -> Result> { // Deduplication is necessary because filters may appear in both Filter nodes // and TableScan.filters when the optimizer pushes some predicates down. // We deduplicate by (unqualified) expression to avoid passing the same filter twice. - let mut seen = HashSet::new(); - let mut deduped = Vec::new(); - - for filter in filters { - let filter = strip_column_qualifiers(filter)?; - if seen.insert(filter.clone()) { - deduped.push(filter); - } - } + let mut seen_filters = HashSet::new(); + let deduped = filters + .into_iter() + .map(strip_column_qualifiers) + .collect::>>()? + .into_iter() + .filter(|f| seen_filters.insert(f.clone())) + .collect(); Ok(deduped) } From b3240bfc6cf50f476c430f7864153f8d6e38d518 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 12:00:59 +0800 Subject: [PATCH 05/36] refactor: enhance filter extraction logic for various logical plans with explicit variant handling --- datafusion/core/src/physical_planner.rs | 31 ++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f2e0a09b31dfb..72bbd19e35127 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1930,7 +1930,36 @@ fn extract_dml_filters(input: &Arc) -> Result> { filters.extend(split_conjunction(filter).into_iter().cloned()); } } - _ => {} + // Plans without filter information + LogicalPlan::EmptyRelation(_) + | LogicalPlan::Values(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::Extension(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Dml(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::Unnest(_) + | LogicalPlan::RecursiveQuery(_) => { + // No filters to extract from leaf/meta plans + } + // Plans with inputs (may contain filters in children) + LogicalPlan::Projection(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Union(_) + | LogicalPlan::Join(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Window(_) + | LogicalPlan::Subquery(_) => { + // Filter information may appear in child nodes; continue traversal + // to extract filters from Filter/TableScan nodes deeper in the plan + } } Ok(TreeNodeRecursion::Continue) })?; From 5fcb927f6bea3f7a60cb796c8aceda757d9f2124 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 12:05:23 +0800 Subject: [PATCH 06/36] feat: add UPDATE tests , filter pushdown support to CaptureUpdateProvider and corresponding tests --- .../custom_sources_cases/dml_planning.rs | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index b4b3d14d0e3e5..ba57471c4f6b2 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -122,6 +122,7 @@ struct CaptureUpdateProvider { schema: SchemaRef, received_filters: Arc>>>, received_assignments: Arc>>>, + filter_pushdown: TableProviderFilterPushDown, } impl CaptureUpdateProvider { @@ -130,6 +131,19 @@ impl CaptureUpdateProvider { schema, received_filters: Arc::new(Mutex::new(None)), received_assignments: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + } + } + + fn new_with_filter_pushdown( + schema: SchemaRef, + filter_pushdown: TableProviderFilterPushDown, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + received_assignments: Arc::new(Mutex::new(None)), + filter_pushdown, } } @@ -186,6 +200,13 @@ impl TableProvider for CaptureUpdateProvider { Field::new("count", DataType::UInt64, false), ]))))) } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![self.filter_pushdown.clone(); filters.len()]) + } } fn test_schema() -> SchemaRef { @@ -361,6 +382,41 @@ async fn test_update_assignments() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_update_filter_pushdown_extracts_table_scan_filters() -> Result<()> { + let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx.sql("UPDATE t SET value = 100 WHERE id = 1").await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + // Verify that the optimizer pushed down the filter into TableScan + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + + assert_eq!(scan_filters.len(), 1); + assert!(scan_filters[0].to_string().contains("id")); + + // Execute the UPDATE and verify filters were extracted and passed to update() + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 1); + assert!(filters[0].to_string().contains("id")); + Ok(()) +} + #[tokio::test] async fn test_unsupported_table_delete() -> Result<()> { let schema = test_schema(); From 98be1b5e03530aba6ffc3d9658be3e33667b4993 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 12:08:14 +0800 Subject: [PATCH 07/36] test: add test for DELETE with mixed filter locations in CaptureDeleteProvider --- .../custom_sources_cases/dml_planning.rs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index ba57471c4f6b2..f0f0aef817865 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -359,6 +359,47 @@ async fn test_delete_compound_filters_with_pushdown() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_delete_mixed_filter_locations() -> Result<()> { + // Test mixed-location filters: some in Filter node, some in TableScan.filters + // This happens when provider uses TableProviderFilterPushDown::Inexact, + // meaning it can push down some predicates but not others. + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Inexact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + // Execute DELETE with compound WHERE clause + ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'") + .await? + .collect() + .await?; + + // Verify that both predicates are extracted and passed to delete_from(), + // even though they may be split between Filter node and TableScan.filters + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!( + filters.len(), + 2, + "should extract both predicates (union of Filter and TableScan.filters)" + ); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("id")), + "should contain id filter" + ); + assert!( + filter_strs.iter().any(|s| s.contains("status")), + "should contain status filter" + ); + Ok(()) +} + #[tokio::test] async fn test_update_assignments() -> Result<()> { let provider = Arc::new(CaptureUpdateProvider::new(test_schema())); From b66edf4774834e332bcdff85eb58f848b65a8a70 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 12:20:47 +0800 Subject: [PATCH 08/36] feat: enhance DELETE filter extraction to scope to target table only --- datafusion/core/src/physical_planner.rs | 19 ++++++++--- .../custom_sources_cases/dml_planning.rs | 34 +++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 72bbd19e35127..c0a4b2242ceb7 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -613,7 +613,7 @@ impl DefaultPhysicalPlanner { if let Some(provider) = target.as_any().downcast_ref::() { - let filters = extract_dml_filters(input)?; + let filters = extract_dml_filters(input, &table_name.to_string())?; provider .table_provider .delete_from(session_state, filters) @@ -639,7 +639,7 @@ impl DefaultPhysicalPlanner { { // For UPDATE, the assignments are encoded in the projection of input // We pass the filters and let the provider handle the projection - let filters = extract_dml_filters(input)?; + let filters = extract_dml_filters(input, &table_name.to_string())?; // Extract assignments from the projection in input plan let assignments = extract_update_assignments(input)?; provider @@ -1913,7 +1913,10 @@ fn get_physical_expr_pair( /// the TableProvider's schema. Deduplicates filters to avoid passing the same /// predicate twice when filters appear in both Filter and TableScan nodes. /// -fn extract_dml_filters(input: &Arc) -> Result> { +fn extract_dml_filters( + input: &Arc, + target_table_name: &str, +) -> Result> { let mut filters = Vec::new(); input.apply(|node| { @@ -1923,11 +1926,17 @@ fn extract_dml_filters(input: &Arc) -> Result> { filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); } LogicalPlan::TableScan(TableScan { + table_name, filters: scan_filters, .. }) => { - for filter in scan_filters { - filters.extend(split_conjunction(filter).into_iter().cloned()); + // Only extract filters from the target table scan. + // This prevents incorrect filter extraction in UPDATE...FROM scenarios + // where multiple table scans may have filters. + if table_name.to_string() == target_table_name { + for filter in scan_filters { + filters.extend(split_conjunction(filter).into_iter().cloned()); + } } } // Plans without filter information diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index f0f0aef817865..bf5ddba9a16d1 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -484,3 +484,37 @@ async fn test_unsupported_table_update() -> Result<()> { assert!(result.is_err() || result.unwrap().collect().await.is_err()); Ok(()) } + +#[tokio::test] +async fn test_delete_target_table_scoping() -> Result<()> { + // Test that DELETE only extracts filters from the target table, + // not from other tables (important for DELETE...FROM safety) + let target_provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table( + "target_t", + Arc::clone(&target_provider) as Arc, + )?; + + // For now, we test single-table DELETE (UPDATE...FROM is not yet supported) + // But this validates that the scoping logic is correct + let df = ctx.sql("DELETE FROM target_t WHERE id > 5").await?; + df.collect().await?; + + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 1); + assert!( + filters[0].to_string().contains("id"), + "Filter should be for id column" + ); + assert!( + filters[0].to_string().contains("5"), + "Filter should contain the value 5" + ); + Ok(()) +} From cfc945d586e3daa9932b6f7710954f1675c639b3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 12:25:36 +0800 Subject: [PATCH 09/36] feat: add validation and stripping of table qualifiers in DML filters --- datafusion/core/src/physical_planner.rs | 47 +++++++++++++++++-- .../custom_sources_cases/dml_planning.rs | 32 +++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c0a4b2242ceb7..a952be9df4c8c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1973,14 +1973,15 @@ fn extract_dml_filters( Ok(TreeNodeRecursion::Continue) })?; - // Strip table qualifiers from column references and deduplicate. - // Deduplication is necessary because filters may appear in both Filter nodes - // and TableScan.filters when the optimizer pushes some predicates down. - // We deduplicate by (unqualified) expression to avoid passing the same filter twice. + // Validate that all filters reference only the target table, then strip qualifiers + // and deduplicate. This ensures: + // 1. No cross-table predicate contamination (safety check) + // 2. Qualifiers stripped for TableProvider compatibility + // 3. Duplicates removed (from Filter nodes + TableScan.filters) let mut seen_filters = HashSet::new(); let deduped = filters .into_iter() - .map(strip_column_qualifiers) + .map(|f| validate_and_strip_qualifiers(&f, target_table_name)) .collect::>>()? .into_iter() .filter(|f| seen_filters.insert(f.clone())) @@ -1989,6 +1990,42 @@ fn extract_dml_filters( Ok(deduped) } +/// Validate that a filter expression only references columns from the target table, +/// then strip table qualifiers from column references. +/// +/// This ensures that DML filters don't inadvertently reference columns from other +/// tables (important for UPDATE...FROM scenarios where multiple tables may be involved). +/// +/// # Arguments +/// * `expr` - The filter expression to validate +/// * `target_table_name` - The name of the target table for the DML operation +/// +/// # Returns +/// * `Ok(expr_with_qualifiers_stripped)` if all columns belong to the target table or are unqualified +/// * `Err(plan_err)` if any column references a different table +fn validate_and_strip_qualifiers(expr: &Expr, target_table_name: &str) -> Result { + // First, validate that all column references belong to the target table + let col_refs = expr.column_refs(); + + for col_ref in col_refs { + if let Some(table_qualifier) = &col_ref.relation { + // Column is qualified; verify it matches the target table + if table_qualifier.to_string() != target_table_name { + return plan_err!( + "DELETE/UPDATE filter references column from non-target table: {}.{}. \ + Only columns from table '{}' are allowed in DML filters.", + table_qualifier, + col_ref.name, + target_table_name + ); + } + } + } + + // All columns validated; now strip qualifiers for passing to TableProvider + strip_column_qualifiers(expr.clone()) +} + /// Strip table qualifiers from column references in an expression. /// This is needed because DML filter expressions contain qualified column names /// (e.g., "table.column") but the TableProvider's schema only has simple names. diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index bf5ddba9a16d1..f41e5b2b94f4c 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -518,3 +518,35 @@ async fn test_delete_target_table_scoping() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { + // Test that filter qualifiers are properly stripped and validated + // Unqualified predicates should work fine + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + // Execute DELETE with unqualified column reference + // (After parsing, the planner adds qualifiers, but our validation should accept them) + let df = ctx.sql("DELETE FROM t WHERE id = 1").await?; + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert!(!filters.is_empty(), "Should have extracted filter"); + + // The filter should have been stripped of qualifiers for TableProvider + // Verify it contains the column reference (without qualifier prefix) + let filter_str = filters[0].to_string(); + assert!( + filter_str.contains("id") || filter_str.contains("1"), + "Filter should reference id column or the value 1, got: {}", + filter_str + ); + Ok(()) +} From 599f0b14c81f3cc8fb9bf030c79b307e9db3e028 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 12:36:26 +0800 Subject: [PATCH 10/36] fix: correct string interpolation in filter validation assertion --- datafusion/core/tests/custom_sources_cases/dml_planning.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index f41e5b2b94f4c..9ce575ff174ee 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -545,8 +545,7 @@ async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { let filter_str = filters[0].to_string(); assert!( filter_str.contains("id") || filter_str.contains("1"), - "Filter should reference id column or the value 1, got: {}", - filter_str + "Filter should reference id column or the value 1, got: {filter_str}" ); Ok(()) } From 2ad307c6894166e451c29099ff63999be560065b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 13:35:06 +0800 Subject: [PATCH 11/36] Refine DML filter extraction logic Update DML filter extraction to utilize TableReference matching, omit non-target predicates, and remove qualifiers post-filtering and deduplication. Pass TableReference targets into DML filter extraction for DELETE and UPDATE planning paths. --- datafusion/core/src/physical_planner.rs | 68 +++++++++---------------- 1 file changed, 24 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a952be9df4c8c..d965a32358406 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -84,7 +84,7 @@ use datafusion_expr::expr::{ }; use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; -use datafusion_expr::utils::split_conjunction; +use datafusion_expr::utils::{expr_to_columns, split_conjunction}; use datafusion_expr::{ Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan, @@ -613,7 +613,7 @@ impl DefaultPhysicalPlanner { if let Some(provider) = target.as_any().downcast_ref::() { - let filters = extract_dml_filters(input, &table_name.to_string())?; + let filters = extract_dml_filters(input, &table_name)?; provider .table_provider .delete_from(session_state, filters) @@ -639,7 +639,7 @@ impl DefaultPhysicalPlanner { { // For UPDATE, the assignments are encoded in the projection of input // We pass the filters and let the provider handle the projection - let filters = extract_dml_filters(input, &table_name.to_string())?; + let filters = extract_dml_filters(input, &table_name)?; // Extract assignments from the projection in input plan let assignments = extract_update_assignments(input)?; provider @@ -1915,7 +1915,7 @@ fn get_physical_expr_pair( /// fn extract_dml_filters( input: &Arc, - target_table_name: &str, + target: &TableReference, ) -> Result> { let mut filters = Vec::new(); @@ -1923,7 +1923,11 @@ fn extract_dml_filters( match node { LogicalPlan::Filter(filter) => { // Split AND predicates into individual expressions - filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); + for predicate in split_conjunction(&filter.predicate) { + if predicate_is_on_target(predicate, target)? { + filters.push(predicate.clone()); + } + } } LogicalPlan::TableScan(TableScan { table_name, @@ -1933,7 +1937,7 @@ fn extract_dml_filters( // Only extract filters from the target table scan. // This prevents incorrect filter extraction in UPDATE...FROM scenarios // where multiple table scans may have filters. - if table_name.to_string() == target_table_name { + if table_name.resolved_eq(target) { for filter in scan_filters { filters.extend(split_conjunction(filter).into_iter().cloned()); } @@ -1973,15 +1977,14 @@ fn extract_dml_filters( Ok(TreeNodeRecursion::Continue) })?; - // Validate that all filters reference only the target table, then strip qualifiers - // and deduplicate. This ensures: - // 1. No cross-table predicate contamination (safety check) + // Strip qualifiers and deduplicate. This ensures: + // 1. Only target-table predicates are retained from Filter nodes // 2. Qualifiers stripped for TableProvider compatibility // 3. Duplicates removed (from Filter nodes + TableScan.filters) let mut seen_filters = HashSet::new(); let deduped = filters .into_iter() - .map(|f| validate_and_strip_qualifiers(&f, target_table_name)) + .map(strip_column_qualifiers) .collect::>>()? .into_iter() .filter(|f| seen_filters.insert(f.clone())) @@ -1990,40 +1993,17 @@ fn extract_dml_filters( Ok(deduped) } -/// Validate that a filter expression only references columns from the target table, -/// then strip table qualifiers from column references. -/// -/// This ensures that DML filters don't inadvertently reference columns from other -/// tables (important for UPDATE...FROM scenarios where multiple tables may be involved). -/// -/// # Arguments -/// * `expr` - The filter expression to validate -/// * `target_table_name` - The name of the target table for the DML operation -/// -/// # Returns -/// * `Ok(expr_with_qualifiers_stripped)` if all columns belong to the target table or are unqualified -/// * `Err(plan_err)` if any column references a different table -fn validate_and_strip_qualifiers(expr: &Expr, target_table_name: &str) -> Result { - // First, validate that all column references belong to the target table - let col_refs = expr.column_refs(); - - for col_ref in col_refs { - if let Some(table_qualifier) = &col_ref.relation { - // Column is qualified; verify it matches the target table - if table_qualifier.to_string() != target_table_name { - return plan_err!( - "DELETE/UPDATE filter references column from non-target table: {}.{}. \ - Only columns from table '{}' are allowed in DML filters.", - table_qualifier, - col_ref.name, - target_table_name - ); - } - } - } - - // All columns validated; now strip qualifiers for passing to TableProvider - strip_column_qualifiers(expr.clone()) +/// Determine whether a predicate references only columns from the target table. +fn predicate_is_on_target(expr: &Expr, target: &TableReference) -> Result { + let mut columns = HashSet::new(); + expr_to_columns(expr, &mut columns)?; + + Ok(columns.iter().all(|column| { + column + .relation + .as_ref() + .is_none_or(|relation| relation.resolved_eq(target)) + })) } /// Strip table qualifiers from column references in an expression. From f1663f1f6ae15da36a721324f156a0b6d2b2c9f3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 13:35:35 +0800 Subject: [PATCH 12/36] Enhance filter handling for UPDATE ... FROM queries Add helper to detect table-qualified column references in captured filters for supporting UPDATE ... FROM assertions. Implement tests to verify that TableScan filters are properly extracted into DML update filters and ensure non-target predicates are dropped while retaining target filters during UPDATE ... FROM operations. --- .../custom_sources_cases/dml_planning.rs | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 9ce575ff174ee..3dc54e4ed0f4a 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -30,6 +30,7 @@ use datafusion::logical_expr::{ }; use datafusion_catalog::Session; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::TableReference; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -217,6 +218,19 @@ fn test_schema() -> SchemaRef { ])) } +fn expr_has_table_reference(expr: &Expr, table: &str) -> Result { + let reference = TableReference::bare(table); + expr.exists(|node| { + Ok(matches!( + node, + Expr::Column(column) + if column.relation.as_ref().is_some_and(|relation| { + relation.resolved_eq(&reference) + }) + )) + }) +} + #[tokio::test] async fn test_delete_single_filter() -> Result<()> { let provider = Arc::new(CaptureDeleteProvider::new(test_schema())); @@ -458,6 +472,45 @@ async fn test_update_filter_pushdown_extracts_table_scan_filters() -> Result<()> Ok(()) } +#[tokio::test] +async fn test_update_filter_pushdown_passes_table_scan_filters() -> Result<()> { + let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx + .sql("UPDATE t SET value = 42 WHERE status = 'ready'") + .await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + + assert!( + !scan_filters.is_empty(), + "expected filter pushdown to populate TableScan filters" + ); + + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert!( + !filters.is_empty(), + "expected filters extracted from TableScan during UPDATE" + ); + Ok(()) +} + #[tokio::test] async fn test_unsupported_table_delete() -> Result<()> { let schema = test_schema(); @@ -519,6 +572,57 @@ async fn test_delete_target_table_scoping() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_update_from_drops_non_target_predicates() -> Result<()> { + let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table( + "t1", + Arc::clone(&target_provider) as Arc, + )?; + + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("status", DataType::Utf8, true), + ])); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + ctx.sql( + "UPDATE t1 SET value = 1 FROM t2 \ + WHERE t1.id = t2.id AND t2.status = 'active' AND t1.value > 10", + ) + .await? + .collect() + .await?; + + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); + assert!( + !filters.is_empty(), + "expected target predicates extracted from UPDATE ... FROM" + ); + + let has_t2_reference = filters.iter().try_fold(false, |found, expr| { + expr_has_table_reference(expr, "t2").map(|has_ref| found || has_ref) + })?; + assert!( + !has_t2_reference, + "filters should only include target-table predicates" + ); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("value")), + "expected target-table predicate to be retained" + ); + Ok(()) +} + #[tokio::test] async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { // Test that filter qualifiers are properly stripped and validated From 234785ef08bbd196a70bc89f64772da6698ae68e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 14:11:39 +0800 Subject: [PATCH 13/36] refactor: improve comments and streamline table registration in DML tests --- .../custom_sources_cases/dml_planning.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 3dc54e4ed0f4a..8fd28ca14275f 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -29,8 +29,8 @@ use datafusion::logical_expr::{ Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, }; use datafusion_catalog::Session; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::TableReference; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -552,8 +552,8 @@ async fn test_delete_target_table_scoping() -> Result<()> { Arc::clone(&target_provider) as Arc, )?; - // For now, we test single-table DELETE (UPDATE...FROM is not yet supported) - // But this validates that the scoping logic is correct + // For now, we test single-table DELETE + // and validate that the scoping logic is correct let df = ctx.sql("DELETE FROM target_t WHERE id > 5").await?; df.collect().await?; @@ -579,10 +579,7 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { TableProviderFilterPushDown::Exact, )); let ctx = SessionContext::new(); - ctx.register_table( - "t1", - Arc::clone(&target_provider) as Arc, - )?; + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; let source_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -644,9 +641,20 @@ async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { .expect("filters should be captured"); assert!(!filters.is_empty(), "Should have extracted filter"); - // The filter should have been stripped of qualifiers for TableProvider - // Verify it contains the column reference (without qualifier prefix) + // Verify qualifiers are stripped: check that Column expressions have no qualifier + let has_qualified_column = filters[0] + .exists(|expr| Ok(matches!(expr, Expr::Column(col) if col.relation.is_some())))?; + assert!( + !has_qualified_column, + "Filter should have unqualified columns after stripping" + ); + + // Also verify the string representation doesn't contain table qualifiers let filter_str = filters[0].to_string(); + assert!( + !filter_str.contains("t.id"), + "Filter should not contain qualified column reference, got: {filter_str}" + ); assert!( filter_str.contains("id") || filter_str.contains("1"), "Filter should reference id column or the value 1, got: {filter_str}" From 96671c732f280ce3f2bc8d95f9d2248ce5d5b222 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 14:32:13 +0800 Subject: [PATCH 14/36] =?UTF-8?q?refactor:=20-=20[=20]=20Replace=20dedupli?= =?UTF-8?q?cation's=20`.collect()=20=E2=86=92=20.into=5Fiter()`=20with=20s?= =?UTF-8?q?ingle-pass=20`try=5Ffold`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datafusion/core/src/physical_planner.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d965a32358406..4c14819348a38 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1981,16 +1981,17 @@ fn extract_dml_filters( // 1. Only target-table predicates are retained from Filter nodes // 2. Qualifiers stripped for TableProvider compatibility // 3. Duplicates removed (from Filter nodes + TableScan.filters) + // + // Deduplication is necessary because filters may appear in both Filter nodes + // and TableScan.filters when the optimizer performs partial (Inexact) pushdown. let mut seen_filters = HashSet::new(); - let deduped = filters - .into_iter() - .map(strip_column_qualifiers) - .collect::>>()? - .into_iter() - .filter(|f| seen_filters.insert(f.clone())) - .collect(); - - Ok(deduped) + filters.into_iter().try_fold(Vec::new(), |mut deduped, filter| { + let unqualified = strip_column_qualifiers(filter)?; + if seen_filters.insert(unqualified.clone()) { + deduped.push(unqualified); + } + Ok(deduped) + }) } /// Determine whether a predicate references only columns from the target table. From 1aeb486bf0d658d3f9d0eb184184e4d344f9f5c7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 14:33:17 +0800 Subject: [PATCH 15/36] refactor: enhance DML filter extraction logic with detailed documentation and improved handling for UPDATE...FROM queries --- datafusion/core/src/physical_planner.rs | 36 ++++++++++++++++++------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 4c14819348a38..db9c4109cd73a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1907,11 +1907,25 @@ fn get_physical_expr_pair( } /// Extract filter predicates from a DML input plan (DELETE/UPDATE). +/// /// Walks the logical plan tree and collects Filter predicates and any filters /// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions. -/// Column qualifiers are stripped so expressions can be evaluated against -/// the TableProvider's schema. Deduplicates filters to avoid passing the same -/// predicate twice when filters appear in both Filter and TableScan nodes. +/// +/// For UPDATE...FROM queries involving multiple tables, this function only extracts predicates +/// that reference the target table. Filters from source table scans are excluded to prevent +/// incorrect filter semantics. +/// +/// Column qualifiers are stripped so expressions can be evaluated against the TableProvider's +/// schema. Deduplication is performed because filters may appear in both Filter nodes and +/// TableScan.filters when the optimizer performs partial (Inexact) filter pushdown. +/// +/// # Parameters +/// - `input`: The logical plan tree to extract filters from (typically a DELETE or UPDATE plan) +/// - `target`: The target table reference to scope filter extraction (prevents multi-table filter leakage) +/// +/// # Returns +/// A vector of unqualified filter expressions that can be passed to the TableProvider for execution. +/// Returns an empty vector if no applicable filters are found. /// fn extract_dml_filters( input: &Arc, @@ -1985,13 +1999,15 @@ fn extract_dml_filters( // Deduplication is necessary because filters may appear in both Filter nodes // and TableScan.filters when the optimizer performs partial (Inexact) pushdown. let mut seen_filters = HashSet::new(); - filters.into_iter().try_fold(Vec::new(), |mut deduped, filter| { - let unqualified = strip_column_qualifiers(filter)?; - if seen_filters.insert(unqualified.clone()) { - deduped.push(unqualified); - } - Ok(deduped) - }) + filters + .into_iter() + .try_fold(Vec::new(), |mut deduped, filter| { + let unqualified = strip_column_qualifiers(filter)?; + if seen_filters.insert(unqualified.clone()) { + deduped.push(unqualified); + } + Ok(deduped) + }) } /// Determine whether a predicate references only columns from the target table. From 6d7749b2f7eaf48d40843e2015405c448b079ab1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 14:34:16 +0800 Subject: [PATCH 16/36] Optimize `predicate_is_on_target` to short-circuit on first mismatch --- datafusion/core/src/physical_planner.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index db9c4109cd73a..f31eeb6e67a34 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2015,11 +2015,12 @@ fn predicate_is_on_target(expr: &Expr, target: &TableReference) -> Result let mut columns = HashSet::new(); expr_to_columns(expr, &mut columns)?; - Ok(columns.iter().all(|column| { + // Short-circuit on first mismatch: returns false if any column references a different table + Ok(!columns.iter().any(|column| { column .relation .as_ref() - .is_none_or(|relation| relation.resolved_eq(target)) + .is_some_and(|relation| !relation.resolved_eq(target)) })) } From 7aca79221188a74d7073cde201eaaeead040e46e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 14:35:42 +0800 Subject: [PATCH 17/36] Add error context when qualifier stripping fails --- datafusion/core/src/physical_planner.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f31eeb6e67a34..a74efb9fb5f10 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2002,7 +2002,12 @@ fn extract_dml_filters( filters .into_iter() .try_fold(Vec::new(), |mut deduped, filter| { - let unqualified = strip_column_qualifiers(filter)?; + let unqualified = strip_column_qualifiers(filter).map_err(|e| { + e.context(format!( + "Failed to strip column qualifiers for DML filter on table '{}'", + target + )) + })?; if seen_filters.insert(unqualified.clone()) { deduped.push(unqualified); } From 290abd7c444527fc60181ee1da2f6b23e97e875b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 14:39:08 +0800 Subject: [PATCH 18/36] clippy fix --- datafusion/core/src/physical_planner.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a74efb9fb5f10..aaf8b90c2898b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -613,7 +613,7 @@ impl DefaultPhysicalPlanner { if let Some(provider) = target.as_any().downcast_ref::() { - let filters = extract_dml_filters(input, &table_name)?; + let filters = extract_dml_filters(input, table_name)?; provider .table_provider .delete_from(session_state, filters) @@ -639,7 +639,7 @@ impl DefaultPhysicalPlanner { { // For UPDATE, the assignments are encoded in the projection of input // We pass the filters and let the provider handle the projection - let filters = extract_dml_filters(input, &table_name)?; + let filters = extract_dml_filters(input, table_name)?; // Extract assignments from the projection in input plan let assignments = extract_update_assignments(input)?; provider @@ -2004,8 +2004,7 @@ fn extract_dml_filters( .try_fold(Vec::new(), |mut deduped, filter| { let unqualified = strip_column_qualifiers(filter).map_err(|e| { e.context(format!( - "Failed to strip column qualifiers for DML filter on table '{}'", - target + "Failed to strip column qualifiers for DML filter on table '{target}'" )) })?; if seen_filters.insert(unqualified.clone()) { From bc5bb4c95b13e264b45e2b14c079d07675c5ccf4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 16:03:15 +0800 Subject: [PATCH 19/36] Add per-filter pushdown coverage --- .../custom_sources_cases/dml_planning.rs | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 8fd28ca14275f..0473b64ec4ac7 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -39,6 +39,7 @@ struct CaptureDeleteProvider { schema: SchemaRef, received_filters: Arc>>>, filter_pushdown: TableProviderFilterPushDown, + per_filter_pushdown: Option>, } impl CaptureDeleteProvider { @@ -47,6 +48,7 @@ impl CaptureDeleteProvider { schema, received_filters: Arc::new(Mutex::new(None)), filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: None, } } @@ -58,6 +60,19 @@ impl CaptureDeleteProvider { schema, received_filters: Arc::new(Mutex::new(None)), filter_pushdown, + per_filter_pushdown: None, + } + } + + fn new_with_per_filter_pushdown( + schema: SchemaRef, + per_filter_pushdown: Vec, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: Some(per_filter_pushdown), } } @@ -113,6 +128,12 @@ impl TableProvider for CaptureDeleteProvider { &self, filters: &[&Expr], ) -> Result> { + if let Some(per_filter) = &self.per_filter_pushdown { + if per_filter.len() == filters.len() { + return Ok(per_filter.clone()); + } + } + Ok(vec![self.filter_pushdown.clone(); filters.len()]) } } @@ -124,6 +145,7 @@ struct CaptureUpdateProvider { received_filters: Arc>>>, received_assignments: Arc>>>, filter_pushdown: TableProviderFilterPushDown, + per_filter_pushdown: Option>, } impl CaptureUpdateProvider { @@ -133,6 +155,7 @@ impl CaptureUpdateProvider { received_filters: Arc::new(Mutex::new(None)), received_assignments: Arc::new(Mutex::new(None)), filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: None, } } @@ -145,6 +168,20 @@ impl CaptureUpdateProvider { received_filters: Arc::new(Mutex::new(None)), received_assignments: Arc::new(Mutex::new(None)), filter_pushdown, + per_filter_pushdown: None, + } + } + + fn new_with_per_filter_pushdown( + schema: SchemaRef, + per_filter_pushdown: Vec, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + received_assignments: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: Some(per_filter_pushdown), } } @@ -206,6 +243,12 @@ impl TableProvider for CaptureUpdateProvider { &self, filters: &[&Expr], ) -> Result> { + if let Some(per_filter) = &self.per_filter_pushdown { + if per_filter.len() == filters.len() { + return Ok(per_filter.clone()); + } + } + Ok(vec![self.filter_pushdown.clone(); filters.len()]) } } @@ -414,6 +457,58 @@ async fn test_delete_mixed_filter_locations() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_delete_per_filter_pushdown_mixed_locations() -> Result<()> { + // Force per-filter pushdown decisions to exercise mixed locations in one query. + // First predicate is pushed down (Exact), second stays as residual (Unsupported). + let provider = Arc::new(CaptureDeleteProvider::new_with_per_filter_pushdown( + test_schema(), + vec![ + TableProviderFilterPushDown::Exact, + TableProviderFilterPushDown::Unsupported, + ], + )); + + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx + .sql("DELETE FROM t WHERE id = 1 AND status = 'active'") + .await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + // Only the first predicate should be pushed to TableScan.filters. + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + assert_eq!(scan_filters.len(), 1); + assert!(scan_filters[0].to_string().contains("id")); + + // Both predicates should still reach the provider (union + dedup behavior). + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 2); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("id")), + "should contain pushed-down id filter" + ); + assert!( + filter_strs.iter().any(|s| s.contains("status")), + "should contain residual status filter" + ); + + Ok(()) +} + #[tokio::test] async fn test_update_assignments() -> Result<()> { let provider = Arc::new(CaptureUpdateProvider::new(test_schema())); From 84f77cf8a5451db6e9fb830ada22b19f082b6e04 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 16:09:38 +0800 Subject: [PATCH 20/36] cargo fmt --- .../core/tests/custom_sources_cases/dml_planning.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 0473b64ec4ac7..b4a53d198cf04 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -172,19 +172,6 @@ impl CaptureUpdateProvider { } } - fn new_with_per_filter_pushdown( - schema: SchemaRef, - per_filter_pushdown: Vec, - ) -> Self { - Self { - schema, - received_filters: Arc::new(Mutex::new(None)), - received_assignments: Arc::new(Mutex::new(None)), - filter_pushdown: TableProviderFilterPushDown::Unsupported, - per_filter_pushdown: Some(per_filter_pushdown), - } - } - fn captured_filters(&self) -> Option> { self.received_filters.lock().unwrap().clone() } From cbd3b2ef41785ead4b58f00a80a5d7f7340be0ee Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 Jan 2026 16:15:20 +0800 Subject: [PATCH 21/36] clippy fix Refactor filter pushdown logic in CaptureDeleteProvider and CaptureUpdateProvider --- .../tests/custom_sources_cases/dml_planning.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index b4a53d198cf04..e93f1a4c33b25 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -128,10 +128,10 @@ impl TableProvider for CaptureDeleteProvider { &self, filters: &[&Expr], ) -> Result> { - if let Some(per_filter) = &self.per_filter_pushdown { - if per_filter.len() == filters.len() { - return Ok(per_filter.clone()); - } + if let Some(per_filter) = &self.per_filter_pushdown + && per_filter.len() == filters.len() + { + return Ok(per_filter.clone()); } Ok(vec![self.filter_pushdown.clone(); filters.len()]) @@ -230,10 +230,10 @@ impl TableProvider for CaptureUpdateProvider { &self, filters: &[&Expr], ) -> Result> { - if let Some(per_filter) = &self.per_filter_pushdown { - if per_filter.len() == filters.len() { - return Ok(per_filter.clone()); - } + if let Some(per_filter) = &self.per_filter_pushdown + && per_filter.len() == filters.len() + { + return Ok(per_filter.clone()); } Ok(vec![self.filter_pushdown.clone(); filters.len()]) From 34f802cf971778ceacffa675b592d3b0e96ac2ca Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 22 Jan 2026 16:08:59 +0800 Subject: [PATCH 22/36] Use t2-only column in test_update_from_drops_non_target_predicates Address review feedback from ethan-tyler: since t2 shares column names with t1, the 'no t2 references' check can false-negative after qualifier stripping. Add a t2-only column 'src_only' to detect leakage reliably. --- datafusion/core/tests/custom_sources_cases/dml_planning.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index e93f1a4c33b25..4d4d76e32b4b5 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -666,13 +666,15 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { let source_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("status", DataType::Utf8, true), + // t2-only column to avoid false negatives after qualifier stripping + Field::new("src_only", DataType::Utf8, true), ])); let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; ctx.sql( "UPDATE t1 SET value = 1 FROM t2 \ - WHERE t1.id = t2.id AND t2.status = 'active' AND t1.value > 10", + WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", ) .await? .collect() From 7ad749ec9247a241cd447e4b36d2b39e3ee703a2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 22 Jan 2026 16:11:04 +0800 Subject: [PATCH 23/36] Enhance DML filter extraction to enforce fail-closed behavior on non-target predicates Replace the permissive predicate_is_on_target check with a two-pass approach: 1. First pass: collect all allowed table references (target + aliases) 2. Second pass: extract filters while enforcing fail-closed behavior Changes: - Rename predicate_is_on_target to predicate_is_on_allowed_refs - Accept a HashSet of allowed references instead of a single target - Invert logic: use .all() instead of .any() to verify all columns are allowed - Return error when any conjunct references non-target tables - Update test to expect failure on mixed-target predicates This prevents silent filter dropping and ensures DML operations fail explicitly when predicates reference multiple tables, avoiding incorrect execution. --- datafusion/core/src/physical_planner.rs | 53 ++++++++++++++----- .../custom_sources_cases/dml_planning.rs | 45 ++++++---------- 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index aaf8b90c2898b..3fbd30e55a027 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1931,17 +1931,39 @@ fn extract_dml_filters( input: &Arc, target: &TableReference, ) -> Result> { + // First pass: collect all allowed table references for the target, including aliases. + let mut allowed_refs: HashSet = HashSet::new(); + allowed_refs.insert(target.clone()); + + input.apply(|node| { + if let LogicalPlan::TableScan(TableScan { table_name, .. }) = node { + if table_name.resolved_eq(target) { + allowed_refs.insert(table_name.clone()); + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + // Second pass: extract filters while enforcing fail-closed behavior on non-target predicates let mut filters = Vec::new(); input.apply(|node| { match node { LogicalPlan::Filter(filter) => { - // Split AND predicates into individual expressions - for predicate in split_conjunction(&filter.predicate) { - if predicate_is_on_target(predicate, target)? { - filters.push(predicate.clone()); - } + let conjuncts = split_conjunction(&filter.predicate); + // If any conjunct is not on the target (incl. alias), fail closed + let any_non_target = conjuncts + .iter() + .try_fold(false, |acc, pred| { + predicate_is_on_allowed_refs(pred, &allowed_refs) + .map(|on_target| acc || !on_target) + })?; + if any_non_target { + return exec_err!( + "DML predicate references non-target tables; mixed-target predicates are not supported for fast-path DML" + ); } + filters.extend(conjuncts.into_iter().cloned()); } LogicalPlan::TableScan(TableScan { table_name, @@ -2014,17 +2036,22 @@ fn extract_dml_filters( }) } -/// Determine whether a predicate references only columns from the target table. -fn predicate_is_on_target(expr: &Expr, target: &TableReference) -> Result { +/// Determine whether a predicate references only columns from the target table +/// or any of its aliases observed in the logical plan. +fn predicate_is_on_allowed_refs( + expr: &Expr, + allowed_refs: &HashSet, +) -> Result { let mut columns = HashSet::new(); expr_to_columns(expr, &mut columns)?; - // Short-circuit on first mismatch: returns false if any column references a different table - Ok(!columns.iter().any(|column| { - column - .relation - .as_ref() - .is_some_and(|relation| !relation.resolved_eq(target)) + Ok(columns.iter().all(|column| { + match &column.relation { + None => true, // Unqualified columns are allowed (single-target scenarios) + Some(relation) => allowed_refs + .iter() + .any(|allowed| relation.resolved_eq(allowed)), + } })) } diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 4d4d76e32b4b5..02ac7c1af6870 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -655,7 +655,7 @@ async fn test_delete_target_table_scoping() -> Result<()> { } #[tokio::test] -async fn test_update_from_drops_non_target_predicates() -> Result<()> { +async fn test_update_from_rejects_mixed_target_predicates() -> Result<()> { let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( test_schema(), TableProviderFilterPushDown::Exact, @@ -672,35 +672,24 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; - ctx.sql( - "UPDATE t1 SET value = 1 FROM t2 \ - WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", - ) - .await? - .collect() - .await?; - - let filters = target_provider - .captured_filters() - .expect("filters should be captured"); - assert!( - !filters.is_empty(), - "expected target predicates extracted from UPDATE ... FROM" - ); - - let has_t2_reference = filters.iter().try_fold(false, |found, expr| { - expr_has_table_reference(expr, "t2").map(|has_ref| found || has_ref) - })?; - assert!( - !has_t2_reference, - "filters should only include target-table predicates" - ); - - let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + let df = ctx + .sql( + "UPDATE t1 SET value = 1 FROM t2 \ + WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", + ) + .await?; + let res = df.collect().await; assert!( - filter_strs.iter().any(|s| s.contains("value")), - "expected target-table predicate to be retained" + res.is_err(), + "expected fail-closed on mixed-target predicates" ); + if let Err(e) = res { + let msg = e.to_string(); + assert!( + msg.contains("mixed-target predicates") || msg.contains("non-target tables"), + "unexpected error message: {msg}" + ); + } Ok(()) } From de794b6bd0f3c10e10bdd7e1e7c85fcaf15be640 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 22 Jan 2026 18:41:18 +0800 Subject: [PATCH 24/36] Revert "Enhance DML filter extraction to enforce fail-closed behavior on non-target predicates" This reverts commit 7ad749ec9247a241cd447e4b36d2b39e3ee703a2. --- datafusion/core/src/physical_planner.rs | 53 +++++-------------- .../custom_sources_cases/dml_planning.rs | 45 ++++++++++------ 2 files changed, 41 insertions(+), 57 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3fbd30e55a027..aaf8b90c2898b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1931,39 +1931,17 @@ fn extract_dml_filters( input: &Arc, target: &TableReference, ) -> Result> { - // First pass: collect all allowed table references for the target, including aliases. - let mut allowed_refs: HashSet = HashSet::new(); - allowed_refs.insert(target.clone()); - - input.apply(|node| { - if let LogicalPlan::TableScan(TableScan { table_name, .. }) = node { - if table_name.resolved_eq(target) { - allowed_refs.insert(table_name.clone()); - } - } - Ok(TreeNodeRecursion::Continue) - })?; - - // Second pass: extract filters while enforcing fail-closed behavior on non-target predicates let mut filters = Vec::new(); input.apply(|node| { match node { LogicalPlan::Filter(filter) => { - let conjuncts = split_conjunction(&filter.predicate); - // If any conjunct is not on the target (incl. alias), fail closed - let any_non_target = conjuncts - .iter() - .try_fold(false, |acc, pred| { - predicate_is_on_allowed_refs(pred, &allowed_refs) - .map(|on_target| acc || !on_target) - })?; - if any_non_target { - return exec_err!( - "DML predicate references non-target tables; mixed-target predicates are not supported for fast-path DML" - ); + // Split AND predicates into individual expressions + for predicate in split_conjunction(&filter.predicate) { + if predicate_is_on_target(predicate, target)? { + filters.push(predicate.clone()); + } } - filters.extend(conjuncts.into_iter().cloned()); } LogicalPlan::TableScan(TableScan { table_name, @@ -2036,22 +2014,17 @@ fn extract_dml_filters( }) } -/// Determine whether a predicate references only columns from the target table -/// or any of its aliases observed in the logical plan. -fn predicate_is_on_allowed_refs( - expr: &Expr, - allowed_refs: &HashSet, -) -> Result { +/// Determine whether a predicate references only columns from the target table. +fn predicate_is_on_target(expr: &Expr, target: &TableReference) -> Result { let mut columns = HashSet::new(); expr_to_columns(expr, &mut columns)?; - Ok(columns.iter().all(|column| { - match &column.relation { - None => true, // Unqualified columns are allowed (single-target scenarios) - Some(relation) => allowed_refs - .iter() - .any(|allowed| relation.resolved_eq(allowed)), - } + // Short-circuit on first mismatch: returns false if any column references a different table + Ok(!columns.iter().any(|column| { + column + .relation + .as_ref() + .is_some_and(|relation| !relation.resolved_eq(target)) })) } diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 02ac7c1af6870..4d4d76e32b4b5 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -655,7 +655,7 @@ async fn test_delete_target_table_scoping() -> Result<()> { } #[tokio::test] -async fn test_update_from_rejects_mixed_target_predicates() -> Result<()> { +async fn test_update_from_drops_non_target_predicates() -> Result<()> { let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( test_schema(), TableProviderFilterPushDown::Exact, @@ -672,24 +672,35 @@ async fn test_update_from_rejects_mixed_target_predicates() -> Result<()> { let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; - let df = ctx - .sql( - "UPDATE t1 SET value = 1 FROM t2 \ - WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", - ) - .await?; - let res = df.collect().await; + ctx.sql( + "UPDATE t1 SET value = 1 FROM t2 \ + WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", + ) + .await? + .collect() + .await?; + + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); assert!( - res.is_err(), - "expected fail-closed on mixed-target predicates" + !filters.is_empty(), + "expected target predicates extracted from UPDATE ... FROM" + ); + + let has_t2_reference = filters.iter().try_fold(false, |found, expr| { + expr_has_table_reference(expr, "t2").map(|has_ref| found || has_ref) + })?; + assert!( + !has_t2_reference, + "filters should only include target-table predicates" + ); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("value")), + "expected target-table predicate to be retained" ); - if let Err(e) = res { - let msg = e.to_string(); - assert!( - msg.contains("mixed-target predicates") || msg.contains("non-target tables"), - "unexpected error message: {msg}" - ); - } Ok(()) } From b34c612e99c1e2d4d7064c5c465795ff349409f6 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 22 Jan 2026 18:38:09 +0800 Subject: [PATCH 25/36] Enhance DML filter extraction to support aliases for target table references --- datafusion/core/src/physical_planner.rs | 41 ++++++++++++++++++++----- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index aaf8b90c2898b..35a88315552a5 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1932,13 +1932,27 @@ fn extract_dml_filters( target: &TableReference, ) -> Result> { let mut filters = Vec::new(); + let mut allowed_refs = vec![target.clone()]; + + // First pass: collect any alias references to the target table + input.apply(|node| { + if let LogicalPlan::SubqueryAlias(alias) = node { + // Check if this alias points to the target table + if let LogicalPlan::TableScan(scan) = alias.input.as_ref() { + if scan.table_name.resolved_eq(target) { + allowed_refs.push(TableReference::bare(alias.alias.to_string())); + } + } + } + Ok(TreeNodeRecursion::Continue) + })?; input.apply(|node| { match node { LogicalPlan::Filter(filter) => { // Split AND predicates into individual expressions for predicate in split_conjunction(&filter.predicate) { - if predicate_is_on_target(predicate, target)? { + if predicate_is_on_target_multi(predicate, &allowed_refs)? { filters.push(predicate.clone()); } } @@ -2014,17 +2028,28 @@ fn extract_dml_filters( }) } -/// Determine whether a predicate references only columns from the target table. -fn predicate_is_on_target(expr: &Expr, target: &TableReference) -> Result { +/// Determine whether a predicate references only columns from the target table +/// or its aliases. +/// +/// Columns may be qualified with the target table name or any of its aliases. +/// Unqualified columns are also accepted as they implicitly belong to the target table. +fn predicate_is_on_target_multi( + expr: &Expr, + allowed_refs: &[TableReference], +) -> Result { let mut columns = HashSet::new(); expr_to_columns(expr, &mut columns)?; - // Short-circuit on first mismatch: returns false if any column references a different table + // Short-circuit on first mismatch: returns false if any column references a table not in allowed_refs. + // Columns are accepted if: + // 1. They are unqualified (no relation specified), OR + // 2. Their relation matches one of the allowed table references using resolved equality Ok(!columns.iter().any(|column| { - column - .relation - .as_ref() - .is_some_and(|relation| !relation.resolved_eq(target)) + column.relation.as_ref().is_some_and(|relation| { + !allowed_refs + .iter() + .any(|allowed| relation.resolved_eq(allowed)) + }) })) } From 1f0b0959d77ef089471219c691a7486a1f3942a4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 22 Jan 2026 19:40:55 +0800 Subject: [PATCH 26/36] clippy fix --- datafusion/core/src/physical_planner.rs | 11 ++++---- .../custom_sources_cases/dml_planning.rs | 25 ++++++++----------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 92132a64ed913..22f4e54846e9b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1965,13 +1965,12 @@ fn extract_dml_filters( // First pass: collect any alias references to the target table input.apply(|node| { - if let LogicalPlan::SubqueryAlias(alias) = node { + if let LogicalPlan::SubqueryAlias(alias) = node // Check if this alias points to the target table - if let LogicalPlan::TableScan(scan) = alias.input.as_ref() { - if scan.table_name.resolved_eq(target) { - allowed_refs.push(TableReference::bare(alias.alias.to_string())); - } - } + && let LogicalPlan::TableScan(scan) = alias.input.as_ref() + && scan.table_name.resolved_eq(target) + { + allowed_refs.push(TableReference::bare(alias.alias.to_string())); } Ok(TreeNodeRecursion::Continue) })?; diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index e52d3a1b128f0..ba49a058c81ba 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -24,20 +24,13 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; -<<<<<<< HEAD -use datafusion::execution::context::SessionContext; +use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::logical_expr::{ Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, }; use datafusion_catalog::Session; -use datafusion_common::TableReference; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -======= -use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::logical_expr::Expr; -use datafusion_catalog::Session; -use datafusion_common::ScalarValue; ->>>>>>> main +use datafusion_common::{ScalarValue, TableReference}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -587,7 +580,6 @@ async fn test_update_assignments() -> Result<()> { } #[tokio::test] -<<<<<<< HEAD async fn test_update_filter_pushdown_extracts_table_scan_filters() -> Result<()> { let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( test_schema(), @@ -658,7 +650,10 @@ async fn test_update_filter_pushdown_passes_table_scan_filters() -> Result<()> { !filters.is_empty(), "expected filters extracted from TableScan during UPDATE" ); -======= + Ok(()) +} + +#[tokio::test] async fn test_truncate_calls_provider() -> Result<()> { let provider = Arc::new(CaptureTruncateProvider::new(test_schema())); let config = SessionConfig::new().set( @@ -677,7 +672,6 @@ async fn test_truncate_calls_provider() -> Result<()> { "truncate() should be called on the TableProvider" ); ->>>>>>> main Ok(()) } @@ -709,7 +703,6 @@ async fn test_unsupported_table_update() -> Result<()> { } #[tokio::test] -<<<<<<< HEAD async fn test_delete_target_table_scoping() -> Result<()> { // Test that DELETE only extracts filters from the target table, // not from other tables (important for DELETE...FROM safety) @@ -832,7 +825,10 @@ async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { filter_str.contains("id") || filter_str.contains("1"), "Filter should reference id column or the value 1, got: {filter_str}" ); -======= + Ok(()) +} + +#[tokio::test] async fn test_unsupported_table_truncate() -> Result<()> { let schema = test_schema(); let ctx = SessionContext::new(); @@ -844,6 +840,5 @@ async fn test_unsupported_table_truncate() -> Result<()> { assert!(result.is_err() || result.unwrap().collect().await.is_err()); ->>>>>>> main Ok(()) } From 20a454cf9d594210960ac30fb0bfc7dad5fb8f6e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 22 Jan 2026 22:33:25 +0800 Subject: [PATCH 27/36] Add tests for updates from another table with and without aliases --- datafusion/sqllogictest/test_files/update.slt | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index a652ae7633e44..a1928d241f8ae 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -81,8 +81,22 @@ physical_plan 01)CooperativeExec 02)--DmlResultExec: rows_affected=0 +# test update from other table with actual data statement ok -create table t3(a int, b varchar, c double, d int); +insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 4.0, 30); + +statement ok +insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, 'updated_b3', 1.5, 60); + +statement ok +update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; + +query ITRI +select * from t1 order by a; +---- +1 updated_b 1 1 +2 updated_b2 2 1 +3 bar 4 30 # set from multiple tables, DataFusion only supports from one table query error DataFusion error: Error during planning: Multiple tables in UPDATE SET FROM not yet supported @@ -96,10 +110,33 @@ logical_plan 01)Dml: op=[Update] table=[t1] 02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d 03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) -04)------Cross Join: +04)------Cross Join: 05)--------SubqueryAlias: t 06)----------TableScan: t1 07)--------TableScan: t2 physical_plan 01)CooperativeExec -02)--DmlResultExec: rows_affected=0 +02)--DmlResultExec: rows_affected=1 + +# test update with table alias with actual data +statement ok +delete from t1; + +statement ok +delete from t2; + +statement ok +insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple', 3.5, 15); + +statement ok +insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); + +statement ok +update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; + +query ITRI +select * from t1 order by a; +---- +1 new_val 1 1 +2 new_val2 2 1 +3 apple 3.5 15 From 3e0898fc311cca9224da9237bc2f4b0976bc4635 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 12:08:33 +0800 Subject: [PATCH 28/36] Update tests to reflect unsupported UPDATE ... FROM syntax for TableProvider-backed tables --- datafusion/sqllogictest/test_files/update.slt | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index a1928d241f8ae..0223eaef6d1cd 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -88,15 +88,11 @@ insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 4.0, statement ok insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, 'updated_b3', 1.5, 60); -statement ok +statement error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; -query ITRI +query error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables select * from t1 order by a; ----- -1 updated_b 1 1 -2 updated_b2 2 1 -3 bar 4 30 # set from multiple tables, DataFusion only supports from one table query error DataFusion error: Error during planning: Multiple tables in UPDATE SET FROM not yet supported @@ -131,12 +127,8 @@ insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple', 3 statement ok insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); -statement ok +statement error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; -query ITRI +query error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables select * from t1 order by a; ----- -1 new_val 1 1 -2 new_val2 2 1 -3 apple 3.5 15 From 4afb705a186e590ea527c73da9de6a3c63d06754 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 12:21:36 +0800 Subject: [PATCH 29/36] Implement error handling for unsupported UPDATE ... FROM syntax --- .../custom_sources_cases/dml_planning.rs | 40 ++++++----------- datafusion/sql/src/statement.rs | 8 ++++ datafusion/sqllogictest/test_files/update.slt | 43 +++++-------------- 3 files changed, 32 insertions(+), 59 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index ba49a058c81ba..2f80b27ac010b 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -738,6 +738,8 @@ async fn test_delete_target_table_scoping() -> Result<()> { #[tokio::test] async fn test_update_from_drops_non_target_predicates() -> Result<()> { + // UPDATE ... FROM is currently unsupported due to design limitations + // See FIX_UPDATE.md and FIX_UPDATE_2.md for details let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( test_schema(), TableProviderFilterPushDown::Exact, @@ -754,34 +756,20 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); ctx.register_table("t2", Arc::new(source_table))?; - ctx.sql( - "UPDATE t1 SET value = 1 FROM t2 \ - WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", - ) - .await? - .collect() - .await?; + let result = ctx + .sql( + "UPDATE t1 SET value = 1 FROM t2 \ + WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", + ) + .await; - let filters = target_provider - .captured_filters() - .expect("filters should be captured"); - assert!( - !filters.is_empty(), - "expected target predicates extracted from UPDATE ... FROM" - ); - - let has_t2_reference = filters.iter().try_fold(false, |found, expr| { - expr_has_table_reference(expr, "t2").map(|has_ref| found || has_ref) - })?; - assert!( - !has_t2_reference, - "filters should only include target-table predicates" - ); - - let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + // Verify UPDATE ... FROM is rejected with appropriate error + assert!(result.is_err()); + let err = result.unwrap_err(); assert!( - filter_strs.iter().any(|s| s.contains("value")), - "expected target-table predicate to be retained" + err.to_string().contains("UPDATE ... FROM is not supported"), + "Expected 'UPDATE ... FROM is not supported' error, got: {}", + err ); Ok(()) } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 4981db5537a74..512ce22c1183c 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1070,6 +1070,14 @@ impl SqlToRel<'_, S> { plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?; } let update_from = from_clauses.and_then(|mut f| f.pop()); + + // UPDATE ... FROM is currently unsupported due to design limitations + // See FIX_UPDATE.md and FIX_UPDATE_2.md for details + // Qualifier stripping breaks source column references, causing incorrect behavior + if update_from.is_some() { + return not_impl_err!("UPDATE ... FROM is not supported"); + } + if returning.is_some() { plan_err!("Update-returning clause not yet supported")?; } diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index 0223eaef6d1cd..d5008bf9b7649 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -67,19 +67,9 @@ logical_plan physical_plan_error This feature is not implemented: Physical plan does not support logical expression ScalarSubquery() # set from other table -query TT +# UPDATE ... FROM is currently unsupported +query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; ----- -logical_plan -01)Dml: op=[Update] table=[t1] -02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d -03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) -04)------Cross Join: -05)--------TableScan: t1 -06)--------TableScan: t2 -physical_plan -01)CooperativeExec -02)--DmlResultExec: rows_affected=0 # test update from other table with actual data statement ok @@ -88,31 +78,20 @@ insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 4.0, statement ok insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, 'updated_b3', 1.5, 60); -statement error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables +# UPDATE ... FROM is currently unsupported - qualifier stripping breaks source column references +# causing assignments like 'b = t2.b' to resolve to target table's 'b' instead of source table's 'b' +# See FIX_UPDATE.md and FIX_UPDATE_2.md for details +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; -query error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables -select * from t1 order by a; - # set from multiple tables, DataFusion only supports from one table query error DataFusion error: Error during planning: Multiple tables in UPDATE SET FROM not yet supported explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a; # test table alias -query TT +# UPDATE ... FROM is currently unsupported +query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; ----- -logical_plan -01)Dml: op=[Update] table=[t1] -02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d -03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) -04)------Cross Join: -05)--------SubqueryAlias: t -06)----------TableScan: t1 -07)--------TableScan: t2 -physical_plan -01)CooperativeExec -02)--DmlResultExec: rows_affected=1 # test update with table alias with actual data statement ok @@ -127,8 +106,6 @@ insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple', 3 statement ok insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); -statement error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables +# UPDATE ... FROM is currently unsupported +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; - -query error DataFusion error: Error during planning: UPDATE ... FROM is not supported for TableProvider-backed tables -select * from t1 order by a; From e0e198afdf856f41599dd25d3e279597bd9eaabc Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 12:25:00 +0800 Subject: [PATCH 30/36] Remove unused function expr_has_table_reference from dml_planning.rs --- .../core/tests/custom_sources_cases/dml_planning.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 2f80b27ac010b..80eaa4b77a4c8 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -308,19 +308,6 @@ fn test_schema() -> SchemaRef { ])) } -fn expr_has_table_reference(expr: &Expr, table: &str) -> Result { - let reference = TableReference::bare(table); - expr.exists(|node| { - Ok(matches!( - node, - Expr::Column(column) - if column.relation.as_ref().is_some_and(|relation| { - relation.resolved_eq(&reference) - }) - )) - }) -} - #[tokio::test] async fn test_delete_single_filter() -> Result<()> { let provider = Arc::new(CaptureDeleteProvider::new(test_schema())); From 1bd888bfba834f908d08ef9bf9dd3ae993e3721a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 13:15:11 +0800 Subject: [PATCH 31/36] Update comments to clarify the unsupported status of UPDATE ... FROM syntax --- datafusion/core/tests/custom_sources_cases/dml_planning.rs | 4 ++-- datafusion/sql/src/statement.rs | 5 ++--- datafusion/sqllogictest/test_files/update.slt | 4 +++- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 80eaa4b77a4c8..2aed6ee5b411e 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -725,8 +725,8 @@ async fn test_delete_target_table_scoping() -> Result<()> { #[tokio::test] async fn test_update_from_drops_non_target_predicates() -> Result<()> { - // UPDATE ... FROM is currently unsupported due to design limitations - // See FIX_UPDATE.md and FIX_UPDATE_2.md for details + // UPDATE ... FROM is currently not working + // TODO fix https://github.com/apache/datafusion/issues/19950 let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( test_schema(), TableProviderFilterPushDown::Exact, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 512ce22c1183c..6956af70a80da 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1071,9 +1071,8 @@ impl SqlToRel<'_, S> { } let update_from = from_clauses.and_then(|mut f| f.pop()); - // UPDATE ... FROM is currently unsupported due to design limitations - // See FIX_UPDATE.md and FIX_UPDATE_2.md for details - // Qualifier stripping breaks source column references, causing incorrect behavior + // UPDATE ... FROM is currently not working + // TODO fix https://github.com/apache/datafusion/issues/19950 if update_from.is_some() { return not_impl_err!("UPDATE ... FROM is not supported"); } diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index d5008bf9b7649..a4e88066104fe 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -80,7 +80,7 @@ insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, # UPDATE ... FROM is currently unsupported - qualifier stripping breaks source column references # causing assignments like 'b = t2.b' to resolve to target table's 'b' instead of source table's 'b' -# See FIX_UPDATE.md and FIX_UPDATE_2.md for details +# TODO fix https://github.com/apache/datafusion/issues/19950 statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; @@ -90,6 +90,7 @@ explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a an # test table alias # UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; @@ -107,5 +108,6 @@ statement ok insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); # UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; From 354cff3828837cf4b72c9414ff57ce4d056c3680 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 13:18:52 +0800 Subject: [PATCH 32/36] Remove unused import of TableReference in dml_planning.rs --- datafusion/core/tests/custom_sources_cases/dml_planning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 2aed6ee5b411e..eb5a83f721807 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -29,8 +29,8 @@ use datafusion::logical_expr::{ Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, }; use datafusion_catalog::Session; +use datafusion_common::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{ScalarValue, TableReference}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; From 4698c5f0d7bc44c3c54d6a29953c7f3083b841f4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 13:29:11 +0800 Subject: [PATCH 33/36] Add TODO comments for fixing unsupported UPDATE ... FROM syntax --- datafusion/core/tests/custom_sources_cases/dml_planning.rs | 1 + datafusion/sqllogictest/test_files/update.slt | 1 + 2 files changed, 2 insertions(+) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index eb5a83f721807..3195c63118d9e 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -751,6 +751,7 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { .await; // Verify UPDATE ... FROM is rejected with appropriate error + // TODO fix https://github.com/apache/datafusion/issues/19950 assert!(result.is_err()); let err = result.unwrap_err(); assert!( diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index a4e88066104fe..37f6e7513970a 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -68,6 +68,7 @@ physical_plan_error This feature is not implemented: Physical plan does not supp # set from other table # UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; From 64be88c495851be43bb0ff6df9e20978d6ee4133 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 23 Jan 2026 15:12:13 +0800 Subject: [PATCH 34/36] clippy fix --- datafusion/core/tests/custom_sources_cases/dml_planning.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 3195c63118d9e..8ba2980bd339d 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -756,8 +756,7 @@ async fn test_update_from_drops_non_target_predicates() -> Result<()> { let err = result.unwrap_err(); assert!( err.to_string().contains("UPDATE ... FROM is not supported"), - "Expected 'UPDATE ... FROM is not supported' error, got: {}", - err + "Expected 'UPDATE ... FROM is not supported' error, got: {err}" ); Ok(()) } From e9bf8f3fe0b2fe46d499103a7be8fb35cda1083d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 27 Jan 2026 09:49:57 +0800 Subject: [PATCH 35/36] Replace plan_err with not_impl_err for unsupported multiple tables in UPDATE SET FROM --- datafusion/sql/src/statement.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 6956af70a80da..b086e89f3e9ac 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1067,7 +1067,9 @@ impl SqlToRel<'_, S> { }); // TODO: support multiple tables in UPDATE SET FROM if from_clauses.as_ref().is_some_and(|f| f.len() > 1) { - plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?; + not_impl_err!( + "Multiple tables in UPDATE SET FROM not yet supported" + )?; } let update_from = from_clauses.and_then(|mut f| f.pop()); From d901ee3ed4f31329219aa9cba75e748120548300 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 27 Jan 2026 10:57:23 +0800 Subject: [PATCH 36/36] Update error messages for unsupported UPDATE ... FROM syntax in tests --- datafusion/sqllogictest/test_files/update.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index 37f6e7513970a..1cd2b626e3b8e 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -86,13 +86,13 @@ statement error DataFusion error: This feature is not implemented: UPDATE ... FR update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; # set from multiple tables, DataFusion only supports from one table -query error DataFusion error: Error during planning: Multiple tables in UPDATE SET FROM not yet supported +statement error DataFusion error: This feature is not implemented: Multiple tables in UPDATE SET FROM not yet supported explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a; # test table alias # UPDATE ... FROM is currently unsupported # TODO fix https://github.com/apache/datafusion/issues/19950 -query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; # test update with table alias with actual data