From 53b0fcf9b1e8c4e62adfb0ccf959e811250765bf Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Sat, 10 Apr 2021 13:50:30 +1000 Subject: [PATCH 1/2] statistics by partition --- rust/datafusion/src/datasource/datasource.rs | 48 +++++++- rust/datafusion/src/datasource/empty.rs | 11 +- rust/datafusion/src/datasource/memory.rs | 94 ++++++--------- rust/datafusion/src/datasource/parquet.rs | 8 +- .../src/optimizer/hash_build_probe_order.rs | 33 ++++-- .../src/physical_optimizer/repartition.rs | 20 ++-- rust/datafusion/src/physical_plan/parquet.rs | 111 ++---------------- 7 files changed, 134 insertions(+), 191 deletions(-) diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs index e2b07336486..5a61cf367e8 100644 --- a/rust/datafusion/src/datasource/datasource.rs +++ b/rust/datafusion/src/datasource/datasource.rs @@ -25,18 +25,58 @@ use crate::logical_plan::Expr; use crate::physical_plan::ExecutionPlan; use crate::{arrow::datatypes::SchemaRef, scalar::ScalarValue}; -/// This table statistics are estimates. +/// These table statistics are estimates. /// It can not be used directly in the precise compute #[derive(Debug, Clone, Default)] pub struct Statistics { + /// The partition level statistics + pub partition_statistics: Vec, +} + +impl Statistics { /// The number of table rows + pub fn num_rows(&self) -> Option { + self.partition_statistics.iter().map(|ps| ps.num_rows).sum() + } + + /// total bytes of the table rows + pub fn total_byte_size(&self) -> Option { + self.partition_statistics + .iter() + .map(|ps| ps.total_byte_size) + .sum() + } + + /// Statistics on a column level for the table + pub fn column_statistics(&self) -> Option> { + self.partition_statistics + .iter() + .map(|ps| { + ps.clone().column_statistics.map(|cs| ColumnStatistics { + null_count: cs.iter().map(|cs| cs.null_count).sum(), + distinct_count: None, + max_value: None, + min_value: None, + }) + }) + .collect() + } +} + +/// These partitions statistics are estimates about the partition +#[derive(Clone, Default, Debug, PartialEq)] +pub struct PartitionStatistics { + /// The filename for this partition + pub filename: Option, + /// The number of partition rows pub num_rows: Option, - /// total byte of the table rows + /// total byte of the partition rows pub total_byte_size: Option, - /// Statistics on a column level + /// Statistics on a column level for this partition pub column_statistics: Option>, } -/// This table statistics are estimates about column + +/// These table statistics are estimates about column #[derive(Clone, Debug, PartialEq)] pub struct ColumnStatistics { /// Number of null values on column diff --git a/rust/datafusion/src/datasource/empty.rs b/rust/datafusion/src/datasource/empty.rs index e6140cdb8de..55766469b73 100644 --- a/rust/datafusion/src/datasource/empty.rs +++ b/rust/datafusion/src/datasource/empty.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow::datatypes::*; -use crate::datasource::datasource::Statistics; +use crate::datasource::datasource::{PartitionStatistics, Statistics}; use crate::datasource::TableProvider; use crate::error::Result; use crate::logical_plan::Expr; @@ -72,9 +72,12 @@ impl TableProvider for EmptyTable { fn statistics(&self) -> Statistics { Statistics { - num_rows: Some(0), - total_byte_size: Some(0), - column_statistics: None, + partition_statistics: vec![PartitionStatistics { + filename: None, + num_rows: Some(0), + total_byte_size: Some(0), + column_statistics: None, + }], } } } diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index af404808702..9190a27ccec 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -34,7 +34,7 @@ use crate::physical_plan::common; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::ExecutionPlan; use crate::{ - datasource::datasource::Statistics, + datasource::datasource::{PartitionStatistics, Statistics}, physical_plan::{repartition::RepartitionExec, Partitioning}, }; @@ -48,40 +48,32 @@ pub struct MemTable { } // Calculates statistics based on partitions -fn calculate_statistics( - schema: &SchemaRef, - partitions: &[Vec], -) -> Statistics { - let num_rows: usize = partitions +fn calculate_statistics(partitions: &[Vec]) -> Statistics { + let partition_statistics = partitions .iter() - .flat_map(|batches| batches.iter().map(RecordBatch::num_rows)) - .sum(); - - let mut null_count: Vec = vec![0; schema.fields().len()]; - for partition in partitions.iter() { - for batch in partition { - for (i, array) in batch.columns().iter().enumerate() { - null_count[i] += array.null_count(); - } - } - } - - let column_statistics = Some( - null_count - .iter() - .map(|null_count| ColumnStatistics { - null_count: Some(*null_count), - distinct_count: None, - max_value: None, - min_value: None, + .flat_map(|batches| { + batches.iter().map(|batch| PartitionStatistics { + filename: None, + num_rows: Some(batch.num_rows()), + total_byte_size: None, + column_statistics: Some( + batch + .columns() + .iter() + .map(|array| ColumnStatistics { + null_count: Some(array.null_count()), + distinct_count: None, + max_value: None, + min_value: None, + }) + .collect::>(), + ), }) - .collect(), - ); + }) + .collect::>(); Statistics { - num_rows: Some(num_rows), - total_byte_size: None, - column_statistics, + partition_statistics, } } @@ -93,7 +85,7 @@ impl MemTable { .flatten() .all(|batches| schema.contains(&batches.schema())) { - let statistics = calculate_statistics(&schema, &partitions); + let statistics = calculate_statistics(&partitions); debug!("MemTable statistics: {:?}", statistics); Ok(Self { @@ -247,35 +239,15 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; - assert_eq!(provider.statistics().num_rows, Some(3)); + assert_eq!(provider.statistics().num_rows(), Some(3)); assert_eq!( - provider.statistics().column_statistics, - Some(vec![ - ColumnStatistics { - null_count: Some(0), - max_value: None, - min_value: None, - distinct_count: None, - }, - ColumnStatistics { - null_count: Some(0), - max_value: None, - min_value: None, - distinct_count: None, - }, - ColumnStatistics { - null_count: Some(0), - max_value: None, - min_value: None, - distinct_count: None, - }, - ColumnStatistics { - null_count: Some(2), - max_value: None, - min_value: None, - distinct_count: None, - }, - ]) + provider.statistics().column_statistics(), + Some(vec![ColumnStatistics { + null_count: Some(2), + max_value: None, + min_value: None, + distinct_count: None, + },]) ); // scan with projection @@ -465,7 +437,7 @@ mod tests { let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); assert_eq!(3, batch1.num_columns()); - assert_eq!(provider.statistics().num_rows, Some(6)); + assert_eq!(provider.statistics().num_rows(), Some(6)); Ok(()) } diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 30e47df5f64..0919b820ccc 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -48,7 +48,9 @@ impl ParquetTable { Ok(Self { path: path.to_string(), schema, - statistics: parquet_exec.statistics().to_owned(), + statistics: Statistics { + partition_statistics: parquet_exec.partitions().to_owned(), + }, max_concurrency, }) } @@ -130,8 +132,8 @@ mod tests { .await; // test metadata - assert_eq!(table.statistics().num_rows, Some(8)); - assert_eq!(table.statistics().total_byte_size, Some(671)); + assert_eq!(table.statistics().num_rows(), Some(8)); + assert_eq!(table.statistics().total_byte_size(), Some(671)); Ok(()) } diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index f44050f0b72..8879861847a 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -1,5 +1,5 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file +// Licensed to the Apache Software Found filename: (), num_rows: (), total_byte_size: (), column_statistics: ()ation (ASF) under one +// or more c total_byte_size: (), column_statistics: ()ontri, total_byte_size: (), column_statistics: ()butor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the @@ -39,7 +39,7 @@ pub struct HashBuildProbeOrder {} // Gets exact number of rows, if known by the statistics of the underlying fn get_num_rows(logical_plan: &LogicalPlan) -> Option { match logical_plan { - LogicalPlan::TableScan { source, .. } => source.statistics().num_rows, + LogicalPlan::TableScan { source, .. } => source.statistics().num_rows(), LogicalPlan::EmptyRelation { produce_one_row, .. } => { @@ -187,13 +187,14 @@ mod tests { use std::sync::Arc; use crate::{ - datasource::{datasource::Statistics, TableProvider}, + datasource::datasource::{PartitionStatistics, Statistics}, + datasource::TableProvider, logical_plan::{DFSchema, Expr}, test::*, }; struct TestTableProvider { - num_rows: usize, + partition_statistics: Vec, } impl TableProvider for TestTableProvider { @@ -215,9 +216,7 @@ mod tests { } fn statistics(&self) -> crate::datasource::datasource::Statistics { Statistics { - num_rows: Some(self.num_rows), - total_byte_size: None, - column_statistics: None, + partition_statistics: self.partition_statistics.clone(), } } } @@ -236,7 +235,14 @@ mod tests { let lp_left = LogicalPlan::TableScan { table_name: "left".to_string(), projection: None, - source: Arc::new(TestTableProvider { num_rows: 1000 }), + source: Arc::new(TestTableProvider { + partition_statistics: vec![PartitionStatistics { + filename: None, + num_rows: Some(1000), + total_byte_size: None, + column_statistics: None, + }], + }), projected_schema: Arc::new(DFSchema::empty()), filters: vec![], limit: None, @@ -245,7 +251,14 @@ mod tests { let lp_right = LogicalPlan::TableScan { table_name: "right".to_string(), projection: None, - source: Arc::new(TestTableProvider { num_rows: 100 }), + source: Arc::new(TestTableProvider { + partition_statistics: vec![PartitionStatistics { + filename: None, + num_rows: Some(100), + total_byte_size: None, + column_statistics: None, + }], + }), projected_schema: Arc::new(DFSchema::empty()), filters: vec![], limit: None, diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs index 82f46f9cbbb..7427d5d55f6 100644 --- a/rust/datafusion/src/physical_optimizer/repartition.rs +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -106,8 +106,8 @@ mod tests { use arrow::datatypes::Schema; use super::*; - use crate::datasource::datasource::Statistics; - use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; + use crate::datasource::datasource::PartitionStatistics; + use crate::physical_plan::parquet::ParquetExec; use crate::physical_plan::projection::ProjectionExec; #[test] @@ -115,9 +115,11 @@ mod tests { let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( - vec![ParquetPartition { - filenames: vec!["x".to_string()], - statistics: Statistics::default(), + vec![PartitionStatistics { + filename: Some("x".to_string()), + num_rows: None, + total_byte_size: None, + column_statistics: None, }], Schema::empty(), None, @@ -151,9 +153,11 @@ mod tests { Arc::new(ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( - vec![ParquetPartition { - filenames: vec!["x".to_string()], - statistics: Statistics::default(), + vec![PartitionStatistics { + filename: Some("x".to_string()), + num_rows: None, + total_byte_size: None, + column_statistics: None, }], Schema::empty(), None, diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index d41d6968fee..d911a912d5e 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -65,7 +65,7 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; -use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::datasource::{ColumnStatistics, PartitionStatistics}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; @@ -73,38 +73,19 @@ use futures::stream::{Stream, StreamExt}; #[derive(Debug, Clone)] pub struct ParquetExec { /// Parquet partitions to read - partitions: Vec, + partitions: Vec, /// Schema after projection is applied schema: SchemaRef, /// Projection for which columns to load projection: Vec, /// Batch size batch_size: usize, - /// Statistics for the data set (sum of statistics for all partitions) - statistics: Statistics, /// Optional predicate builder predicate_builder: Option, /// Optional limit of the number of rows limit: Option, } -/// Represents one partition of a Parquet data set and this currently means one Parquet file. -/// -/// In the future it would be good to support subsets of files based on ranges of row groups -/// so that we can better parallelize reads of large files across available cores (see -/// [ARROW-10995](https://issues.apache.org/jira/browse/ARROW-10995)). -/// -/// We may also want to support reading Parquet files that are partitioned based on a key and -/// in this case we would want this partition struct to represent multiple files for a given -/// partition key (see [ARROW-11019](https://issues.apache.org/jira/browse/ARROW-11019)). -#[derive(Debug, Clone)] -pub struct ParquetPartition { - /// The Parquet filename for this partition - pub filenames: Vec, - /// Statistics for this partition - pub statistics: Statistics, -} - impl ParquetExec { /// Create a new Parquet reader execution plan based on the specified Parquet filename or /// directory containing Parquet files @@ -209,16 +190,13 @@ impl ParquetExec { }) .collect(); - let statistics = Statistics { + // remove files that are not needed in case of limit + filenames.truncate(total_files); + partitions.push(PartitionStatistics { + filename: filenames.first().cloned(), num_rows: Some(num_rows as usize), total_byte_size: Some(total_byte_size as usize), column_statistics: Some(column_stats), - }; - // remove files that are not needed in case of limit - filenames.truncate(total_files); - partitions.push(ParquetPartition { - filenames, - statistics, }); if limit_exhausted { break; @@ -252,7 +230,7 @@ impl ParquetExec { /// Create a new Parquet reader execution plan with provided partitions and schema pub fn new( - partitions: Vec, + partition_statistics: Vec, schema: Schema, projection: Option>, predicate_builder: Option, @@ -271,62 +249,18 @@ impl ParquetExec { .collect(), ); - // sum the statistics - let mut num_rows: Option = None; - let mut total_byte_size: Option = None; - let mut null_counts: Vec = vec![0; schema.fields().len()]; - let mut has_null_counts = false; - for part in &partitions { - if let Some(n) = part.statistics.num_rows { - num_rows = Some(num_rows.unwrap_or(0) + n) - } - if let Some(n) = part.statistics.total_byte_size { - total_byte_size = Some(total_byte_size.unwrap_or(0) + n) - } - if let Some(x) = &part.statistics.column_statistics { - let part_nulls: Vec> = - x.iter().map(|c| c.null_count).collect(); - has_null_counts = true; - - for &i in projection.iter() { - null_counts[i] = part_nulls[i].unwrap_or(0); - } - } - } - let column_stats = if has_null_counts { - Some( - null_counts - .iter() - .map(|null_count| ColumnStatistics { - null_count: Some(*null_count), - distinct_count: None, - max_value: None, - min_value: None, - }) - .collect(), - ) - } else { - None - }; - - let statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: column_stats, - }; Self { - partitions, + partitions: partition_statistics, schema: Arc::new(projected_schema), projection, predicate_builder, batch_size, - statistics, limit, } } /// Parquet partitions to read - pub fn partitions(&self) -> &[ParquetPartition] { + pub fn partitions(&self) -> &[PartitionStatistics] { &self.partitions } @@ -339,31 +273,6 @@ impl ParquetExec { pub fn batch_size(&self) -> usize { self.batch_size } - - /// Statistics for the data set (sum of statistics for all partitions) - pub fn statistics(&self) -> &Statistics { - &self.statistics - } -} - -impl ParquetPartition { - /// Create a new parquet partition - pub fn new(filenames: Vec, statistics: Statistics) -> Self { - Self { - filenames, - statistics, - } - } - - /// The Parquet filename for this partition - pub fn filenames(&self) -> &[String] { - &self.filenames - } - - /// Statistics for this partition - pub fn statistics(&self) -> &Statistics { - &self.statistics - } } #[derive(Debug, Clone)] @@ -854,7 +763,7 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let filenames = self.partitions[partition].filenames.clone(); + let filenames = vec![self.partitions[partition].filename.clone().unwrap()]; let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); let batch_size = self.batch_size; From 1b7830f731c661038811e014ccde2690a7a94846 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Sat, 10 Apr 2021 14:06:56 +1000 Subject: [PATCH 2/2] fix accidentally modified license --- rust/datafusion/src/optimizer/hash_build_probe_order.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index 8879861847a..c84e1711e34 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -1,5 +1,5 @@ -// Licensed to the Apache Software Found filename: (), num_rows: (), total_byte_size: (), column_statistics: ()ation (ASF) under one -// or more c total_byte_size: (), column_statistics: ()ontri, total_byte_size: (), column_statistics: ()butor license agreements. See the NOTICE file +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the @@ -13,7 +13,7 @@ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations -// under the License +// under the License. //! Optimizer rule to switch build and probe order of hash join //! based on statistics of a `TableProvider`. If the number of