diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ff0812ce4937d..9705225c24c7b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -27,7 +27,10 @@ pub(crate) mod test_util { use crate::test::object_store::local_unpartitioned_file; - /// Writes `batches` to a temporary parquet file + /// Writes each `batch` to at least one temporary parquet file + /// + /// For example, if `batches` contains 2 batches, the function will create + /// 2 temporary files, each containing the contents of one batch /// /// If multi_page is set to `true`, the parquet file(s) are written /// with 2 rows per data page (used to test page filtering and @@ -52,7 +55,7 @@ pub(crate) mod test_util { } } - // we need the tmp files to be sorted as some tests rely on the how the returning files are ordered + // we need the tmp files to be sorted as some tests rely on the returned file ordering // https://github.com/apache/datafusion/pull/6629 let tmp_files = { let mut tmp_files: Vec<_> = (0..batches.len()) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index fb4eb13db1cf8..8dee79ad61b23 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -96,7 +96,11 @@ mod tests { #[derive(Debug, Default)] struct RoundTrip { projection: Option>, - schema: Option, + /// Optional logical table schema to use when reading the parquet files + /// + /// If None, the logical schema to use will be inferred from the + /// original data via [`Schema::try_merge`] + table_schema: Option, predicate: Option, pushdown_predicate: bool, page_index_predicate: bool, @@ -113,8 +117,11 @@ mod tests { self } - fn with_schema(mut self, schema: SchemaRef) -> Self { - self.schema = Some(schema); + /// Specify table schema. + /// + ///See [`Self::table_schema`] for more details + fn with_table_schema(mut self, schema: SchemaRef) -> Self { + self.table_schema = Some(schema); self } @@ -146,12 +153,12 @@ mod tests { self.round_trip(batches).await.batches } - fn build_file_source(&self, file_schema: SchemaRef) -> Arc { + fn build_file_source(&self, table_schema: SchemaRef) -> Arc { // set up predicate (this is normally done by a layer higher up) let predicate = self .predicate .as_ref() - .map(|p| logical2physical(p, &file_schema)); + .map(|p| logical2physical(p, &table_schema)); let mut source = ParquetSource::default(); if let Some(predicate) = predicate { @@ -178,7 +185,7 @@ mod tests { source = source.with_bloom_filter_on_read(false); } - source.with_schema(Arc::clone(&file_schema)) + source.with_schema(Arc::clone(&table_schema)) } fn build_parquet_exec( @@ -199,8 +206,14 @@ mod tests { } /// run the test, returning the `RoundTripResult` + /// + /// Each input batch is written into one or more parquet files (and thus + /// they could potentially have different schemas). The resulting + /// parquet files are then read back and filters are applied to the async fn round_trip(&self, batches: Vec) -> RoundTripResult { - let file_schema = match &self.schema { + // If table_schema is not set, we need to merge the schema of the + // input batches to get a unified schema. + let table_schema = match &self.table_schema { Some(schema) => schema, None => &Arc::new( Schema::try_merge( @@ -209,7 +222,6 @@ mod tests { .unwrap(), ), }; - let file_schema = Arc::clone(file_schema); // If testing with page_index_predicate, write parquet // files with multiple pages let multi_page = self.page_index_predicate; @@ -217,9 +229,9 @@ mod tests { let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); // build a ParquetExec to return the results - let parquet_source = self.build_file_source(file_schema.clone()); + let parquet_source = self.build_file_source(Arc::clone(table_schema)); let parquet_exec = self.build_parquet_exec( - file_schema.clone(), + Arc::clone(table_schema), file_group.clone(), Arc::clone(&parquet_source), ); @@ -229,9 +241,9 @@ mod tests { false, // use a new ParquetSource to avoid sharing execution metrics self.build_parquet_exec( - file_schema.clone(), + Arc::clone(table_schema), file_group.clone(), - self.build_file_source(file_schema.clone()), + self.build_file_source(Arc::clone(table_schema)), ), Arc::new(Schema::new(vec![ Field::new("plan_type", DataType::Utf8, true), @@ -304,7 +316,7 @@ mod tests { // Thus this predicate will come back as false. let filter = col("c2").eq(lit(1_i32)); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -323,7 +335,7 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -362,7 +374,7 @@ mod tests { // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -381,7 +393,7 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -424,7 +436,7 @@ mod tests { // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -443,7 +455,7 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -486,7 +498,7 @@ mod tests { // Thus this predicate will come back as false. let filter = col("c2").eq(lit("abc")); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -505,7 +517,7 @@ mod tests { // If we excplicitly allow nulls the rest of the predicate should work let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32))); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -553,7 +565,7 @@ mod tests { .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null())); let rt = RoundTrip::new() - .with_schema(table_schema.clone()) + .with_table_schema(table_schema.clone()) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch.clone()]) @@ -583,7 +595,7 @@ mod tests { .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null())); let rt = RoundTrip::new() - .with_schema(table_schema) + .with_table_schema(table_schema) .with_predicate(filter.clone()) .with_pushdown_predicate() .round_trip(vec![batch]) @@ -871,13 +883,15 @@ mod tests { Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")])); let batch = create_batch(vec![("c1", c1.clone())]); - let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + // Table schema is Utf8 but file schema is StringView + let table_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); // Predicate should prune all row groups let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema.clone()) + .with_table_schema(table_schema.clone()) .round_trip(vec![batch.clone()]) .await; // There should be no predicate evaluation errors @@ -890,7 +904,7 @@ mod tests { let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema) + .with_table_schema(table_schema) .round_trip(vec![batch]) .await; // There should be no predicate evaluation errors @@ -912,14 +926,14 @@ mod tests { let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); let batch = create_batch(vec![("c1", c1.clone())]); - let schema = + let table_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); // Predicate should prune all row groups let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema.clone()) + .with_table_schema(table_schema.clone()) .round_trip(vec![batch.clone()]) .await; // There should be no predicate evaluation errors @@ -931,7 +945,7 @@ mod tests { let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); let rt = RoundTrip::new() .with_predicate(filter) - .with_schema(schema) + .with_table_schema(table_schema) .round_trip(vec![batch]) .await; // There should be no predicate evaluation errors @@ -1183,7 +1197,7 @@ mod tests { // batch2: c3(int8), c2(int64), c1(string), c4(string) let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]); - let schema = Schema::new(vec![ + let table_schema = Schema::new(vec![ Field::new("c1", DataType::Utf8, true), Field::new("c2", DataType::Int64, true), Field::new("c3", DataType::Int8, true), @@ -1191,7 +1205,7 @@ mod tests { // read/write them files: let read = RoundTrip::new() - .with_schema(Arc::new(schema)) + .with_table_schema(Arc::new(table_schema)) .round_trip_to_batches(vec![batch1, batch2]) .await; assert_contains!(read.unwrap_err().to_string(), diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 0788ee87bad7a..f93792b7facc5 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -232,9 +232,15 @@ pub struct FileScanConfig { pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, /// Table schema before any projections or partition columns are applied. - /// This schema is used to read the files, but is **not** necessarily the schema of the physical files. - /// Rather this is the schema that the physical file schema will be mapped onto, and the schema that the + /// + /// This schema is used to read the files, but is **not** necessarily the + /// schema of the physical files. Rather this is the schema that the + /// physical file schema will be mapped onto, and the schema that the /// [`DataSourceExec`] will return. + /// + /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns. + /// + /// This probably would be better named `table_schema` file_schema: SchemaRef, file_source: Arc,