Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,31 @@ impl Statistics {
}
}

/// Calculates `total_byte_size` based on the schema and `num_rows`.
/// If any of the columns has non-primitive width, `total_byte_size` is set to inexact.
pub fn calculate_total_byte_size(&mut self, schema: &Schema) {
let mut row_size = Some(0);
for field in schema.fields() {
match field.data_type().primitive_width() {
Some(width) => {
row_size = row_size.map(|s| s + width);
}
None => {
row_size = None;
break;
}
}
}
match row_size {
None => {
self.total_byte_size = self.total_byte_size.to_inexact();
}
Some(size) => {
self.total_byte_size = self.num_rows.multiply(&Precision::Exact(size));
}
}
}

/// Returns an unbounded `ColumnStatistics` for each field in the schema.
pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
schema
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ mod tests {
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
Precision::Absent,
);

Ok(())
Expand Down Expand Up @@ -770,10 +770,9 @@ mod tests {
exec.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
Precision::Absent,
);
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod tests {
);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
Precision::Absent,
);

Ok(())
Expand Down Expand Up @@ -1397,7 +1397,7 @@ mod tests {
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec_enabled.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
Precision::Absent,
);

Ok(())
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ async fn load_table_stats_with_session_level_cache() {
);
assert_eq!(
exec1.partition_statistics(None).unwrap().total_byte_size,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
// Byte size is absent because we cannot estimate the output size
// of the Arrow data since there are variable length columns.
Precision::Absent,
);
assert_eq!(get_static_cache_size(&state1), 1);

Expand All @@ -143,8 +144,8 @@ async fn load_table_stats_with_session_level_cache() {
);
assert_eq!(
exec2.partition_statistics(None).unwrap().total_byte_size,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
// Absent because the data contains variable length columns
Precision::Absent,
);
assert_eq!(get_static_cache_size(&state2), 1);

