From d6891400fd7df94beec55df4a6caf8eb9b2e9610 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Thu, 2 Dec 2021 20:36:54 +0200 Subject: [PATCH 1/2] Fix index out of bounds for stats on nested fields Closes #1383 --- .../src/datasource/file_format/parquet.rs | 11 ++++-- datafusion/src/datasource/mod.rs | 38 ++++++++++++++++--- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 7976be7913c8a..b9b851b1bdf76 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -37,6 +37,7 @@ use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; use super::PhysicalPlanConfig; use crate::arrow::datatypes::{DataType, Field}; +use crate::datasource::flatten_schema; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::DataFusionError; @@ -123,7 +124,7 @@ impl FileFormat for ParquetFormat { fn summarize_min_max( max_values: &mut Vec>, min_values: &mut Vec>, - fields: &[Field], + fields: &Vec<&Field>, i: usize, stat: &ParquetStatistics, ) { @@ -258,8 +259,9 @@ fn fetch_statistics(object_reader: Arc) -> Result let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); let schema = arrow_reader.get_schema()?; - let num_fields = schema.fields().len(); - let fields = schema.fields().to_vec(); + let flat_schema = flatten_schema(&schema); + let num_fields = flat_schema.len(); + let fields = flat_schema.clone(); let meta_data = arrow_reader.get_metadata(); let mut num_rows = 0; @@ -278,6 +280,9 @@ fn fetch_statistics(object_reader: Arc) -> Result .iter() .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + let cols_vec: Vec = columns_null_counts.clone().collect(); + let cols_vec_num = cols_vec.len(); + for (i, cnt) in columns_null_counts.enumerate() { null_counts[i] += cnt as usize } diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9f4f77f7ea285..269b4e2865383 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -24,6 +24,7 @@ pub mod listing; pub mod memory; pub mod object_store; +use arrow::datatypes::{DataType, Field}; use futures::Stream; pub use self::datasource::{TableProvider, TableType}; @@ -48,8 +49,9 @@ pub async fn get_statistics_with_limit( ) -> Result<(Vec, Statistics)> { let mut result_files = vec![]; + let flat_schema = flatten_schema(&file_schema); let mut total_byte_size = 0; - let mut null_counts = vec![0; file_schema.fields().len()]; + let mut null_counts = vec![0; flat_schema.len()]; let mut has_statistics = false; let (mut max_values, mut min_values) = create_max_min_accs(&file_schema); @@ -160,13 +162,12 @@ impl std::fmt::Display for PartitionedFile { fn create_max_min_accs( schema: &Schema, ) -> (Vec>, Vec>) { - let max_values: Vec> = schema - .fields() + let flat_schema = flatten_schema(schema); + let max_values: Vec> = flat_schema .iter() .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) .collect::>(); - let min_values: Vec> = schema - .fields() + let min_values: Vec> = flat_schema .iter() .map(|field| MinAccumulator::try_new(field.data_type()).ok()) .collect::>(); @@ -179,7 +180,8 @@ fn get_col_stats( max_values: &mut Vec>, min_values: &mut Vec>, ) -> Vec { - (0..schema.fields().len()) + let flat_schema = flatten_schema(schema); + (0..flat_schema.len()) .map(|i| { let max_value = match &max_values[i] { Some(max_value) => max_value.evaluate().ok(), @@ -198,3 +200,27 @@ fn get_col_stats( }) .collect() } + +fn flatten_schema(schema: &Schema) -> Vec<&Field> { + fn fetch_children(field: &Field) -> Vec<&Field> { + let mut collected_fields: Vec<&Field> = vec![]; + let data_type = field.data_type(); + match data_type { + DataType::Struct(fields) | DataType::Union(fields) => collected_fields + .extend(fields.iter().map(|f| fetch_children(f)).flatten()), + DataType::List(f) + | DataType::LargeList(f) + | DataType::FixedSizeList(f, _) + | DataType::Map(f, _) => collected_fields.extend(fetch_children(f)), + _ => collected_fields.push(field), + } + collected_fields + } + let top_level_fields = schema.fields(); + let flatten = top_level_fields + .iter() + .map(|f| fetch_children(f)) + .flatten() + .collect(); + flatten +} From a25fd496e27625d48162a59f677642733f441d5b Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Fri, 3 Dec 2021 21:47:15 +0200 Subject: [PATCH 2/2] Added tests for flattening schema and counting total number of leaves --- .../src/datasource/file_format/parquet.rs | 5 +- datafusion/src/datasource/mod.rs | 146 ++++++++++++++++-- 2 files changed, 137 insertions(+), 14 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index b9b851b1bdf76..8ac6253af794f 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -124,7 +124,7 @@ impl FileFormat for ParquetFormat { fn summarize_min_max( max_values: &mut Vec>, min_values: &mut Vec>, - fields: &Vec<&Field>, + fields: &Vec, i: usize, stat: &ParquetStatistics, ) { @@ -280,9 +280,6 @@ fn fetch_statistics(object_reader: Arc) -> Result .iter() .flat_map(|c| c.statistics().map(|stats| stats.null_count())); - let cols_vec: Vec = columns_null_counts.clone().collect(); - let cols_vec_num = cols_vec.len(); - for (i, cnt) in columns_null_counts.enumerate() { null_counts[i] += cnt as usize } diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 269b4e2865383..bf7982005fdaf 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -49,9 +49,9 @@ pub async fn get_statistics_with_limit( ) -> Result<(Vec, Statistics)> { let mut result_files = vec![]; - let flat_schema = flatten_schema(&file_schema); + let total_num_fields = total_number_of_fields(&file_schema); let mut total_byte_size = 0; - let mut null_counts = vec![0; flat_schema.len()]; + let mut null_counts = vec![0; total_num_fields]; let mut has_statistics = false; let (mut max_values, mut min_values) = create_max_min_accs(&file_schema); @@ -180,8 +180,8 @@ fn get_col_stats( max_values: &mut Vec>, min_values: &mut Vec>, ) -> Vec { - let flat_schema = flatten_schema(schema); - (0..flat_schema.len()) + let total_num_fields = total_number_of_fields(schema); + (0..total_num_fields) .map(|i| { let max_value = match &max_values[i] { Some(max_value) => max_value.evaluate().ok(), @@ -201,17 +201,50 @@ fn get_col_stats( .collect() } -fn flatten_schema(schema: &Schema) -> Vec<&Field> { - fn fetch_children(field: &Field) -> Vec<&Field> { - let mut collected_fields: Vec<&Field> = vec![]; +fn total_number_of_fields(schema: &Schema) -> usize { + fn count_children(field: &Field) -> usize { + let mut num_children: usize = 0; + match field.data_type() { + DataType::Struct(fields) | DataType::Union(fields) => { + let counts_arr = fields.iter().map(|f| count_children(f)).collect::>(); + let c: usize = counts_arr.iter().sum(); + num_children += c + }, + DataType::List(f) + | DataType::LargeList(f) + | DataType::FixedSizeList(f, _) + | DataType::Map(f, _) => { + let c: usize = count_children(f); + num_children += c + }, + _ => num_children += 1, + } + num_children + } + let top_level_fields = schema.fields(); + let top_level_counts = top_level_fields.iter().map(|i| count_children(i)).collect::>(); + top_level_counts.iter().sum() +} + +fn flatten_schema(schema: &Schema) -> Vec { + fn fetch_children(field: Field) -> Vec { + let mut collected_fields: Vec = vec![]; let data_type = field.data_type(); match data_type { DataType::Struct(fields) | DataType::Union(fields) => collected_fields - .extend(fields.iter().map(|f| fetch_children(f)).flatten()), + .extend(fields.iter().map(|f| { + let full_name = format!("{}.{}", field.name(), f.name()); + let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable()); + fetch_children(f_new) + }).flatten()), DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) - | DataType::Map(f, _) => collected_fields.extend(fetch_children(f)), + | DataType::Map(f, _) => { + let full_name = format!("{}.{}", field.name(), f.name()); + let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable()); + collected_fields.extend(fetch_children(f_new)) + }, _ => collected_fields.push(field), } collected_fields @@ -219,8 +252,101 @@ fn flatten_schema(schema: &Schema) -> Vec<&Field> { let top_level_fields = schema.fields(); let flatten = top_level_fields .iter() - .map(|f| fetch_children(f)) + .map(|f| fetch_children(f.clone())) .flatten() .collect(); flatten } + +#[cfg(test)] +mod tests { + use arrow::datatypes::TimeUnit; + + use super::*; + + #[tokio::test] + async fn test_total_number_of_fields() -> Result<()> { + let fields: Vec = vec![ + Field::new("id", DataType::Int16, false), + Field::new("name", DataType::Utf8, false), + Field::new("nested1", DataType::Struct(vec![ + Field::new("str1", DataType::Utf8, false), + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), + ]), false), + Field::new("nested2", DataType::Struct(vec![ + Field::new("nested_a", DataType::Struct(vec![ + Field::new("another_nested", DataType::Struct(vec![ + Field::new("idx", DataType::UInt8, false), + Field::new("no", DataType::UInt8, false), + ]), false), + Field::new("id2", DataType::UInt16, false), + ]), false), + Field::new("nested_b", DataType::Struct(vec![ + Field::new("nested_x", DataType::Struct(vec![ + Field::new("nested_y", DataType::Struct(vec![ + Field::new("desc", DataType::Utf8, false), + ]), false), + ]), false), + ]), false), + ]), false), + ]; + + assert_eq!(8, total_number_of_fields(&Schema::new(fields))); + + Ok(()) + } + + #[tokio::test] + async fn test_total_number_of_fields_empty_struct() -> Result<()> { + + let fields = vec![ + Field::new("empty_nested", DataType::Struct(vec![]), false), + ]; + + assert_eq!(0, total_number_of_fields(&Schema::new(fields))); + + Ok(()) + } + + #[tokio::test] + async fn test_flatten_schema() -> Result<()> { + let fields: Vec = vec![ + Field::new("id", DataType::Int16, false), + Field::new("name", DataType::Utf8, false), + Field::new("nested1", DataType::Struct(vec![ + Field::new("str1", DataType::Utf8, false), + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), + ]), false), + Field::new("nested2", DataType::Struct(vec![ + Field::new("nested_a", DataType::Struct(vec![ + Field::new("another_nested", DataType::Struct(vec![ + Field::new("idx", DataType::UInt8, false), + Field::new("no", DataType::UInt8, false), + ]), false), + Field::new("id2", DataType::UInt16, false), + ]), false), + Field::new("nested_b", DataType::Struct(vec![ + Field::new("nested_x", DataType::Struct(vec![ + Field::new("nested_y", DataType::Struct(vec![ + Field::new("desc", DataType::Utf8, false), + ]), false), + ]), false), + ]), false), + ]), false), + ]; + + let flat_schema = flatten_schema(&Schema::new(fields)); + assert_eq!(8, flat_schema.len()); + assert_eq!(Field::new("id", DataType::Int16, false), flat_schema[0]); + assert_eq!(Field::new("name", DataType::Utf8, false), flat_schema[1]); + assert_eq!(Field::new("nested1.str1", DataType::Utf8, false), flat_schema[2]); + assert_eq!(Field::new("nested1.ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), flat_schema[3]); + assert_eq!(Field::new("nested2.nested_a.another_nested.idx", DataType::UInt8, false), flat_schema[4]); + assert_eq!(Field::new("nested2.nested_a.another_nested.no", DataType::UInt8, false), flat_schema[5]); + assert_eq!(Field::new("nested2.nested_a.id2", DataType::UInt16, false), flat_schema[6]); + assert_eq!(Field::new("nested2.nested_b.nested_x.nested_y.desc", DataType::Utf8, false), flat_schema[7]); + + Ok(()) + } + +} \ No newline at end of file