From c75ac61cfa51210e55f63ab552ee86e5f0b9a986 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 12:02:37 +0100 Subject: [PATCH 1/4] Add support for extra columns statistics --- rust/datafusion/src/datasource/datasource.rs | 8 ++++++- rust/datafusion/src/datasource/memory.rs | 23 ++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs index 5e9cb0044d3..21488fa9e64 100644 --- a/rust/datafusion/src/datasource/datasource.rs +++ b/rust/datafusion/src/datasource/datasource.rs @@ -20,10 +20,10 @@ use std::any::Any; use std::sync::Arc; -use crate::arrow::datatypes::SchemaRef; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::ExecutionPlan; +use crate::{arrow::datatypes::SchemaRef, scalar::ScalarValue}; /// This table statistics are estimates. /// It can not be used directly in the precise compute @@ -41,6 +41,12 @@ pub struct Statistics { pub struct ColumnStatistics { /// Number of null values on column pub null_count: Option, + /// Minimum value of column + pub max_value: Option, + /// Maximum value of column + pub min_value: Option, + /// Number of distinct values + pub distinct_count: Option, } /// Indicates whether and how a filter expression can be handled by a diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index 0fafa0f6925..51caa2eaba0 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -71,6 +71,9 @@ fn calculate_statistics( .iter() .map(|null_count| ColumnStatistics { null_count: Some(*null_count), + distinct_count: None, + max_value: None, + min_value: None, }) .collect(), ); @@ -248,16 +251,28 @@ mod tests { provider.statistics().column_statistics, Some(vec![ ColumnStatistics { - null_count: Some(0) + null_count: Some(0), + max_value: None, + min_value: None, + distinct_count: None, }, ColumnStatistics { - null_count: Some(0) + null_count: Some(0), + max_value: None, + min_value: None, + distinct_count: None, }, ColumnStatistics { - null_count: Some(0) + null_count: Some(0), + max_value: None, + min_value: None, + distinct_count: None, }, ColumnStatistics { - null_count: Some(2) + null_count: Some(2), + max_value: None, + min_value: None, + distinct_count: None, }, ]) ); From f47365e15e59b6005bcf913e3bf7e6bc2433cd09 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 7 Mar 2021 11:42:34 +0100 Subject: [PATCH 2/4] Add null countst to parquet reader stats --- rust/datafusion/src/physical_plan/parquet.rs | 33 +++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 348a924040a..ee01a4135e7 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -62,7 +62,7 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; -use crate::datasource::datasource::Statistics; +use crate::datasource::datasource::{ColumnStatistics, Statistics}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; @@ -151,6 +151,8 @@ impl ParquetExec { let chunks = split_files(&filenames, max_concurrency); let mut num_rows = 0; let mut total_byte_size = 0; + let mut null_counts = Vec::new(); + for chunk in chunks { let filenames: Vec = chunk.iter().map(|x| x.to_string()).collect(); for filename in &filenames { @@ -160,19 +162,42 @@ impl ParquetExec { let meta_data = arrow_reader.get_metadata(); // collect all the unique schemas in this data set let schema = arrow_reader.get_schema()?; + let num_fields = schema.fields().len(); if schemas.is_empty() || schema != schemas[0] { schemas.push(schema); + null_counts = vec![0; num_fields] } - for i in 0..meta_data.num_row_groups() { - let row_group_meta = meta_data.row_group(i); + for row_group_meta in meta_data.row_groups() { num_rows += row_group_meta.num_rows(); total_byte_size += row_group_meta.total_byte_size(); + + // Currently assumes every Parquet file has same schema + // https://issues.apache.org/jira/browse/ARROW-11017 + let columns_null_counts = row_group_meta + .columns() + .iter() + .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + + for (i, cnt) in columns_null_counts.enumerate() { + null_counts[i] += cnt + } } } + + let column_stats = null_counts + .iter() + .map(|null_count| ColumnStatistics { + null_count: Some(*null_count as usize), + max_value: None, + min_value: None, + distinct_count: None, + }) + .collect(); + let statistics = Statistics { num_rows: Some(num_rows as usize), total_byte_size: Some(total_byte_size as usize), - column_statistics: None, + column_statistics: Some(column_stats), }; partitions.push(ParquetPartition { filenames, From 36ef58646aaa9eb505c0050958f3fe26d49bd13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 6 Mar 2021 13:43:17 +0100 Subject: [PATCH 3/4] Fix comments --- rust/datafusion/src/datasource/datasource.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs index 21488fa9e64..7fe9f6af151 100644 --- a/rust/datafusion/src/datasource/datasource.rs +++ b/rust/datafusion/src/datasource/datasource.rs @@ -41,9 +41,9 @@ pub struct Statistics { pub struct ColumnStatistics { /// Number of null values on column pub null_count: Option, - /// Minimum value of column - pub max_value: Option, /// Maximum value of column + pub max_value: Option, + /// Minimum value of column pub min_value: Option, /// Number of distinct values pub distinct_count: Option, From 0b9f3fab9cd2f7d8f471cae99c4b11c4be2f307f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 7 Mar 2021 13:37:40 +0100 Subject: [PATCH 4/4] Add null stats to parquet table --- rust/datafusion/src/physical_plan/parquet.rs | 29 +++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index ee01a4135e7..99b7933e990 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -252,6 +252,8 @@ impl ParquetExec { // sum the statistics let mut num_rows: Option = None; let mut total_byte_size: Option = None; + let mut null_counts: Vec = vec![0; schema.fields().len()]; + let mut has_null_counts = false; for part in &partitions { if let Some(n) = part.statistics.num_rows { num_rows = Some(num_rows.unwrap_or(0) + n) @@ -259,11 +261,36 @@ impl ParquetExec { if let Some(n) = part.statistics.total_byte_size { total_byte_size = Some(total_byte_size.unwrap_or(0) + n) } + if let Some(x) = &part.statistics.column_statistics { + let part_nulls: Vec> = + x.iter().map(|c| c.null_count).collect(); + has_null_counts = true; + + for &i in projection.iter() { + null_counts[i] = part_nulls[i].unwrap_or(0); + } + } } + let column_stats = if has_null_counts { + Some( + null_counts + .iter() + .map(|null_count| ColumnStatistics { + null_count: Some(*null_count), + distinct_count: None, + max_value: None, + min_value: None, + }) + .collect(), + ) + } else { + None + }; + let statistics = Statistics { num_rows, total_byte_size, - column_statistics: None, + column_statistics: column_stats, }; Self { partitions,