Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rust/datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
use std::any::Any;
use std::sync::Arc;

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;
use crate::{arrow::datatypes::SchemaRef, scalar::ScalarValue};

/// This table statistics are estimates.
/// It can not be used directly in the precise compute
Expand All @@ -41,6 +41,12 @@ pub struct Statistics {
pub struct ColumnStatistics {
/// Number of null values on column
pub null_count: Option<usize>,
/// Maximum value of column
pub max_value: Option<ScalarValue>,
/// Minimum value of column
pub min_value: Option<ScalarValue>,
/// Number of distinct values
pub distinct_count: Option<usize>,
}

/// Indicates whether and how a filter expression can be handled by a
Expand Down
23 changes: 19 additions & 4 deletions rust/datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ fn calculate_statistics(
.iter()
.map(|null_count| ColumnStatistics {
null_count: Some(*null_count),
distinct_count: None,
max_value: None,
min_value: None,
})
.collect(),
);
Expand Down Expand Up @@ -248,16 +251,28 @@ mod tests {
provider.statistics().column_statistics,
Some(vec![
ColumnStatistics {
null_count: Some(0)
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(0)
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(0)
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(2)
null_count: Some(2),
max_value: None,
min_value: None,
distinct_count: None,
},
])
);
Expand Down
62 changes: 57 additions & 5 deletions rust/datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use tokio::{
};
use tokio_stream::wrappers::ReceiverStream;

use crate::datasource::datasource::Statistics;
use crate::datasource::datasource::{ColumnStatistics, Statistics};
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};

Expand Down Expand Up @@ -151,6 +151,8 @@ impl ParquetExec {
let chunks = split_files(&filenames, max_concurrency);
let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = Vec::new();

for chunk in chunks {
let filenames: Vec<String> = chunk.iter().map(|x| x.to_string()).collect();
for filename in &filenames {
Expand All @@ -160,19 +162,42 @@ impl ParquetExec {
let meta_data = arrow_reader.get_metadata();
// collect all the unique schemas in this data set
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
if schemas.is_empty() || schema != schemas[0] {
schemas.push(schema);
null_counts = vec![0; num_fields]
}
for i in 0..meta_data.num_row_groups() {
let row_group_meta = meta_data.row_group(i);
for row_group_meta in meta_data.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();

// Currently assumes every Parquet file has same schema
// https://issues.apache.org/jira/browse/ARROW-11017
let columns_null_counts = row_group_meta
.columns()
.iter()
.flat_map(|c| c.statistics().map(|stats| stats.null_count()));

for (i, cnt) in columns_null_counts.enumerate() {
null_counts[i] += cnt
}
}
}

let column_stats = null_counts
.iter()
.map(|null_count| ColumnStatistics {
null_count: Some(*null_count as usize),
max_value: None,
min_value: None,
distinct_count: None,
})
.collect();

let statistics = Statistics {
num_rows: Some(num_rows as usize),
total_byte_size: Some(total_byte_size as usize),
column_statistics: None,
column_statistics: Some(column_stats),
};
partitions.push(ParquetPartition {
filenames,
Expand Down Expand Up @@ -227,18 +252,45 @@ impl ParquetExec {
// sum the statistics
let mut num_rows: Option<usize> = None;
let mut total_byte_size: Option<usize> = None;
let mut null_counts: Vec<usize> = vec![0; schema.fields().len()];
let mut has_null_counts = false;
for part in &partitions {
if let Some(n) = part.statistics.num_rows {
num_rows = Some(num_rows.unwrap_or(0) + n)
}
if let Some(n) = part.statistics.total_byte_size {
total_byte_size = Some(total_byte_size.unwrap_or(0) + n)
}
if let Some(x) = &part.statistics.column_statistics {
let part_nulls: Vec<Option<usize>> =
x.iter().map(|c| c.null_count).collect();
has_null_counts = true;

for &i in projection.iter() {
null_counts[i] = part_nulls[i].unwrap_or(0);
}
}
}
let column_stats = if has_null_counts {
Some(
null_counts
.iter()
.map(|null_count| ColumnStatistics {
null_count: Some(*null_count),
distinct_count: None,
max_value: None,
min_value: None,
})
.collect(),
)
} else {
None
};

let statistics = Statistics {
num_rows,
total_byte_size,
column_statistics: None,
column_statistics: column_stats,
};
Self {
partitions,
Expand Down