Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,23 @@ impl TableProviderFactory for ListingTableFactory {
Some(cmd.order_exprs.clone())
};

// look for 'infinite' as an option
let infinite_source = match cmd.options.get("infinite_source").map(|s| s.as_str())
{
None => false,
Some("true") => true,
Some("false") => false,
Some(value) => {
return Err(DataFusionError::Plan(format!("Unknown value for infinite_source: {value}. Expected 'true' or 'false'")));
}
};

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_infinite_source(infinite_source)
.with_file_sort_order(file_sort_order);

let table_path = ListingTableUrl::parse(&cmd.location)?;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ impl Display for FileScanConfig {
write!(f, ", limit={}", limit)?;
}

if self.infinite_source {
write!(f, ", infinite_source=true")?;
}

if let Some(orders) = ordering {
if !orders.is_empty() {
write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?;
Expand Down
60 changes: 59 additions & 1 deletion datafusion/core/tests/sqllogictests/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ NULL
statement ok
drop table foo;

# craete csv table with empty csv file
# create csv table with empty csv file
statement ok
CREATE EXTERNAL TABLE empty STORED AS CSV WITH HEADER ROW LOCATION 'tests/data/empty.csv';

Expand Down Expand Up @@ -665,3 +665,61 @@ CREATE SCHEMA empty_schema;

statement ok
DROP SCHEMA empty_schema;

##########
# creating external CSV tables with an infinite marking
##########

# external table with infinite source
statement ok
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';

# should see infinite_source=true in the explain
query TT
explain select c1 from t;
----
logical_plan TableScan: t projection=[c1]
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], infinite_source=true, has_header=true

statement ok
drop table t;


# external table without explicit non infinite source
statement ok
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'false')
LOCATION 'tests/data/empty.csv';

# expect to see no infinite_source in the explain
query TT
explain select c1 from t;
----
logical_plan TableScan: t projection=[c1]
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], has_header=true

statement ok
drop table t;


# error conditions
statement error DataFusion error: Error during planning: Unknown value for infinite_source: FALSE\. Expected 'true' or 'false'
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'FALSE')
LOCATION 'tests/data/empty.csv';

# error conditions
statement error DataFusion error: Error during planning: Unknown value for infinite_source: g\. Expected 'true' or 'false'
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'g')
LOCATION 'tests/data/empty.csv';