Expand All @@ -158,8 +159,8 @@ async fn load_table_stats_with_session_level_cache() {
);
assert_eq!(
exec3.partition_statistics(None).unwrap().total_byte_size,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
// Absent because the data contains variable length columns
Precision::Absent,
);
// List same file no increase
assert_eq!(get_static_cache_size(&state1), 1);
Expand Down
49 changes: 27 additions & 22 deletions datafusion/core/tests/physical_optimizer/partition_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod test {
/// - Each partition has an "id" column (INT) with the following values:
/// - First partition: [3, 4]
/// - Second partition: [1, 2]
/// - Each row is 110 bytes in size
/// - Each partition has 16 bytes total (Int32 id: 4 bytes × 2 rows + Date32 date: 4 bytes × 2 rows)
///
/// @param create_table_sql Optional parameter to set the create table SQL
/// @param target_partition Optional parameter to set the target partitions
Expand Down Expand Up @@ -215,9 +215,9 @@ mod test {
.map(|idx| scan.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
let expected_statistic_partition_1 =
create_partition_statistics(2, 110, 3, 4, true);
create_partition_statistics(2, 16, 3, 4, true);
let expected_statistic_partition_2 =
create_partition_statistics(2, 110, 1, 2, true);
create_partition_statistics(2, 16, 1, 2, true);
// Check the statistics of each partition
assert_eq!(statistics.len(), 2);
assert_eq!(statistics[0], expected_statistic_partition_1);
Expand Down Expand Up @@ -277,8 +277,7 @@ mod test {
let statistics = (0..sort_exec.output_partitioning().partition_count())
.map(|idx| sort_exec.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
let expected_statistic_partition =
create_partition_statistics(4, 220, 1, 4, true);
let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true);
assert_eq!(statistics.len(), 1);
assert_eq!(statistics[0], expected_statistic_partition);
// Check the statistics_by_partition with real results
Expand All @@ -292,9 +291,9 @@ mod test {
SortExec::new(ordering.into(), scan_2).with_preserve_partitioning(true),
);
let expected_statistic_partition_1 =
create_partition_statistics(2, 110, 3, 4, true);
create_partition_statistics(2, 16, 3, 4, true);
let expected_statistic_partition_2 =
create_partition_statistics(2, 110, 1, 2, true);
create_partition_statistics(2, 16, 1, 2, true);
let statistics = (0..sort_exec.output_partitioning().partition_count())
.map(|idx| sort_exec.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -366,9 +365,9 @@ mod test {
// Check that we have 4 partitions (2 from each scan)
assert_eq!(statistics.len(), 4);
let expected_statistic_partition_1 =
create_partition_statistics(2, 110, 3, 4, true);
create_partition_statistics(2, 16, 3, 4, true);
let expected_statistic_partition_2 =
create_partition_statistics(2, 110, 1, 2, true);
create_partition_statistics(2, 16, 1, 2, true);
// Verify first partition (from first scan)
assert_eq!(statistics[0], expected_statistic_partition_1);
// Verify second partition (from first scan)
Expand Down Expand Up @@ -418,7 +417,7 @@ mod test {

let expected_stats = Statistics {
num_rows: Precision::Inexact(4),
total_byte_size: Precision::Inexact(220),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand why these numbers changed. I think the issue is that the way the row sizes are calculated when the statistics are initially computed doesn't match how the row sizes are calculated by ProjectionExprs. I'm guessing they're measuring different things (parquet row size vs arrow row size??). Unfortunately the Statistics struct doesn't have much in terms of documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is the case:

total_byte_size += row_group_meta.total_byte_size() as usize;

So the initial collection just adds up the size of the row groups (in Parquet). But presumably we want the size to be the size returned by the scan, not the size of the scan itself. So this is wrong, it should be doing the same calculation as ProjectionExprs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to include a fix in this PR so that the original collection matches 😄

Copy link
Contributor

@alamb alamb Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree using "bytes of arrow data" sounds better compared to "bytes of parquet encoded data"

i would have thought the difference is due to your fixing this:

(BTW I think you could mark this PR as fixing that issue as well)

Edit: I see you did!

 // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
            total_byte_size: statistics.total_byte_size,

total_byte_size: Precision::Inexact(32),
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
Expand Down Expand Up @@ -462,7 +461,7 @@ mod test {
// Check that we have 2 partitions
assert_eq!(statistics.len(), 2);
let mut expected_statistic_partition_1 =
create_partition_statistics(8, 48400, 1, 4, true);
create_partition_statistics(8, 512, 1, 4, true);
expected_statistic_partition_1
.column_statistics
.push(ColumnStatistics {
Expand All @@ -473,7 +472,7 @@ mod test {
distinct_count: Precision::Absent,
});
let mut expected_statistic_partition_2 =
create_partition_statistics(8, 48400, 1, 4, true);
create_partition_statistics(8, 512, 1, 4, true);
expected_statistic_partition_2
.column_statistics
.push(ColumnStatistics {
Expand Down Expand Up @@ -501,9 +500,9 @@ mod test {
let coalesce_batches: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(scan, 2));
let expected_statistic_partition_1 =
create_partition_statistics(2, 110, 3, 4, true);
create_partition_statistics(2, 16, 3, 4, true);
let expected_statistic_partition_2 =
create_partition_statistics(2, 110, 1, 2, true);
create_partition_statistics(2, 16, 1, 2, true);
let statistics = (0..coalesce_batches.output_partitioning().partition_count())
.map(|idx| coalesce_batches.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
Expand All @@ -525,8 +524,7 @@ mod test {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
let coalesce_partitions: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(scan));
let expected_statistic_partition =
create_partition_statistics(4, 220, 1, 4, true);
let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true);
let statistics = (0..coalesce_partitions.output_partitioning().partition_count())
.map(|idx| coalesce_partitions.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -575,8 +573,7 @@ mod test {
.map(|idx| global_limit.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
assert_eq!(statistics.len(), 1);
let expected_statistic_partition =
create_partition_statistics(2, 110, 3, 4, true);
let expected_statistic_partition = create_partition_statistics(2, 16, 3, 4, true);
assert_eq!(statistics[0], expected_statistic_partition);
Ok(())
}
Expand Down Expand Up @@ -627,7 +624,11 @@ mod test {

let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Inexact(110),
// Each row produces 8 bytes of data:
// - id column: Int32 (4 bytes) × 2 rows = 8 bytes
// - id + 1 column: Int32 (4 bytes) × 2 rows = 8 bytes
// AggregateExec cannot yet derive byte sizes for the COUNT(c) column
total_byte_size: Precision::Inexact(16),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
Expand All @@ -645,7 +646,11 @@ mod test {

let expected_p1_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Inexact(110),
// Each row produces 8 bytes of data:
// - id column: Int32 (4 bytes) × 2 rows = 8 bytes
// - id + 1 column: Int32 (4 bytes) × 2 rows = 8 bytes
// AggregateExec cannot yet derive byte sizes for the COUNT(c) column
total_byte_size: Precision::Inexact(16),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
Expand Down Expand Up @@ -851,7 +856,7 @@ mod test {

let expected_stats = Statistics {
num_rows: Precision::Inexact(1),
total_byte_size: Precision::Inexact(73),
total_byte_size: Precision::Inexact(10),
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
Expand Down Expand Up @@ -955,7 +960,7 @@ mod test {

let expected_stats = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Inexact(110),
total_byte_size: Precision::Inexact(16),
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
Expand Down
4 changes: 1 addition & 3 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,8 @@ impl<'a> DFParquetMetadata<'a> {
let mut statistics = Statistics::new_unknown(table_schema);
let mut has_statistics = false;
let mut num_rows = 0_usize;
let mut total_byte_size = 0_usize;
for row_group_meta in row_groups_metadata {
num_rows += row_group_meta.num_rows() as usize;
total_byte_size += row_group_meta.total_byte_size() as usize;

if !has_statistics {
has_statistics = row_group_meta
Expand All @@ -260,7 +258,7 @@ impl<'a> DFParquetMetadata<'a> {
}
}
statistics.num_rows = Precision::Exact(num_rows);
statistics.total_byte_size = Precision::Exact(total_byte_size);
statistics.calculate_total_byte_size(table_schema);

let file_metadata = metadata.file_metadata();
let mut file_schema = parquet_to_arrow_schema(
Expand Down
Loading
Loading