From ddbf54e38703a996d960badb2f10a93f8f9f2359 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 24 Mar 2025 17:51:50 +0100 Subject: [PATCH 1/9] Fix duplicate unqualified field name issue --- datafusion/expr/src/logical_plan/builder.rs | 21 ++++++++++++------- .../src/equivalence/projection.rs | 6 +++--- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 91a871d52e9ad..3df99d23dfe8a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1470,17 +1470,24 @@ impl ValuesFields { pub fn change_redundant_column(fields: &Fields) -> Vec { let mut name_map = HashMap::new(); + let mut seen: HashSet = HashSet::new(); + fields .into_iter() .map(|field| { - let counter = name_map.entry(field.name().to_string()).or_insert(0); - *counter += 1; - if *counter > 1 { - let new_name = format!("{}:{}", field.name(), *counter - 1); - Field::new(new_name, field.data_type().clone(), field.is_nullable()) - } else { - field.as_ref().clone() + let base_name = field.name(); + let count = name_map.entry(base_name.clone()).or_insert(0); + let mut new_name = base_name.clone(); + + // Loop until we find a name that hasn't been used + while seen.contains(&new_name) { + *count += 1; + new_name = format!("{}:{}", base_name, count); } + + seen.insert(new_name.clone()); + + Field::new(&new_name, field.data_type().clone(), field.is_nullable()) }) .collect() } diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 035678fbf1f39..873b8d04a883f 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -66,9 +66,9 @@ impl ProjectionMapping { let idx = col.index(); let matching_input_field = input_schema.field(idx); if col.name() != matching_input_field.name() { - return internal_err!("Input field name {} does not match with the projection expression {}", - matching_input_field.name(),col.name()) - } + let fixed_col = Column::new(matching_input_field.name(), idx); + return Ok(Transformed::yes(Arc::new(fixed_col))) + } let matching_input_column = Column::new(matching_input_field.name(), idx); Ok(Transformed::yes(Arc::new(matching_input_column))) From 8d6ac438f8fa97f435c3ab135328ea90a477ccc1 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 25 Mar 2025 12:30:06 +0100 Subject: [PATCH 2/9] Adjust Projection Properly --- datafusion/physical-expr/src/equivalence/projection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 873b8d04a883f..4ebe6ae9a956e 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -66,7 +66,7 @@ impl ProjectionMapping { let idx = col.index(); let matching_input_field = input_schema.field(idx); if col.name() != matching_input_field.name() { - let fixed_col = Column::new(matching_input_field.name(), idx); + let fixed_col = Column::new(col.name(), idx); return Ok(Transformed::yes(Arc::new(fixed_col))) } let matching_input_column = From 36645ef32ebdf514ca0632126b743c19e07ab7ea Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 26 Mar 2025 10:20:53 +0100 Subject: [PATCH 3/9] Add reproducer plan --- .../testdata/test_plans/multiple_joins.json | 510 ++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 datafusion/substrait/tests/testdata/test_plans/multiple_joins.json diff --git a/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json b/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json new file mode 100644 index 0000000000000..cdfd13fa845fb --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json @@ -0,0 +1,510 @@ +{ + "relations": [{ + "root": { + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [8, 9, 10, 11] + } + }, + "input": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["id"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "virtualTable": { + "values": [{ + "fields": [{ + "i64": "1", + "nullable": true, + "typeVariationReference": 0 + }] + }, { + "fields": [{ + "i64": "2", + "nullable": true, + "typeVariationReference": 0 + }] + }] + } + } + }, + "groupings": [{ + "groupingExpressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }], + "expressionReferences": [] + }], + "measures": [{ + "measure": { + "functionReference": 0, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [], + "options": [] + } + }], + "groupingExpressions": [] + } + }, + "right": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["id", "category"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "string": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "virtualTable": { + "values": [{ + "fields": [{ + "i64": "1", + "nullable": true, + "typeVariationReference": 0 + }, { + "string": "info", + "nullable": true, + "typeVariationReference": 0 + }] + }, { + "fields": [{ + "i64": "2", + "nullable": true, + "typeVariationReference": 0 + }, { + "string": "low", + "nullable": true, + "typeVariationReference": 0 + }] + }] + } + } + }, + "groupings": [{ + "groupingExpressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }], + "expressionReferences": [] + }], + "measures": [], + "groupingExpressions": [] + } + }, + "expression": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + } + }], + "options": [] + } + }, + "type": "JOIN_TYPE_LEFT" + } + }, + "right": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["id"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "virtualTable": { + "values": [{ + "fields": [{ + "i64": "1", + "nullable": true, + "typeVariationReference": 0 + }] + }, { + "fields": [{ + "i64": "2", + "nullable": true, + "typeVariationReference": 0 + }] + }] + } + } + }, + "groupings": [{ + "groupingExpressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }], + "expressionReferences": [] + }], + "measures": [{ + "measure": { + "functionReference": 0, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [], + "options": [] + } + }], + "groupingExpressions": [] + } + }, + "expression": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + }, + "rootReference": { + } + } + } + }], + "options": [] + } + }, + "type": "JOIN_TYPE_LEFT" + } + }, + "right": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["id"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "virtualTable": { + "values": [{ + "fields": [{ + "i64": "1", + "nullable": true, + "typeVariationReference": 0 + }] + }, { + "fields": [{ + "i64": "2", + "nullable": true, + "typeVariationReference": 0 + }] + }] + } + } + }, + "groupings": [{ + "groupingExpressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }], + "expressionReferences": [] + }], + "measures": [{ + "measure": { + "functionReference": 0, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [], + "options": [] + } + }], + "groupingExpressions": [] + } + }, + "expression": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + }, + "rootReference": { + } + } + } + }], + "options": [] + } + }, + "type": "JOIN_TYPE_LEFT" + } + }, + "expressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }, { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + }, + "rootReference": { + } + } + }, { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": { + } + } + }, { + "selection": { + "directReference": { + "structField": { + "field": 7 + } + }, + "rootReference": { + } + } + }] + } + }, + "names": ["count_first", "category", "count_second", "count_third"] + } + }], + "expectedTypeUrls": [] +} \ No newline at end of file From a9d99492801fa48b007b28ba2e485bd8b7594935 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Thu, 27 Mar 2025 09:25:12 +0100 Subject: [PATCH 4/9] Adjust comment --- datafusion/substrait/src/logical_plan/consumer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 61f3379735c7d..1442267d3dbb6 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -1835,8 +1835,7 @@ fn requalify_sides_if_needed( }) }) { // These names have no connection to the original plan, but they'll make the columns - // (mostly) unique. There may be cases where this still causes duplicates, if either left - // or right side itself contains duplicate names with different qualifiers. + // (mostly) unique. Ok(( left.alias(TableReference::bare("left"))?, right.alias(TableReference::bare("right"))?, From c3c6abbf5f093d7a05cd8c51d0cb9580db179659 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 26 Mar 2025 11:15:24 +0100 Subject: [PATCH 5/9] Set metadata to be the same as well --- datafusion/expr/src/logical_plan/builder.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3df99d23dfe8a..7206ea9026555 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1487,7 +1487,9 @@ pub fn change_redundant_column(fields: &Fields) -> Vec { seen.insert(new_name.clone()); - Field::new(&new_name, field.data_type().clone(), field.is_nullable()) + let mut modified_field = Field::new(&new_name, field.data_type().clone(), field.is_nullable()); + modified_field.set_metadata(field.metadata().clone()); + modified_field }) .collect() } From 63cd54d7ad4be35952c530323ddafedb3eaf99d3 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 26 Mar 2025 12:01:41 +0100 Subject: [PATCH 6/9] Fix substrait reproducer + Add test case --- .../tests/cases/consumer_integration.rs | 28 +++++++++++++++++++ .../testdata/test_plans/multiple_joins.json | 28 ++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs b/datafusion/substrait/tests/cases/consumer_integration.rs index 1f1a15abb837c..947fb7293c6b4 100644 --- a/datafusion/substrait/tests/cases/consumer_integration.rs +++ b/datafusion/substrait/tests/cases/consumer_integration.rs @@ -483,6 +483,34 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_multiple_joins() -> Result<()> { + let plan_str = test_plan_to_string("multiple_joins.json").await?; + println!("{}", plan_str); + assert_eq!( + plan_str, + "Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third\ + \n Left Join: left.id = right.id\ + \n SubqueryAlias: left\ + \n Left Join: left.id = right.id\ + \n SubqueryAlias: left\ + \n Left Join: left.id = right.id\ + \n SubqueryAlias: left\ + \n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\ + \n Values: (Int64(1)), (Int64(2))\ + \n SubqueryAlias: right\ + \n Aggregate: groupBy=[[id, category]], aggr=[[]]\ + \n Values: (Int64(1), Utf8(\"info\")), (Int64(2), Utf8(\"low\"))\ + \n SubqueryAlias: right\ + \n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\ + \n Values: (Int64(1)), (Int64(2))\ + \n SubqueryAlias: right\ + \n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\ + \n Values: (Int64(1)), (Int64(2))" + ); + Ok(()) + } + #[tokio::test] async fn test_select_window_count() -> Result<()> { let plan_str = test_plan_to_string("select_window_count.substrait.json").await?; diff --git a/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json b/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json index cdfd13fa845fb..e88cce648da7c 100644 --- a/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json +++ b/datafusion/substrait/tests/testdata/test_plans/multiple_joins.json @@ -1,4 +1,24 @@ { + "extensionUris": [{ + "extensionUriAnchor": 1, + "uri": "/functions_aggregate_generic.yaml" + }, { + "extensionUriAnchor": 2, + "uri": "/functions_comparison.yaml" + }], + "extensions": [{ + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "count:" + } + }, { + "extensionFunction": { + "extensionUriReference": 2, + "functionAnchor": 1, + "name": "equal:any_any" + } + }], "relations": [{ "root": { "input": { @@ -506,5 +526,11 @@ "names": ["count_first", "category", "count_second", "count_third"] } }], - "expectedTypeUrls": [] + "expectedTypeUrls": [], + "version": { + "majorNumber": 0, + "minorNumber": 52, + "patchNumber": 0, + "gitHash": "" + } } \ No newline at end of file From 2f925ce1bb9e230817f694172b0d383990a39768 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 26 Mar 2025 16:06:18 +0100 Subject: [PATCH 7/9] Format --- datafusion/expr/src/logical_plan/builder.rs | 3 ++- datafusion/physical-expr/src/equivalence/projection.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 7206ea9026555..79e2e2680ea34 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1487,7 +1487,8 @@ pub fn change_redundant_column(fields: &Fields) -> Vec { seen.insert(new_name.clone()); - let mut modified_field = Field::new(&new_name, field.data_type().clone(), field.is_nullable()); + let mut modified_field = + Field::new(&new_name, field.data_type().clone(), field.is_nullable()); modified_field.set_metadata(field.metadata().clone()); modified_field }) diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 4ebe6ae9a956e..d10243fbab452 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -22,7 +22,7 @@ use crate::PhysicalExpr; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_err, Result}; +use datafusion_common::Result; /// Stores the mapping between source expressions and target expressions for a /// projection. @@ -67,7 +67,7 @@ impl ProjectionMapping { let matching_input_field = input_schema.field(idx); if col.name() != matching_input_field.name() { let fixed_col = Column::new(col.name(), idx); - return Ok(Transformed::yes(Arc::new(fixed_col))) + return Ok(Transformed::yes(Arc::new(fixed_col))); } let matching_input_column = Column::new(matching_input_field.name(), idx); From 926fcb7f0e2902f618c2209ad21c6f1c4c1c37ed Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 1 Apr 2025 12:15:03 +0200 Subject: [PATCH 8/9] Add explanation comment --- datafusion/expr/src/logical_plan/builder.rs | 8 ++++++++ datafusion/substrait/tests/cases/consumer_integration.rs | 1 - 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 79e2e2680ea34..d6880ead1ca29 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1468,6 +1468,14 @@ impl ValuesFields { } } +// `name_map` tracks a mapping between a field name and the number of appearances of that field. +// +// Some field names might already come to this function with the count (number of times it appeared) +// as a sufix e.g. id:1, so there's still a chance of name collisions, for example, +// if these three fields passed to this function: "col:1", "col" and "col", the function +// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema. +// that's why we need the `seen` set, so the fields are always unique. +// pub fn change_redundant_column(fields: &Fields) -> Vec { let mut name_map = HashMap::new(); let mut seen: HashSet = HashSet::new(); diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs b/datafusion/substrait/tests/cases/consumer_integration.rs index 947fb7293c6b4..b80c52a7457b6 100644 --- a/datafusion/substrait/tests/cases/consumer_integration.rs +++ b/datafusion/substrait/tests/cases/consumer_integration.rs @@ -486,7 +486,6 @@ mod tests { #[tokio::test] async fn test_multiple_joins() -> Result<()> { let plan_str = test_plan_to_string("multiple_joins.json").await?; - println!("{}", plan_str); assert_eq!( plan_str, "Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third\ From 97ef82052e1236668d4320e3108a4a1125dd28de Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 1 Apr 2025 12:34:39 +0200 Subject: [PATCH 9/9] Add test case to change_redundant_column --- datafusion/expr/src/logical_plan/builder.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index d6880ead1ca29..2f6eaf1c052f9 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -2748,10 +2748,13 @@ mod tests { let t1_field_1 = Field::new("a", DataType::Int32, false); let t2_field_1 = Field::new("a", DataType::Int32, false); let t2_field_3 = Field::new("a", DataType::Int32, false); + let t2_field_4 = Field::new("a:1", DataType::Int32, false); let t1_field_2 = Field::new("b", DataType::Int32, false); let t2_field_2 = Field::new("b", DataType::Int32, false); - let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3]; + let field_vec = vec![ + t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4, + ]; let remove_redundant = change_redundant_column(&Fields::from(field_vec)); assert_eq!( @@ -2762,6 +2765,7 @@ mod tests { Field::new("b", DataType::Int32, false), Field::new("b:1", DataType::Int32, false), Field::new("a:2", DataType::Int32, false), + Field::new("a:1:1", DataType::Int32, false), ] ); Ok(())