From 5628320665a67ec00830c1183b469f36b53bb4f3 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Thu, 20 Jan 2022 08:30:22 -0500 Subject: [PATCH 01/13] Handle merging of evolved schemas in ParquetExec --- .../src/datasource/file_format/parquet.rs | 38 +++-- .../src/physical_plan/file_format/parquet.rs | 134 +++++++++++++++++- parquet-testing | 2 +- 3 files changed, 162 insertions(+), 12 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index eedb4c9139292..415f697c5fc4f 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use futures::stream::StreamExt; +use futures::TryStreamExt; use parquet::arrow::ArrowReader; use parquet::arrow::ParquetFileArrowReader; use parquet::errors::ParquetError; @@ -87,16 +87,23 @@ impl FileFormat for ParquetFormat { self } - async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { + async fn infer_schema(&self, readers: ObjectReaderStream) -> Result { // We currently get the schema information from the first file rather than do // schema merging and this is a limitation. // See https://issues.apache.org/jira/browse/ARROW-11017 - let first_file = readers - .next() - .await - .ok_or_else(|| DataFusionError::Plan("No data file found".to_owned()))??; - let schema = fetch_schema(first_file)?; - Ok(Arc::new(schema)) + let merged_schema = readers + .try_fold(Schema::empty(), |acc, reader| async { + let next_schema = fetch_schema(reader); + Schema::try_merge([acc, next_schema?]) + .map_err(|e| DataFusionError::ArrowError(e)) + }) + .await?; + // let first_file = readers + // .next() + // .await + // .ok_or_else(|| DataFusionError::Plan("No data file found".to_owned()))??; + // let schema = fetch_schema(first_file)?; + Ok(Arc::new(merged_schema)) } async fn infer_stats(&self, reader: Arc) -> Result { @@ -367,6 +374,7 @@ mod tests { }; use super::*; + use crate::datasource::listing::ListingOptions; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, @@ -374,6 +382,20 @@ mod tests { }; use futures::StreamExt; + #[tokio::test] + async fn test_merge_schema() -> Result<()> { + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/{}", testdata, "schema_evolution"); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let schema = opt + .infer_schema(Arc::new(LocalFileSystem {}), &filename) + .await?; + + assert_eq!(schema.fields().len(), 5); + + Ok(()) + } + #[tokio::test] async fn read_small_batches() -> Result<()> { let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?); diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 17abb434fe68b..c6eb34db22016 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -51,6 +51,7 @@ use parquet::file::{ statistics::Statistics as ParquetStatistics, }; +use arrow::array::new_null_array; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -214,9 +215,11 @@ impl ExecutionPlan for ParquetExec { &self.base_config.table_partition_cols, ); + let file_schema_ref = self.base_config().file_schema.clone(); let join_handle = task::spawn_blocking(move || { if let Err(e) = read_partition( object_store.as_ref(), + file_schema_ref, partition_index, partition, metrics, @@ -385,9 +388,35 @@ fn build_row_group_predicate( } } +// Map projections from the schema which merges all file schemas to projections on a particular +// file +fn map_projections( + merged_schema: &Schema, + file_schema: &Schema, + projections: &[usize], +) -> Vec { + if merged_schema.fields().len() == file_schema.fields().len() { + projections.to_vec() + } else { + let mut mapped: Vec = vec![]; + for idx in projections { + let field = merged_schema.field(*idx); + if let Ok(file_field) = file_schema.field_with_name(field.name().as_str()) { + if file_field.data_type() == field.data_type() { + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + mapped.push(mapped_idx) + } + } + } + } + mapped + } +} + #[allow(clippy::too_many_arguments)] fn read_partition( object_store: &dyn ObjectStore, + file_schema: SchemaRef, partition_index: usize, partition: Vec, metrics: ExecutionPlanMetricsSet, @@ -400,6 +429,8 @@ fn read_partition( ) -> Result<()> { let mut total_rows = 0; 'outer: for partitioned_file in partition { + debug!("Reading file {}", &partitioned_file.file_meta.path()); + let file_metrics = ParquetFileMetrics::new( partition_index, &*partitioned_file.file_meta.path(), @@ -417,15 +448,49 @@ fn read_partition( ); file_reader.filter_row_groups(&row_group_predicate); } + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let mut batch_reader = arrow_reader - .get_record_reader_by_columns(projection.to_owned(), batch_size)?; + let mapped_projections = + map_projections(&file_schema, &arrow_reader.get_schema()?, projection); + + let mut batch_reader = + arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?; loop { match batch_reader.next() { Some(Ok(batch)) => { + let total_cols = &file_schema.fields().len(); + let batch_rows = batch.num_rows(); total_rows += batch.num_rows(); + + let batch_schema = batch.schema(); + + let merged_batch = if batch.columns().len() < projection.len() { + let mut cols: Vec = Vec::with_capacity(*total_cols); + let batch_cols = batch.columns().to_vec(); + + for field_idx in projection { + let merged_field = &file_schema.fields()[*field_idx]; + if let Some((batch_idx, _name)) = batch_schema + .column_with_name(merged_field.name().as_str()) + { + cols.push(batch_cols[batch_idx].clone()); + } else { + cols.push(new_null_array( + merged_field.data_type(), + batch_rows, + )) + } + } + + let projected_schema = file_schema.clone().project(projection)?; + + RecordBatch::try_new(Arc::new(projected_schema), cols)? + } else { + batch + }; + let proj_batch = partition_column_projector - .project(batch, &partitioned_file.partition_values); + .project(merged_batch, &partitioned_file.partition_values); send_result(&response_tx, proj_batch)?; if limit.map(|l| total_rows >= l).unwrap_or(false) { @@ -473,6 +538,69 @@ mod tests { schema::types::SchemaDescPtr, }; + #[tokio::test] + async fn parquet_exec_with_evolved_schema() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let testdata = crate::test_util::parquet_test_data(); + let part1 = format!("{}/schema_evolution/part1.parquet", testdata); + let part2 = format!("{}/schema_evolution/part2.parquet", testdata); + let part3 = format!("{}/schema_evolution/part3.parquet", testdata); + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![vec![ + local_unpartitioned_file(part1.clone()), + local_unpartitioned_file(part2.clone()), + local_unpartitioned_file(part3.clone()), + ]], + file_schema: ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![part1, part2, part3])) + .await?, + statistics: Statistics::default(), + projection: Some(vec![0, 1, 4]), + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let mut results = parquet_exec.execute(0, runtime).await?; + let batch1 = results.next().await.unwrap()?; + + assert_eq!(8, batch1.num_rows()); + assert_eq!(3, batch1.num_columns()); + + let schema = batch1.schema(); + let field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(vec!["id", "bool_col", "string_col"], field_names); + + let batch2 = results.next().await.unwrap()?; + + assert_eq!(8, batch2.num_rows()); + assert_eq!(3, batch2.num_columns()); + + let schema = batch2.schema(); + let field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(vec!["id", "bool_col", "string_col"], field_names); + + let batch3 = results.next().await.unwrap()?; + + assert_eq!(8, batch3.num_rows()); + assert_eq!(3, batch3.num_columns()); + + let schema = batch3.schema(); + let field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(vec!["id", "bool_col", "string_col"], field_names); + + let batch = results.next().await; + assert!(batch.is_none()); + + Ok(()) + } + #[tokio::test] async fn parquet_exec_with_projection() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/parquet-testing b/parquet-testing index ddd898958803c..972fda30d5562 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de +Subproject commit 972fda30d5562074458a1d1f731e7044992581ca From 98a5ad4241ca83374c0df2bfaf0f25ffa60afdd3 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Thu, 20 Jan 2022 08:30:22 -0500 Subject: [PATCH 02/13] Handle merging of evolved schemas in ParquetExec --- datafusion/src/datasource/file_format/parquet.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 415f697c5fc4f..a330fcbfe489b 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -88,9 +88,6 @@ impl FileFormat for ParquetFormat { } async fn infer_schema(&self, readers: ObjectReaderStream) -> Result { - // We currently get the schema information from the first file rather than do - // schema merging and this is a limitation. - // See https://issues.apache.org/jira/browse/ARROW-11017 let merged_schema = readers .try_fold(Schema::empty(), |acc, reader| async { let next_schema = fetch_schema(reader); @@ -98,11 +95,6 @@ impl FileFormat for ParquetFormat { .map_err(|e| DataFusionError::ArrowError(e)) }) .await?; - // let first_file = readers - // .next() - // .await - // .ok_or_else(|| DataFusionError::Plan("No data file found".to_owned()))??; - // let schema = fetch_schema(first_file)?; Ok(Arc::new(merged_schema)) } From 33ef068c12e3aee63ec142deff6eef2f0d7577f5 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Thu, 20 Jan 2022 15:00:50 -0500 Subject: [PATCH 03/13] Linting fix --- datafusion/src/datasource/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index a330fcbfe489b..ab8121d38c829 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -92,7 +92,7 @@ impl FileFormat for ParquetFormat { .try_fold(Schema::empty(), |acc, reader| async { let next_schema = fetch_schema(reader); Schema::try_merge([acc, next_schema?]) - .map_err(|e| DataFusionError::ArrowError(e)) + .map_err(DataFusionError::ArrowError) }) .await?; Ok(Arc::new(merged_schema)) From e81f1df6c8e62c3295b876cfff0bdab2ae06bd49 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 21 Jan 2022 06:00:33 -0500 Subject: [PATCH 04/13] Avoid unnecessary search by field name --- datafusion/src/physical_plan/file_format/parquet.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index c6eb34db22016..fee0f9a5c9fcf 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -401,11 +401,9 @@ fn map_projections( let mut mapped: Vec = vec![]; for idx in projections { let field = merged_schema.field(*idx); - if let Ok(file_field) = file_schema.field_with_name(field.name().as_str()) { - if file_field.data_type() == field.data_type() { - if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { - mapped.push(mapped_idx) - } + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if file_schema.field(mapped_idx).data_type() == field.data_type() { + mapped.push(mapped_idx) } } } From ad335edc16f6750be88b25c5dd92246dbc3bd854 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 Jan 2022 12:27:05 -0500 Subject: [PATCH 05/13] Add round trip parquet testing --- .../src/physical_plan/file_format/parquet.rs | 112 +++++++++++++++++- 1 file changed, 107 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index fee0f9a5c9fcf..0c0a78cd3a513 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -520,24 +520,126 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::{ + use crate::{datasource::{ file_format::{parquet::ParquetFormat, FileFormat}, object_store::local::{ local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, }, - }; + }, assert_batches_sorted_eq, physical_plan::collect}; use super::*; - use arrow::datatypes::{DataType, Field}; + use arrow::{datatypes::{DataType, Field}, array::{StringArray, Int8Array, Int64Array}}; use futures::StreamExt; use parquet::{ basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, - schema::types::SchemaDescPtr, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, properties::WriterProperties}, + schema::types::SchemaDescPtr, arrow::ArrowWriter, }; + + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet(batches: Vec, projection: Option>) -> Vec { + // When vec is dropped, temp files are deleted + let files: Vec<_> = batches.into_iter() + .map(|batch| { + let output = tempfile::NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let file: std::fs::File = (*output.as_file()).try_clone().expect("cloning file descriptor"); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let file_names: Vec<_> = files.iter() + .map(|t| t.path().to_string_lossy().to_string()) + .collect(); + + // Now, read the files back in + let file_groups: Vec<_> = file_names.iter() + .map(|name| local_unpartitioned_file(name.clone())) + .collect(); + + // Infer the schema + let file_schema = ParquetFormat::default() + .infer_schema(local_object_reader_stream(file_names)) + .await + .expect("inferring schema"); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let runtime = Arc::new(RuntimeEnv::default()); + collect(Arc::new(parquet_exec) , runtime) + .await + .expect("reading parquet data") + } + + // Add a new column with the specified field name to the RecordBatch + fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + #[tokio::test] + async fn evolved_schema() { + let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + // batch1: c1(string) + let batch1 = add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), "c1", c1); + + // batch2: c1(string) and c2(int64) + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = add_to_batch(&batch1, "c2", c2); + + // batch3: c1(string) and c3(int8) + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + let batch3 = add_to_batch(&batch1, "c3", c3); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| | | |", + "| | | 20 |", + "| | 2 | |", + "| Foo | | |", + "| Foo | | 10 |", + "| Foo | 1 | |", + "| bar | | |", + "| bar | | |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] async fn parquet_exec_with_evolved_schema() -> Result<()> { + + let runtime = Arc::new(RuntimeEnv::default()); let testdata = crate::test_util::parquet_test_data(); let part1 = format!("{}/schema_evolution/part1.parquet", testdata); From ccee675fd8b017ea7ed830460b69e9629266eb2a Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 21 Jan 2022 15:15:56 -0500 Subject: [PATCH 06/13] PR comments: 1. Add additional test cases 2. Map projected column indexes in all cases 3. Raise an explicit error in the case where there a conflict between a file schema and the merged schema. --- .../src/physical_plan/file_format/parquet.rs | 346 ++++++++++++------ 1 file changed, 235 insertions(+), 111 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 0c0a78cd3a513..dcf28cf7b4583 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -44,7 +44,7 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use log::debug; +use log::{debug, error}; use parquet::file::{ metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, @@ -62,6 +62,7 @@ use tokio::{ use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; +use parquet::errors::ParquetError; use super::PartitionColumnProjector; @@ -394,21 +395,21 @@ fn map_projections( merged_schema: &Schema, file_schema: &Schema, projections: &[usize], -) -> Vec { - if merged_schema.fields().len() == file_schema.fields().len() { - projections.to_vec() - } else { - let mut mapped: Vec = vec![]; - for idx in projections { - let field = merged_schema.field(*idx); - if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { - if file_schema.field(mapped_idx).data_type() == field.data_type() { - mapped.push(mapped_idx) - } +) -> Result> { + let mut mapped: Vec = vec![]; + for idx in projections { + let field = merged_schema.field(*idx); + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if file_schema.field(mapped_idx).data_type() == field.data_type() { + mapped.push(mapped_idx) + } else { + let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); + error!("{}", msg); + return Err(DataFusionError::ParquetError(ParquetError::General(msg))); } } - mapped } + Ok(mapped) } #[allow(clippy::too_many_arguments)] @@ -449,7 +450,7 @@ fn read_partition( let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let mapped_projections = - map_projections(&file_schema, &arrow_reader.get_schema()?, projection); + map_projections(&file_schema, &arrow_reader.get_schema()?, projection)?; let mut batch_reader = arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?; @@ -462,30 +463,27 @@ fn read_partition( let batch_schema = batch.schema(); - let merged_batch = if batch.columns().len() < projection.len() { - let mut cols: Vec = Vec::with_capacity(*total_cols); - let batch_cols = batch.columns().to_vec(); - - for field_idx in projection { - let merged_field = &file_schema.fields()[*field_idx]; - if let Some((batch_idx, _name)) = batch_schema - .column_with_name(merged_field.name().as_str()) - { - cols.push(batch_cols[batch_idx].clone()); - } else { - cols.push(new_null_array( - merged_field.data_type(), - batch_rows, - )) - } + let mut cols: Vec = Vec::with_capacity(*total_cols); + let batch_cols = batch.columns().to_vec(); + + for field_idx in projection { + let merged_field = &file_schema.fields()[*field_idx]; + if let Some((batch_idx, _name)) = + batch_schema.column_with_name(merged_field.name().as_str()) + { + cols.push(batch_cols[batch_idx].clone()); + } else { + cols.push(new_null_array( + merged_field.data_type(), + batch_rows, + )) } + } - let projected_schema = file_schema.clone().project(projection)?; + let projected_schema = file_schema.clone().project(projection)?; - RecordBatch::try_new(Arc::new(projected_schema), cols)? - } else { - batch - }; + let merged_batch = + RecordBatch::try_new(Arc::new(projected_schema), cols)?; let proj_batch = partition_column_projector .project(merged_batch, &partitioned_file.partition_values); @@ -520,34 +518,52 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::{datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + use crate::{ + assert_batches_sorted_eq, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + }, }, - }, assert_batches_sorted_eq, physical_plan::collect}; + physical_plan::collect, + }; use super::*; - use arrow::{datatypes::{DataType, Field}, array::{StringArray, Int8Array, Int64Array}}; + use arrow::{ + array::{Int64Array, Int8Array, StringArray}, + datatypes::{DataType, Field}, + }; use futures::StreamExt; use parquet::{ + arrow::ArrowWriter, basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, properties::WriterProperties}, - schema::types::SchemaDescPtr, arrow::ArrowWriter, + file::{ + metadata::RowGroupMetaData, properties::WriterProperties, + statistics::Statistics as ParquetStatistics, + }, + schema::types::SchemaDescPtr, }; - /// writes each RecordBatch as an individual parquet file and then /// reads it back in to the named location. - async fn round_trip_to_parquet(batches: Vec, projection: Option>) -> Vec { + async fn round_trip_to_parquet( + batches: Vec, + projection: Option>, + schema: Option, + ) -> Vec { // When vec is dropped, temp files are deleted - let files: Vec<_> = batches.into_iter() + let files: Vec<_> = batches + .into_iter() .map(|batch| { let output = tempfile::NamedTempFile::new().expect("creating temp file"); let props = WriterProperties::builder().build(); - let file: std::fs::File = (*output.as_file()).try_clone().expect("cloning file descriptor"); - let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).expect("creating writer"); + let file: std::fs::File = (*output.as_file()) + .try_clone() + .expect("cloning file descriptor"); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)) + .expect("creating writer"); writer.write(&batch).expect("Writing batch"); writer.close().unwrap(); @@ -555,20 +571,25 @@ mod tests { }) .collect(); - let file_names: Vec<_> = files.iter() + let file_names: Vec<_> = files + .iter() .map(|t| t.path().to_string_lossy().to_string()) .collect(); // Now, read the files back in - let file_groups: Vec<_> = file_names.iter() + let file_groups: Vec<_> = file_names + .iter() .map(|name| local_unpartitioned_file(name.clone())) .collect(); - // Infer the schema - let file_schema = ParquetFormat::default() - .infer_schema(local_object_reader_stream(file_names)) - .await - .expect("inferring schema"); + // Infer the schema (if not provided) + let file_schema = match schema { + Some(provided_schema) => provided_schema, + None => ParquetFormat::default() + .infer_schema(local_object_reader_stream(file_names)) + .await + .expect("inferring schema"), + }; // prepare the scan let parquet_exec = ParquetExec::new( @@ -585,13 +606,17 @@ mod tests { ); let runtime = Arc::new(RuntimeEnv::default()); - collect(Arc::new(parquet_exec) , runtime) - .await - .expect("reading parquet data") + collect(Arc::new(parquet_exec), runtime) + .await + .expect("reading parquet data") } // Add a new column with the specified field name to the RecordBatch - fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch { + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { let mut fields = batch.schema().fields().clone(); fields.push(Field::new(field_name, array.data_type().clone(), true)); let schema = Arc::new(Schema::new(fields)); @@ -601,11 +626,23 @@ mod tests { RecordBatch::try_new(schema, columns).expect("error; creating record batch") } + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + #[tokio::test] async fn evolved_schema() { - let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); // batch1: c1(string) - let batch1 = add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), "c1", c1); + let batch1 = add_to_batch( + &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + "c1", + c1, + ); // batch2: c1(string) and c2(int64) let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); @@ -616,7 +653,7 @@ mod tests { let batch3 = add_to_batch(&batch1, "c3", c3); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None).await; + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await; let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", @@ -635,70 +672,157 @@ mod tests { assert_batches_sorted_eq!(expected, &read); } + #[tokio::test] + async fn evolved_schema_inconsistent_order() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } #[tokio::test] - async fn parquet_exec_with_evolved_schema() -> Result<()> { + async fn evolved_schema_intersection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let runtime = Arc::new(RuntimeEnv::default()); - let testdata = crate::test_util::parquet_test_data(); - let part1 = format!("{}/schema_evolution/part1.parquet", testdata); - let part2 = format!("{}/schema_evolution/part2.parquet", testdata); - let part3 = format!("{}/schema_evolution/part3.parquet", testdata); - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_groups: vec![vec![ - local_unpartitioned_file(part1.clone()), - local_unpartitioned_file(part2.clone()), - local_unpartitioned_file(part3.clone()), - ]], - file_schema: ParquetFormat::default() - .infer_schema(local_object_reader_stream(vec![part1, part2, part3])) - .await?, - statistics: Statistics::default(), - projection: Some(vec![0, 1, 4]), - limit: None, - table_partition_cols: vec![], - }, - None, - ); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - let mut results = parquet_exec.execute(0, runtime).await?; - let batch1 = results.next().await.unwrap()?; + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); - assert_eq!(8, batch1.num_rows()); - assert_eq!(3, batch1.num_columns()); + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); - let schema = batch1.schema(); - let field_names: Vec<&str> = - schema.fields().iter().map(|f| f.name().as_str()).collect(); - assert_eq!(vec!["id", "bool_col", "string_col"], field_names); + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c3 | c2 |", + "+-----+----+----+", + "| Foo | 10 | |", + "| | 20 | |", + "| bar | | |", + "| | 10 | 1 |", + "| | 20 | 2 |", + "| | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } - let batch2 = results.next().await.unwrap()?; + #[tokio::test] + async fn evolved_schema_projection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); - assert_eq!(8, batch2.num_rows()); - assert_eq!(3, batch2.num_columns()); + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let schema = batch2.schema(); - let field_names: Vec<&str> = - schema.fields().iter().map(|f| f.name().as_str()).collect(); - assert_eq!(vec!["id", "bool_col", "string_col"], field_names); + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - let batch3 = results.next().await.unwrap()?; + let c4: ArrayRef = + Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); - assert_eq!(8, batch3.num_rows()); - assert_eq!(3, batch3.num_columns()); + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); - let schema = batch3.schema(); - let field_names: Vec<&str> = - schema.fields().iter().map(|f| f.name().as_str()).collect(); - assert_eq!(vec!["id", "bool_col", "string_col"], field_names); + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); - let batch = results.next().await; - assert!(batch.is_none()); + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await; + let expected = vec![ + "+-----+-----+", + "| c1 | c4 |", + "+-----+-----+", + "| Foo | baz |", + "| | boo |", + "| bar | |", + "| Foo | |", + "| | |", + "| bar | |", + "+-----+-----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } - Ok(()) + #[tokio::test] + async fn evolved_schema_incompatible_types() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let c4: ArrayRef = + Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]); + + let schema = Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int64, true), + Field::new("c3", DataType::Int8, true), + ]); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema))) + .await; + // expect on the first batch to be read + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); } #[tokio::test] From 012bdbcc989bb1fcdd2cb9411c189e8595316193 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 21 Jan 2022 15:24:15 -0500 Subject: [PATCH 07/13] Remove redundant test case and revert submodule changes --- datafusion/src/datasource/file_format/parquet.rs | 14 -------------- parquet-testing | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index ab8121d38c829..0f65eecec62d7 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -374,20 +374,6 @@ mod tests { }; use futures::StreamExt; - #[tokio::test] - async fn test_merge_schema() -> Result<()> { - let testdata = crate::test_util::parquet_test_data(); - let filename = format!("{}/{}", testdata, "schema_evolution"); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt - .infer_schema(Arc::new(LocalFileSystem {}), &filename) - .await?; - - assert_eq!(schema.fields().len(), 5); - - Ok(()) - } - #[tokio::test] async fn read_small_batches() -> Result<()> { let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?); diff --git a/parquet-testing b/parquet-testing index 972fda30d5562..ddd898958803c 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 972fda30d5562074458a1d1f731e7044992581ca +Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de From 8acae80d624f6803078a1a1eced17a6e4c30c17d Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 21 Jan 2022 15:41:32 -0500 Subject: [PATCH 08/13] Linting --- datafusion/src/datasource/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 0f65eecec62d7..f170310d0eb22 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -366,7 +366,7 @@ mod tests { }; use super::*; - use crate::datasource::listing::ListingOptions; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, From c0593bb9de5c49e492fda0b09c4df4ca0e5ef3ec Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 21 Jan 2022 16:05:02 -0500 Subject: [PATCH 09/13] Linting --- datafusion/src/datasource/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index f170310d0eb22..4afb2f54c3abc 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -366,7 +366,7 @@ mod tests { }; use super::*; - + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, From 4a88f4fa38d9179e6d7f1506c851464cb2c1172c Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Sat, 22 Jan 2022 09:56:43 -0500 Subject: [PATCH 10/13] Using a info message (as depending on the usecase this may signal a user error rather than something to investigate) Using a DataFusionError rather than one from Parquet (the rationale being that this error comes from DataFusion, and is not related to being able to read the parquet files) Co-authored-by: Andrew Lamb --- datafusion/src/physical_plan/file_format/parquet.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index dcf28cf7b4583..5455aba00d90a 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -404,8 +404,8 @@ fn map_projections( mapped.push(mapped_idx) } else { let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); - error!("{}", msg); - return Err(DataFusionError::ParquetError(ParquetError::General(msg))); + info!("{}", msg); + return Err(DataFusionError::Execution(msg)); } } } From 3e67937c41e222432a7d2bb74cd4cc34b032bd09 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 22 Jan 2022 10:08:03 -0500 Subject: [PATCH 11/13] PR comments: Clarify incompatible schema test --- .../src/physical_plan/file_format/parquet.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 5455aba00d90a..833c0699d9ed6 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -44,7 +44,7 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use log::{debug, error}; +use log::{debug, info}; use parquet::file::{ metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, @@ -62,7 +62,7 @@ use tokio::{ use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; -use parquet::errors::ParquetError; + use super::PartitionColumnProjector; @@ -530,6 +530,7 @@ mod tests { }; use super::*; + use arrow::array::{Float32Array}; use arrow::{ array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field}, @@ -789,8 +790,11 @@ mod tests { let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - let c4: ArrayRef = - Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); + let c4: ArrayRef = Arc::new(Float32Array::from(vec![ + Some(1.0 as f32), + Some(2.0 as f32), + None, + ])); // batch1: c1(string), c2(int64), c3(int8) let batch1 = create_batch(vec![ @@ -812,7 +816,8 @@ mod tests { let read = round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema))) .await; - // expect on the first batch to be read + + // expect only the first batch to be read let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", From ae5ae7ef7115032539fc86fa0fa682acb457e4ff Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 22 Jan 2022 10:14:44 -0500 Subject: [PATCH 12/13] Linting --- datafusion/src/physical_plan/file_format/parquet.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 833c0699d9ed6..fe34a8b6b63f8 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -63,7 +63,6 @@ use tokio::{ use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; - use super::PartitionColumnProjector; /// Execution plan for scanning one or more Parquet partitions @@ -530,7 +529,7 @@ mod tests { }; use super::*; - use arrow::array::{Float32Array}; + use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field}, From f95008051b573881d58f49c97280f958ccee801c Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 22 Jan 2022 12:09:21 -0500 Subject: [PATCH 13/13] Clippy --- datafusion/src/physical_plan/file_format/parquet.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index fe34a8b6b63f8..06b597c8ae7fb 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -789,11 +789,8 @@ mod tests { let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); - let c4: ArrayRef = Arc::new(Float32Array::from(vec![ - Some(1.0 as f32), - Some(2.0 as f32), - None, - ])); + let c4: ArrayRef = + Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), None])); // batch1: c1(string), c2(int64), c3(int8) let batch1 = create_batch(vec![