diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index eedb4c9139292..4afb2f54c3abc 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use futures::stream::StreamExt; +use futures::TryStreamExt; use parquet::arrow::ArrowReader; use parquet::arrow::ParquetFileArrowReader; use parquet::errors::ParquetError; @@ -87,16 +87,15 @@ impl FileFormat for ParquetFormat { self } - async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { - // We currently get the schema information from the first file rather than do - // schema merging and this is a limitation. - // See https://issues.apache.org/jira/browse/ARROW-11017 - let first_file = readers - .next() - .await - .ok_or_else(|| DataFusionError::Plan("No data file found".to_owned()))??; - let schema = fetch_schema(first_file)?; - Ok(Arc::new(schema)) + async fn infer_schema(&self, readers: ObjectReaderStream) -> Result { + let merged_schema = readers + .try_fold(Schema::empty(), |acc, reader| async { + let next_schema = fetch_schema(reader); + Schema::try_merge([acc, next_schema?]) + .map_err(DataFusionError::ArrowError) + }) + .await?; + Ok(Arc::new(merged_schema)) } async fn infer_stats(&self, reader: Arc) -> Result { @@ -367,6 +366,7 @@ mod tests { }; use super::*; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 17abb434fe68b..06b597c8ae7fb 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -44,13 +44,14 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use log::debug; +use log::{debug, info}; use parquet::file::{ metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, statistics::Statistics as ParquetStatistics, }; +use arrow::array::new_null_array; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -214,9 +215,11 @@ impl ExecutionPlan for ParquetExec { &self.base_config.table_partition_cols, ); + let file_schema_ref = self.base_config().file_schema.clone(); let join_handle = task::spawn_blocking(move || { if let Err(e) = read_partition( object_store.as_ref(), + file_schema_ref, partition_index, partition, metrics, @@ -385,9 +388,33 @@ fn build_row_group_predicate( } } +// Map projections from the schema which merges all file schemas to projections on a particular +// file +fn map_projections( + merged_schema: &Schema, + file_schema: &Schema, + projections: &[usize], +) -> Result> { + let mut mapped: Vec = vec![]; + for idx in projections { + let field = merged_schema.field(*idx); + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if file_schema.field(mapped_idx).data_type() == field.data_type() { + mapped.push(mapped_idx) + } else { + let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); + info!("{}", msg); + return Err(DataFusionError::Execution(msg)); + } + } + } + Ok(mapped) +} + #[allow(clippy::too_many_arguments)] fn read_partition( object_store: &dyn ObjectStore, + file_schema: SchemaRef, partition_index: usize, partition: Vec, metrics: ExecutionPlanMetricsSet, @@ -400,6 +427,8 @@ fn read_partition( ) -> Result<()> { let mut total_rows = 0; 'outer: for partitioned_file in partition { + debug!("Reading file {}", &partitioned_file.file_meta.path()); + let file_metrics = ParquetFileMetrics::new( partition_index, &*partitioned_file.file_meta.path(), @@ -417,15 +446,46 @@ fn read_partition( ); file_reader.filter_row_groups(&row_group_predicate); } + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let mut batch_reader = arrow_reader - .get_record_reader_by_columns(projection.to_owned(), batch_size)?; + let mapped_projections = + map_projections(&file_schema, &arrow_reader.get_schema()?, projection)?; + + let mut batch_reader = + arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?; loop { match batch_reader.next() { Some(Ok(batch)) => { + let total_cols = &file_schema.fields().len(); + let batch_rows = batch.num_rows(); total_rows += batch.num_rows(); + + let batch_schema = batch.schema(); + + let mut cols: Vec = Vec::with_capacity(*total_cols); + let batch_cols = batch.columns().to_vec(); + + for field_idx in projection { + let merged_field = &file_schema.fields()[*field_idx]; + if let Some((batch_idx, _name)) = + batch_schema.column_with_name(merged_field.name().as_str()) + { + cols.push(batch_cols[batch_idx].clone()); + } else { + cols.push(new_null_array( + merged_field.data_type(), + batch_rows, + )) + } + } + + let projected_schema = file_schema.clone().project(projection)?; + + let merged_batch = + RecordBatch::try_new(Arc::new(projected_schema), cols)?; + let proj_batch = partition_column_projector - .project(batch, &partitioned_file.partition_values); + .project(merged_batch, &partitioned_file.partition_values); send_result(&response_tx, proj_batch)?; if limit.map(|l| total_rows >= l).unwrap_or(false) { @@ -457,22 +517,315 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + use crate::{ + assert_batches_sorted_eq, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + }, }, + physical_plan::collect, }; use super::*; - use arrow::datatypes::{DataType, Field}; + use arrow::array::Float32Array; + use arrow::{ + array::{Int64Array, Int8Array, StringArray}, + datatypes::{DataType, Field}, + }; use futures::StreamExt; use parquet::{ + arrow::ArrowWriter, basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + file::{ + metadata::RowGroupMetaData, properties::WriterProperties, + statistics::Statistics as ParquetStatistics, + }, schema::types::SchemaDescPtr, }; + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet( + batches: Vec, + projection: Option>, + schema: Option, + ) -> Vec { + // When vec is dropped, temp files are deleted + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let output = tempfile::NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let file: std::fs::File = (*output.as_file()) + .try_clone() + .expect("cloning file descriptor"); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let file_names: Vec<_> = files + .iter() + .map(|t| t.path().to_string_lossy().to_string()) + .collect(); + + // Now, read the files back in + let file_groups: Vec<_> = file_names + .iter() + .map(|name| local_unpartitioned_file(name.clone())) + .collect(); + + // Infer the schema (if not provided) + let file_schema = match schema { + Some(provided_schema) => provided_schema, + None => ParquetFormat::default() + .infer_schema(local_object_reader_stream(file_names)) + .await + .expect("inferring schema"), + }; + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let runtime = Arc::new(RuntimeEnv::default()); + collect(Arc::new(parquet_exec), runtime) + .await + .expect("reading parquet data") + } + + // Add a new column with the specified field name to the RecordBatch + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + + #[tokio::test] + async fn evolved_schema() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + // batch1: c1(string) + let batch1 = add_to_batch( + &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + "c1", + c1, + ); + + // batch2: c1(string) and c2(int64) + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = add_to_batch(&batch1, "c2", c2); + + // batch3: c1(string) and c3(int8) + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + let batch3 = add_to_batch(&batch1, "c3", c3); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| | | |", + "| | | 20 |", + "| | 2 | |", + "| Foo | | |", + "| Foo | | 10 |", + "| Foo | 1 | |", + "| bar | | |", + "| bar | | |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_inconsistent_order() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_intersection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c3 | c2 |", + "+-----+----+----+", + "| Foo | 10 | |", + "| | 20 | |", + "| bar | | |", + "| | 10 | 1 |", + "| | 20 | 2 |", + "| | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_projection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let c4: ArrayRef = + Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await; + let expected = vec![ + "+-----+-----+", + "| c1 | c4 |", + "+-----+-----+", + "| Foo | baz |", + "| | boo |", + "| bar | |", + "| Foo | |", + "| | |", + "| bar | |", + "+-----+-----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_incompatible_types() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let c4: ArrayRef = + Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]); + + let schema = Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int64, true), + Field::new("c3", DataType::Int8, true), + ]); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema))) + .await; + + // expect only the first batch to be read + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + #[tokio::test] async fn parquet_exec_with_projection() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default());