diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 7976be7913c8a..8ac6253af794f 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -37,6 +37,7 @@ use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; use super::PhysicalPlanConfig; use crate::arrow::datatypes::{DataType, Field}; +use crate::datasource::flatten_schema; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::DataFusionError; @@ -123,7 +124,7 @@ impl FileFormat for ParquetFormat { fn summarize_min_max( max_values: &mut Vec>, min_values: &mut Vec>, - fields: &[Field], + fields: &Vec, i: usize, stat: &ParquetStatistics, ) { @@ -258,8 +259,9 @@ fn fetch_statistics(object_reader: Arc) -> Result let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); let schema = arrow_reader.get_schema()?; - let num_fields = schema.fields().len(); - let fields = schema.fields().to_vec(); + let flat_schema = flatten_schema(&schema); + let num_fields = flat_schema.len(); + let fields = flat_schema.clone(); let meta_data = arrow_reader.get_metadata(); let mut num_rows = 0; diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9f4f77f7ea285..bf7982005fdaf 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -24,6 +24,7 @@ pub mod listing; pub mod memory; pub mod object_store; +use arrow::datatypes::{DataType, Field}; use futures::Stream; pub use self::datasource::{TableProvider, TableType}; @@ -48,8 +49,9 @@ pub async fn get_statistics_with_limit( ) -> Result<(Vec, Statistics)> { let mut result_files = vec![]; + let total_num_fields = total_number_of_fields(&file_schema); let mut total_byte_size = 0; - let mut null_counts = vec![0; file_schema.fields().len()]; + let mut null_counts = vec![0; total_num_fields]; let mut has_statistics = false; let (mut max_values, mut min_values) = create_max_min_accs(&file_schema); @@ -160,13 +162,12 @@ impl std::fmt::Display for PartitionedFile { fn create_max_min_accs( schema: &Schema, ) -> (Vec>, Vec>) { - let max_values: Vec> = schema - .fields() + let flat_schema = flatten_schema(schema); + let max_values: Vec> = flat_schema .iter() .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) .collect::>(); - let min_values: Vec> = schema - .fields() + let min_values: Vec> = flat_schema .iter() .map(|field| MinAccumulator::try_new(field.data_type()).ok()) .collect::>(); @@ -179,7 +180,8 @@ fn get_col_stats( max_values: &mut Vec>, min_values: &mut Vec>, ) -> Vec { - (0..schema.fields().len()) + let total_num_fields = total_number_of_fields(schema); + (0..total_num_fields) .map(|i| { let max_value = match &max_values[i] { Some(max_value) => max_value.evaluate().ok(), @@ -198,3 +200,153 @@ fn get_col_stats( }) .collect() } + +fn total_number_of_fields(schema: &Schema) -> usize { + fn count_children(field: &Field) -> usize { + let mut num_children: usize = 0; + match field.data_type() { + DataType::Struct(fields) | DataType::Union(fields) => { + let counts_arr = fields.iter().map(|f| count_children(f)).collect::>(); + let c: usize = counts_arr.iter().sum(); + num_children += c + }, + DataType::List(f) + | DataType::LargeList(f) + | DataType::FixedSizeList(f, _) + | DataType::Map(f, _) => { + let c: usize = count_children(f); + num_children += c + }, + _ => num_children += 1, + } + num_children + } + let top_level_fields = schema.fields(); + let top_level_counts = top_level_fields.iter().map(|i| count_children(i)).collect::>(); + top_level_counts.iter().sum() +} + +fn flatten_schema(schema: &Schema) -> Vec { + fn fetch_children(field: Field) -> Vec { + let mut collected_fields: Vec = vec![]; + let data_type = field.data_type(); + match data_type { + DataType::Struct(fields) | DataType::Union(fields) => collected_fields + .extend(fields.iter().map(|f| { + let full_name = format!("{}.{}", field.name(), f.name()); + let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable()); + fetch_children(f_new) + }).flatten()), + DataType::List(f) + | DataType::LargeList(f) + | DataType::FixedSizeList(f, _) + | DataType::Map(f, _) => { + let full_name = format!("{}.{}", field.name(), f.name()); + let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable()); + collected_fields.extend(fetch_children(f_new)) + }, + _ => collected_fields.push(field), + } + collected_fields + } + let top_level_fields = schema.fields(); + let flatten = top_level_fields + .iter() + .map(|f| fetch_children(f.clone())) + .flatten() + .collect(); + flatten +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::TimeUnit; + + use super::*; + + #[tokio::test] + async fn test_total_number_of_fields() -> Result<()> { + let fields: Vec = vec![ + Field::new("id", DataType::Int16, false), + Field::new("name", DataType::Utf8, false), + Field::new("nested1", DataType::Struct(vec![ + Field::new("str1", DataType::Utf8, false), + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), + ]), false), + Field::new("nested2", DataType::Struct(vec![ + Field::new("nested_a", DataType::Struct(vec![ + Field::new("another_nested", DataType::Struct(vec![ + Field::new("idx", DataType::UInt8, false), + Field::new("no", DataType::UInt8, false), + ]), false), + Field::new("id2", DataType::UInt16, false), + ]), false), + Field::new("nested_b", DataType::Struct(vec![ + Field::new("nested_x", DataType::Struct(vec![ + Field::new("nested_y", DataType::Struct(vec![ + Field::new("desc", DataType::Utf8, false), + ]), false), + ]), false), + ]), false), + ]), false), + ]; + + assert_eq!(8, total_number_of_fields(&Schema::new(fields))); + + Ok(()) + } + + #[tokio::test] + async fn test_total_number_of_fields_empty_struct() -> Result<()> { + + let fields = vec![ + Field::new("empty_nested", DataType::Struct(vec![]), false), + ]; + + assert_eq!(0, total_number_of_fields(&Schema::new(fields))); + + Ok(()) + } + + #[tokio::test] + async fn test_flatten_schema() -> Result<()> { + let fields: Vec = vec![ + Field::new("id", DataType::Int16, false), + Field::new("name", DataType::Utf8, false), + Field::new("nested1", DataType::Struct(vec![ + Field::new("str1", DataType::Utf8, false), + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), + ]), false), + Field::new("nested2", DataType::Struct(vec![ + Field::new("nested_a", DataType::Struct(vec![ + Field::new("another_nested", DataType::Struct(vec![ + Field::new("idx", DataType::UInt8, false), + Field::new("no", DataType::UInt8, false), + ]), false), + Field::new("id2", DataType::UInt16, false), + ]), false), + Field::new("nested_b", DataType::Struct(vec![ + Field::new("nested_x", DataType::Struct(vec![ + Field::new("nested_y", DataType::Struct(vec![ + Field::new("desc", DataType::Utf8, false), + ]), false), + ]), false), + ]), false), + ]), false), + ]; + + let flat_schema = flatten_schema(&Schema::new(fields)); + assert_eq!(8, flat_schema.len()); + assert_eq!(Field::new("id", DataType::Int16, false), flat_schema[0]); + assert_eq!(Field::new("name", DataType::Utf8, false), flat_schema[1]); + assert_eq!(Field::new("nested1.str1", DataType::Utf8, false), flat_schema[2]); + assert_eq!(Field::new("nested1.ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), flat_schema[3]); + assert_eq!(Field::new("nested2.nested_a.another_nested.idx", DataType::UInt8, false), flat_schema[4]); + assert_eq!(Field::new("nested2.nested_a.another_nested.no", DataType::UInt8, false), flat_schema[5]); + assert_eq!(Field::new("nested2.nested_a.id2", DataType::UInt16, false), flat_schema[6]); + assert_eq!(Field::new("nested2.nested_b.nested_x.nested_y.desc", DataType::Utf8, false), flat_schema[7]); + + Ok(()) + } + +} \ No newline at end of file