From 416c83008445c1393afa3c206560ff3bc087259d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 14 Feb 2026 21:56:34 -0500 Subject: [PATCH 1/9] fix: handle complex projections in ordering validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, `get_projected_output_ordering` used `ordered_column_indices_from_projection` which was all-or-nothing: if any expression in the projection wasn't a simple Column, it returned None for the entire projection — even if the sort columns themselves were simple column refs. Replace it with `resolve_sort_column_projection` which only requires sort-column positions to resolve to simple Columns. Each ordering is now independently evaluated: orderings on simple column refs get validated with statistics even when other projection expressions are complex. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 107 +++++++++--------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 78da70402f93d..0e2737b3064b8 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1341,19 +1341,33 @@ impl DisplayAs for FileScanConfig { } } -/// Get the indices of columns in a projection if the projection is a simple -/// list of columns. -/// If there are any expressions other than columns, returns None. -fn ordered_column_indices_from_projection( +/// Build a projection index mapping for the sort columns in `ordering`. +/// +/// Returns a `Vec` of the same length as `projection`, where each entry +/// maps a projected-schema column index to the corresponding table-schema column +/// index. Only the entries referenced by sort columns in `ordering` are required +/// to be simple `Column` expressions; non-sort-column positions are filled with a +/// placeholder (0) since they will never be accessed by `MinMaxStatistics`. +/// +/// Returns `None` if any sort column is not a simple `Column` reference in the +/// projected ordering, or if its corresponding projection expression is not a +/// simple `Column`. +fn resolve_sort_column_projection( + ordering: &LexOrdering, projection: &ProjectionExprs, ) -> Option> { - projection - .expr_iter() - .map(|e| { - let index = e.as_any().downcast_ref::()?.index(); - Some(index) - }) - .collect::>>() + let proj_slice = projection.as_ref(); + let mut indices = vec![0usize; proj_slice.len()]; + + for sort_expr in ordering.iter() { + let col = sort_expr.expr.as_any().downcast_ref::()?; + let proj_idx = col.index(); + let proj_expr = proj_slice.get(proj_idx)?; + let table_col = proj_expr.expr.as_any().downcast_ref::()?; + indices[proj_idx] = table_col.index(); + } + + Some(indices) } /// Check whether a given ordering is valid for all file groups by verifying @@ -1467,47 +1481,38 @@ fn get_projected_output_ordering( let projected_orderings = project_orderings(&base_config.output_ordering, projected_schema); - let indices = base_config - .file_source - .projection() - .as_ref() - .map(|p| ordered_column_indices_from_projection(p)); - - match indices { - Some(Some(indices)) => { - // Simple column projection — validate with statistics - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - Some(indices.as_slice()), - ) - } - None => { - // No projection — validate with statistics (no remapping needed) - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - None, - ) - } - Some(None) => { - // Complex projection (expressions, not simple columns) — can't - // determine column indices for statistics. Still valid if all - // file groups have at most one file. - if base_config.file_groups.iter().all(|g| g.len() <= 1) { - projected_orderings - } else { - debug!( - "Skipping specified output orderings. \ - Some file groups couldn't be determined to be sorted: {:?}", - base_config.file_groups - ); - vec![] + let projection = base_config.file_source.projection(); + + projected_orderings + .into_iter() + .filter(|ordering| match projection.as_ref() { + None => { + // No projection — validate directly with statistics + is_ordering_valid_for_file_groups( + &base_config.file_groups, + ordering, + projected_schema, + None, + ) } - } - } + Some(proj) => match resolve_sort_column_projection(ordering, proj) { + Some(indices) => { + // All sort columns resolved — validate with statistics + is_ordering_valid_for_file_groups( + &base_config.file_groups, + ordering, + projected_schema, + Some(&indices), + ) + } + None => { + // Some sort column is a complex expression — can't + // look up statistics. Fall back to single-file check. + base_config.file_groups.iter().all(|g| g.len() <= 1) + } + }, + }) + .collect() } /// Convert type to a type suitable for use as a `ListingTable` From e2ebdcdda8b95d75c47411b25c9fe83861730c44 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:23:03 -0500 Subject: [PATCH 2/9] fix: unify ordering display with optimization path Replace the independent display computation (get_projected_output_ordering) with orderings extracted from eq_properties().oeq_class(), so EXPLAIN output always matches what the optimizer actually sees. Previously, fmt_as() independently recomputed orderings via get_projected_output_ordering(), which validated post-projection and would drop valid orderings when any projection expression was complex (e.g. `a + 1`). Now both display and optimization use the same path: validate at table-schema level, then project through EquivalenceProperties::project(). - Delete get_projected_output_ordering and resolve_sort_column_projection - Update DataSource::fmt_as and DisplayAs::fmt_as to use eq_properties() - Add regression tests for complex projections with multi-file groups - Update SLT expectations for equivalence-aware ordering display Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 389 ++++++++++++------ .../test_files/encrypted_parquet.slt | 7 +- .../sqllogictest/test_files/group_by.slt | 4 +- datafusion/sqllogictest/test_files/joins.slt | 2 +- .../test_files/monotonic_projection_test.slt | 4 +- .../sqllogictest/test_files/sort_pushdown.slt | 4 +- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/union.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 10 +- 9 files changed, 273 insertions(+), 151 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 0e2737b3064b8..6eded159da40a 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -604,7 +604,8 @@ impl DataSource for FileScanConfig { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; @@ -643,7 +644,7 @@ impl DataSource for FileScanConfig { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1317,21 +1318,23 @@ impl Debug for FileScanConfig { impl DisplayAs for FileScanConfig { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + if let Ok(schema) = self.projected_schema() { + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } } if let Some(limit) = self.limit { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1341,35 +1344,6 @@ impl DisplayAs for FileScanConfig { } } -/// Build a projection index mapping for the sort columns in `ordering`. -/// -/// Returns a `Vec` of the same length as `projection`, where each entry -/// maps a projected-schema column index to the corresponding table-schema column -/// index. Only the entries referenced by sort columns in `ordering` are required -/// to be simple `Column` expressions; non-sort-column positions are filled with a -/// placeholder (0) since they will never be accessed by `MinMaxStatistics`. -/// -/// Returns `None` if any sort column is not a simple `Column` reference in the -/// projected ordering, or if its corresponding projection expression is not a -/// simple `Column`. -fn resolve_sort_column_projection( - ordering: &LexOrdering, - projection: &ProjectionExprs, -) -> Option> { - let proj_slice = projection.as_ref(); - let mut indices = vec![0usize; proj_slice.len()]; - - for sort_expr in ordering.iter() { - let col = sort_expr.expr.as_any().downcast_ref::()?; - let proj_idx = col.index(); - let proj_expr = proj_slice.get(proj_idx)?; - let table_col = proj_expr.expr.as_any().downcast_ref::()?; - indices[proj_idx] = table_col.index(); - } - - Some(indices) -} - /// Check whether a given ordering is valid for all file groups by verifying /// that files within each group are sorted according to their min/max statistics. /// @@ -1415,106 +1389,6 @@ fn validate_orderings( .collect() } -/// The various listing tables does not attempt to read all files -/// concurrently, instead they will read files in sequence within a -/// partition. This is an important property as it allows plans to -/// run against 1000s of files and not try to open them all -/// concurrently. -/// -/// However, it means if we assign more than one file to a partition -/// the output sort order will not be preserved as illustrated in the -/// following diagrams: -/// -/// When only 1 file is assigned to each partition, each partition is -/// correctly sorted on `(A, B, C)` -/// -/// ```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ -/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// DataFusion DataFusion DataFusion DataFusion -/// ┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// -/// DataSourceExec -/// ``` -/// -/// However, when more than 1 file is assigned to each partition, each -/// partition is NOT correctly sorted on `(A, B, C)`. Once the second -/// file is scanned, the same values for A, B and C can be repeated in -/// the same sorted stream -/// -///```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ │ -/// │ │ │ ┃ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// DataFusion DataFusion ┃ -/// ┃ Partition 1 Partition 2 -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ -/// -/// DataSourceExec -/// ``` -fn get_projected_output_ordering( - base_config: &FileScanConfig, - projected_schema: &SchemaRef, -) -> Vec { - let projected_orderings = - project_orderings(&base_config.output_ordering, projected_schema); - - let projection = base_config.file_source.projection(); - - projected_orderings - .into_iter() - .filter(|ordering| match projection.as_ref() { - None => { - // No projection — validate directly with statistics - is_ordering_valid_for_file_groups( - &base_config.file_groups, - ordering, - projected_schema, - None, - ) - } - Some(proj) => match resolve_sort_column_projection(ordering, proj) { - Some(indices) => { - // All sort columns resolved — validate with statistics - is_ordering_valid_for_file_groups( - &base_config.file_groups, - ordering, - projected_schema, - Some(&indices), - ) - } - None => { - // Some sort column is a complex expression — can't - // look up statistics. Fall back to single-file check. - base_config.file_groups.iter().all(|g| g.len() <= 1) - } - }, - }) - .collect() -} - /// Convert type to a type suitable for use as a `ListingTable` /// partition column. Returns `Dictionary(UInt16, val_type)`, which is /// a reasonable trade off between a reasonable number of partition @@ -2616,4 +2490,247 @@ mod tests { Ok(()) } + + /// Helper: create a `PartitionedFile` with min/max stats for the given columns. + fn partitioned_file_with_stats( + name: &str, + col_stats: Vec<(ScalarValue, ScalarValue)>, + ) -> PartitionedFile { + let column_statistics: Vec = col_stats + .into_iter() + .map(|(min, max)| ColumnStatistics { + min_value: Precision::Exact(min), + max_value: Precision::Exact(max), + null_count: Precision::Exact(0), + ..ColumnStatistics::new_unknown() + }) + .collect(); + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics, + }; + PartitionedFile::new(name, 1024).with_statistics(Arc::new(stats)) + } + + /// Regression test: with a complex projection like `a + 1`, the display + /// path should still show orderings (it delegates to `eq_properties()` + /// which validates at table-schema level, then projects correctly). + #[test] + fn test_display_ordering_with_complex_projection_multi_file() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, non-overlapping b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), // b + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), // b + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Push a complex projection: [a + 1 AS x, b] + let expr_a_plus_1: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(expr_a_plus_1, "x"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Format via DisplayAs and verify ordering is present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + assert!( + display.contains("output_ordering="), + "Expected output_ordering in display, but got: {display}" + ); + assert!( + display.contains("b@1 ASC"), + "Expected 'b@1 ASC' in display, but got: {display}" + ); + } + + /// Verify orderings ARE dropped when file statistics overlap + /// (ordering is genuinely invalid for multi-file groups). + #[test] + fn test_display_ordering_dropped_for_overlapping_stats() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, OVERLAPPING b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(20))), // overlaps! + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Format and verify ordering is NOT present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(config.clone()) + ); + assert!( + !display.contains("output_ordering"), + "Expected no output_ordering for overlapping stats, but got: {display}" + ); + } + + /// Verify the display path and optimization path agree: orderings from + /// `eq_properties().oeq_class()` match what appears in `fmt_as()` output. + #[test] + fn test_display_ordering_matches_eq_properties() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Non-overlapping b statistics across two files + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Simple projection [a, b] + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Get orderings from eq_properties (what the optimizer sees) + let eq_props = new_config.eq_properties(); + let oeq_orderings = eq_props.oeq_class(); + assert!( + !oeq_orderings.is_empty(), + "eq_properties should report orderings for valid non-overlapping files" + ); + + // Get display output + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + + // Verify they agree: each ordering from eq_properties should appear in display + for ordering in oeq_orderings.iter() { + let ordering_str = format!("{ordering}"); + assert!( + display.contains(&ordering_str), + "Display should contain ordering '{ordering_str}', but got: {display}" + ); + } + } } diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index d580b7d1ad2b8..42985d6b2c6f5 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,5 +85,10 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided +query RR SELECT * FROM parquet_table +---- +1 2 +5 6 +3 4 +-1 -1 diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 366326479dab1..fb8a97d8cd2d7 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2279,7 +2279,7 @@ logical_plan 02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d) @@ -5004,7 +5004,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query II? SELECT a, b, ARRAY_AGG(c ORDER BY c DESC) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 0c14958f1a87c..aac4ab16cc1c6 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3518,7 +3518,7 @@ logical_plan 05)----TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=example(join_proj_push_down_1@0, join_proj_push_down_2@1) > 3, projection=[a0@0, a@1, b@2, c@3, d@4, a0@6, a@7, b@8, c@9, d@10] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_orderings=[[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [join_proj_push_down_1@5 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true 03)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] 04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 7feefc169fcab..3144977b678f2 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index c6d5df0d0dfee..826d06329ed2f 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -404,7 +404,7 @@ logical_plan 01)Sort: timeseries_parquet.period_end ASC NULLS LAST, fetch=2 02)--Filter: timeseries_parquet.timeframe = Utf8View("quarterly") 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Test 2.4: Verify ASC results query TIR @@ -458,7 +458,7 @@ logical_plan 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] physical_plan 01)SortExec: TopK(fetch=2), expr=[period_end@1 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Results should still be correct query TIR diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a1fef0722297..3f135c25c86e3 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 0f4b02ae25d3f..20dc0b7127590 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -592,7 +592,7 @@ physical_plan 01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST] 02)--UnionExec 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok drop table t1 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index ae83045961488..272f9cfad3226 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3508,7 +3508,7 @@ logical_plan physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--FilterExec: b@2 = 0 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Since column b is constant after filter b=0, # window requirement b ASC, d ASC can be satisfied @@ -3526,7 +3526,7 @@ physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false] 03)----FilterExec: b@2 = 0 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Create an unbounded source where there is multiple orderings. @@ -3562,7 +3562,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3606,7 +3606,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) From 60dead9bd3b41644f4a1f405870d2d2387ad7a84 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:28:45 -0500 Subject: [PATCH 3/9] docs: move ASCII art diagrams to validated_output_ordering The partition/file ordering diagrams from the deleted get_projected_output_ordering are useful context for understanding why we validate orderings against file statistics. Move them to validated_output_ordering where they belong. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 74 ++++++++++++++++--- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 6eded159da40a..2374d67ccc624 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -952,16 +952,69 @@ impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. /// + /// The various listing tables do not attempt to read all files + /// concurrently, instead they read files in sequence within a + /// partition. This is an important property as it allows plans to + /// run against 1000s of files and not try to open them all + /// concurrently. + /// + /// However, it means if we assign more than one file to a partition + /// the output sort order will not be preserved unless the files' + /// min/max statistics prove the combined stream is still ordered. + /// + /// When only 1 file is assigned to each partition, each partition is + /// correctly sorted on `(A, B, C)`: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ + /// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// Partition 1 Partition 2 Partition 3 Partition 4 + /// ┃ ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// DataSourceExec + /// ``` + /// + /// However, when more than 1 file is assigned to each partition, each + /// partition is NOT necessarily sorted on `(A, B, C)`. Once the second + /// file is scanned, the same values for A, B and C can be repeated in + /// the same sorted stream: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + /// Partition 1 Partition 2 ┃ + /// ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ + /// DataSourceExec + /// ``` + /// /// For example, individual files may be ordered by `col1 ASC`, - /// but if we have files with these min/max statistics in a single partition / file group: + /// but if we have files with these min/max statistics in a single + /// partition / file group: /// /// - file1: min(col1) = 10, max(col1) = 20 /// - file2: min(col1) = 5, max(col1) = 15 /// - /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap - /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering. + /// Because reading file1 followed by file2 would produce out-of-order + /// output (there is overlap in the ranges), we cannot retain `col1 ASC` + /// as a valid output ordering. /// - /// Similarly this would not be a valid order (non-overlapping ranges but not ordered): + /// Similarly this would not be a valid order (non-overlapping ranges + /// but not ordered): /// /// - file1: min(col1) = 20, max(col1) = 30 /// - file2: min(col1) = 10, max(col1) = 15 @@ -971,13 +1024,14 @@ impl FileScanConfig { /// - file1: min(col1) = 5, max(col1) = 15 /// - file2: min(col1) = 16, max(col1) = 25 /// - /// Then we know that reading file1 followed by file2 will produce ordered output, - /// so `col1 ASC` would be retained. + /// Then we know that reading file1 followed by file2 will produce + /// ordered output, so `col1 ASC` would be retained. /// - /// Note that we are checking for ordering *within* *each* file group / partition, - /// files in different partitions are read independently and do not affect each other's ordering. - /// Merging of the multiple partition streams into a single ordered stream is handled - /// upstream e.g. by `SortPreservingMergeExec`. + /// Note that we are checking for ordering *within* *each* file group / + /// partition — files in different partitions are read independently and + /// do not affect each other's ordering. Merging of the multiple + /// partition streams into a single ordered stream is handled upstream + /// e.g. by `SortPreservingMergeExec`. fn validated_output_ordering(&self) -> Vec { let schema = self.file_source.table_schema().table_schema(); validate_orderings(&self.output_ordering, schema, &self.file_groups, None) From 6bfdcceeafac4cb686e2546c0f090eb3732ff528 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:42:14 -0500 Subject: [PATCH 4/9] lint --- datafusion/datasource/src/file_scan_config.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 2374d67ccc624..e3c7cb253374b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1378,10 +1378,10 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if let Ok(schema) = self.projected_schema() { - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; - } + if let Ok(schema) = self.projected_schema() + && !schema.fields().is_empty() + { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } if let Some(limit) = self.limit { From 8f067b9e7c0cce4f704f39bfd1088f890cdd1f20 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 13:26:37 -0500 Subject: [PATCH 5/9] fix --- datafusion/sqllogictest/test_files/encrypted_parquet.slt | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index 42985d6b2c6f5..d580b7d1ad2b8 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,10 +85,5 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query RR +query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided SELECT * FROM parquet_table ----- -1 2 -5 6 -3 4 --1 -1 From 411bcb0f6fe1f54e9830de5e988ea869cec65144 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:03:34 +0000 Subject: [PATCH 6/9] Update datafusion/datasource/src/file_scan_config.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- datafusion/datasource/src/file_scan_config.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e3c7cb253374b..6c2e4a8a8437a 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1378,9 +1378,11 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if let Ok(schema) = self.projected_schema() - && !schema.fields().is_empty() - { + let schema = self + .projected_schema() + .map_err(|_| std::fmt::Error)?; + + if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } From da8a62936da06eb66ea05c894b6e36c4297f9c9e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:11:53 +0000 Subject: [PATCH 7/9] Apply suggestion from @adriangb --- datafusion/datasource/src/file_scan_config.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 6c2e4a8a8437a..5dc8319c3587b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1378,10 +1378,7 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - let schema = self - .projected_schema() - .map_err(|_| std::fmt::Error)?; - + let schema = self.projected_schema().map_err(|_| std::fmt::Error)?; if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } From 5fbb6eb41d359a90add1d3154e2a7194e686ab21 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 6 Mar 2026 14:19:43 -0800 Subject: [PATCH 8/9] fix: preserve declaration order of orderings in EXPLAIN display Three changes to ensure projected orderings match the original declaration order: 1. In OrderingEquivalenceClass::remove_redundant_entries(), use Vec::remove() instead of Vec::swap_remove() to preserve ordering during dedup. 2. In projected_orderings(), swap chain order so dependency-map-based orderings (which preserve declaration order via IndexMap) come before newly derived orderings from projection mapping. 3. In substitute_oeq_class(), push the original sort expression first before substituted (derived) expressions so the cartesian product preserves declaration order. Co-Authored-By: Claude Opus 4.6 --- datafusion/physical-expr/src/equivalence/ordering.rs | 4 ++-- .../physical-expr/src/equivalence/properties/mod.rs | 12 ++++++++---- datafusion/sqllogictest/test_files/group_by.slt | 4 ++-- .../test_files/monotonic_projection_test.slt | 4 ++-- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 10 +++++----- 6 files changed, 20 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 2ce8a8d246fe7..76d98ae4bbecb 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -107,14 +107,14 @@ impl OrderingEquivalenceClass { if let Some(remove) = self.resolve_overlap(idx, ordering_idx) { work = true; if remove { - self.orderings.swap_remove(idx); + self.orderings.remove(idx); continue 'outer; } } if let Some(remove) = self.resolve_overlap(ordering_idx, idx) { work = true; if remove { - self.orderings.swap_remove(ordering_idx); + self.orderings.remove(ordering_idx); continue; } } diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 1ca4ead0335de..6a845a31bd957 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -871,7 +871,9 @@ impl EquivalenceProperties { .map(|(source, _target)| source) .filter(|source| expr_refers(source, &sort_expr.expr)) .cloned(); - let mut result = vec![]; + // Start with the original expression so that + // declared orderings appear before derived ones: + let mut result = vec![sort_expr.clone()]; // The sort expression comes from this schema, so the // following call to `unwrap` is safe. let expr_type = sort_expr.expr.data_type(schema).unwrap(); @@ -883,7 +885,6 @@ impl EquivalenceProperties { result.push(substituted); } } - result.push(sort_expr); result }) // Generate all valid orderings given substituted expressions: @@ -1122,8 +1123,11 @@ impl EquivalenceProperties { prefixes }); - // Simplify each ordering by removing redundant sections: - orderings.chain(projected_orderings).collect() + // Simplify each ordering by removing redundant sections. + // Place projected_orderings first so that orderings reconstructed + // from the dependency map (which preserves declaration order via + // IndexMap) appear before newly derived orderings from Pass 1: + projected_orderings.chain(orderings).collect() } /// Projects constraints according to the given projection mapping. diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index fb8a97d8cd2d7..366326479dab1 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2279,7 +2279,7 @@ logical_plan 02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d) @@ -5004,7 +5004,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true query II? SELECT a, b, ARRAY_AGG(c ORDER BY c DESC) diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 3144977b678f2..62bad823c0111 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 3f135c25c86e3..38305c2a23d6b 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 272f9cfad3226..25f73ec849bc3 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3508,7 +3508,7 @@ logical_plan physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--FilterExec: b@2 = 0 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true # Since column b is constant after filter b=0, # window requirement b ASC, d ASC can be satisfied @@ -3526,7 +3526,7 @@ physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false] 03)----FilterExec: b@2 = 0 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true # Create an unbounded source where there is multiple orderings. @@ -3562,7 +3562,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3606,7 +3606,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) From e643fbc91facf9f92d44ae8d847129b99c1b137c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 6 Mar 2026 14:33:30 -0800 Subject: [PATCH 9/9] fix: prefer removing later duplicates in remove_redundant_entries Swap the order of resolve_overlap checks so that when two orderings are exact duplicates, the later occurrence is removed instead of the earlier one. This preserves the declaration order of orderings when the same ordering appears multiple times (e.g. from both Pass 1 and Pass 2 of projected_orderings). Co-Authored-By: Claude Opus 4.6 --- .../physical-expr/src/equivalence/ordering.rs | 15 +++++++++------ .../src/equivalence/properties/dependency.rs | 2 +- .../test_files/monotonic_projection_test.slt | 4 ++-- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 4 ++-- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 76d98ae4bbecb..48e2a37ce7752 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -104,18 +104,21 @@ impl OrderingEquivalenceClass { 'outer: while idx < self.orderings.len() { let mut ordering_idx = idx + 1; while ordering_idx < self.orderings.len() { - if let Some(remove) = self.resolve_overlap(idx, ordering_idx) { + // Check the later index first so that for exact + // duplicates we remove the later occurrence and + // preserve the declaration order of orderings: + if let Some(remove) = self.resolve_overlap(ordering_idx, idx) { work = true; if remove { - self.orderings.remove(idx); - continue 'outer; + self.orderings.remove(ordering_idx); + continue; } } - if let Some(remove) = self.resolve_overlap(ordering_idx, idx) { + if let Some(remove) = self.resolve_overlap(idx, ordering_idx) { work = true; if remove { - self.orderings.remove(ordering_idx); - continue; + self.orderings.remove(idx); + continue 'outer; } } ordering_idx += 1; diff --git a/datafusion/physical-expr/src/equivalence/properties/dependency.rs b/datafusion/physical-expr/src/equivalence/properties/dependency.rs index 57804d3950852..a0dfa15bea25b 100644 --- a/datafusion/physical-expr/src/equivalence/properties/dependency.rs +++ b/datafusion/physical-expr/src/equivalence/properties/dependency.rs @@ -492,7 +492,7 @@ mod tests { assert_eq!( out_properties.to_string(), - "order: [[a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC], [a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC]]" + "order: [[a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC], [a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC]]" ); Ok(()) diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 62bad823c0111..3144977b678f2 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 38305c2a23d6b..e202f35583e8f 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, letter@1 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 25f73ec849bc3..6598ab197b05f 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3346,7 +3346,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum3, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as sum4] 02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Linear] -03)----RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, preserve_order=true, sort_exprs=__common_expr_1@0 ASC NULLS LAST, a@1 ASC NULLS LAST +03)----RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST 04)------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as a, d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 06)----------RepartitionExec: partitioning=Hash([b@2, a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST