From fd5f325c2e376f1ea248dbf893e14cd1d225ce6a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 6 Feb 2023 09:33:17 -0500 Subject: [PATCH] Minor: Begin porting some window tests to sqllogictests --- datafusion/core/tests/sql/window.rs | 414 ------------------ .../tests/sqllogictests/test_files/window.slt | 400 +++++++++++++++++ 2 files changed, 400 insertions(+), 414 deletions(-) create mode 100644 datafusion/core/tests/sqllogictests/test_files/window.slt diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 0d39b9433b6e8..6ce718f561325 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -20,420 +20,6 @@ use ::parquet::arrow::arrow_writer::ArrowWriter; use ::parquet::file::properties::WriterProperties; use datafusion::execution::options::ReadOptions; -/// for window functions without order by the first, last, and nth function call does not make sense -#[tokio::test] -async fn csv_query_window_with_empty_over() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "select \ - c9, \ - count(c5) over () as count1, \ - max(c5) over () as max1, \ - min(c5) over () as min1 \ - from aggregate_test_100 \ - order by c9 \ - limit 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-----------+--------+------------+-------------+", - "| c9 | count1 | max1 | min1 |", - "+-----------+--------+------------+-------------+", - "| 28774375 | 100 | 2143473091 | -2141999138 |", - "| 63044568 | 100 | 2143473091 | -2141999138 |", - "| 141047417 | 100 | 2143473091 | -2141999138 |", - "| 141680161 | 100 | 2143473091 | -2141999138 |", - "| 145294611 | 100 | 2143473091 | -2141999138 |", - "+-----------+--------+------------+-------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -/// for window functions without order by the first, last, and nth function call does not make sense -#[tokio::test] -async fn csv_query_window_with_partition_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "select \ - c9, \ - sum(cast(c4 as Int)) over (partition by c3) as sum1, \ - avg(cast(c4 as Int)) over (partition by c3) as avg1, \ - count(cast(c4 as Int)) over (partition by c3) as count1, \ - max(cast(c4 as Int)) over (partition by c3) as max1, \ - min(cast(c4 as Int)) over (partition by c3) as min1 \ - from aggregate_test_100 \ - order by c9 \ - limit 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-----------+--------+----------+--------+--------+--------+", - "| c9 | sum1 | avg1 | count1 | max1 | min1 |", - "+-----------+--------+----------+--------+--------+--------+", - "| 28774375 | -16110 | -16110 | 1 | -16110 | -16110 |", - "| 63044568 | 3917 | 3917 | 1 | 3917 | 3917 |", - "| 141047417 | -38455 | -19227.5 | 2 | -16974 | -21481 |", - "| 141680161 | -1114 | -1114 | 1 | -1114 | -1114 |", - "| 145294611 | 15673 | 15673 | 1 | 15673 | 15673 |", - "+-----------+--------+----------+--------+--------+--------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn csv_query_window_with_order_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "select \ - c9, \ - sum(c5) over (order by c9) as sum1, \ - avg(c5) over (order by c9) as avg1, \ - count(c5) over (order by c9) as count1, \ - max(c5) over (order by c9) as max1, \ - min(c5) over (order by c9) as min1, \ - first_value(c5) over (order by c9) as fv1, \ - last_value(c5) over (order by c9) as lv1, \ - nth_value(c5, 2) over (order by c9) as nv1 \ - from aggregate_test_100 \ - order by c9 \ - limit 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-----------+-------------+--------------------+--------+-----------+-------------+----------+-------------+------------+", - "| c9 | sum1 | avg1 | count1 | max1 | min1 | fv1 | lv1 | nv1 |", - "+-----------+-------------+--------------------+--------+-----------+-------------+----------+-------------+------------+", - "| 28774375 | 61035129 | 61035129 | 1 | 61035129 | 61035129 | 61035129 | 61035129 | |", - "| 63044568 | -47938237 | -23969118.5 | 2 | 61035129 | -108973366 | 61035129 | -108973366 | -108973366 |", - "| 141047417 | 575165281 | 191721760.33333334 | 3 | 623103518 | -108973366 | 61035129 | 623103518 | -108973366 |", - "| 141680161 | -1352462829 | -338115707.25 | 4 | 623103518 | -1927628110 | 61035129 | -1927628110 | -108973366 |", - "| 145294611 | -3251637940 | -650327588 | 5 | 623103518 | -1927628110 | 61035129 | -1899175111 | -108973366 |", - "+-----------+-------------+--------------------+--------+-----------+-------------+----------+-------------+------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn csv_query_window_with_partition_by_order_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "select \ - c9, \ - sum(c5) over (partition by c4 order by c9) as sum1, \ - avg(c5) over (partition by c4 order by c9) as avg1, \ - count(c5) over (partition by c4 order by c9) as count1, \ - max(c5) over (partition by c4 order by c9) as max1, \ - min(c5) over (partition by c4 order by c9) as min1, \ - first_value(c5) over (partition by c4 order by c9) as fv1, \ - last_value(c5) over (partition by c4 order by c9) as lv1, \ - nth_value(c5, 2) over (partition by c4 order by c9) as nv1 \ - from aggregate_test_100 \ - order by c9 \ - limit 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-----------+-------------+-------------+--------+-------------+-------------+-------------+-------------+-----+", - "| c9 | sum1 | avg1 | count1 | max1 | min1 | fv1 | lv1 | nv1 |", - "+-----------+-------------+-------------+--------+-------------+-------------+-------------+-------------+-----+", - "| 28774375 | 61035129 | 61035129 | 1 | 61035129 | 61035129 | 61035129 | 61035129 | |", - "| 63044568 | -108973366 | -108973366 | 1 | -108973366 | -108973366 | -108973366 | -108973366 | |", - "| 141047417 | 623103518 | 623103518 | 1 | 623103518 | 623103518 | 623103518 | 623103518 | |", - "| 141680161 | -1927628110 | -1927628110 | 1 | -1927628110 | -1927628110 | -1927628110 | -1927628110 | |", - "| 145294611 | -1899175111 | -1899175111 | 1 | -1899175111 | -1899175111 | -1899175111 | -1899175111 | |", - "+-----------+-------------+-------------+--------+-------------+-------------+-------------+-------------+-----+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window() -> Result<()> { - let results = execute_with_partition( - "SELECT \ - c1, \ - c2, \ - SUM(c2) OVER () as sum1, \ - COUNT(c2) OVER () as count1, \ - MAX(c2) OVER () as max1, \ - MIN(c2) OVER () as min1, \ - AVG(c2) OVER () as avg1 \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - // result in one batch, although e.g. having 2 batches do not change - // result semantics, having a len=1 assertion upfront keeps surprises - // at bay - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+------+--------+------+------+------+", - "| c1 | c2 | sum1 | count1 | max1 | min1 | avg1 |", - "+----+----+------+--------+------+------+------+", - "| 0 | 1 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 2 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 3 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 4 | 220 | 40 | 10 | 1 | 5.5 |", - "| 0 | 5 | 220 | 40 | 10 | 1 | 5.5 |", - "+----+----+------+--------+------+------+------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) -} - -#[tokio::test] -async fn window_order_by() -> Result<()> { - let results = execute_with_partition( - "SELECT \ - c1, \ - c2, \ - ROW_NUMBER() OVER (ORDER BY c1, c2) as rn1, \ - FIRST_VALUE(c2) OVER (ORDER BY c1, c2) as fv1, \ - LAST_VALUE(c2) OVER (ORDER BY c1, c2) as lv1, \ - NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2) as nv1, \ - SUM(c2) OVER (ORDER BY c1, c2) as sum1, \ - COUNT(c2) OVER (ORDER BY c1, c2) as count1, \ - MAX(c2) OVER (ORDER BY c1, c2) as max1, \ - MIN(c2) OVER (ORDER BY c1, c2) as min1, \ - AVG(c2) OVER (ORDER BY c1, c2) as avg1 \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - // result in one batch, although e.g. having 2 batches do not change - // result semantics, having a len=1 assertion upfront keeps surprises - // at bay - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+-----+-----+-----+-----+------+--------+------+------+------+", - "| c1 | c2 | rn1 | fv1 | lv1 | nv1 | sum1 | count1 | max1 | min1 | avg1 |", - "+----+----+-----+-----+-----+-----+------+--------+------+------+------+", - "| 0 | 1 | 1 | 1 | 1 | | 1 | 1 | 1 | 1 | 1 |", - "| 0 | 2 | 2 | 1 | 2 | 2 | 3 | 2 | 2 | 1 | 1.5 |", - "| 0 | 3 | 3 | 1 | 3 | 2 | 6 | 3 | 3 | 1 | 2 |", - "| 0 | 4 | 4 | 1 | 4 | 2 | 10 | 4 | 4 | 1 | 2.5 |", - "| 0 | 5 | 5 | 1 | 5 | 2 | 15 | 5 | 5 | 1 | 3 |", - "+----+----+-----+-----+-----+-----+------+--------+------+------+------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) -} - -#[tokio::test] -async fn window_partition_by() -> Result<()> { - let results = execute_with_partition( - "SELECT \ - c1, \ - c2, \ - SUM(c2) OVER (PARTITION BY c2) as sum1, \ - COUNT(c2) OVER (PARTITION BY c2) as count1, \ - MAX(c2) OVER (PARTITION BY c2) as max1, \ - MIN(c2) OVER (PARTITION BY c2) as min1, \ - AVG(c2) OVER (PARTITION BY c2) as avg1 \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - - let expected = vec![ - "+----+----+------+--------+------+------+------+", - "| c1 | c2 | sum1 | count1 | max1 | min1 | avg1 |", - "+----+----+------+--------+------+------+------+", - "| 0 | 1 | 4 | 4 | 1 | 1 | 1 |", - "| 0 | 2 | 8 | 4 | 2 | 2 | 2 |", - "| 0 | 3 | 12 | 4 | 3 | 3 | 3 |", - "| 0 | 4 | 16 | 4 | 4 | 4 | 4 |", - "| 0 | 5 | 20 | 4 | 5 | 5 | 5 |", - "+----+----+------+--------+------+------+------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) -} - -#[tokio::test] -async fn window_partition_by_order_by() -> Result<()> { - let results = execute_with_partition( - "SELECT \ - c1, \ - c2, \ - ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) as rn1, \ - FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1) as fv1, \ - LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1) as lv1, \ - NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1) as nv1, \ - SUM(c2) OVER (PARTITION BY c2 ORDER BY c1) as sum1, \ - COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1) as count1, \ - MAX(c2) OVER (PARTITION BY c2 ORDER BY c1) as max1, \ - MIN(c2) OVER (PARTITION BY c2 ORDER BY c1) as min1, \ - AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) as avg1 \ - FROM test \ - ORDER BY c1, c2 \ - LIMIT 5", - 4, - ) - .await?; - - let expected = vec![ - "+----+----+-----+-----+-----+-----+------+--------+------+------+------+", - "| c1 | c2 | rn1 | fv1 | lv1 | nv1 | sum1 | count1 | max1 | min1 | avg1 |", - "+----+----+-----+-----+-----+-----+------+--------+------+------+------+", - "| 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |", - "| 0 | 2 | 1 | 2 | 2 | 2 | 2 | 1 | 2 | 2 | 2 |", - "| 0 | 3 | 1 | 3 | 3 | 3 | 3 | 1 | 3 | 3 | 3 |", - "| 0 | 4 | 1 | 4 | 4 | 4 | 4 | 1 | 4 | 4 | 4 |", - "| 0 | 5 | 1 | 5 | 5 | 5 | 5 | 1 | 5 | 5 | 5 |", - "+----+----+-----+-----+-----+-----+------+--------+------+------+------+", - ]; - - // window function shall respect ordering - assert_batches_eq!(expected, &results); - Ok(()) -} - -#[tokio::test] -async fn window_expr_eliminate() -> Result<()> { - let ctx = SessionContext::new(); - - // window expr is not referenced anywhere, eliminate it. - let sql = "WITH _sample_data AS ( - SELECT 1 as a, 'aa' AS b - UNION ALL - SELECT 3 as a, 'aa' AS b - UNION ALL - SELECT 5 as a, 'bb' AS b - UNION ALL - SELECT 7 as a, 'bb' AS b - ), _data2 AS ( - SELECT - row_number() OVER (PARTITION BY s.b ORDER BY s.a) AS seq, - s.a, - s.b - FROM _sample_data s - ) - SELECT d.b, MAX(d.a) AS max_a - FROM _data2 d - GROUP BY d.b - ORDER BY d.b;"; - - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan().unwrap(); - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N]", - " Projection: d.b, MAX(d.a) AS max_a [b:Utf8, max_a:Int64;N]", - " Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]] [b:Utf8, MAX(d.a):Int64;N]", - " SubqueryAlias: d [a:Int64, b:Utf8]", - " SubqueryAlias: _data2 [a:Int64, b:Utf8]", - " Projection: s.a, s.b [a:Int64, b:Utf8]", - " SubqueryAlias: s [a:Int64, b:Utf8]", - " SubqueryAlias: _sample_data [a:Int64, b:Utf8]", - " Union [a:Int64, b:Utf8]", - " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let results = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----+-------+", - "| b | max_a |", - "+----+-------+", - "| aa | 3 |", - "| bb | 7 |", - "+----+-------+", - ]; - - assert_batches_eq!(expected, &results); - - // window expr is referenced by the output, keep it - let sql = "WITH _sample_data AS ( - SELECT 1 as a, 'aa' AS b - UNION ALL - SELECT 3 as a, 'aa' AS b - UNION ALL - SELECT 5 as a, 'bb' AS b - UNION ALL - SELECT 7 as a, 'bb' AS b - ), _data2 AS ( - SELECT - row_number() OVER (PARTITION BY s.b ORDER BY s.a) AS seq, - s.a, - s.b - FROM _sample_data s - ) - SELECT d.b, MAX(d.a) AS max_a, max(d.seq) - FROM _data2 d - GROUP BY d.b - ORDER BY d.b;"; - - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan().unwrap(); - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]", - " Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq) [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]", - " Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]] [b:Utf8, MAX(d.a):Int64;N, MAX(d.seq):UInt64;N]", - " SubqueryAlias: d [seq:UInt64;N, a:Int64, b:Utf8]", - " SubqueryAlias: _data2 [seq:UInt64;N, a:Int64, b:Utf8]", - " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a, s.b [seq:UInt64;N, a:Int64, b:Utf8]", - " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [a:Int64, b:Utf8, ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]", - " SubqueryAlias: s [a:Int64, b:Utf8]", - " SubqueryAlias: _sample_data [a:Int64, b:Utf8]", - " Union [a:Int64, b:Utf8]", - " Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - " Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]", - " EmptyRelation []", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let results = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----+-------+------------+", - "| b | max_a | MAX(d.seq) |", - "+----+-------+------------+", - "| aa | 3 | 2 |", - "| bb | 7 | 2 |", - "+----+-------+------------+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) -} - #[tokio::test] async fn window_in_expression() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt new file mode 100644 index 0000000000000..d10946e5da4f1 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -0,0 +1,400 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv' + +### This is the same table as +### execute_with_partition with 4 partitions +statement ok +CREATE EXTERNAL TABLE test (c1 int, c2 bigint, c3 boolean) +STORED AS CSV LOCATION 'tests/data/partitioned_csv'; + + +# for window functions without order by the first, last, and nth function call does not make sense +# csv_query_window_with_empty_over +query IIII +select +c9, +count(c5) over () as count1, +max(c5) over () as max1, +min(c5) over () as min1 +from aggregate_test_100 +order by c9 +limit 5 +---- +28774375 100 2143473091 -2141999138 +63044568 100 2143473091 -2141999138 +141047417 100 2143473091 -2141999138 +141680161 100 2143473091 -2141999138 +145294611 100 2143473091 -2141999138 + +# for window functions without order by the first, last, and nth function call does not make sense +# csv_query_window_with_partition_by +query IIIIII +select +c9, +sum(cast(c4 as Int)) over (partition by c3) as sum1, +avg(cast(c4 as Int)) over (partition by c3) as avg1, +count(cast(c4 as Int)) over (partition by c3) as count1, +max(cast(c4 as Int)) over (partition by c3) as max1, +min(cast(c4 as Int)) over (partition by c3) as min1 +from aggregate_test_100 +order by c9 +limit 5 +---- +28774375 -16110 -16110 1 -16110 -16110 +63044568 3917 3917 1 3917 3917 +141047417 -38455 -19227.5 2 -16974 -21481 +141680161 -1114 -1114 1 -1114 -1114 +145294611 15673 15673 1 15673 15673 + + + +# async fn csv_query_window_with_order_by +query IIIIII +select +c9, +sum(c5) over (order by c9) as sum1, +avg(c5) over (order by c9) as avg1, +count(c5) over (order by c9) as count1, +max(c5) over (order by c9) as max1, +min(c5) over (order by c9) as min1, +first_value(c5) over (order by c9) as fv1, +last_value(c5) over (order by c9) as lv1, +nth_value(c5, 2) over (order by c9) as nv1 +from aggregate_test_100 +order by c9 +limit 5 +---- +28774375 61035129 61035129 1 61035129 61035129 61035129 61035129 NULL +63044568 -47938237 -23969118.5 2 61035129 -108973366 61035129 -108973366 -108973366 +141047417 575165281 191721760.33333334 3 623103518 -108973366 61035129 623103518 -108973366 +141680161 -1352462829 -338115707.25 4 623103518 -1927628110 61035129 -1927628110 -108973366 +145294611 -3251637940 -650327588 5 623103518 -1927628110 61035129 -1899175111 -108973366 + +# csv_query_window_with_partition_by_order_by +query IIIIII +select + c9, + sum(c5) over (partition by c4 order by c9) as sum1, + avg(c5) over (partition by c4 order by c9) as avg1, + count(c5) over (partition by c4 order by c9) as count1, + max(c5) over (partition by c4 order by c9) as max1, + min(c5) over (partition by c4 order by c9) as min1, + first_value(c5) over (partition by c4 order by c9) as fv1, + last_value(c5) over (partition by c4 order by c9) as lv1, + nth_value(c5, 2) over (partition by c4 order by c9) as nv1 +from aggregate_test_100 +order by c9 +limit 5 +---- +28774375 61035129 61035129 1 61035129 61035129 61035129 61035129 NULL +63044568 -108973366 -108973366 1 -108973366 -108973366 -108973366 -108973366 NULL +141047417 623103518 623103518 1 623103518 623103518 623103518 623103518 NULL +141680161 -1927628110 -1927628110 1 -1927628110 -1927628110 -1927628110 -1927628110 NULL +145294611 -1899175111 -1899175111 1 -1899175111 -1899175111 -1899175111 -1899175111 NULL + +# window() +query IIIIII +SELECT +c1, +c2, +SUM(c2) OVER () as sum1, +COUNT(c2) OVER () as count1, +MAX(c2) OVER () as max1, +MIN(c2) OVER () as min1, +AVG(c2) OVER () as avg1 +FROM test +ORDER BY c1, c2 +LIMIT 5 +---- +0 0 220 44 10 0 5 +0 1 220 44 10 0 5 +0 2 220 44 10 0 5 +0 3 220 44 10 0 5 +0 4 220 44 10 0 5 + + +# window_order_by +query IIIIII +SELECT +c1, +c2, +ROW_NUMBER() OVER (ORDER BY c1, c2) as rn1, +FIRST_VALUE(c2) OVER (ORDER BY c1, c2) as fv1, +LAST_VALUE(c2) OVER (ORDER BY c1, c2) as lv1, +NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2) as nv1, +SUM(c2) OVER (ORDER BY c1, c2) as sum1, +COUNT(c2) OVER (ORDER BY c1, c2) as count1, +MAX(c2) OVER (ORDER BY c1, c2) as max1, +MIN(c2) OVER (ORDER BY c1, c2) as min1, +AVG(c2) OVER (ORDER BY c1, c2) as avg1 +FROM test +ORDER BY c1, c2 +LIMIT 5 +---- +0 0 1 0 0 NULL 0 1 0 0 0 +0 1 2 0 1 1 1 2 1 0 0.5 +0 2 3 0 2 1 3 3 2 0 1 +0 3 4 0 3 1 6 4 3 0 1.5 +0 4 5 0 4 1 10 5 4 0 2 + +# window_partition_by +query IIIIII +SELECT +c1, +c2, +SUM(c2) OVER (PARTITION BY c2) as sum1, +COUNT(c2) OVER (PARTITION BY c2) as count1, +MAX(c2) OVER (PARTITION BY c2) as max1, +MIN(c2) OVER (PARTITION BY c2) as min1, +AVG(c2) OVER (PARTITION BY c2) as avg1 +FROM test +ORDER BY c1, c2 +LIMIT 5 +---- +0 0 0 4 0 0 0 +0 1 4 4 1 1 1 +0 2 8 4 2 2 2 +0 3 12 4 3 3 3 +0 4 16 4 4 4 4 + +query IIIIIIIIII +SELECT +c1, +c2, +ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) as rn1, +FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1) as fv1, +LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1) as lv1, +NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1) as nv1, +SUM(c2) OVER (PARTITION BY c2 ORDER BY c1) as sum1, +COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1) as count1, +MAX(c2) OVER (PARTITION BY c2 ORDER BY c1) as max1, +MIN(c2) OVER (PARTITION BY c2 ORDER BY c1) as min1, +AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) as avg1 +FROM test +ORDER BY c1, c2 +LIMIT 5 +---- +0 0 1 0 0 0 0 1 0 0 0 +0 1 1 1 1 1 1 1 1 1 1 +0 2 1 2 2 2 2 1 2 2 2 +0 3 1 3 3 3 3 1 3 3 3 +0 4 1 4 4 4 4 1 4 4 4 + + +##### +# window_expr_eliminate() +##### + +# window expr is not referenced anywhere, eliminate it. +query ?? +EXPLAIN +WITH _sample_data AS ( + SELECT 1 as a, 'aa' AS b + UNION ALL + SELECT 3 as a, 'aa' AS b + UNION ALL + SELECT 5 as a, 'bb' AS b + UNION ALL + SELECT 7 as a, 'bb' AS b + ), _data2 AS ( + SELECT + row_number() OVER (PARTITION BY s.b ORDER BY s.a) AS seq, + s.a, + s.b + FROM _sample_data s + ) + SELECT d.b, MAX(d.a) AS max_a + FROM _data2 d + GROUP BY d.b + ORDER BY d.b; +---- +logical_plan +Sort: d.b ASC NULLS LAST + Projection: d.b, MAX(d.a) AS max_a + Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]] + SubqueryAlias: d + SubqueryAlias: _data2 + Projection: s.a, s.b + SubqueryAlias: s + SubqueryAlias: _sample_data + Union + Projection: Int64(1) AS a, Utf8("aa") AS b + EmptyRelation + Projection: Int64(3) AS a, Utf8("aa") AS b + EmptyRelation + Projection: Int64(5) AS a, Utf8("bb") AS b + EmptyRelation + Projection: Int64(7) AS a, Utf8("bb") AS b + EmptyRelation +physical_plan +SortPreservingMergeExec: [b@0 ASC NULLS LAST] + SortExec: [b@0 ASC NULLS LAST] + ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a] + AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a)] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)] + ProjectionExec: expr=[a@0 as a, b@1 as b] + UnionExec + ProjectionExec: expr=[1 as a, aa as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[3 as a, aa as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[5 as a, bb as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[7 as a, bb as b] + EmptyExec: produce_one_row=true + +# Check actual result: +query ?I +WITH _sample_data AS ( + SELECT 1 as a, 'aa' AS b + UNION ALL + SELECT 3 as a, 'aa' AS b + UNION ALL + SELECT 5 as a, 'bb' AS b + UNION ALL + SELECT 7 as a, 'bb' AS b + ), _data2 AS ( + SELECT + row_number() OVER (PARTITION BY s.b ORDER BY s.a) AS seq, + s.a, + s.b + FROM _sample_data s + ) + SELECT d.b, MAX(d.a) AS max_a + FROM _data2 d + GROUP BY d.b + ORDER BY d.b; +---- +aa 3 +bb 7 + +# window expr is referenced by the output, keep it +query ?? +EXPLAIN +WITH _sample_data AS ( + SELECT 1 as a, 'aa' AS b + UNION ALL + SELECT 3 as a, 'aa' AS b + UNION ALL + SELECT 5 as a, 'bb' AS b + UNION ALL + SELECT 7 as a, 'bb' AS b + ), _data2 AS ( + SELECT + row_number() OVER (PARTITION BY s.b ORDER BY s.a) AS seq, + s.a, + s.b + FROM _sample_data s + ) + SELECT d.b, MAX(d.a) AS max_a, max(d.seq) + FROM _data2 d + GROUP BY d.b + ORDER BY d.b +---- +logical_plan +Sort: d.b ASC NULLS LAST + Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq) + Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]] + SubqueryAlias: d + SubqueryAlias: _data2 + Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a, s.b + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + SubqueryAlias: s + SubqueryAlias: _sample_data + Union + Projection: Int64(1) AS a, Utf8("aa") AS b + EmptyRelation + Projection: Int64(3) AS a, Utf8("aa") AS b + EmptyRelation + Projection: Int64(5) AS a, Utf8("bb") AS b + EmptyRelation + Projection: Int64(7) AS a, Utf8("bb") AS b + EmptyRelation +physical_plan +SortPreservingMergeExec: [b@0 ASC NULLS LAST] + SortExec: [b@0 ASC NULLS LAST] + ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)] + AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a), MAX(d.seq)] + AggregateExec: mode=Partial, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)] + ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b] + BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }] + SortExec: [b@1 ASC NULLS LAST,a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4 + UnionExec + ProjectionExec: expr=[1 as a, aa as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[3 as a, aa as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[5 as a, bb as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[7 as a, bb as b] + EmptyExec: produce_one_row=true + + + + + + + +# check actual result + +query ?II +WITH _sample_data AS ( + SELECT 1 as a, 'aa' AS b + UNION ALL + SELECT 3 as a, 'aa' AS b + UNION ALL + SELECT 5 as a, 'bb' AS b + UNION ALL + SELECT 7 as a, 'bb' AS b + ), _data2 AS ( + SELECT + row_number() OVER (PARTITION BY s.b ORDER BY s.a) AS seq, + s.a, + s.b + FROM _sample_data s + ) + SELECT d.b, MAX(d.a) AS max_a, max(d.seq) + FROM _data2 d + GROUP BY d.b + ORDER BY d.b +---- +aa 3 2 +bb 7 2