From b588b0b7889a8e25cb47b02586abef0ddedd5752 Mon Sep 17 00:00:00 2001 From: parkma99 Date: Thu, 4 May 2023 17:58:37 +0800 Subject: [PATCH 1/3] Port test in union.rs to sqllogic --- datafusion/core/tests/sql/union.rs | 15 --------------- .../core/tests/sqllogictests/test_files/union.slt | 10 ++++++++++ 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs index cca9c23e67338..a1f4dcffa3135 100644 --- a/datafusion/core/tests/sql/union.rs +++ b/datafusion/core/tests/sql/union.rs @@ -123,21 +123,6 @@ async fn test_union_upcast_types() -> Result<()> { let actual_logical_plan: Vec<&str> = formatted_logical_plan.trim().lines().collect(); assert_eq!(expected_logical_plan, actual_logical_plan, "\n\nexpected:\n\n{expected_logical_plan:#?}\nactual:\n\n{actual_logical_plan:#?}\n\n"); - let actual = execute_to_batches(&ctx, sql).await; - - let expected = vec![ - "+----+------------+", - "| c1 | c9 |", - "+----+------------+", - "| c | 4268716378 |", - "| e | 4229654142 |", - "| d | 4216440507 |", - "| e | 4144173353 |", - "| b | 4076864659 |", - "+----+------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) } diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt index 8a13dfe36f273..e26ed11197260 100644 --- a/datafusion/core/tests/sqllogictests/test_files/union.slt +++ b/datafusion/core/tests/sqllogictests/test_files/union.slt @@ -191,6 +191,16 @@ UNION ALL 3 Alice 3 John +# union_upcast_types +query TI +SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5 +---- +c 4268716378 +e 4229654142 +d 4216440507 +e 4144173353 +b 4076864659 + ######## # Clean up after the test ######## From a0bfa96afcecd576d436305f2889eae59394adb1 Mon Sep 17 00:00:00 2001 From: parkma99 Date: Fri, 5 May 2023 08:08:01 +0800 Subject: [PATCH 2/3] port rest tests in union.rs --- datafusion/core/tests/sql/mod.rs | 1 - datafusion/core/tests/sql/union.rs | 167 ------------------ .../tests/sqllogictests/test_files/union.slt | 160 +++++++++++++++++ 3 files changed, 160 insertions(+), 168 deletions(-) delete mode 100644 datafusion/core/tests/sql/union.rs diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 8c7f50c9a6494..e1e8077b8a08b 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -99,7 +99,6 @@ pub mod references; pub mod select; pub mod timestamp; pub mod udf; -pub mod union; pub mod wildcard; pub mod window; diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs deleted file mode 100644 index a1f4dcffa3135..0000000000000 --- a/datafusion/core/tests/sql/union.rs +++ /dev/null @@ -1,167 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::*; - -#[tokio::test] -async fn union_with_except_input() -> Result<()> { - let ctx = create_union_context()?; - let sql = "( - SELECT name FROM t1 - EXCEPT - SELECT name FROM t2 - ) - UNION ALL - ( - SELECT name FROM t2 - EXCEPT - SELECT name FROM t1 - )"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan()?; - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Union [name:UInt8;N]", - " LeftAnti Join: t1.name = t2.name [name:UInt8;N]", - " Aggregate: groupBy=[[t1.name]], aggr=[[]] [name:UInt8;N]", - " TableScan: t1 projection=[name] [name:UInt8;N]", - " TableScan: t2 projection=[name] [name:UInt8;N]", - " LeftAnti Join: t2.name = t1.name [name:UInt8;N]", - " Aggregate: groupBy=[[t2.name]], aggr=[[]] [name:UInt8;N]", - " TableScan: t2 projection=[name] [name:UInt8;N]", - " TableScan: t1 projection=[name] [name:UInt8;N]", - ]; - - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - Ok(()) -} - -#[tokio::test] -async fn union_with_type_coercion() -> Result<()> { - let ctx = create_union_context()?; - let sql = "( - SELECT id, name FROM t1 - EXCEPT - SELECT id, name FROM t2 - ) - UNION ALL - ( - SELECT id, name FROM t2 - EXCEPT - SELECT id, name FROM t1 - )"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan()?; - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Union [id:Int32;N, name:UInt8;N]", - " LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name [id:Int32;N, name:UInt8;N]", - " Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]] [id:Int32;N, name:UInt8;N]", - " TableScan: t1 projection=[id, name] [id:Int32;N, name:UInt8;N]", - " TableScan: t2 projection=[id, name] [id:UInt8;N, name:UInt8;N]", - " Projection: CAST(t2.id AS Int32) AS id, t2.name [id:Int32;N, name:UInt8;N]", - " LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name [id:UInt8;N, name:UInt8;N]", - " Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]] [id:UInt8;N, name:UInt8;N]", - " TableScan: t2 projection=[id, name] [id:UInt8;N, name:UInt8;N]", - " TableScan: t1 projection=[id, name] [id:Int32;N, name:UInt8;N]", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - Ok(()) -} - -#[tokio::test] -async fn test_union_upcast_types() -> Result<()> { - let config = SessionConfig::new() - .with_repartition_windows(false) - .with_target_partitions(1); - let ctx = SessionContext::with_config(config); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT c1, c9 FROM aggregate_test_100 - UNION ALL - SELECT c1, c3 FROM aggregate_test_100 - ORDER BY c9 DESC LIMIT 5"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - - let expected_logical_plan = vec![ - "Limit: skip=0, fetch=5 [c1:Utf8, c9:Int64]", - " Sort: aggregate_test_100.c9 DESC NULLS FIRST [c1:Utf8, c9:Int64]", - " Union [c1:Utf8, c9:Int64]", - " Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9 [c1:Utf8, c9:Int64]", - " TableScan: aggregate_test_100 [c1:Utf8, c2:UInt32, c3:Int8, c4:Int16, c5:Int32, c6:Int64, c7:UInt8, c8:UInt16, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]", - " Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9 [c1:Utf8, c9:Int64]", - " TableScan: aggregate_test_100 [c1:Utf8, c2:UInt32, c3:Int8, c4:Int16, c5:Int32, c6:Int64, c7:UInt8, c8:UInt16, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]", - ]; - let formatted_logical_plan = - dataframe.logical_plan().display_indent_schema().to_string(); - let actual_logical_plan: Vec<&str> = formatted_logical_plan.trim().lines().collect(); - assert_eq!(expected_logical_plan, actual_logical_plan, "\n\nexpected:\n\n{expected_logical_plan:#?}\nactual:\n\n{actual_logical_plan:#?}\n\n"); - - Ok(()) -} - -#[tokio::test] -async fn union_with_hash_aggregate() -> Result<()> { - let ctx = create_union_context()?; - let sql = "select count(*) from ( - select distinct name from t1 - union all - select distinct name from t2 - ) group by name"; - - let dataframe = ctx.sql(sql).await.unwrap(); - let plan = dataframe.into_optimized_plan().unwrap(); - let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); - let formatted = displayable(plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - - let expected = vec![ - "ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]", - " AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]", - " InterleaveExec", - " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4", - " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1", - " AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]", - " MemoryExec: partitions=1, partition_sizes=[1]", - " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4", - " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1", - " AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - Ok(()) -} diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt index e26ed11197260..2bd11dac733ab 100644 --- a/datafusion/core/tests/sqllogictests/test_files/union.slt +++ b/datafusion/core/tests/sqllogictests/test_files/union.slt @@ -175,6 +175,61 @@ Alice John # union_with_type_coercion +query TT +explain +( + SELECT id, name FROM t1 + EXCEPT + SELECT id, name FROM t2 +) +UNION ALL +( + SELECT id, name FROM t2 + EXCEPT + SELECT id, name FROM t1 +) +---- +logical_plan +Union + LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name + Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]] + TableScan: t1 projection=[id, name] + TableScan: t2 projection=[id, name] + Projection: CAST(t2.id AS Int32) AS id, t2.name + LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name + Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]] + TableScan: t2 projection=[id, name] + TableScan: t1 projection=[id, name] +physical_plan +UnionExec + ProjectionExec: expr=[id@0 as id, name@1 as name] + CoalesceBatchesExec: target_batch_size=8192 + HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "id", index: 0 }, Column { name: "CAST(t2.id AS Int32)", index: 2 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })] + AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4 + ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name] + ProjectionExec: expr=[id@0 as id, name@1 as name] + CoalesceBatchesExec: target_batch_size=8192 + HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "id", index: 0 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4 + ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] + AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4 + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + query IT rowsort ( SELECT id, name FROM t1 @@ -191,7 +246,79 @@ UNION ALL 3 Alice 3 John +# union_with_except_input +query TT +explain +( + SELECT name FROM t1 + EXCEPT + SELECT name FROM t2 +) +UNION ALL +( + SELECT name FROM t2 + EXCEPT + SELECT name FROM t1 +) +---- +logical_plan +Union + LeftAnti Join: t1.name = t2.name + Aggregate: groupBy=[[t1.name]], aggr=[[]] + TableScan: t1 projection=[name] + TableScan: t2 projection=[name] + LeftAnti Join: t2.name = t1.name + Aggregate: groupBy=[[t2.name]], aggr=[[]] + TableScan: t2 projection=[name] + TableScan: t1 projection=[name] +physical_plan + InterleaveExec + CoalesceBatchesExec: target_batch_size=8192 + HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })] + AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4 + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + CoalesceBatchesExec: target_batch_size=8192 + HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })] + AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4 + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + # union_upcast_types +query TT +explain SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 + Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5 + Union + Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9 + TableScan: aggregate_test_100 projection=[c1, c9] + Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9 + TableScan: aggregate_test_100 projection=[c1, c3] +physical_plan +GlobalLimitExec: skip=0, fetch=5 + SortPreservingMergeExec: [c9@1 DESC] + UnionExec + SortExec: expr=[c9@1 DESC] + ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Int64) as c9] + RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 + CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9] + SortExec: expr=[c9@1 DESC] + ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Int64) as c9] + RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 + CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c3] + query TI SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5 ---- @@ -201,6 +328,39 @@ d 4216440507 e 4144173353 b 4076864659 +# union_with_hash_aggregate +query TT +explain +SELECT count(*) FROM ( + SELECT distinct name FROM t1 + UNION ALL + SELECT distinct name FROM t2 +) GROUP BY name +---- +logical_plan +Projection: COUNT(UInt8(1)) + Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1))]] + Union + Aggregate: groupBy=[[t1.name]], aggr=[[]] + TableScan: t1 projection=[name] + Aggregate: groupBy=[[t2.name]], aggr=[[]] + TableScan: t2 projection=[name] +physical_plan +ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))] + AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))] + InterleaveExec + AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4 + AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] + MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + + ######## # Clean up after the test ######## From 6858daa0629d139504813c568c9555468c93173c Mon Sep 17 00:00:00 2001 From: parkma99 Date: Fri, 5 May 2023 20:18:17 +0800 Subject: [PATCH 3/3] fix clippy error and slt error --- datafusion/core/tests/sql/mod.rs | 23 ------------------- .../tests/sqllogictests/test_files/union.slt | 4 ++-- 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index e1e8077b8a08b..ba2329839f4b4 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -749,29 +749,6 @@ fn create_sort_merge_join_datatype_context() -> Result { Ok(ctx) } -fn create_union_context() -> Result { - let ctx = SessionContext::with_config( - SessionConfig::new() - .with_target_partitions(4) - .with_batch_size(4096), - ); - let t1_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("name", DataType::UInt8, true), - ])); - let t1_data = RecordBatch::new_empty(t1_schema); - ctx.register_batch("t1", t1_data)?; - - let t2_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt8, true), - Field::new("name", DataType::UInt8, true), - ])); - let t2_data = RecordBatch::new_empty(t2_schema); - ctx.register_batch("t2", t2_data)?; - - Ok(ctx) -} - fn create_nested_loop_join_context() -> Result { let ctx = SessionContext::with_config( SessionConfig::new() diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt index 2bd11dac733ab..4860627cffa81 100644 --- a/datafusion/core/tests/sqllogictests/test_files/union.slt +++ b/datafusion/core/tests/sqllogictests/test_files/union.slt @@ -313,11 +313,11 @@ GlobalLimitExec: skip=0, fetch=5 SortExec: expr=[c9@1 DESC] ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Int64) as c9] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true SortExec: expr=[c9@1 DESC] ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Int64) as c9] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c3] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true query TI SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5