Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl ListingSchemaProvider {
definition: None,
file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
order_exprs: vec![],
unbounded: false,
options: Default::default(),
},
)
Expand Down
11 changes: 2 additions & 9 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,7 @@ impl TableProviderFactory for ListingTableFactory {
};

// look for 'infinite' as an option
let infinite_source = match cmd.options.get("infinite_source").map(|s| s.as_str())
{
None => false,
Some("true") => true,
Some("false") => false,
Some(value) => {
return Err(DataFusionError::Plan(format!("Unknown value for infinite_source: {value}. Expected 'true' or 'false'")));
}
};
let infinite_source = cmd.unbounded;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down Expand Up @@ -208,6 +200,7 @@ mod tests {
file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
definition: None,
order_exprs: vec![],
unbounded: false,
options: HashMap::new(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,7 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH HEAD ROW LOCATION 'foo.csv';
# Missing `anything` in WITH clause
statement error DataFusion error: SQL error: ParserError\("Expected HEADER, found: LOCATION"\)
CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH LOCATION 'foo.csv';

# Unrecognized random clause
statement error DataFusion error: SQL error: ParserError\("Unexpected token FOOBAR"\)
CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 'foo.csv';
21 changes: 1 addition & 20 deletions datafusion/core/tests/sqllogictests/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -711,10 +711,9 @@ DROP SCHEMA empty_schema;

# external table with infinite source
statement ok
CREATE external table t(c1 integer, c2 integer, c3 integer)
CREATE UNBOUNDED external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';

# should see infinite_source=true in the explain
Expand All @@ -733,7 +732,6 @@ statement ok
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'false')
LOCATION 'tests/data/empty.csv';

# expect to see no infinite_source in the explain
Expand All @@ -745,20 +743,3 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te

statement ok
drop table t;


# error conditions
statement error DataFusion error: Error during planning: Unknown value for infinite_source: FALSE\. Expected 'true' or 'false'
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'FALSE')
LOCATION 'tests/data/empty.csv';

# error conditions
statement error DataFusion error: Error during planning: Unknown value for infinite_source: g\. Expected 'true' or 'false'
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
OPTIONS('infinite_source' 'g')
LOCATION 'tests/data/empty.csv';
3 changes: 1 addition & 2 deletions datafusion/core/tests/sqllogictests/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
statement ok
CREATE EXTERNAL TABLE annotated_data_infinite2 (
CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
a0 INTEGER,
a INTEGER,
b INTEGER,
Expand All @@ -1938,7 +1938,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 (
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC, c ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/window_2.csv';

# test_window_agg_sort
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/tests/sqllogictests/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2451,15 +2451,14 @@ LOCATION 'tests/data/window_1.csv'
# Source is CsvExec which is ordered by ts column.
# Infinite source
statement ok
CREATE EXTERNAL TABLE annotated_data_infinite (
CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite (
ts INTEGER,
inc_col INTEGER,
desc_col INTEGER,
)
STORED AS CSV
WITH HEADER ROW
WITH ORDER (ts ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/window_1.csv';

# test_source_sorted_aggregate
Expand Down Expand Up @@ -2866,7 +2865,7 @@ LOCATION 'tests/data/window_2.csv';
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
statement ok
CREATE EXTERNAL TABLE annotated_data_infinite2 (
CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
a0 INTEGER,
a INTEGER,
b INTEGER,
Expand All @@ -2876,7 +2875,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 (
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC, c ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/window_2.csv';


Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ pub struct CreateExternalTable {
pub order_exprs: Vec<Expr>,
/// File compression type (GZIP, BZIP2, XZ, ZSTD)
pub file_compression_type: CompressionTypeVariant,
/// Whether the table is an infinite streams
pub unbounded: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, fn hash<H: Hasher>(&self, state: &mut H) function should use self.unbounded during hashing also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I miss that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

/// Table(provider) specific options
pub options: HashMap<String, String>,
}
Expand All @@ -211,6 +213,7 @@ impl Hash for CreateExternalTable {
self.definition.hash(state);
self.file_compression_type.hash(state);
self.order_exprs.hash(state);
self.unbounded.hash(state);
self.options.len().hash(state); // HashMap is not hashable
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ message CreateExternalTableNode {
string definition = 9;
string file_compression_type = 10;
repeated LogicalExprNode order_exprs = 13;
bool unbounded = 14;
map<string, string> options = 11;
}

Expand Down
17 changes: 17 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: create_extern_table.if_not_exists,
file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
definition,
unbounded: create_extern_table.unbounded,
options: create_extern_table.options.clone(),
})))
}
Expand Down Expand Up @@ -1184,6 +1185,7 @@ impl AsLogicalPlan for LogicalPlanNode {
definition,
file_compression_type,
order_exprs,
unbounded,
options,
},
)) => Ok(protobuf::LogicalPlanNode {
Expand All @@ -1203,6 +1205,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.collect::<Result<Vec<_>, to_proto::Error>>()?,
definition: definition.clone().unwrap_or_default(),
file_compression_type: file_compression_type.to_string(),
unbounded: *unbounded,
options: options.clone(),
},
)),
Expand Down
Loading