From 41e491ddcde2b6ae4b94c8144729cd4f64548f91 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Feb 2022 15:48:46 -0500 Subject: [PATCH 1/5] Send parquet reader error back --- datafusion/src/physical_plan/file_format/parquet.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 168a5c07644b2..72170d822ba8d 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -237,7 +237,7 @@ impl ExecutionPlan for ParquetExec { &projection, &pruning_predicate, batch_size, - response_tx, + response_tx.clone(), limit, partition_col_proj, ) { @@ -245,6 +245,12 @@ impl ExecutionPlan for ParquetExec { "Parquet reader thread terminated due to error: {:?} for files: {:?}", e, partition ); + // Send the error back to the main thread. + // + // Ignore error sending (via `.ok()`) because that + // means the receiver has been torn down (and nothing + // cares about the errors anymore) + send_result(&response_tx, Err(e.into())).ok(); } }); From 35c27b89b3c96240a02dd40ba18deea844976e92 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Feb 2022 16:02:00 -0500 Subject: [PATCH 2/5] testi --- .../src/physical_plan/file_format/parquet.rs | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 72170d822ba8d..ff0544dd609f6 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -523,11 +523,14 @@ mod tests { assert_batches_sorted_eq, datasource::{ file_format::{parquet::ParquetFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + object_store::{ + local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + }, + FileMeta, SizedFile, }, }, - physical_plan::collect, + physical_plan::collect, assert_contains, }; use super::*; @@ -928,6 +931,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_with_error() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_plain.parquet", testdata); + let partitioned_file = PartitionedFile { + file_meta: FileMeta { + sized_file: SizedFile { + size: 1337, + path: "invalid".into(), + }, + last_modified: None, + }, + partition_values: vec![], + }; + + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![vec![partitioned_file]], + file_schema: ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![filename])) + .await?, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let mut results = parquet_exec.execute(0, runtime).await?; + let batch = results.next().await.unwrap(); + // invalid file should produce an error to that effect + assert_contains!(batch.unwrap_err().to_string(), + "External error: Parquet error: Arrow: IO error: No such file or directory"); + assert!(results.next().await.is_none()); + + Ok(()) + } + fn parquet_file_metrics() -> ParquetFileMetrics { let metrics = Arc::new(ExecutionPlanMetricsSet::new()); ParquetFileMetrics::new(0, "file.parquet", &metrics) From 711b752ff5ee61e2556dfddb7ed6491b9cd4a363 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Feb 2022 16:04:34 -0500 Subject: [PATCH 3/5] fmt --- datafusion/src/physical_plan/file_format/parquet.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index ff0544dd609f6..f1e80a8ab5deb 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -520,7 +520,7 @@ fn read_partition( #[cfg(test)] mod tests { use crate::{ - assert_batches_sorted_eq, + assert_batches_sorted_eq, assert_contains, datasource::{ file_format::{parquet::ParquetFormat, FileFormat}, object_store::{ @@ -530,7 +530,7 @@ mod tests { FileMeta, SizedFile, }, }, - physical_plan::collect, assert_contains, + physical_plan::collect, }; use super::*; @@ -965,8 +965,10 @@ mod tests { let mut results = parquet_exec.execute(0, runtime).await?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect - assert_contains!(batch.unwrap_err().to_string(), - "External error: Parquet error: Arrow: IO error: No such file or directory"); + assert_contains!( + batch.unwrap_err().to_string(), + "External error: Parquet error: Arrow: IO error: No such file or directory" + ); assert!(results.next().await.is_none()); Ok(()) From 4a53954519da211119e0b1afb5857d881a45bf4c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Feb 2022 16:23:57 -0500 Subject: [PATCH 4/5] Fix other test --- .../src/physical_plan/file_format/parquet.rs | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index f1e80a8ab5deb..441c33f1794ca 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -556,7 +556,7 @@ mod tests { batches: Vec, projection: Option>, schema: Option, - ) -> Vec { + ) -> Result> { // When vec is dropped, temp files are deleted let files: Vec<_> = batches .into_iter() @@ -611,9 +611,7 @@ mod tests { ); let runtime = Arc::new(RuntimeEnv::default()); - collect(Arc::new(parquet_exec), runtime) - .await - .expect("reading parquet data") + collect(Arc::new(parquet_exec), runtime).await } // Add a new column with the specified field name to the RecordBatch @@ -658,7 +656,9 @@ mod tests { let batch3 = add_to_batch(&batch1, "c3", c3); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await; + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", @@ -697,7 +697,9 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let read = round_trip_to_parquet(vec![batch1, batch2], None, None) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c2 | c3 |", @@ -729,7 +731,9 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); // read/write them files: - let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let read = round_trip_to_parquet(vec![batch1, batch2], None, None) + .await + .unwrap(); let expected = vec![ "+-----+----+----+", "| c1 | c3 | c2 |", @@ -768,8 +772,9 @@ mod tests { let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await; + let read = round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None) + .await + .unwrap(); let expected = vec![ "+-----+-----+", "| c1 | c4 |", @@ -817,18 +822,8 @@ mod tests { let read = round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema))) .await; - - // expect only the first batch to be read - let expected = vec![ - "+-----+----+----+", - "| c1 | c2 | c3 |", - "+-----+----+----+", - "| Foo | 1 | 10 |", - "| | 2 | 20 |", - "| bar | | |", - "+-----+----+----+", - ]; - assert_batches_sorted_eq!(expected, &read); + assert_contains!(read.unwrap_err().to_string(), + "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); } #[tokio::test] From e0ff14ec022f9086a49a581810189b040fde72fa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Feb 2022 16:40:26 -0500 Subject: [PATCH 5/5] fix test on windows --- datafusion/src/physical_plan/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 441c33f1794ca..1ae3012464e87 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -962,7 +962,7 @@ mod tests { // invalid file should produce an error to that effect assert_contains!( batch.unwrap_err().to_string(), - "External error: Parquet error: Arrow: IO error: No such file or directory" + "External error: Parquet error: Arrow: IO error" ); assert!(results.next().await.is_none());