diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 96753c8c52608..d02aa9b3088cd 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -17,7 +17,6 @@ use std::{sync::Arc, vec}; -use datafusion::common::Statistics; use datafusion::{ assert_batches_eq, datasource::{ @@ -58,16 +57,11 @@ async fn main() -> Result<()> { let path = std::path::Path::new(&path).canonicalize()?; - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![12, 0]), - limit: Some(5), - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone()) + .with_projection(Some(vec![12, 0])) + .with_limit(Some(5)) + .with_file(PartitionedFile::new(path.display().to_string(), 10)); let result = FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index ee33f969caa9f..e32fb9b096302 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -29,7 +29,6 @@ use datafusion::{ error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, }; -use datafusion_common::Statistics; use futures::StreamExt; use object_store::ObjectStore; @@ -61,16 +60,11 @@ async fn main() -> Result<()> { Arc::new(object_store), ); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![1, 0]), - limit: Some(5), - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone()) + .with_projection(Some(vec![1, 0])) + .with_limit(Some(5)) + .with_file(PartitionedFile::new(path.to_string(), 10)); let result = FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 243a91b7437bd..7cc3421ebb480 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -154,16 +154,11 @@ pub(crate) mod test_util { let exec = format .create_physical_plan( state, - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema, - file_groups, - statistics, - projection, - limit, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_groups(file_groups) + .with_statistics(statistics) + .with_projection(projection) + .with_limit(limit), None, ) .await?; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index cf70894806a31..746e4b8e3330a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -805,16 +805,13 @@ impl TableProvider for ListingTable { .format .create_physical_plan( state, - FileScanConfig { - object_store_url, - file_schema: Arc::clone(&self.file_schema), - file_groups: partitioned_file_lists, - statistics, - projection: projection.cloned(), - limit, - output_ordering, - table_partition_cols, - }, + FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema)) + .with_file_groups(partitioned_file_lists) + .with_statistics(statistics) + .with_projection(projection.cloned()) + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_table_partition_cols(table_partition_cols), filters.as_ref(), ) .await diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index a8a29e9bbabe4..4bb8f28860ac5 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -271,16 +271,11 @@ mod tests { .infer_schema(&state, &store, &[meta.clone()]) .await?; - let avro_exec = AvroExec::new(FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![meta.into()]], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![0, 1, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }); + let avro_exec = AvroExec::new( + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file(meta.into()) + .with_projection(Some(vec![0, 1, 2])), + ); assert_eq!( avro_exec .properties() @@ -348,16 +343,11 @@ mod tests { // Include the missing column in the projection let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); - let avro_exec = AvroExec::new(FileScanConfig { - object_store_url, - file_groups: vec![vec![meta.into()]], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }); + let avro_exec = AvroExec::new( + FileScanConfig::new(object_store_url, file_schema) + .with_file(meta.into()) + .with_projection(projection), + ); assert_eq!( avro_exec .properties() @@ -422,18 +412,19 @@ mod tests { let mut partitioned_file = PartitionedFile::from(meta); partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")]; - let avro_exec = AvroExec::new(FileScanConfig { - // select specific columns of the files as well as the partitioning - // column which is supposed to be the last column in the table schema. - projection: Some(vec![0, 1, file_schema.fields().len(), 2]), - object_store_url, - file_groups: vec![vec![partitioned_file]], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - limit: None, - table_partition_cols: vec![Field::new("date", DataType::Utf8, false)], - output_ordering: vec![], - }); + let projection = Some(vec![0, 1, file_schema.fields().len(), 2]); + let avro_exec = AvroExec::new( + FileScanConfig::new(object_store_url, file_schema) + // select specific columns of the files as well as the partitioning + // column which is supposed to be the last column in the table schema. + .with_projection(projection) + .with_file(partitioned_file) + .with_table_partition_cols(vec![Field::new( + "date", + DataType::Utf8, + false, + )]), + ); assert_eq!( avro_exec .properties() diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index a266b9b014e61..679f6c0109669 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -561,7 +561,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.projection = Some(vec![0, 2, 4]); let csv = CsvExec::new( @@ -627,7 +627,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.projection = Some(vec![4, 0, 2]); let csv = CsvExec::new( @@ -693,7 +693,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.limit = Some(5); let csv = CsvExec::new( @@ -756,7 +756,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.limit = Some(5); let csv = CsvExec::new( @@ -809,7 +809,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); // Add partition columns config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)]; @@ -914,7 +914,7 @@ mod tests { ) .unwrap(); - let config = partitioned_csv_config(file_schema, file_groups).unwrap(); + let config = partitioned_csv_config(file_schema, file_groups); let csv = CsvExec::new( config, true, diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 4de7eb136f22e..f5d3c7a6410d7 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -64,12 +64,41 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { /// The base configurations to provide when creating a physical plan for /// any given file format. +/// +/// # Example +/// ``` +/// # use std::sync::Arc; +/// # use arrow_schema::Schema; +/// use datafusion::datasource::listing::PartitionedFile; +/// # use datafusion::datasource::physical_plan::FileScanConfig; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # let file_schema = Arc::new(Schema::empty()); +/// // create FileScan config for reading data from file:// +/// let object_store_url = ObjectStoreUrl::local_filesystem(); +/// let config = FileScanConfig::new(object_store_url, file_schema) +/// .with_limit(Some(1000)) // read only the first 1000 records +/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 +/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group +/// .with_file(PartitionedFile::new("file1.parquet", 1234)) +/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes +/// // in a single row group +/// .with_file_group(vec![ +/// PartitionedFile::new("file2.parquet", 56), +/// PartitionedFile::new("file3.parquet", 78), +/// ]); +/// ``` #[derive(Clone)] pub struct FileScanConfig { /// Object store URL, used to get an [`ObjectStore`] instance from /// [`RuntimeEnv::object_store`] /// + /// This `ObjectStoreUrl` should be the prefix of the absolute url for files + /// as `file://` or `s3://my_bucket`. It should not include the path to the + /// file itself. The relevant URL prefix must be registered via + /// [`RuntimeEnv::register_object_store`] + /// /// [`ObjectStore`]: object_store::ObjectStore + /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store pub object_store_url: ObjectStoreUrl, /// Schema before `projection` is applied. It contains the all columns that may @@ -87,6 +116,7 @@ pub struct FileScanConfig { /// sequentially, one after the next. pub file_groups: Vec>, /// Estimated overall statistics of the files, taking `filters` into account. + /// Defaults to [`Statistics::new_unknown`]. pub statistics: Statistics, /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. @@ -101,6 +131,86 @@ pub struct FileScanConfig { } impl FileScanConfig { + /// Create a new `FileScanConfig` with default settings for scanning files. + /// + /// See example on [`FileScanConfig`] + /// + /// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group]` and + /// [`Self::with_file_groups`]. + /// + /// # Parameters: + /// * `object_store_url`: See [`Self::object_store_url`] + /// * `file_schema`: See [`Self::file_schema`] + pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self { + let statistics = Statistics::new_unknown(&file_schema); + Self { + object_store_url, + file_schema, + file_groups: vec![], + statistics, + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + } + } + + /// Set the statistics of the files + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = statistics; + self + } + + /// Set the projection of the files + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the limit of the files + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Add a file as a single group + /// + /// See [Self::file_groups] for more information. + pub fn with_file(self, file: PartitionedFile) -> Self { + self.with_file_group(vec![file]) + } + + /// Add the file groups + /// + /// See [Self::file_groups] for more information. + pub fn with_file_groups( + mut self, + mut file_groups: Vec>, + ) -> Self { + self.file_groups.append(&mut file_groups); + self + } + + /// Add a new file group + /// + /// See [Self::file_groups] for more information + pub fn with_file_group(mut self, file_group: Vec) -> Self { + self.file_groups.push(file_group); + self + } + + /// Set the partitioning columns of the files + pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Set the output ordering of the files + pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { + self.output_ordering = output_ordering; + self + } + /// Project the schema and the statistics on the given column indices pub fn project(&self) -> (SchemaRef, Statistics, Vec) { if self.projection.is_none() && self.table_partition_cols.is_empty() { @@ -1117,16 +1227,10 @@ mod tests { statistics: Statistics, table_partition_cols: Vec, ) -> FileScanConfig { - FileScanConfig { - file_schema, - file_groups: vec![vec![]], - limit: None, - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - projection, - statistics, - table_partition_cols, - output_ordering: vec![], - } + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), file_schema) + .with_projection(projection) + .with_statistics(statistics) + .with_table_partition_cols(table_partition_cols) } /// Convert partition columns from Vec to Vec diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 9732d08c7a1d4..6f354b31ae878 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -524,7 +524,7 @@ mod tests { use crate::test::{make_partition, object_store::register_test_store}; use arrow_schema::Schema; - use datafusion_common::{internal_err, Statistics}; + use datafusion_common::internal_err; /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] @@ -643,16 +643,12 @@ mod tests { let on_error = self.on_error; - let config = FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - statistics: Statistics::new_unknown(&file_schema), + let config = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), file_schema, - file_groups: vec![file_group], - projection: None, - limit: self.limit, - table_partition_cols: vec![], - output_ordering: vec![], - }; + ) + .with_file_group(file_group) + .with_limit(self.limit); let metrics_set = ExecutionPlanMetricsSet::new(); let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set) .unwrap() diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 4728069f19db9..5e8ba526594c7 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -521,16 +521,9 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: Some(3), - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_limit(Some(3)), file_compression_type.to_owned(), ); @@ -599,16 +592,9 @@ mod tests { let missing_field_idx = file_schema.fields.len() - 1; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: Some(3), - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_limit(Some(3)), file_compression_type.to_owned(), ); @@ -646,16 +632,9 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![0, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_projection(Some(vec![0, 2])), file_compression_type.to_owned(), ); let inferred_schema = exec.schema(); @@ -698,16 +677,9 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![3, 0, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_projection(Some(vec![3, 0, 2])), file_compression_type.to_owned(), ); let inferred_schema = exec.schema(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 410413ebd71b4..17cb6a66c7058 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -925,23 +925,16 @@ mod tests { // files with multiple pages let multi_page = page_index_predicate; let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + let file_group = meta.into_iter().map(Into::into).collect(); // set up predicate (this is normally done by a layer higher up) let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); // prepare the scan let mut parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_group(file_group) + .with_projection(projection), predicate, None, Default::default(), @@ -1590,16 +1583,8 @@ mod tests { file_schema: SchemaRef, ) -> Result<()> { let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_groups(file_groups), None, None, Default::default(), @@ -1700,15 +1685,11 @@ mod tests { ]); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url, - file_groups: vec![vec![partitioned_file]], - file_schema: schema.clone(), - statistics: Statistics::new_unknown(&schema), + FileScanConfig::new(object_store_url, schema.clone()) + .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day - projection: Some(vec![0, 1, 2, 12, 13]), - limit: None, - table_partition_cols: vec![ + .with_projection(Some(vec![0, 1, 2, 12, 13])) + .with_table_partition_cols(vec![ Field::new("year", DataType::Utf8, false), Field::new("month", DataType::UInt8, false), Field::new( @@ -1719,9 +1700,7 @@ mod tests { ), false, ), - ], - output_ordering: vec![], - }, + ]), None, None, Default::default(), @@ -1779,17 +1758,10 @@ mod tests { extensions: None, }; + let file_schema = Arc::new(Schema::empty()); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![partitioned_file]], - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file(partitioned_file), None, None, Default::default(), diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index e41e4dd316479..b93f4012b0935 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -203,7 +203,7 @@ mod tests { use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_plan::expressions::lit; use crate::physical_plan::repartition::RepartitionExec; - use crate::physical_plan::{displayable, Partitioning, Statistics}; + use crate::physical_plan::{displayable, Partitioning}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_physical_expr::expressions::{col, Count, Sum}; @@ -246,16 +246,11 @@ mod tests { fn parquet_exec(schema: &SchemaRef) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)), None, None, Default::default(), diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index cd84e911d381a..033cec53019d6 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1432,16 +1432,9 @@ pub(crate) mod tests { output_ordering: Vec>, ) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), None, None, Default::default(), @@ -1457,19 +1450,12 @@ pub(crate) mod tests { output_ordering: Vec>, ) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![ + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file_groups(vec![ vec![PartitionedFile::new("x".to_string(), 100)], vec![PartitionedFile::new("y".to_string(), 100)], - ], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + ]) + .with_output_ordering(output_ordering), None, None, Default::default(), @@ -1482,16 +1468,9 @@ pub(crate) mod tests { fn csv_exec_with_sort(output_ordering: Vec>) -> Arc { Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), false, b',', b'"', @@ -1509,19 +1488,12 @@ pub(crate) mod tests { output_ordering: Vec>, ) -> Arc { Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![ + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file_groups(vec![ vec![PartitionedFile::new("x".to_string(), 100)], vec![PartitionedFile::new("y".to_string(), 100)], - ], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + ]) + .with_output_ordering(output_ordering), false, b',', b'"', @@ -3790,19 +3762,11 @@ pub(crate) mod tests { let plan = aggregate_exec_with_alias( Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new( - "x".to_string(), - 100, - )]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)), false, b',', b'"', diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index fe1290e40774c..a15b9d4fbc874 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1297,7 +1297,7 @@ mod tests { use crate::physical_plan::joins::StreamJoinPartitionMode; use arrow_schema::{DataType, Field, Schema, SortOptions}; - use datafusion_common::{JoinType, ScalarValue, Statistics}; + use datafusion_common::{JoinType, ScalarValue}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{ @@ -1676,16 +1676,12 @@ mod tests { Field::new("e", DataType::Int32, true), ])); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![0, 1, 2, 3, 4]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![vec![]], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![0, 1, 2, 3, 4])), false, 0, 0, @@ -1702,16 +1698,12 @@ mod tests { Field::new("d", DataType::Int32, true), ])); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![3, 2, 1]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![vec![]], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![3, 2, 1])), false, 0, 0, diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index f69c0df32e8a5..e3ef3b95aa06c 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -291,7 +291,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::{TransformedResult, TreeNode}; - use datafusion_common::{Result, Statistics}; + use datafusion_common::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; @@ -1491,19 +1491,13 @@ mod tests { let projection: Vec = vec![0, 2, 3]; Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new( - "file_path".to_string(), - 100, - )]], - statistics: Statistics::new_unknown(schema), - projection: Some(projection), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("file_path".to_string(), 100)) + .with_projection(Some(projection)) + .with_output_ordering(vec![sort_exprs]), true, 0, b'"', diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 7bc1eeb7c4a5a..4d926847e4653 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -41,7 +41,7 @@ use crate::prelude::{CsvReadOptions, SessionContext}; use arrow_schema::{Schema, SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{JoinType, Statistics}; +use datafusion_common::JoinType; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunctionDefinition}; use datafusion_physical_expr::expressions::col; @@ -275,16 +275,8 @@ pub fn sort_preserving_merge_exec( /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)), None, None, Default::default(), @@ -299,16 +291,9 @@ pub fn parquet_exec_sorted( let sort_exprs = sort_exprs.into_iter().collect(); Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(vec![sort_exprs]), None, None, Default::default(), diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 1152c70d43915..b03aaabcad6bf 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -91,7 +91,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result>, -) -> Result { - Ok(FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: schema.clone(), - file_groups, - statistics: Statistics::new_unknown(&schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }) +) -> FileScanConfig { + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_file_groups(file_groups) } pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) { @@ -283,16 +275,9 @@ pub fn csv_exec_sorted( let sort_exprs = sort_exprs.into_iter().collect(); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(vec![sort_exprs]), false, 0, 0, @@ -345,16 +330,9 @@ pub fn csv_exec_ordered( let sort_exprs = sort_exprs.into_iter().collect(); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("file_path".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("file_path".to_string(), 100)) + .with_output_ordering(vec![sort_exprs]), true, 0, b'"', diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 1d5668c7ec55e..df1d2c6f0999b 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -37,8 +37,6 @@ use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; use crate::prelude::{Expr, SessionConfig, SessionContext}; -use datafusion_common::Statistics; - use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -144,22 +142,15 @@ impl TestParquetFile { ctx: &SessionContext, maybe_filter: Option, ) -> Result> { - let scan_config = FileScanConfig { - object_store_url: self.object_store_url.clone(), - file_schema: self.schema.clone(), - file_groups: vec![vec![PartitionedFile { - object_meta: self.object_meta.clone(), - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - }]], - statistics: Statistics::new_unknown(&self.schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = + FileScanConfig::new(self.object_store_url.clone(), self.schema.clone()) + .with_file(PartitionedFile { + object_meta: self.object_meta.clone(), + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + }); let df_schema = self.schema.clone().to_dfschema_ref()?; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index e4f4d229c416a..4f50c55c627cd 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -30,8 +30,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, }; +use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion::physical_plan::{collect, Statistics}; use datafusion::prelude::SessionContext; use datafusion_common::Result; @@ -63,7 +63,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { let file_schema = batch.schema().clone(); let (in_memory_object_store, parquet_files_meta) = store_parquet_in_memory(vec![batch]).await; - let file_groups = parquet_files_meta + let file_group = parquet_files_meta .into_iter() .map(|meta| PartitionedFile { object_meta: meta, @@ -76,17 +76,12 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { // prepare the scan let parquet_exec = ParquetExec::new( - FileScanConfig { + FileScanConfig::new( // just any url that doesn't point to in memory object store - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), + ObjectStoreUrl::local_filesystem(), file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + ) + .with_file_group(file_group), None, None, Default::default(), diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 8f42f21834cc3..2e9cda40c330e 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -27,7 +27,7 @@ use datafusion::execution::context::SessionState; use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; -use datafusion_common::{ScalarValue, Statistics, ToDFSchema}; +use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; @@ -71,17 +71,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { let predicate = create_physical_expr(&filter, &df_schema, &execution_props).unwrap(); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url, - file_groups: vec![vec![partitioned_file]], - file_schema: schema.clone(), - statistics: Statistics::new_unknown(&schema), - // file has 10 cols so index 12 should be month - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, schema).with_file(partitioned_file), Some(predicate), None, Default::default(), diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 10c4e8a4c059c..ead2884e43c5d 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -30,7 +30,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; -use datafusion::physical_plan::{collect, Statistics}; +use datafusion::physical_plan::collect; use datafusion::prelude::SessionContext; use datafusion::datasource::listing::PartitionedFile; @@ -83,16 +83,8 @@ async fn can_override_schema_adapter() { // prepare the scan let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![partitioned_file]], - statistics: Statistics::new_unknown(&schema), - file_schema: schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_file(partitioned_file), None, None, Default::default(), diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 88f795d2a4fe1..ac51b4f712015 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -26,7 +26,7 @@ use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::collect; use datafusion::prelude::SessionContext; -use datafusion_common::{Result, Statistics}; +use datafusion_common::Result; use datafusion_execution::object_store::ObjectStoreUrl; use object_store::path::Path; @@ -51,7 +51,7 @@ async fn multi_parquet_coercion() { let batch2 = RecordBatch::try_from_iter(vec![("c2", c2), ("c3", c3)]).unwrap(); let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + let file_group = meta.into_iter().map(Into::into).collect(); // cast c1 to utf8, c2 to int32, c3 to float64 let file_schema = Arc::new(Schema::new(vec![ @@ -60,16 +60,8 @@ async fn multi_parquet_coercion() { Field::new("c3", DataType::Float64, true), ])); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_group(file_group), None, None, Default::default(), @@ -115,7 +107,7 @@ async fn multi_parquet_coercion_projection() { RecordBatch::try_from_iter(vec![("c2", c2), ("c1", c1s), ("c3", c3)]).unwrap(); let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + let file_group = meta.into_iter().map(Into::into).collect(); // cast c1 to utf8, c2 to int32, c3 to float64 let file_schema = Arc::new(Schema::new(vec![ @@ -124,16 +116,9 @@ async fn multi_parquet_coercion_projection() { Field::new("c3", DataType::Float64, true), ])); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![1, 0, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_group(file_group) + .with_projection(Some(vec![1, 0, 2])), None, None, Default::default(), diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index 126f83f7e238e..7697c01d63f22 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -51,7 +51,14 @@ impl ObjectStoreUrl { Ok(Self { url: parsed }) } - /// An [`ObjectStoreUrl`] for the local filesystem + /// An [`ObjectStoreUrl`] for the local filesystem (`file://`) + /// + /// # Example + /// ``` + /// # use datafusion_execution::object_store::ObjectStoreUrl; + /// let local_fs = ObjectStoreUrl::parse("file://").unwrap(); + /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem()) + /// ``` pub fn local_filesystem() -> Self { Self::parse("file://").unwrap() } diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 50b08e7793f0d..68f8b02b0f090 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -24,7 +24,7 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::error::{DataFusionError, Result}; -use datafusion::physical_plan::{ExecutionPlan, Statistics}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use async_recursion::async_recursion; @@ -104,16 +104,11 @@ pub async fn from_substrait_rel( file_groups[part_index].push(partitioned_file) } - let mut base_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: Arc::new(Schema::empty()), - file_groups, - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }; + let mut base_config = FileScanConfig::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(Schema::empty()), + ) + .with_file_groups(file_groups); if let Some(MaskExpression { select, .. }) = &read.projection { if let Some(projection) = &select.as_ref() { diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index 70887e3934919..aca044319406e 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -23,7 +23,7 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::error::Result; -use datafusion::physical_plan::{displayable, ExecutionPlan, Statistics}; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_substrait::physical_plan::{consumer, producer}; @@ -31,25 +31,20 @@ use substrait::proto::extensions; #[tokio::test] async fn parquet_exec() -> Result<()> { - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: Arc::new(Schema::empty()), - file_groups: vec![ - vec![PartitionedFile::new( - "file://foo/part-0.parquet".to_string(), - 123, - )], - vec![PartitionedFile::new( - "file://foo/part-1.parquet".to_string(), - 123, - )], - ], - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = FileScanConfig::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(Schema::empty()), + ) + .with_file_groups(vec![ + vec![PartitionedFile::new( + "file://foo/part-0.parquet".to_string(), + 123, + )], + vec![PartitionedFile::new( + "file://foo/part-1.parquet".to_string(), + 123, + )], + ]); let parquet_exec: Arc = Arc::new(ParquetExec::new( scan_config, None,