Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use parquet::file::statistics::Statistics as ParquetStatistics;
use super::FileFormat;
use super::PhysicalPlanConfig;
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::flatten_schema;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::DataFusionError;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl FileFormat for ParquetFormat {
fn summarize_min_max(
max_values: &mut Vec<Option<MaxAccumulator>>,
min_values: &mut Vec<Option<MinAccumulator>>,
fields: &[Field],
fields: &Vec<Field>,
i: usize,
stat: &ParquetStatistics,
) {
Expand Down Expand Up @@ -258,8 +259,9 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
let fields = schema.fields().to_vec();
let flat_schema = flatten_schema(&schema);
let num_fields = flat_schema.len();
let fields = flat_schema.clone();
let meta_data = arrow_reader.get_metadata();

let mut num_rows = 0;
Expand Down
164 changes: 158 additions & 6 deletions datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod listing;
pub mod memory;
pub mod object_store;

use arrow::datatypes::{DataType, Field};
use futures::Stream;

pub use self::datasource::{TableProvider, TableType};
Expand All @@ -48,8 +49,9 @@ pub async fn get_statistics_with_limit(
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];

let total_num_fields = total_number_of_fields(&file_schema);
let mut total_byte_size = 0;
let mut null_counts = vec![0; file_schema.fields().len()];
let mut null_counts = vec![0; total_num_fields];
let mut has_statistics = false;
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);

Expand Down Expand Up @@ -160,13 +162,12 @@ impl std::fmt::Display for PartitionedFile {
fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
let flat_schema = flatten_schema(schema);
let max_values: Vec<Option<MaxAccumulator>> = flat_schema
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
let min_values: Vec<Option<MinAccumulator>> = flat_schema
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
Expand All @@ -179,7 +180,8 @@ fn get_col_stats(
max_values: &mut Vec<Option<MaxAccumulator>>,
min_values: &mut Vec<Option<MinAccumulator>>,
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
let total_num_fields = total_number_of_fields(schema);
(0..total_num_fields)
.map(|i| {
let max_value = match &max_values[i] {
Some(max_value) => max_value.evaluate().ok(),
Expand All @@ -198,3 +200,153 @@ fn get_col_stats(
})
.collect()
}

fn total_number_of_fields(schema: &Schema) -> usize {
fn count_children(field: &Field) -> usize {
let mut num_children: usize = 0;
match field.data_type() {
DataType::Struct(fields) | DataType::Union(fields) => {
let counts_arr = fields.iter().map(|f| count_children(f)).collect::<Vec<usize>>();
let c: usize = counts_arr.iter().sum();
num_children += c
},
DataType::List(f)
| DataType::LargeList(f)
| DataType::FixedSizeList(f, _)
| DataType::Map(f, _) => {
let c: usize = count_children(f);
num_children += c
},
_ => num_children += 1,
}
num_children
}
let top_level_fields = schema.fields();
let top_level_counts = top_level_fields.iter().map(|i| count_children(i)).collect::<Vec<usize>>();
top_level_counts.iter().sum()
}

fn flatten_schema(schema: &Schema) -> Vec<Field> {
fn fetch_children(field: Field) -> Vec<Field> {
let mut collected_fields: Vec<Field> = vec![];
let data_type = field.data_type();
match data_type {
DataType::Struct(fields) | DataType::Union(fields) => collected_fields
.extend(fields.iter().map(|f| {
let full_name = format!("{}.{}", field.name(), f.name());
let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable());
fetch_children(f_new)
}).flatten()),
DataType::List(f)
| DataType::LargeList(f)
| DataType::FixedSizeList(f, _)
| DataType::Map(f, _) => {
let full_name = format!("{}.{}", field.name(), f.name());
let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable());
collected_fields.extend(fetch_children(f_new))
},
_ => collected_fields.push(field),
}
collected_fields
}
let top_level_fields = schema.fields();
let flatten = top_level_fields
.iter()
.map(|f| fetch_children(f.clone()))
.flatten()
.collect();
flatten
}

#[cfg(test)]
mod tests {
use arrow::datatypes::TimeUnit;

use super::*;

#[tokio::test]
async fn test_total_number_of_fields() -> Result<()> {
let fields: Vec<Field> = vec![
Field::new("id", DataType::Int16, false),
Field::new("name", DataType::Utf8, false),
Field::new("nested1", DataType::Struct(vec![
Field::new("str1", DataType::Utf8, false),
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false),
]), false),
Field::new("nested2", DataType::Struct(vec![
Field::new("nested_a", DataType::Struct(vec![
Field::new("another_nested", DataType::Struct(vec![
Field::new("idx", DataType::UInt8, false),
Field::new("no", DataType::UInt8, false),
]), false),
Field::new("id2", DataType::UInt16, false),
]), false),
Field::new("nested_b", DataType::Struct(vec![
Field::new("nested_x", DataType::Struct(vec![
Field::new("nested_y", DataType::Struct(vec![
Field::new("desc", DataType::Utf8, false),
]), false),
]), false),
]), false),
]), false),
];

assert_eq!(8, total_number_of_fields(&Schema::new(fields)));

Ok(())
}

#[tokio::test]
async fn test_total_number_of_fields_empty_struct() -> Result<()> {

let fields = vec![
Field::new("empty_nested", DataType::Struct(vec![]), false),
];

assert_eq!(0, total_number_of_fields(&Schema::new(fields)));

Ok(())
}

#[tokio::test]
async fn test_flatten_schema() -> Result<()> {
let fields: Vec<Field> = vec![
Field::new("id", DataType::Int16, false),
Field::new("name", DataType::Utf8, false),
Field::new("nested1", DataType::Struct(vec![
Field::new("str1", DataType::Utf8, false),
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false),
]), false),
Field::new("nested2", DataType::Struct(vec![
Field::new("nested_a", DataType::Struct(vec![
Field::new("another_nested", DataType::Struct(vec![
Field::new("idx", DataType::UInt8, false),
Field::new("no", DataType::UInt8, false),
]), false),
Field::new("id2", DataType::UInt16, false),
]), false),
Field::new("nested_b", DataType::Struct(vec![
Field::new("nested_x", DataType::Struct(vec![
Field::new("nested_y", DataType::Struct(vec![
Field::new("desc", DataType::Utf8, false),
]), false),
]), false),
]), false),
]), false),
];

let flat_schema = flatten_schema(&Schema::new(fields));
assert_eq!(8, flat_schema.len());
assert_eq!(Field::new("id", DataType::Int16, false), flat_schema[0]);
assert_eq!(Field::new("name", DataType::Utf8, false), flat_schema[1]);
assert_eq!(Field::new("nested1.str1", DataType::Utf8, false), flat_schema[2]);
assert_eq!(Field::new("nested1.ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), flat_schema[3]);
assert_eq!(Field::new("nested2.nested_a.another_nested.idx", DataType::UInt8, false), flat_schema[4]);
assert_eq!(Field::new("nested2.nested_a.another_nested.no", DataType::UInt8, false), flat_schema[5]);
assert_eq!(Field::new("nested2.nested_a.id2", DataType::UInt16, false), flat_schema[6]);
assert_eq!(Field::new("nested2.nested_b.nested_x.nested_y.desc", DataType::Utf8, false), flat_schema[7]);

Ok(())
}

}