diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index d6fd4d8d00ae..200025ef64c5 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -256,7 +256,8 @@ fn test_prefix_match_through_transparent_nodes() { #[test] fn test_no_prefix_match_wrong_direction() { - // Test that prefix matching does NOT work if the direction is wrong + // Test that when the requested sort [a DESC] matches a prefix of the source's + // natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown). let schema = schema(); // Source has [a DESC, b ASC] ordering @@ -265,7 +266,7 @@ fn test_no_prefix_match_wrong_direction() { let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap(); let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - // Request [a DESC] - same direction as source, NOT a reverse prefix + // Request [a DESC] - same direction as source prefix, Sort should be eliminated let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap(); let plan = sort_exec(same_direction, source); @@ -278,8 +279,7 @@ fn test_no_prefix_match_wrong_direction() { - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet output: Ok: - - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet " ); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 1355eff5b9f1..dc8daf2bd4c7 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -743,19 +743,17 @@ impl FileSource for ParquetSource { /// /// With both pieces of information, ParquetSource can decide what optimizations to apply. /// - /// # Phase 1 Behavior (Current) - /// Returns `Inexact` when reversing the row group scan order would help satisfy the - /// requested ordering. We still need a Sort operator at a higher level because: - /// - We only reverse row group read order, not rows within row groups - /// - This provides approximate ordering that benefits limit pushdown - /// - /// # Phase 2 (Future) - /// Could return `Exact` when we can guarantee perfect ordering through techniques like: - /// - File reordering based on statistics - /// - Detecting already-sorted data - /// This would allow removing the Sort operator entirely. + /// # Behavior + /// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already + /// satisfies the requested ordering. This allows the Sort operator to be eliminated + /// if the files within each group are also non-overlapping (checked by FileScanConfig). + /// - Returns `Inexact` when reversing the row group scan order would help satisfy the + /// requested ordering. We still need a Sort operator at a higher level because: + /// - We only reverse row group read order, not rows within row groups + /// - This provides approximate ordering that benefits limit pushdown /// /// # Returns + /// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed) /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order /// - `Unsupported`: Cannot optimize for this ordering fn try_pushdown_sort( @@ -767,6 +765,16 @@ impl FileSource for ParquetSource { return Ok(SortOrderPushdownResult::Unsupported); } + // Check if the natural (non-reversed) ordering already satisfies the request. + // Parquet metadata guarantees within-file ordering, so if the ordering matches + // we can return Exact. FileScanConfig will verify that files within each group + // are non-overlapping before declaring the entire scan as Exact. + if eq_properties.ordering_satisfy(order.iter().cloned())? { + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(self.clone()) as Arc, + }); + } + // Build new equivalence properties with the reversed ordering. // This allows us to check if the reversed ordering satisfies the request // by leveraging: @@ -811,11 +819,6 @@ impl FileSource for ParquetSource { Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) - - // TODO Phase 2: Add support for other optimizations: - // - File reordering based on min/max statistics - // - Detection of exact ordering (return Exact to remove Sort operator) - // - Partial sort pushdown for prefix matches } fn apply_expressions( diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 524e091381c4..a07e0f0e6918 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -901,17 +901,28 @@ impl DataSource for FileScanConfig { match pushdown_result { SortOrderPushdownResult::Exact { inner } => { - Ok(SortOrderPushdownResult::Exact { - inner: self.rebuild_with_source(inner, true, order)?, - }) + let config = self.rebuild_with_source(inner, true, order)?; + // rebuild_with_source keeps output_ordering only if all groups are non-overlapping. + // If it cleared output_ordering, files overlap despite within-file ordering, + // so we downgrade to Inexact. + if config.output_ordering.is_empty() { + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(config), + }) + } else { + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(config), + }) + } } SortOrderPushdownResult::Inexact { inner } => { Ok(SortOrderPushdownResult::Inexact { - inner: self.rebuild_with_source(inner, false, order)?, + inner: Arc::new(self.rebuild_with_source(inner, false, order)?), }) } SortOrderPushdownResult::Unsupported => { - Ok(SortOrderPushdownResult::Unsupported) + // FileSource can't optimize, but we can still sort files by statistics + self.try_sort_file_groups_by_statistics(order) } } } @@ -937,6 +948,16 @@ impl DataSource for FileScanConfig { } } +/// Result of sorting files within groups by their min/max statistics. +struct SortedFileGroups { + /// The file groups with files reordered within each group. + file_groups: Vec, + /// Whether any group's files were actually reordered. + any_reordered: bool, + /// Whether every group was positively confirmed as non-overlapping. + all_non_overlapping: bool, +} + impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. @@ -1243,7 +1264,7 @@ impl FileScanConfig { new_file_source: Arc, is_exact: bool, order: &[PhysicalSortExpr], - ) -> Result> { + ) -> Result { let mut new_config = self.clone(); // Reverse file order (within each group) if the caller is requesting a reversal of this @@ -1283,13 +1304,183 @@ impl FileScanConfig { new_config.file_source = new_file_source; - // Phase 1: Clear output_ordering for Inexact - // (we're only reversing row groups, not guaranteeing perfect ordering) - if !is_exact { + // Sort files within groups by statistics when not reversing + let all_non_overlapping = if !reverse_file_groups { + if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = new_config.projected_schema()?; + let projection_indices = new_config + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + let result = Self::sort_files_within_groups_by_statistics( + &new_config.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + new_config.file_groups = result.file_groups; + result.all_non_overlapping + } else { + false + } + } else { + // When reversing, check if reversed groups are non-overlapping + if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = new_config.projected_schema()?; + let projection_indices = new_config + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + let result = Self::sort_files_within_groups_by_statistics( + &new_config.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + result.all_non_overlapping + } else { + false + } + }; + + if is_exact && all_non_overlapping { + // Truly exact: within-file ordering guaranteed and files are non-overlapping + // Keep output_ordering as-is + } else { + // Either not exact or files overlap: clear output_ordering new_config.output_ordering = vec![]; } - Ok(Arc::new(new_config)) + Ok(new_config) + } + + /// Sort files within each existing file group by their min/max statistics. + /// + /// No files are moved between groups (parallelism/group composition unchanged). + /// Groups where statistics are unavailable are kept as-is. + fn sort_files_within_groups_by_statistics( + file_groups: &[FileGroup], + sort_order: &LexOrdering, + projected_schema: &SchemaRef, + projection_indices: Option<&[usize]>, + ) -> SortedFileGroups { + let mut any_reordered = false; + // Track how many groups we have positively confirmed as non-overlapping. + // Default-safe: if any group is skipped without confirmation, the count + // won't match and we'll conservatively keep the SortExec. + let mut confirmed_non_overlapping: usize = 0; + let mut new_groups = Vec::with_capacity(file_groups.len()); + + for group in file_groups { + if group.len() <= 1 { + // Single-file or empty groups are trivially non-overlapping and already sorted + new_groups.push(group.clone()); + confirmed_non_overlapping += 1; + continue; + } + + let files: Vec<_> = group.iter().collect(); + + // Try to build statistics for this group + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + files.iter().copied(), + ) { + Ok(stats) => stats, + Err(e) => { + log::trace!( + "Cannot sort file group by statistics: {e}. Keeping original order." + ); + new_groups.push(group.clone()); + continue; + } + }; + + // Get indices sorted by min values + let sorted_indices = statistics.min_values_sorted(); + + // Check if already in order + let already_sorted = sorted_indices + .iter() + .enumerate() + .all(|(pos, (idx, _))| pos == *idx); + + let sorted_group: FileGroup = if already_sorted { + group.clone() + } else { + any_reordered = true; + sorted_indices + .iter() + .map(|(idx, _)| files[*idx].clone()) + .collect() + }; + + // Check non-overlapping on the sorted group + let sorted_files: Vec<_> = sorted_group.iter().collect(); + let is_non_overlapping = match MinMaxStatistics::new_from_files( + sort_order, + projected_schema, + projection_indices, + sorted_files.iter().copied(), + ) { + Ok(stats) => stats.is_sorted(), + Err(_) => false, + }; + + if is_non_overlapping { + confirmed_non_overlapping += 1; + } + + new_groups.push(sorted_group); + } + + SortedFileGroups { + file_groups: new_groups, + any_reordered, + all_non_overlapping: confirmed_non_overlapping == file_groups.len(), + } + } + + /// Try to sort file groups by statistics as a best-effort optimization. + /// + /// Used when `FileSource::try_pushdown_sort` returns `Unsupported`. Since there is no + /// within-file ordering guarantee, even non-overlapping groups can only produce `Inexact`. + fn try_sort_file_groups_by_statistics( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let Some(sort_order) = LexOrdering::new(order.iter().cloned()) else { + return Ok(SortOrderPushdownResult::Unsupported); + }; + + let projected_schema = self.projected_schema()?; + let projection_indices = self + .file_source + .projection() + .as_ref() + .and_then(|p| ordered_column_indices_from_projection(p)); + + let result = Self::sort_files_within_groups_by_statistics( + &self.file_groups, + &sort_order, + &projected_schema, + projection_indices.as_deref(), + ); + + if !result.any_reordered { + return Ok(SortOrderPushdownResult::Unsupported); + } + + let mut new_config = self.clone(); + new_config.file_groups = result.file_groups; + new_config.output_ordering = vec![]; + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(new_config), + }) } } @@ -2602,4 +2793,462 @@ mod tests { Ok(()) } + + /// Helper: create a PartitionedFile with Float64 min/max statistics for one column + fn make_file_with_stats(name: &str, min: f64, max: f64) -> PartitionedFile { + PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new( + Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + ..Default::default() + }], + }, + )) + } + + /// Helper: create an ExactSortPushdownSource that always returns Exact + #[derive(Clone)] + struct ExactSortPushdownSource { + metrics: ExecutionPlanMetricsSet, + table_schema: TableSchema, + } + + impl ExactSortPushdownSource { + fn new(table_schema: TableSchema) -> Self { + Self { + metrics: ExecutionPlanMetricsSet::new(), + table_schema, + } + } + } + + impl FileSource for ExactSortPushdownSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "mock_exact" + } + + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + _eq_properties: &EquivalenceProperties, + ) -> Result>> { + Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(self.clone()) as Arc, + }) + } + } + + #[test] + fn sort_pushdown_unsupported_source_files_get_sorted() -> Result<()> { + // MockSource returns Unsupported, but files in reverse order should get sorted + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + // Files in reverse order (high min first) + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file3"); + // output_ordering should be cleared (Unsupported source, no within-file guarantee) + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> { + // Files already in order -> should return Unsupported (no reordering needed) + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!( + matches!(result, SortOrderPushdownResult::Unsupported), + "Expected Unsupported since files were already in order" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_single_file_groups() -> Result<()> { + // Single-file groups -> trivially sorted, should return Unsupported + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![make_file_with_stats("file1", 0.0, 9.0)]), + FileGroup::new(vec![make_file_with_stats("file2", 10.0, 19.0)]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + assert!( + matches!(result, SortOrderPushdownResult::Unsupported), + "Expected Unsupported for single-file groups" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_descending_sort() -> Result<()> { + // Descending sort should reverse file order + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let sort_expr = PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }, + ); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + // Descending order: highest min first + assert_eq!(files[0].object_meta.location.as_ref(), "file3"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file1"); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_multiple_groups() -> Result<()> { + // Multiple groups, each independently sorted + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + FileGroup::new(vec![ + make_file_with_stats("file_b", 10.0, 19.0), + make_file_with_stats("file_a", 0.0, 9.0), + ]), + FileGroup::new(vec![ + make_file_with_stats("file_d", 30.0, 39.0), + make_file_with_stats("file_c", 20.0, 29.0), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + // Group 1: sorted + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_a"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_b"); + // Group 2: sorted + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_c"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_d"); + Ok(()) + } + + #[test] + fn sort_pushdown_unsupported_source_partial_statistics() -> Result<()> { + // One group has stats, one doesn't -> sorted group + unchanged group + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + + let file_groups = vec![ + // Group with stats (out of order) + FileGroup::new(vec![ + make_file_with_stats("file_b", 10.0, 19.0), + make_file_with_stats("file_a", 0.0, 9.0), + ]), + // Group without stats + FileGroup::new(vec![ + PartitionedFile::new("file_d".to_string(), 1024), + PartitionedFile::new("file_c".to_string(), 1024), + ]), + ]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + // Group 1: sorted by stats + let files0 = pushed_config.file_groups[0].files(); + assert_eq!(files0[0].object_meta.location.as_ref(), "file_a"); + assert_eq!(files0[1].object_meta.location.as_ref(), "file_b"); + // Group 2: unchanged (no stats) + let files1 = pushed_config.file_groups[1].files(); + assert_eq!(files1[0].object_meta.location.as_ref(), "file_d"); + assert_eq!(files1[1].object_meta.location.as_ref(), "file_c"); + Ok(()) + } + + #[test] + fn sort_pushdown_inexact_source_with_statistics_sorting() -> Result<()> { + // InexactSortPushdownSource + out-of-order files -> files get sorted + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file1", 0.0, 9.0), + ])]; + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert!(pushed_config.output_ordering.is_empty()); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_non_overlapping_returns_exact() -> Result<()> { + // ExactSortPushdownSource + non-overlapping files -> Exact + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Non-overlapping files already in order + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + assert!( + !pushed_config.output_ordering.is_empty(), + "output_ordering should be preserved for Exact" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_overlapping_downgraded_to_inexact() -> Result<()> { + // ExactSortPushdownSource + overlapping files -> downgrade to Inexact + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Overlapping files + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file1", 0.0, 15.0), + make_file_with_stats("file2", 10.0, 25.0), + make_file_with_stats("file3", 20.0, 30.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact (downgraded from Exact), got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + assert!( + pushed_config.output_ordering.is_empty(), + "output_ordering should be cleared for downgraded Inexact" + ); + Ok(()) + } + + #[test] + fn sort_pushdown_exact_source_non_overlapping_out_of_order_returns_exact() + -> Result<()> { + // ExactSortPushdownSource + non-overlapping files out of order -> sorted, Exact + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); + + let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + + // Non-overlapping but out of order + let file_groups = vec![FileGroup::new(vec![ + make_file_with_stats("file3", 20.0, 30.0), + make_file_with_stats("file1", 0.0, 9.0), + make_file_with_stats("file2", 10.0, 19.0), + ])]; + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr.clone()]).unwrap(), + ]) + .build(); + + let result = config.try_pushdown_sort(&[sort_expr])?; + let SortOrderPushdownResult::Exact { inner } = result else { + panic!("Expected Exact result, got {result:?}"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + // Files should be sorted + let files = pushed_config.file_groups[0].files(); + assert_eq!(files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(files[1].object_meta.location.as_ref(), "file2"); + assert_eq!(files[2].object_meta.location.as_ref(), "file3"); + assert!( + !pushed_config.output_ordering.is_empty(), + "output_ordering should be preserved for Exact" + ); + Ok(()) + } } diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 1fa15492d2a9..65687b0f9f34 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -19,7 +19,7 @@ //! //! This optimizer attempts to push sort requirements down through the execution plan //! tree to data sources that can natively handle them (e.g., by scanning files in -//! reverse order). +//! reverse order or reordering files based on statistics). //! //! ## How it works //! @@ -35,19 +35,20 @@ //! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK) //! - `Unsupported`: No change //! -//! ## Current capabilities (Phase 1) +//! ## Capabilities //! -//! - Reverse scan optimization: when required sort is the reverse of the data source's -//! natural ordering, enable reverse scanning (reading row groups in reverse order) -//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query needs -//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement +//! - **Reverse scan**: when required sort is the reverse of the data source's natural +//! ordering, enable reverse scanning (reading row groups in reverse order) +//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs [A ASC], +//! reversing gives [A ASC, B DESC] which satisfies the requirement +//! - **File reordering by statistics**: files within each group are sorted by their +//! min/max statistics to approximate the requested ordering, improving TopK early +//! termination +//! - **Sort elimination**: when a `FileSource` guarantees within-file ordering (e.g. +//! Parquet with matching metadata) and files within each group are non-overlapping, +//! `Exact` is returned and the `SortExec` is removed entirely //! -//! TODO Issue: -//! ## Future enhancements (Phase 2), -//! -//! - File reordering based on statistics -//! - Return `Exact` when files are known to be perfectly sorted -//! - Complete Sort elimination when ordering is guaranteed +//! See also: use crate::PhysicalOptimizerRule; use datafusion_common::Result; diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 99f26b66d458..daab61266a92 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1632,3 +1632,386 @@ DROP TABLE agg_expr_parquet; statement ok SET datafusion.optimizer.enable_sort_pushdown = true; + + +# Test 4: Sort elimination with non-overlapping multi-file groups +# When files are non-overlapping and each file is internally sorted, +# the optimizer can eliminate the SortExec entirely. + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.execution.collect_statistics = true; + +# Create 3 parquet files with non-overlapping ranges for column `id`: +# file_a: id 1-3 (min=1, max=3) +# file_b: id 4-6 (min=4, max=6) +# file_c: id 7-10 (min=7, max=10) +statement ok +CREATE TABLE t4_src_a(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE t4_src_b(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 600); + +statement ok +CREATE TABLE t4_src_c(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 900), (10, 1000); + +query I +COPY (SELECT * FROM t4_src_a ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t4_nonoverlap/file_a.parquet'; +---- +3 + +query I +COPY (SELECT * FROM t4_src_b ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t4_nonoverlap/file_b.parquet'; +---- +3 + +query I +COPY (SELECT * FROM t4_src_c ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t4_nonoverlap/file_c.parquet'; +---- +4 + +statement ok +CREATE EXTERNAL TABLE t4_sorted(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/t4_nonoverlap/' +WITH ORDER (id ASC); + +# Test 4.1: ORDER BY id ASC → files are non-overlapping and within-file ordering +# is guaranteed (ParquetSource returns Exact). After statistics-based sorting, +# files are in [a, b, c] order. The SortExec should be ELIMINATED. +# Plan should show just DataSourceExec with output_ordering=[id@0 ASC ...]. +query TT +EXPLAIN SELECT * FROM t4_sorted ORDER BY id ASC; +---- +logical_plan +01)Sort: t4_sorted.id ASC NULLS LAST +02)--TableScan: t4_sorted projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_a.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_b.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_c.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Test 4.2: Verify result correctness +query II +SELECT * FROM t4_sorted ORDER BY id ASC; +---- +1 100 +2 200 +3 300 +4 400 +5 500 +6 600 +7 700 +8 800 +9 900 +10 1000 + +# Test 4.3: ORDER BY id ASC LIMIT 3 → Same elimination but with limit pushdown +query TT +EXPLAIN SELECT * FROM t4_sorted ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: t4_sorted.id ASC NULLS LAST, fetch=3 +02)--TableScan: t4_sorted projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_a.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_b.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_c.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT * FROM t4_sorted ORDER BY id ASC LIMIT 3; +---- +1 100 +2 200 +3 300 + +# Test 4.4: ORDER BY id DESC LIMIT 3 → Reverse case. +# Since this is the Inexact (reverse_row_groups) path, SortExec stays +# but reverse_row_groups=true should appear. +query TT +EXPLAIN SELECT * FROM t4_sorted ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: t4_sorted.id DESC NULLS FIRST, fetch=3 +02)--TableScan: t4_sorted projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_c.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_b.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t4_nonoverlap/file_a.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +query II +SELECT * FROM t4_sorted ORDER BY id DESC LIMIT 3; +---- +10 1000 +9 900 +8 800 + +# Cleanup Test 4 +statement ok +DROP TABLE t4_src_a; + +statement ok +DROP TABLE t4_src_b; + +statement ok +DROP TABLE t4_src_c; + +statement ok +DROP TABLE t4_sorted; + + +# Test 5: Sort pushdown with overlapping multi-file groups +# When files overlap, the optimizer uses statistics-based file sorting +# as a best-effort optimization. SortExec stays (Inexact) to ensure +# correct final ordering. + +# Create 3 parquet files with overlapping ranges: +# file_x: id range [1, 5] +# file_y: id range [3, 8] +# file_z: id range [6, 10] +statement ok +CREATE TABLE t5_src_x(id INT, value INT) AS VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50); + +statement ok +CREATE TABLE t5_src_y(id INT, value INT) AS VALUES (3, 31), (4, 41), (5, 51), (6, 61), (7, 71), (8, 81); + +statement ok +CREATE TABLE t5_src_z(id INT, value INT) AS VALUES (6, 62), (7, 72), (8, 82), (9, 92), (10, 102); + +query I +COPY (SELECT * FROM t5_src_x ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t5_overlap/file_x.parquet'; +---- +5 + +query I +COPY (SELECT * FROM t5_src_y ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t5_overlap/file_y.parquet'; +---- +6 + +query I +COPY (SELECT * FROM t5_src_z ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t5_overlap/file_z.parquet'; +---- +5 + +statement ok +CREATE EXTERNAL TABLE t5_overlap(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/t5_overlap/' +WITH ORDER (id ASC); + +# Test 5.1: ORDER BY id DESC, value DESC LIMIT 5 → The requested sort +# doesn't exactly reverse the file ordering (two columns vs one), so the +# Unsupported→statistics fallback path activates. Files are sorted by +# statistics in DESC order [z, y, x]. SortExec stays (Inexact). +query TT +EXPLAIN SELECT * FROM t5_overlap ORDER BY id DESC, value DESC LIMIT 5; +---- +logical_plan +01)Sort: t5_overlap.id DESC NULLS FIRST, t5_overlap.value DESC NULLS FIRST, fetch=5 +02)--TableScan: t5_overlap projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=5), expr=[id@0 DESC, value@1 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t5_overlap/file_z.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t5_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t5_overlap/file_x.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Test 5.2: Verify result correctness with DESC on overlapping files +query II +SELECT * FROM t5_overlap ORDER BY id DESC, value DESC LIMIT 5; +---- +10 102 +9 92 +8 82 +8 81 +7 72 + +# Cleanup Test 5 +statement ok +DROP TABLE t5_src_x; + +statement ok +DROP TABLE t5_src_y; + +statement ok +DROP TABLE t5_src_z; + +statement ok +DROP TABLE t5_overlap; + + +# Test 6: File ordering with reverse scan on non-overlapping files +# Create 3 non-overlapping files with mixed filesystem ordering. +# When requesting DESC order, the reverse path reverses the file listing +# order and applies reverse_row_groups=true. Results should be correct +# regardless of filesystem naming order. + +statement ok +CREATE TABLE t6_src_high(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 900), (10, 1000); + +statement ok +CREATE TABLE t6_src_mid(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 600); + +statement ok +CREATE TABLE t6_src_low(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 300); + +# Write files so that alphabetical order puts high-ID file first +query I +COPY (SELECT * FROM t6_src_high ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t6_reorder/aaa_high.parquet'; +---- +4 + +query I +COPY (SELECT * FROM t6_src_mid ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t6_reorder/bbb_mid.parquet'; +---- +3 + +query I +COPY (SELECT * FROM t6_src_low ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t6_reorder/ccc_low.parquet'; +---- +3 + +statement ok +CREATE EXTERNAL TABLE t6_reorder(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/t6_reorder/' +WITH ORDER (id ASC); + +# Test 6.1: ORDER BY id DESC LIMIT 3 → Reverse scan with non-overlapping files. +# The reverse path reverses file listing order and applies reverse_row_groups=true. +# SortExec stays (Inexact) to ensure correct ordering across files. +query TT +EXPLAIN SELECT * FROM t6_reorder ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: t6_reorder.id DESC NULLS FIRST, fetch=3 +02)--TableScan: t6_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t6_reorder/ccc_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t6_reorder/bbb_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t6_reorder/aaa_high.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +query II +SELECT * FROM t6_reorder ORDER BY id DESC LIMIT 3; +---- +10 1000 +9 900 +8 800 + +# Test 6.2: ORDER BY id DESC (no limit) → Full reverse scan +query II +SELECT * FROM t6_reorder ORDER BY id DESC; +---- +10 1000 +9 900 +8 800 +7 700 +6 600 +5 500 +4 400 +3 300 +2 200 +1 100 + +# Cleanup Test 6 +statement ok +DROP TABLE t6_src_high; + +statement ok +DROP TABLE t6_src_mid; + +statement ok +DROP TABLE t6_src_low; + +statement ok +DROP TABLE t6_reorder; + + +# Test 7: Multi-group case with varying target_partitions +# Reuse non-overlapping files and test with target_partitions=2 +# so files get split across multiple groups. + +statement ok +CREATE TABLE t7_src_a(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE t7_src_b(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 600); + +statement ok +CREATE TABLE t7_src_c(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 900), (10, 1000); + +query I +COPY (SELECT * FROM t7_src_a ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t7_multi/file_a.parquet'; +---- +3 + +query I +COPY (SELECT * FROM t7_src_b ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t7_multi/file_b.parquet'; +---- +3 + +query I +COPY (SELECT * FROM t7_src_c ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/t7_multi/file_c.parquet'; +---- +4 + +# Test 7.1: With target_partitions=2, files get split across 2 groups. +# Each group's files are sorted by statistics. Plan should show +# SortPreservingMergeExec with output_ordering (Sort eliminated within +# each group, merge across groups). +statement ok +SET datafusion.execution.target_partitions = 2; + +statement ok +CREATE EXTERNAL TABLE t7_multi(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/t7_multi/' +WITH ORDER (id ASC); + +query TT +EXPLAIN SELECT * FROM t7_multi ORDER BY id ASC; +---- +logical_plan +01)Sort: t7_multi.id ASC NULLS LAST +02)--TableScan: t7_multi projection=[id, value] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t7_multi/file_a.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t7_multi/file_b.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/t7_multi/file_c.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Verify correctness +query II +SELECT * FROM t7_multi ORDER BY id ASC; +---- +1 100 +2 200 +3 300 +4 400 +5 500 +6 600 +7 700 +8 800 +9 900 +10 1000 + +# Cleanup Test 7 +statement ok +DROP TABLE t7_src_a; + +statement ok +DROP TABLE t7_src_b; + +statement ok +DROP TABLE t7_src_c; + +statement ok +DROP TABLE t7_multi; + +# Reset settings +statement ok +SET datafusion.execution.target_partitions = 4; + +statement ok +SET datafusion.execution.collect_statistics = true;