From 48a8e6bf244ca70dda11ad8da2b00b62cb689535 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Sun, 12 Jan 2025 23:49:32 +0800 Subject: [PATCH 1/4] fix: incorrect NATURAL/USING JOIN schema --- datafusion/expr/src/utils.rs | 39 +++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index b1e36e02925b0..47ada0c478ef2 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -379,14 +379,12 @@ fn get_exprs_except_skipped( } } -/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. -pub fn expand_wildcard( - schema: &DFSchema, - plan: &LogicalPlan, - wildcard_options: Option<&WildcardOptions>, -) -> Result> { +/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice +/// (once for each join side), but an unqualified wildcard should include it only once. +/// This function returns the columns that should be excluded. +fn exclude_using_columns(plan: &LogicalPlan) -> Result> { let using_columns = plan.using_columns()?; - let mut columns_to_skip = using_columns + let excluded = using_columns .into_iter() // For each USING JOIN condition, only expand to one of each join column in projection .flat_map(|cols| { @@ -407,6 +405,16 @@ pub fn expand_wildcard( .collect::>() }) .collect::>(); + Ok(excluded) +} + +/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. +pub fn expand_wildcard( + schema: &DFSchema, + plan: &LogicalPlan, + wildcard_options: Option<&WildcardOptions>, +) -> Result> { + let mut columns_to_skip = exclude_using_columns(plan)?; let excluded_columns = if let Some(WildcardOptions { exclude: opt_exclude, except: opt_except, @@ -705,25 +713,20 @@ pub fn exprlist_to_fields<'a>( .map(|e| match e { Expr::Wildcard { qualifier, options } => match qualifier { None => { - let excluded: Vec = get_excluded_columns( + let mut excluded = exclude_using_columns(plan)?; + excluded.extend(get_excluded_columns( options.exclude.as_ref(), options.except.as_ref(), wildcard_schema, None, - )? - .into_iter() - .map(|c| c.flat_name()) - .collect(); + )?); Ok::<_, DataFusionError>( wildcard_schema - .field_names() .iter() - .enumerate() - .filter(|(_, s)| !excluded.contains(s)) - .map(|(i, _)| wildcard_schema.qualified_field(i)) - .map(|(qualifier, f)| { - (qualifier.cloned(), Arc::new(f.to_owned())) + .filter(|(q, f)| { + !excluded.contains(&Column::new(q.cloned(), f.name())) }) + .map(|(q, f)| (q.cloned(), Arc::clone(f))) .collect::>(), ) } From 7b2cdd2ce947e1a694c9db7ea5d4575267f05ef4 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 13 Jan 2025 11:18:58 +0800 Subject: [PATCH 2/4] Add test --- datafusion/expr/src/utils.rs | 20 ++++++++---------- datafusion/sql/tests/sql_integration.rs | 28 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 47ada0c478ef2..8cb02edfabebd 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{ }; use datafusion_common::utils::get_at_indices; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, - DataFusionError, HashMap, Result, TableReference, + internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap, + Result, TableReference, }; use indexmap::IndexSet; @@ -720,15 +720,13 @@ pub fn exprlist_to_fields<'a>( wildcard_schema, None, )?); - Ok::<_, DataFusionError>( - wildcard_schema - .iter() - .filter(|(q, f)| { - !excluded.contains(&Column::new(q.cloned(), f.name())) - }) - .map(|(q, f)| (q.cloned(), Arc::clone(f))) - .collect::>(), - ) + Ok(wildcard_schema + .iter() + .filter(|(q, f)| { + !excluded.contains(&Column::new(q.cloned(), f.name())) + }) + .map(|(q, f)| (q.cloned(), Arc::clone(f))) + .collect::>()) } Some(qualifier) => { let excluded: Vec = get_excluded_columns( diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index b93060988d205..aaf5b73ead0b0 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4552,3 +4552,31 @@ fn test_error_message_invalid_window_aggregate_function_signature() { "Error during planning: sum does not support zero arguments", ); } + +// Test issue: https://github.com/apache/datafusion/issues/14058 +// Select with wildcard over a USING/NATURAL JOIN should deduplicate condition columns. +#[test] +fn test_using_join_wildcard_schema() { + let sql = "SELECT * FROM orders o1 JOIN orders o2 USING (order_id)"; + let plan = logical_plan(sql).unwrap(); + let count = plan + .schema() + .iter() + .filter(|(_, f)| f.name() == "order_id") + .count(); + // Only one order_id column + assert_eq!(count, 1); + + let sql = "SELECT * FROM orders o1 NATURAL JOIN orders o2"; + let plan = logical_plan(sql).unwrap(); + // Only columns from one join side should be present + let expected_fields = vec![ + "o1.order_id".to_string(), + "o1.customer_id".to_string(), + "o1.o_item_id".to_string(), + "o1.qty".to_string(), + "o1.price".to_string(), + "o1.delivered".to_string(), + ]; + assert_eq!(plan.schema().field_names(), expected_fields); +} From efca7531b890002c6b3982dd73bb8297cda32f32 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 13 Jan 2025 14:42:21 +0800 Subject: [PATCH 3/4] Simplify exclude_using_columns --- datafusion/expr/src/utils.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 8cb02edfabebd..049926fb0bcd6 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -393,16 +393,14 @@ fn exclude_using_columns(plan: &LogicalPlan) -> Result> { // qualified column cols.sort(); let mut out_column_names: HashSet = HashSet::new(); - cols.into_iter() - .filter_map(|c| { - if out_column_names.contains(&c.name) { - Some(c) - } else { - out_column_names.insert(c.name); - None - } - }) - .collect::>() + cols.into_iter().filter_map(move |c| { + if out_column_names.contains(&c.name) { + Some(c) + } else { + out_column_names.insert(c.name); + None + } + }) }) .collect::>(); Ok(excluded) From 789e9f9fc8b0b3c90e1a449252b796d8685dcf37 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Tue, 14 Jan 2025 10:24:53 +0800 Subject: [PATCH 4/4] Add more tests --- datafusion/sql/tests/sql_integration.rs | 46 +++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index aaf5b73ead0b0..24b585d2a5e75 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4579,4 +4579,50 @@ fn test_using_join_wildcard_schema() { "o1.delivered".to_string(), ]; assert_eq!(plan.schema().field_names(), expected_fields); + + // Reproducible example of issue #14058 + let sql = "WITH t1 AS (SELECT 1 AS id, 'a' AS value1), + t2 AS (SELECT 1 AS id, 'x' AS value2) + SELECT * FROM t1 NATURAL JOIN t2"; + let plan = logical_plan(sql).unwrap(); + assert_eq!( + plan.schema().field_names(), + [ + "t1.id".to_string(), + "t1.value1".to_string(), + "t2.value2".to_string() + ] + ); + + // Multiple joins + let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b), + t2 AS (SELECT 1 AS a, 2 AS c), + t3 AS (SELECT 1 AS c, 2 AS d) + SELECT * FROM t1 NATURAL JOIN t2 RIGHT JOIN t3 USING (c)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!( + plan.schema().field_names(), + [ + "t1.a".to_string(), + "t1.b".to_string(), + "t2.c".to_string(), + "t3.d".to_string() + ] + ); + + // Subquery + let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b), + t2 AS (SELECT 1 AS a, 2 AS c), + t3 AS (SELECT 1 AS c, 2 AS d) + SELECT * FROM (SELECT * FROM t1 LEFT JOIN t2 USING(a)) NATURAL JOIN t3"; + let plan = logical_plan(sql).unwrap(); + assert_eq!( + plan.schema().field_names(), + [ + "t1.a".to_string(), + "t1.b".to_string(), + "t2.c".to_string(), + "t3.d".to_string() + ] + ); }