diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 34afe75a6ea20..01d8ea6eac81c 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -132,11 +132,23 @@ impl TableProviderFactory for ListingTableFactory { Some(cmd.order_exprs.clone()) }; + // look for 'infinite' as an option + let infinite_source = match cmd.options.get("infinite_source").map(|s| s.as_str()) + { + None => false, + Some("true") => true, + Some("false") => false, + Some(value) => { + return Err(DataFusionError::Plan(format!("Unknown value for infinite_source: {value}. Expected 'true' or 'false'"))); + } + }; + let options = ListingOptions::new(file_format) .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) + .with_infinite_source(infinite_source) .with_file_sort_order(file_sort_order); let table_path = ListingTableUrl::parse(&cmd.location)?; diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 6591d8346558e..40c1618cda4ed 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -248,6 +248,10 @@ impl Display for FileScanConfig { write!(f, ", limit={}", limit)?; } + if self.infinite_source { + write!(f, ", infinite_source=true")?; + } + if let Some(orders) = ordering { if !orders.is_empty() { write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?; diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt index 856994881f61f..2fdb8e7d0108a 100644 --- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt +++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt @@ -620,7 +620,7 @@ NULL statement ok drop table foo; -# craete csv table with empty csv file +# create csv table with empty csv file statement ok CREATE EXTERNAL TABLE empty STORED AS CSV WITH HEADER ROW LOCATION 'tests/data/empty.csv'; @@ -665,3 +665,61 @@ CREATE SCHEMA empty_schema; statement ok DROP SCHEMA empty_schema; + +########## +# creating external CSV tables with an infinite marking +########## + +# external table with infinite source +statement ok +CREATE external table t(c1 integer, c2 integer, c3 integer) +STORED as CSV +WITH HEADER ROW +OPTIONS('infinite_source' 'true') +LOCATION 'tests/data/empty.csv'; + +# should see infinite_source=true in the explain +query TT +explain select c1 from t; +---- +logical_plan TableScan: t projection=[c1] +physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], infinite_source=true, has_header=true + +statement ok +drop table t; + + +# external table without explicit non infinite source +statement ok +CREATE external table t(c1 integer, c2 integer, c3 integer) +STORED as CSV +WITH HEADER ROW +OPTIONS('infinite_source' 'false') +LOCATION 'tests/data/empty.csv'; + +# expect to see no infinite_source in the explain +query TT +explain select c1 from t; +---- +logical_plan TableScan: t projection=[c1] +physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], has_header=true + +statement ok +drop table t; + + +# error conditions +statement error DataFusion error: Error during planning: Unknown value for infinite_source: FALSE\. Expected 'true' or 'false' +CREATE external table t(c1 integer, c2 integer, c3 integer) +STORED as CSV +WITH HEADER ROW +OPTIONS('infinite_source' 'FALSE') +LOCATION 'tests/data/empty.csv'; + +# error conditions +statement error DataFusion error: Error during planning: Unknown value for infinite_source: g\. Expected 'true' or 'false' +CREATE external table t(c1 integer, c2 integer, c3 integer) +STORED as CSV +WITH HEADER ROW +OPTIONS('infinite_source' 'g') +LOCATION 'tests/data/empty.csv';