From 8affbdbd230ee7d74bb50043db19eaf2e173e658 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 14 Aug 2024 22:26:26 -0500 Subject: [PATCH 1/8] fix/11982: resolves projection issue found in with_column window fn usage Signed-off-by: Devan --- .../testing_window_bug_will_delete.rs | 40 +++++++++++++++++++ datafusion/core/src/dataframe/mod.rs | 40 +++++++++++++++++-- datafusion/expr/src/logical_plan/builder.rs | 1 + datafusion/expr/src/logical_plan/plan.rs | 1 + 4 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 datafusion-examples/examples/testing_window_bug_will_delete.rs diff --git a/datafusion-examples/examples/testing_window_bug_will_delete.rs b/datafusion-examples/examples/testing_window_bug_will_delete.rs new file mode 100644 index 0000000000000..04e381e0aa945 --- /dev/null +++ b/datafusion-examples/examples/testing_window_bug_will_delete.rs @@ -0,0 +1,40 @@ +use arrow::array::{Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::MemTable; +use datafusion::error::Result; +use datafusion::prelude::SessionContext; +use datafusion_expr::expr::WindowFunction; +use datafusion_expr::{col, BuiltInWindowFunction, Expr, WindowFunctionDefinition}; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]))], + )?; + + let ctx = SessionContext::new(); + + let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; + ctx.register_table("t", Arc::new(provider))?; + + let df = ctx.table("t").await?; + + let func = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::RowNumber), + vec![], + )) + .alias("row_num"); + + df.clone() + .select(vec![col("a"), func.clone()])? + .show() + .await?; + + df.with_column("r", func)?.show().await?; + + Ok(()) +} diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 25a8d1c87f004..683359a420278 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1452,12 +1452,14 @@ impl DataFrame { let mut fields: Vec = plan .schema() .iter() - .map(|(qualifier, field)| { + .filter_map(|(qualifier, field)| { + qualifier?; + if field.name() == name { col_exists = true; - new_column.clone() + Some(new_column.clone()) } else { - col(Column::from((qualifier, field))) + Some(col(Column::from((qualifier, field)))) } }) .collect(); @@ -1703,6 +1705,7 @@ mod tests { use arrow::array::{self, Int32Array}; use datafusion_common::{Constraint, Constraints, ScalarValue}; use datafusion_common_runtime::SpawnedTask; + use datafusion_expr::expr::WindowFunction; use datafusion_expr::{ cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt, ScalarFunctionImplementation, Volatility, WindowFunctionDefinition, @@ -2373,6 +2376,37 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_window_function_with_column() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]))], + )?; + + let ctx = SessionContext::new(); + + let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; + ctx.register_table("t", Arc::new(provider))?; + + let df = ctx.table("t").await?; + + let func = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::BuiltInWindowFunction( + BuiltInWindowFunction::RowNumber, + ), + vec![], + )) + .alias("row_num"); + + let out = df.with_column("r", func)?; + + // Should only output 'a' and 'r' + assert_eq!(2, out.schema().fields().len()); + Ok(()) + } + #[tokio::test] async fn test_distinct() -> Result<()> { let t = test_table().await?; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2e53a682854ce..c9c06098375cd 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -353,6 +353,7 @@ impl LogicalPlanBuilder { .window(window_exprs)? .build()?; } + Ok(plan) } /// Apply a projection without alias. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 2bab6d516a73e..ee41a2730128c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2195,6 +2195,7 @@ impl Window { .iter() .map(|(q, f)| (q.cloned(), Arc::clone(f))) .collect(); + let input_len = fields.len(); let mut window_fields = fields; let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?; From f039d3e881331ecb432f1df9c01012a598975862 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 14 Aug 2024 22:28:11 -0500 Subject: [PATCH 2/8] fix/11982: resolves projection issue found in with_column window fn usage Signed-off-by: Devan --- .../testing_window_bug_will_delete.rs | 40 ------------------- 1 file changed, 40 deletions(-) delete mode 100644 datafusion-examples/examples/testing_window_bug_will_delete.rs diff --git a/datafusion-examples/examples/testing_window_bug_will_delete.rs b/datafusion-examples/examples/testing_window_bug_will_delete.rs deleted file mode 100644 index 04e381e0aa945..0000000000000 --- a/datafusion-examples/examples/testing_window_bug_will_delete.rs +++ /dev/null @@ -1,40 +0,0 @@ -use arrow::array::{Int32Array, RecordBatch}; -use arrow_schema::{DataType, Field, Schema}; -use datafusion::datasource::MemTable; -use datafusion::error::Result; -use datafusion::prelude::SessionContext; -use datafusion_expr::expr::WindowFunction; -use datafusion_expr::{col, BuiltInWindowFunction, Expr, WindowFunctionDefinition}; -use std::sync::Arc; - -#[tokio::main] -async fn main() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]))], - )?; - - let ctx = SessionContext::new(); - - let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; - ctx.register_table("t", Arc::new(provider))?; - - let df = ctx.table("t").await?; - - let func = Expr::WindowFunction(WindowFunction::new( - WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::RowNumber), - vec![], - )) - .alias("row_num"); - - df.clone() - .select(vec![col("a"), func.clone()])? - .show() - .await?; - - df.with_column("r", func)?.show().await?; - - Ok(()) -} From ff08f0e982c90e736fa780bfcddfa1459d45a326 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 14 Aug 2024 22:29:36 -0500 Subject: [PATCH 3/8] fmt Signed-off-by: Devan --- datafusion/expr/src/logical_plan/builder.rs | 1 - datafusion/expr/src/logical_plan/plan.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c9c06098375cd..2e53a682854ce 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -353,7 +353,6 @@ impl LogicalPlanBuilder { .window(window_exprs)? .build()?; } - Ok(plan) } /// Apply a projection without alias. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ee41a2730128c..2bab6d516a73e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2195,7 +2195,6 @@ impl Window { .iter() .map(|(q, f)| (q.cloned(), Arc::clone(f))) .collect(); - let input_len = fields.len(); let mut window_fields = fields; let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?; From c697cacd28bdc708527f7f0ae0c52dbaae4ed4ae Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 14 Aug 2024 23:05:19 -0500 Subject: [PATCH 4/8] fmt Signed-off-by: Devan --- datafusion/core/src/dataframe/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 683359a420278..8d5545b37338c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1454,7 +1454,6 @@ impl DataFrame { .iter() .filter_map(|(qualifier, field)| { qualifier?; - if field.name() == name { col_exists = true; Some(new_column.clone()) @@ -2402,7 +2401,6 @@ mod tests { let out = df.with_column("r", func)?; - // Should only output 'a' and 'r' assert_eq!(2, out.schema().fields().len()); Ok(()) } From 4cc3cc6676a72bcc4c08a2bcd9429ce60314aaff Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 15 Aug 2024 13:09:35 -0500 Subject: [PATCH 5/8] refactor to get tests working Signed-off-by: Devan --- datafusion/core/src/dataframe/mod.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 8d5545b37338c..7237d40d6c529 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1441,24 +1441,29 @@ impl DataFrame { /// ``` pub fn with_column(self, name: &str, expr: Expr) -> Result { let window_func_exprs = find_window_exprs(&[expr.clone()]); - let plan = if window_func_exprs.is_empty() { - self.plan + + let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() { + (self.plan, false, false) } else { - LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)? + ( + LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?, + true, + true, + ) }; let new_column = expr.alias(name); - let mut col_exists = false; let mut fields: Vec = plan .schema() .iter() - .filter_map(|(qualifier, field)| { - qualifier?; + .map(|(qualifier, field)| { if field.name() == name { col_exists = true; - Some(new_column.clone()) + new_column.clone() + } else if window_func && qualifier.is_none() { + col(Column::from((qualifier, field))).alias(name) } else { - Some(col(Column::from((qualifier, field)))) + col(Column::from((qualifier, field))) } }) .collect(); From e236a22f47fb8d648ab8d2b8c6c6ec068b2936fc Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 15 Aug 2024 22:35:22 -0500 Subject: [PATCH 6/8] change test to use test harness Signed-off-by: Devan --- datafusion/core/src/dataframe/mod.rs | 63 +++++++++++++++------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 7237d40d6c529..a4d33a876f9d3 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2380,36 +2380,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_window_function_with_column() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]))], - )?; - - let ctx = SessionContext::new(); - - let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; - ctx.register_table("t", Arc::new(provider))?; - - let df = ctx.table("t").await?; - - let func = Expr::WindowFunction(WindowFunction::new( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::RowNumber, - ), - vec![], - )) - .alias("row_num"); - - let out = df.with_column("r", func)?; - - assert_eq!(2, out.schema().fields().len()); - Ok(()) - } - #[tokio::test] async fn test_distinct() -> Result<()> { let t = test_table().await?; @@ -2906,6 +2876,39 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_window_function_with_column() -> Result<()> { + let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?; + let ctx = SessionContext::new(); + let df_impl = DataFrame::new(ctx.state(), df.plan.clone()); + let func = Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::BuiltInWindowFunction( + BuiltInWindowFunction::RowNumber, + ), + vec![], + )) + .alias("row_num"); + + // Should create an additional column with alias 'r' that has window func results + let df = df_impl.with_column("r", func)?.limit(0, Some(2))?; + assert_eq!(4, df.schema().fields().len()); + + let df_results = df.clone().collect().await?; + assert_batches_sorted_eq!( + [ + "+----+----+-----+---+", + "| c1 | c2 | c3 | r |", + "+----+----+-----+---+", + "| c | 2 | 1 | 1 |", + "| d | 5 | -40 | 2 |", + "+----+----+-----+---+", + ], + &df_results + ); + + Ok(()) + } + // Test issue: https://github.com/apache/datafusion/issues/7790 // The join operation outputs two identical column names, but they belong to different relations. #[tokio::test] From 79c9b236327169da300e1e221309a282f7bbd159 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 16 Aug 2024 08:43:25 -0500 Subject: [PATCH 7/8] use row_number method and add comment about test Signed-off-by: Devan --- datafusion/core/src/dataframe/mod.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index a4d33a876f9d3..04101fce2f5d1 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1709,7 +1709,7 @@ mod tests { use arrow::array::{self, Int32Array}; use datafusion_common::{Constraint, Constraints, ScalarValue}; use datafusion_common_runtime::SpawnedTask; - use datafusion_expr::expr::WindowFunction; + use datafusion_expr::window_function::row_number; use datafusion_expr::{ cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt, ScalarFunctionImplementation, Volatility, WindowFunctionDefinition, @@ -2876,18 +2876,14 @@ mod tests { Ok(()) } + // Test issue: https://github.com/apache/datafusion/issues/11982 + // Window function was creating unwanted projection when using with_column() method. #[tokio::test] async fn test_window_function_with_column() -> Result<()> { let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?; let ctx = SessionContext::new(); let df_impl = DataFrame::new(ctx.state(), df.plan.clone()); - let func = Expr::WindowFunction(WindowFunction::new( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::RowNumber, - ), - vec![], - )) - .alias("row_num"); + let func = row_number().alias("row_num"); // Should create an additional column with alias 'r' that has window func results let df = df_impl.with_column("r", func)?.limit(0, Some(2))?; From 0dbfb4f12850fc3af49af6f7ac24dd65a8ae9573 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 16 Aug 2024 08:47:54 -0500 Subject: [PATCH 8/8] add back import Signed-off-by: Devan --- datafusion/core/src/dataframe/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 442dc7caf5a80..760ebd7392e56 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1709,6 +1709,7 @@ mod tests { use arrow::array::{self, Int32Array}; use datafusion_common::{Constraint, Constraints, ScalarValue}; use datafusion_common_runtime::SpawnedTask; + use datafusion_expr::expr::WindowFunction; use datafusion_expr::window_function::row_number; use datafusion_expr::{ cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt,