From 40fcbe1506c7e64597f9354aec143b436645ed0f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Feb 2025 10:25:04 -0500 Subject: [PATCH 1/2] Minor: use FileScanConfig builder API --- .../tests/cases/roundtrip_physical_plan.rs | 80 +++++++------------ 1 file changed, 30 insertions(+), 50 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a8ee213653086..ab0bb39b83cb1 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -738,33 +738,23 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { let mut options = TableParquetOptions::new(); options.global.pushdown_filters = true; - let source = Arc::new( + let file_source = Arc::new( ParquetSource::new(options).with_predicate(Arc::clone(&file_schema), predicate), ); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema, - file_groups: vec![vec![PartitionedFile::new( - "/path/to/file.parquet".to_string(), - 1024, - )]], - constraints: Constraints::empty(), - statistics: Statistics { - num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(1024), - column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![ - Field::new("col", DataType::Utf8, false), - ]))), - }, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - file_compression_type: FileCompressionType::UNCOMPRESSED, - new_lines_in_values: false, - file_source: source, - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source) + .with_file_groups(vec![vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )]]) + .with_statistics(Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1024), + column_statistics: Statistics::unknown_column(&Arc::new(Schema::new( + vec![Field::new("col", DataType::Utf8, false)], + ))), + }); roundtrip_test(scan_config.build()) } @@ -777,9 +767,9 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { vec![wrap_partition_value_in_dict(ScalarValue::Int64(Some(0)))]; let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); - let source = Arc::new(ParquetSource::default()); + let file_source = Arc::new(ParquetSource::default()); let scan_config = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source) + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source) .with_projection(Some(vec![0, 1])) .with_file_group(vec![file_group]) .with_table_partition_cols(vec![Field::new( @@ -801,34 +791,24 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { inner: Arc::new(Column::new("col", 1)), }); - let source = Arc::new( + let file_source = Arc::new( ParquetSource::default() .with_predicate(Arc::clone(&file_schema), custom_predicate_expr), ); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema, - file_groups: vec![vec![PartitionedFile::new( - "/path/to/file.parquet".to_string(), - 1024, - )]], - constraints: Constraints::empty(), - statistics: Statistics { - num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(1024), - column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![ - Field::new("col", DataType::Utf8, false), - ]))), - }, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - file_compression_type: FileCompressionType::UNCOMPRESSED, - new_lines_in_values: false, - file_source: source, - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source) + .with_file_groups(vec![vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )]]) + .with_statistics(Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1024), + column_statistics: Statistics::unknown_column(&Arc::new(Schema::new( + vec![Field::new("col", DataType::Utf8, false)], + ))), + }); #[derive(Debug, Clone, Eq)] struct CustomPredicateExpr { From b8363d9d41ffe9dcdb824c7f121fd79a055e5b1e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Feb 2025 10:30:32 -0500 Subject: [PATCH 2/2] moar --- .../tests/cases/roundtrip_physical_plan.rs | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index ab0bb39b83cb1..b5bfef99a6f39 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -41,7 +41,6 @@ use datafusion::arrow::compute::kernels::sort::SortOptions; use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema}; use datafusion::datasource::empty::EmptyTable; use datafusion::datasource::file_format::csv::CsvSink; -use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::json::JsonSink; use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; @@ -95,7 +94,7 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, not_impl_err, Constraints, DataFusionError, Result, UnnestOptions, + internal_err, not_impl_err, DataFusionError, Result, UnnestOptions, }; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, @@ -1588,24 +1587,18 @@ async fn roundtrip_projection_source() -> Result<()> { let statistics = Statistics::new_unknown(&schema); - let source = ParquetSource::default().with_statistics(statistics.clone()); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![PartitionedFile::new( - "/path/to/file.parquet".to_string(), - 1024, - )]], - constraints: Constraints::empty(), - statistics, - file_schema: schema.clone(), - projection: Some(vec![0, 1, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - file_compression_type: FileCompressionType::UNCOMPRESSED, - new_lines_in_values: false, - file_source: source, - }; + let file_source = ParquetSource::default().with_statistics(statistics.clone()); + let scan_config = FileScanConfig::new( + ObjectStoreUrl::local_filesystem(), + schema.clone(), + file_source, + ) + .with_file_groups(vec![vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )]]) + .with_statistics(statistics) + .with_projection(Some(vec![0, 1, 2])); let filter = Arc::new( FilterExec::try_new(