From 4f186ef3908163220aa4f2b72683a460335ff27c Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 11 Sep 2024 02:38:06 -0500 Subject: [PATCH 1/5] add test for with_column bug Ref: https://github.com/apache/datafusion/issues/12425 --- datafusion/core/src/dataframe/mod.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2138bd1294b4b..fea0083c05928 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2984,19 +2984,23 @@ mod tests { let df_impl = DataFrame::new(ctx.state(), df.plan.clone()); let func = row_number().alias("row_num"); + // This first `with_column` results in a column without a `qualifier` + let df_impl = df_impl.with_column("s", col("c2") + col("c3"))?; + + // This second `with_column` then assigns `"r"` alias to the above column and the window function // Should create an additional column with alias 'r' that has window func results let df = df_impl.with_column("r", func)?.limit(0, Some(2))?; - assert_eq!(4, df.schema().fields().len()); + assert_eq!(5, df.schema().fields().len()); let df_results = df.clone().collect().await?; assert_batches_sorted_eq!( [ - "+----+----+-----+---+", - "| c1 | c2 | c3 | r |", - "+----+----+-----+---+", - "| c | 2 | 1 | 1 |", - "| d | 5 | -40 | 2 |", - "+----+----+-----+---+", + "+----+----+-----+-----+---+", + "| c1 | c2 | c3 | s | r |", + "+----+----+-----+-----+---+", + "| c | 2 | 1 | 3 | 1 |", + "| d | 5 | -40 | -35 | 2 |", + "+----+----+-----+-----+---+", ], &df_results ); From 66a17e5117c0e0a90bd6267cc1236404ac2d8cfa Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 11 Sep 2024 02:39:07 -0500 Subject: [PATCH 2/5] convert DataFrame::with_column into API sugar My mental model for `with_column` is sugar for `df.select([wildcard(), expr.alias(name)])`. The bug test-case passes with this implementation, but a few other test cases fail. Those test cases resolve around ambiguity in column names, which I *think* I can solve by using `WildcardOptions`. --- datafusion/core/src/dataframe/mod.rs | 41 ++-------------------------- 1 file changed, 2 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index fea0083c05928..b498a3c9f77d1 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions, }; -use datafusion_expr::{case, is_null, lit, SortExpr}; +use datafusion_expr::{case, is_null, lit, wildcard, SortExpr}; use datafusion_expr::{ utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, }; @@ -1450,44 +1450,7 @@ impl DataFrame { /// # } /// ``` pub fn with_column(self, name: &str, expr: Expr) -> Result { - let window_func_exprs = find_window_exprs(&[expr.clone()]); - - let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() { - (self.plan, false, false) - } else { - ( - LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?, - true, - true, - ) - }; - - let new_column = expr.alias(name); - let mut fields: Vec = plan - .schema() - .iter() - .map(|(qualifier, field)| { - if field.name() == name { - col_exists = true; - new_column.clone() - } else if window_func && qualifier.is_none() { - col(Column::from((qualifier, field))).alias(name) - } else { - col(Column::from((qualifier, field))) - } - }) - .collect(); - - if !col_exists { - fields.push(new_column); - } - - let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; - - Ok(DataFrame { - session_state: self.session_state, - plan: project_plan, - }) + self.select(vec![wildcard(), expr.alias(name)]) } /// Rename one column by applying a new projection. This is a no-op if the column to be From 7bebf934347575eba207dc07ffa29b0e7aa45b82 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 11 Sep 2024 11:12:22 -0500 Subject: [PATCH 3/5] update expected logical_plan for with_column_join_same_columns The sugared version uses a wildcard to express the previous columns --- datafusion/core/src/dataframe/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index b498a3c9f77d1..8822f80fac118 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -3013,7 +3013,7 @@ mod tests { assert_eq!( "\ - Projection: t1.c1, t2.c1, Boolean(true) AS new_column\ + Projection: *, Boolean(true) AS new_column\ \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST\ \n Inner Join: t1.c1 = t2.c1\ From 979594ff1f03d53c294e9d3a8424966ee7b09048 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 11 Sep 2024 11:35:45 -0500 Subject: [PATCH 4/5] update with_column_self_join test The ambiguous projection now does not occur until the `physical` stage. I assume there's a way to expand the wildcard and catch it during the `logical` stage, but I haven't found it yet. --- datafusion/core/src/dataframe/mod.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 8822f80fac118..d8a2ea01dd95d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -3086,10 +3086,15 @@ mod tests { &df_results ); - let actual_err = df.clone().with_column("new_column", lit(true)).unwrap_err(); - let expected_err = "Error during planning: Projections require unique expression names \ - but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. \ - Consider aliasing (\"AS\") one of them."; + // The error now doesn't occur until the `physical` stage. + // I assume there's a way to catch this during hte `logical` stage, so please let me know. + let df_with_new_column = df + .clone() + .with_column("new_column", lit(true)) + .expect("with-column"); + let actual_err = df_with_new_column.collect().await.unwrap_err(); + + let expected_err = "expand_wildcard_rule\ncaused by\nError during planning: Projections require unique expression names but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. Consider aliasing (\"AS\") one of them."; assert_eq!(actual_err.strip_backtrace(), expected_err); Ok(()) From 96974f4c446b983ce6cc4b47689f4614d717198e Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 11 Sep 2024 13:38:23 -0500 Subject: [PATCH 5/5] update with_column to what I think is the full semantic equivalent using wildcardoptions There's still a TODO because I'm not sure **how** to construct the WildcardOptions, but this captures the semantic behavior. --- datafusion/core/src/dataframe/mod.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index d8a2ea01dd95d..e4d9106c5e297 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1450,7 +1450,29 @@ impl DataFrame { /// # } /// ``` pub fn with_column(self, name: &str, expr: Expr) -> Result { - self.select(vec![wildcard(), expr.alias(name)]) + let col_exists = self + .plan + .schema() + .fields() + .iter() + .any(|f| f.name() == name); + + if col_exists { + // if `name` matches a field in the schema, we replace it with expr + // ```select * REPLACE expr as name``` + // + // PlannedReplaceSelectItem.items should be a vec of `ReplaceSelectElement` + // however, ReplaceSelectElement.expr is a `sqlparser::ast::Expr` + // I'm unsure how to construct it from here or if it's necessary / possible + let replace = todo!(); // PlannedReplaceSelectItem + let options = WildcardOptions::default().with_replace(replace); + self.select(vec![wildcard_with_options(options)]) + } else { + // otherwise we add a new column + // ```select *, expr as name``` + // LogicalPlan::columnized_output_exprs + self.select(vec![wildcard(), expr.alias(name)]) + } } /// Rename one column by applying a new projection. This is a no-op if the column to be