From ffbda4c11684f095a2b6387dd62f2da4ed21fd31 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 11 Sep 2024 07:33:11 -0400 Subject: [PATCH 1/5] Check for window functions already in schema by str match instead of if any are lacking a qualifier --- datafusion/core/src/dataframe/mod.rs | 43 +++++++++++++++++----------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2138bd1294b4b..22620f22335dd 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1452,28 +1452,31 @@ impl DataFrame { pub fn with_column(self, name: &str, expr: Expr) -> Result { let window_func_exprs = find_window_exprs(&[expr.clone()]); - let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() { - (self.plan, false, false) + let ( window_fn_str, plan) = if window_func_exprs.is_empty() { + (None, self.plan) } else { ( - LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?, - true, - true, + Some(window_func_exprs[0].to_string()), + LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)? ) }; + let mut col_exists = false; let new_column = expr.alias(name); let mut fields: Vec = plan .schema() .iter() - .map(|(qualifier, field)| { + .filter_map(|(qualifier, field)| { if field.name() == name { col_exists = true; - new_column.clone() - } else if window_func && qualifier.is_none() { - col(Column::from((qualifier, field))).alias(name) + Some(new_column.clone()) } else { - col(Column::from((qualifier, field))) + let e = col(Column::from((qualifier, field))); + let match_window_fn = window_fn_str.as_ref().map(|s| s == &e.to_string()).unwrap_or(false); + match match_window_fn { + true => None, + false => Some(e), + } } }) .collect(); @@ -2984,19 +2987,25 @@ mod tests { let df_impl = DataFrame::new(ctx.state(), df.plan.clone()); let func = row_number().alias("row_num"); + // This first `with_column` results in a column without a `qualifier` + let df_impl = df_impl.with_column("s", col("c2") + col("c3"))?; + + // This second `with_column` then assigns `"r"` alias to the above column and the window function // Should create an additional column with alias 'r' that has window func results let df = df_impl.with_column("r", func)?.limit(0, Some(2))?; - assert_eq!(4, df.schema().fields().len()); + + df.clone().show().await?; + assert_eq!(5, df.schema().fields().len()); let df_results = df.clone().collect().await?; assert_batches_sorted_eq!( [ - "+----+----+-----+---+", - "| c1 | c2 | c3 | r |", - "+----+----+-----+---+", - "| c | 2 | 1 | 1 |", - "| d | 5 | -40 | 2 |", - "+----+----+-----+---+", + "+----+----+-----+-----+---+", + "| c1 | c2 | c3 | s | r |", + "+----+----+-----+-----+---+", + "| c | 2 | 1 | 3 | 1 |", + "| d | 5 | -40 | -35 | 2 |", + "+----+----+-----+-----+---+", ], &df_results ); From 910985d00e9142859c755d40ffb146100a3ebea0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 11 Sep 2024 09:47:06 -0400 Subject: [PATCH 2/5] cargo fmt --- datafusion/core/src/dataframe/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 22620f22335dd..092e91f6d0e15 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1452,12 +1452,12 @@ impl DataFrame { pub fn with_column(self, name: &str, expr: Expr) -> Result { let window_func_exprs = find_window_exprs(&[expr.clone()]); - let ( window_fn_str, plan) = if window_func_exprs.is_empty() { + let (window_fn_str, plan) = if window_func_exprs.is_empty() { (None, self.plan) } else { ( Some(window_func_exprs[0].to_string()), - LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)? + LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?, ) }; @@ -1472,7 +1472,10 @@ impl DataFrame { Some(new_column.clone()) } else { let e = col(Column::from((qualifier, field))); - let match_window_fn = window_fn_str.as_ref().map(|s| s == &e.to_string()).unwrap_or(false); + let match_window_fn = window_fn_str + .as_ref() + .map(|s| s == &e.to_string()) + .unwrap_or(false); match match_window_fn { true => None, false => Some(e), @@ -2987,7 +2990,7 @@ mod tests { let df_impl = DataFrame::new(ctx.state(), df.plan.clone()); let func = row_number().alias("row_num"); - // This first `with_column` results in a column without a `qualifier` + // This first `with_column` results in a column without a `qualifier` let df_impl = df_impl.with_column("s", col("c2") + col("c3"))?; // This second `with_column` then assigns `"r"` alias to the above column and the window function From 51f9f630403333d9652876efda330e2f39892c72 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 11 Sep 2024 11:52:46 -0400 Subject: [PATCH 3/5] Update with single liner to make code easier to read --- datafusion/core/src/dataframe/mod.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 092e91f6d0e15..b46af22fdbc6b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1472,14 +1472,7 @@ impl DataFrame { Some(new_column.clone()) } else { let e = col(Column::from((qualifier, field))); - let match_window_fn = window_fn_str - .as_ref() - .map(|s| s == &e.to_string()) - .unwrap_or(false); - match match_window_fn { - true => None, - false => Some(e), - } + window_fn_str.as_ref().filter(|s| *s == &e.to_string()).is_none().then_some(e) } }) .collect(); From 3cf08eaa8af7843d23bcbfdb52f8bd98aa868309 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 11 Sep 2024 11:54:13 -0400 Subject: [PATCH 4/5] Update test documentation --- datafusion/core/src/dataframe/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index b46af22fdbc6b..50d8e14aa7149 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2974,7 +2974,8 @@ mod tests { Ok(()) } - // Test issue: https://github.com/apache/datafusion/issues/11982 + // Test issues: https://github.com/apache/datafusion/issues/11982 + // and https://github.com/apache/datafusion/issues/12425 // Window function was creating unwanted projection when using with_column() method. #[tokio::test] async fn test_window_function_with_column() -> Result<()> { @@ -2986,8 +2987,7 @@ mod tests { // This first `with_column` results in a column without a `qualifier` let df_impl = df_impl.with_column("s", col("c2") + col("c3"))?; - // This second `with_column` then assigns `"r"` alias to the above column and the window function - // Should create an additional column with alias 'r' that has window func results + // This second `with_column` should only alias `func` as `"r"` let df = df_impl.with_column("r", func)?.limit(0, Some(2))?; df.clone().show().await?; From 62e7e6a4181bb207f6f968fcd7d525ac438a4d5d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 11 Sep 2024 12:38:16 -0400 Subject: [PATCH 5/5] Cargo fmt --- datafusion/core/src/dataframe/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 50d8e14aa7149..e7aa1172a8540 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1472,7 +1472,11 @@ impl DataFrame { Some(new_column.clone()) } else { let e = col(Column::from((qualifier, field))); - window_fn_str.as_ref().filter(|s| *s == &e.to_string()).is_none().then_some(e) + window_fn_str + .as_ref() + .filter(|s| *s == &e.to_string()) + .is_none() + .then_some(e) } }) .collect();