diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index cbc7e65d1c753..9637457370f0b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1127,12 +1127,12 @@ impl ListingTable { get_files_with_limit(files, limit, self.options.collect_stat).await?; let file_groups = file_group.split_files(self.options.target_partitions); - compute_all_files_statistics( + Ok(compute_all_files_statistics( file_groups, self.schema(), self.options.collect_stat, inexact_stats, - ) + )) } /// Collects statistics for a given partitioned file. diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index a1f966c22f35f..b8d8598c79285 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -17,6 +17,7 @@ //! Logic for managing groups of [`PartitionedFile`]s in DataFusion +use crate::statistics::compute_file_group_statistics; use crate::{FileRange, PartitionedFile}; use datafusion_common::Statistics; use itertools::Itertools; @@ -199,11 +200,20 @@ impl FileGroupPartitioner { } // special case when order must be preserved - if self.preserve_order_within_groups { + let repartitioned_groups = if self.preserve_order_within_groups { self.repartition_preserving_order(file_groups) } else { self.repartition_evenly_by_size(file_groups) + }; + + let repartitioned_groups = repartitioned_groups?; + + // Recompute statistics for each file group + let mut groups = Vec::with_capacity(repartitioned_groups.len()); + for file_group in repartitioned_groups { + groups.push(compute_file_group_statistics(file_group, true)); } + Some(groups) } /// Evenly repartition files across partitions by size, ignoring any @@ -351,8 +361,18 @@ impl FileGroupPartitioner { if i == last_group { range_end = file_size as i64; } - target_group - .push(original_file.clone().with_range(range_start, range_end)); + let updated_file = + original_file.clone().with_range(range_start, range_end); + let statistics_option = updated_file + .statistics + .as_ref() + .map(|stat| Arc::new(stat.as_ref().clone().to_inexact())); + + if let Some(statistics) = statistics_option { + target_group.push(updated_file.with_statistics(statistics)); + } else { + target_group.push(updated_file); + } range_start = range_end; range_end += range_size; } @@ -525,6 +545,9 @@ impl Ord for ToRepartition { #[cfg(test)] mod test { use super::*; + use datafusion_common::stats::Precision; + use datafusion_common::ScalarValue; + use std::sync::Arc; /// Empty file won't get partitioned #[test] @@ -941,6 +964,138 @@ mod test { assert_partitioned_files(expected, actual); } + #[test] + fn repartition_file_groups_with_statistics() -> datafusion_common::Result<()> { + // Create test files + let mut file1 = pfile("a", 100); + let mut file2 = pfile("b", 50); + + // Create statistics for file groups + let stats1 = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + // Just add column statistics for a couple columns + datafusion_common::ColumnStatistics { + null_count: Precision::Exact(10), + max_value: Precision::Exact(ScalarValue::UInt32(Some(100))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + + file1 = file1.with_statistics(Arc::new(stats1.clone())); + + let stats2 = Statistics { + num_rows: Precision::Exact(500), + total_byte_size: Precision::Exact(50), + column_statistics: vec![ + // Just add column statistics for a couple columns + datafusion_common::ColumnStatistics { + null_count: Precision::Exact(5), + max_value: Precision::Exact(ScalarValue::UInt32(Some(200))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(101))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + + file2 = file2.with_statistics(Arc::new(stats2.clone())); + + let file_groups = vec![ + FileGroup::new(vec![file1]).with_statistics(Arc::new(stats1)), + FileGroup::new(vec![file2]).with_statistics(Arc::new(stats2)), + ]; + + // Verify initial state + assert!(file_groups[0].statistics().is_some()); + assert!(file_groups[1].statistics().is_some()); + + // Repartition files + let repartitioned = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(3) + .with_repartition_file_min_size(10) + .repartition_file_groups(&file_groups) + .unwrap(); + + // Verify statistics are present and valid + assert_eq!(repartitioned.len(), 3, "Should have 3 partitions"); + + // Helper function to check statistics are inexact + fn assert_stats_are_inexact(stats: &Statistics) { + assert!(!stats.num_rows.is_exact().unwrap()); + assert!(!stats.total_byte_size.is_exact().unwrap()); + assert!(!stats.column_statistics[0].max_value.is_exact().unwrap()); + } + + for group in repartitioned.iter() { + // Check all files have inexact statistics regardless of group + for file in group.files.iter() { + let stats = file.statistics.as_ref().unwrap(); + assert_stats_are_inexact(stats); + } + + let stats = group.statistics.as_ref().unwrap(); + assert_stats_are_inexact(stats); + } + + // Check the specific statistics for each group (after repartition, each group only has one file, so we don't need to check the partitioned file statistics) + let expected_group_1_statistics = Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![datafusion_common::ColumnStatistics { + null_count: Precision::Inexact(10), + max_value: Precision::Inexact(ScalarValue::UInt32(Some(100))), + min_value: Precision::Inexact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let expected_group_2_statistics = Statistics { + num_rows: Precision::Inexact(500), + total_byte_size: Precision::Inexact(50), + column_statistics: vec![datafusion_common::ColumnStatistics { + null_count: Precision::Inexact(5), + max_value: Precision::Inexact(ScalarValue::UInt32(Some(200))), + min_value: Precision::Inexact(ScalarValue::UInt32(Some(101))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let expected_group_3_statistics = Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![datafusion_common::ColumnStatistics { + null_count: Precision::Inexact(10), + max_value: Precision::Inexact(ScalarValue::UInt32(Some(100))), + min_value: Precision::Inexact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + assert_eq!( + repartitioned[0].statistics.as_ref().unwrap(), + &Arc::new(expected_group_1_statistics) + ); + assert_eq!( + repartitioned[1].statistics.as_ref().unwrap(), + &Arc::new(expected_group_2_statistics) + ); + assert_eq!( + repartitioned[2].statistics.as_ref().unwrap(), + &Arc::new(expected_group_3_statistics) + ); + + Ok(()) + } + /// Asserts that the two groups of [`PartitionedFile`] are the same /// (PartitionedFile doesn't implement PartialEq) fn assert_partitioned_files( diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 040bf754dd274..f6ceaf05c5178 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -410,23 +410,24 @@ pub async fn get_statistics_with_limit( } /// Generic function to compute statistics across multiple items that have statistics -fn compute_summary_statistics( +/// If `items` is empty or all items don't have statistics, it returns `None`. +pub fn compute_summary_statistics( items: I, - file_schema: &SchemaRef, stats_extractor: impl Fn(&T) -> Option<&Statistics>, -) -> Statistics +) -> Option where I: IntoIterator, { - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut col_stats_set = Vec::new(); let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; - for (idx, item) in items.into_iter().enumerate() { + for item in items.into_iter() { if let Some(item_stats) = stats_extractor(&item) { - if idx == 0 { + if col_stats_set.is_empty() { // First item, set values directly + col_stats_set = + vec![ColumnStatistics::default(); item_stats.column_statistics.len()]; num_rows = item_stats.num_rows; total_byte_size = item_stats.total_byte_size; for (index, column_stats) in @@ -458,11 +459,15 @@ where } } - Statistics { + if col_stats_set.is_empty() { + // No statistics available + return None; + } + Some(Statistics { num_rows, total_byte_size, column_statistics: col_stats_set, - } + }) } /// Computes the summary statistics for a group of files(`FileGroup` level's statistics). @@ -479,22 +484,24 @@ where /// * `collect_stats` - Whether to collect statistics (if false, returns original file group) /// /// # Returns -/// A new file group with summary statistics attached +/// A new file group with summary statistics attached if there is statistics pub fn compute_file_group_statistics( - file_group: FileGroup, - file_schema: SchemaRef, + mut file_group: FileGroup, collect_stats: bool, -) -> Result { +) -> FileGroup { if !collect_stats { - return Ok(file_group); + return file_group; } - let statistics = - compute_summary_statistics(file_group.iter(), &file_schema, |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); + let statistics = compute_summary_statistics(file_group.iter(), |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); + + if let Some(stats) = statistics { + file_group = file_group.with_statistics(Arc::new(stats)); + } - Ok(file_group.with_statistics(Arc::new(statistics))) + file_group } /// Computes statistics for all files across multiple file groups. @@ -519,29 +526,30 @@ pub fn compute_all_files_statistics( file_schema: SchemaRef, collect_stats: bool, inexact_stats: bool, -) -> Result<(Vec, Statistics)> { +) -> (Vec, Statistics) { + if !collect_stats { + return (file_groups, Statistics::new_unknown(&file_schema)); + } let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); // First compute statistics for each file group for file_group in file_groups { - file_groups_with_stats.push(compute_file_group_statistics( - file_group, - Arc::clone(&file_schema), - collect_stats, - )?); + file_groups_with_stats + .push(compute_file_group_statistics(file_group, collect_stats)); } // Then summary statistics across all file groups let mut statistics = - compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { + compute_summary_statistics(&file_groups_with_stats, |file_group| { file_group.statistics() - }); + }) + .unwrap_or(Statistics::new_unknown(&file_schema)); if inexact_stats { statistics = statistics.to_inexact() } - Ok((file_groups_with_stats, statistics)) + (file_groups_with_stats, statistics) } pub fn add_row_stats( @@ -620,18 +628,11 @@ fn set_min_if_lesser( #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::ScalarValue; use std::sync::Arc; #[test] fn test_compute_summary_statistics_basic() { - // Create a schema with two columns - let schema = Arc::new(Schema::new(vec![ - Field::new("col1", DataType::Int32, false), - Field::new("col2", DataType::Int32, false), - ])); - // Create items with statistics let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -679,7 +680,7 @@ mod tests { // Call compute_summary_statistics let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + compute_summary_statistics(items, |item| Some(item.as_ref())).unwrap(); // Verify the results assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 @@ -719,13 +720,6 @@ mod tests { #[test] fn test_compute_summary_statistics_mixed_precision() { - // Create a schema with one column - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - // Create items with different precision levels let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -754,7 +748,7 @@ mod tests { let items = vec![Arc::new(stats1), Arc::new(stats2)]; let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + compute_summary_statistics(items, |item| Some(item.as_ref())).unwrap(); assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); @@ -774,25 +768,11 @@ mod tests { #[test] fn test_compute_summary_statistics_empty() { - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - // Empty collection let items: Vec> = vec![]; - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); - // Verify default values for empty collection - assert_eq!(summary_stats.num_rows, Precision::Absent); - assert_eq!(summary_stats.total_byte_size, Precision::Absent); - assert_eq!(summary_stats.column_statistics.len(), 1); - assert_eq!( - summary_stats.column_statistics[0].null_count, - Precision::Absent - ); + assert!(summary_stats.is_none()); } }