Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use datafusion_physical_expr::{
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::stats::Precision;
use datafusion_datasource::add_row_stats;
use datafusion_datasource::compute_all_files_statistics;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
Expand Down Expand Up @@ -1230,7 +1229,7 @@ async fn get_files_with_limit(
file_stats.num_rows
} else {
// For subsequent files, accumulate the counts
add_row_stats(num_rows, file_stats.num_rows)
num_rows.add(&file_stats.num_rows)
};
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ use file_meta::FileMeta;
use futures::{Stream, StreamExt};
use object_store::{path::Path, ObjectMeta};
use object_store::{GetOptions, GetRange, ObjectStore};
// Remove when add_row_stats is remove
#[allow(deprecated)]
pub use statistics::add_row_stats;
pub use statistics::compute_all_files_statistics;
use std::ops::Range;
Expand Down
96 changes: 14 additions & 82 deletions datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! respect to the required sort order. See [`MinMaxStatistics`]

use futures::{Stream, StreamExt};
use std::mem;
use std::sync::Arc;

use crate::file_groups::FileGroup;
Expand All @@ -34,7 +33,6 @@ use arrow::{
row::{Row, Rows},
};
use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand Down Expand Up @@ -357,10 +355,9 @@ pub async fn get_statistics_with_limit(
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
num_rows = add_row_stats(file_stats.num_rows, num_rows);
num_rows = num_rows.add(&file_stats.num_rows);

total_byte_size =
add_row_stats(file_stats.total_byte_size, total_byte_size);
total_byte_size = total_byte_size.add(&file_stats.total_byte_size);

for (file_col_stats, col_stats) in file_stats
.column_statistics
Expand All @@ -375,10 +372,10 @@ pub async fn get_statistics_with_limit(
distinct_count: _,
} = file_col_stats;

col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count);
set_max_if_greater(file_max, &mut col_stats.max_value);
set_min_if_lesser(file_min, &mut col_stats.min_value);
col_stats.sum_value = file_sum.add(&col_stats.sum_value);
col_stats.null_count = col_stats.null_count.add(file_nc);
col_stats.max_value = col_stats.max_value.max(file_max);
col_stats.min_value = col_stats.min_value.min(file_min);
col_stats.sum_value = col_stats.sum_value.add(file_sum);
}

// If the number of rows exceeds the limit, we can stop processing
Expand Down Expand Up @@ -441,19 +438,19 @@ where
}

// Accumulate statistics for subsequent items
num_rows = add_row_stats(item_stats.num_rows, num_rows);
total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size);
num_rows = num_rows.add(&item_stats.num_rows);
total_byte_size = total_byte_size.add(&item_stats.total_byte_size);

for (item_col_stats, col_stats) in item_stats
.column_statistics
.iter()
.zip(col_stats_set.iter_mut())
{
col_stats.null_count =
add_row_stats(item_col_stats.null_count, col_stats.null_count);
set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value);
set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value);
col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value);
col_stats.null_count.add(&item_col_stats.null_count);
col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value);
col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value);
col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value);
}
}
}
Expand Down Expand Up @@ -545,77 +542,12 @@ pub fn compute_all_files_statistics(
Ok((file_groups_with_stats, statistics))
}

#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
pub fn add_row_stats(
file_num_rows: Precision<usize>,
num_rows: Precision<usize>,
) -> Precision<usize> {
match (file_num_rows, &num_rows) {
(Precision::Absent, _) => num_rows.to_inexact(),
(lhs, Precision::Absent) => lhs.to_inexact(),
(lhs, rhs) => lhs.add(rhs),
}
}

/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is the same as Precision::min and Precision::max, so we can avoid the duplication

max_nominee: &Precision<ScalarValue>,
max_value: &mut Precision<ScalarValue>,
) {
match (&max_value, max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
*max_value = max_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 < val2 =>
{
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_max = mem::take(max_value);
*max_value = exact_max.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*max_value = max_nominee.clone();
}
_ => {}
}
}

/// If the given value is numerically lesser than the original minimum value,
/// return the new minimum value with appropriate exactness information.
fn set_min_if_lesser(
min_nominee: &Precision<ScalarValue>,
min_value: &mut Precision<ScalarValue>,
) {
match (&min_value, min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
*min_value = min_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 > val2 =>
{
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_min = mem::take(min_value);
*min_value = exact_min.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*min_value = min_nominee.clone();
}
_ => {}
}
file_num_rows.add(&num_rows)
}

#[cfg(test)]
Expand Down