diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2138bd1294b4b..e4d9106c5e297 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions, }; -use datafusion_expr::{case, is_null, lit, SortExpr}; +use datafusion_expr::{case, is_null, lit, wildcard, SortExpr}; use datafusion_expr::{ utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, }; @@ -1450,44 +1450,29 @@ impl DataFrame { /// # } /// ``` pub fn with_column(self, name: &str, expr: Expr) -> Result { - let window_func_exprs = find_window_exprs(&[expr.clone()]); - - let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() { - (self.plan, false, false) - } else { - ( - LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?, - true, - true, - ) - }; - - let new_column = expr.alias(name); - let mut fields: Vec = plan + let col_exists = self + .plan .schema() + .fields() .iter() - .map(|(qualifier, field)| { - if field.name() == name { - col_exists = true; - new_column.clone() - } else if window_func && qualifier.is_none() { - col(Column::from((qualifier, field))).alias(name) - } else { - col(Column::from((qualifier, field))) - } - }) - .collect(); - - if !col_exists { - fields.push(new_column); + .any(|f| f.name() == name); + + if col_exists { + // if `name` matches a field in the schema, we replace it with expr + // ```select * REPLACE expr as name``` + // + // PlannedReplaceSelectItem.items should be a vec of `ReplaceSelectElement` + // however, ReplaceSelectElement.expr is a `sqlparser::ast::Expr` + // I'm unsure how to construct it from here or if it's necessary / possible + let replace = todo!(); // PlannedReplaceSelectItem + let options = WildcardOptions::default().with_replace(replace); + self.select(vec![wildcard_with_options(options)]) + } else { + // otherwise we add a new column + // ```select *, expr as name``` + // LogicalPlan::columnized_output_exprs + self.select(vec![wildcard(), expr.alias(name)]) } - - let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; - - Ok(DataFrame { - session_state: self.session_state, - plan: project_plan, - }) } /// Rename one column by applying a new projection. This is a no-op if the column to be @@ -2984,19 +2969,23 @@ mod tests { let df_impl = DataFrame::new(ctx.state(), df.plan.clone()); let func = row_number().alias("row_num"); + // This first `with_column` results in a column without a `qualifier` + let df_impl = df_impl.with_column("s", col("c2") + col("c3"))?; + + // This second `with_column` then assigns `"r"` alias to the above column and the window function // Should create an additional column with alias 'r' that has window func results let df = df_impl.with_column("r", func)?.limit(0, Some(2))?; - assert_eq!(4, df.schema().fields().len()); + assert_eq!(5, df.schema().fields().len()); let df_results = df.clone().collect().await?; assert_batches_sorted_eq!( [ - "+----+----+-----+---+", - "| c1 | c2 | c3 | r |", - "+----+----+-----+---+", - "| c | 2 | 1 | 1 |", - "| d | 5 | -40 | 2 |", - "+----+----+-----+---+", + "+----+----+-----+-----+---+", + "| c1 | c2 | c3 | s | r |", + "+----+----+-----+-----+---+", + "| c | 2 | 1 | 3 | 1 |", + "| d | 5 | -40 | -35 | 2 |", + "+----+----+-----+-----+---+", ], &df_results ); @@ -3046,7 +3035,7 @@ mod tests { assert_eq!( "\ - Projection: t1.c1, t2.c1, Boolean(true) AS new_column\ + Projection: *, Boolean(true) AS new_column\ \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST\ \n Inner Join: t1.c1 = t2.c1\ @@ -3119,10 +3108,15 @@ mod tests { &df_results ); - let actual_err = df.clone().with_column("new_column", lit(true)).unwrap_err(); - let expected_err = "Error during planning: Projections require unique expression names \ - but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. \ - Consider aliasing (\"AS\") one of them."; + // The error now doesn't occur until the `physical` stage. + // I assume there's a way to catch this during hte `logical` stage, so please let me know. + let df_with_new_column = df + .clone() + .with_column("new_column", lit(true)) + .expect("with-column"); + let actual_err = df_with_new_column.collect().await.unwrap_err(); + + let expected_err = "expand_wildcard_rule\ncaused by\nError during planning: Projections require unique expression names but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. Consider aliasing (\"AS\") one of them."; assert_eq!(actual_err.strip_backtrace(), expected_err); Ok(())