diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs index b39c3693c7068..50f733c3236ff 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -145,6 +145,7 @@ impl ListingSchemaProvider { definition: None, file_compression_type: CompressionTypeVariant::UNCOMPRESSED, order_exprs: vec![], + unbounded: false, options: Default::default(), }, ) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 44595e5122c1e..bcfb4f3866626 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -133,15 +133,7 @@ impl TableProviderFactory for ListingTableFactory { }; // look for 'infinite' as an option - let infinite_source = match cmd.options.get("infinite_source").map(|s| s.as_str()) - { - None => false, - Some("true") => true, - Some("false") => false, - Some(value) => { - return Err(DataFusionError::Plan(format!("Unknown value for infinite_source: {value}. Expected 'true' or 'false'"))); - } - }; + let infinite_source = cmd.unbounded; let options = ListingOptions::new(file_format) .with_collect_stat(state.config().collect_statistics()) @@ -208,6 +200,7 @@ mod tests { file_compression_type: CompressionTypeVariant::UNCOMPRESSED, definition: None, order_exprs: vec![], + unbounded: false, options: HashMap::new(), }; let table_provider = factory.create(&state, &cmd).await.unwrap(); diff --git a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt index a0784ff96def8..ce6fdfeee00cb 100644 --- a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt +++ b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt @@ -99,3 +99,7 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH HEAD ROW LOCATION 'foo.csv'; # Missing `anything` in WITH clause statement error DataFusion error: SQL error: ParserError\("Expected HEADER, found: LOCATION"\) CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH LOCATION 'foo.csv'; + +# Unrecognized random clause +statement error DataFusion error: SQL error: ParserError\("Unexpected token FOOBAR"\) +CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 'foo.csv'; diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt index afb009f89f3c6..901a1525f1e4b 100644 --- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt +++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt @@ -711,10 +711,9 @@ DROP SCHEMA empty_schema; # external table with infinite source statement ok -CREATE external table t(c1 integer, c2 integer, c3 integer) +CREATE UNBOUNDED external table t(c1 integer, c2 integer, c3 integer) STORED as CSV WITH HEADER ROW -OPTIONS('infinite_source' 'true') LOCATION 'tests/data/empty.csv'; # should see infinite_source=true in the explain @@ -733,7 +732,6 @@ statement ok CREATE external table t(c1 integer, c2 integer, c3 integer) STORED as CSV WITH HEADER ROW -OPTIONS('infinite_source' 'false') LOCATION 'tests/data/empty.csv'; # expect to see no infinite_source in the explain @@ -745,20 +743,3 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te statement ok drop table t; - - -# error conditions -statement error DataFusion error: Error during planning: Unknown value for infinite_source: FALSE\. Expected 'true' or 'false' -CREATE external table t(c1 integer, c2 integer, c3 integer) -STORED as CSV -WITH HEADER ROW -OPTIONS('infinite_source' 'FALSE') -LOCATION 'tests/data/empty.csv'; - -# error conditions -statement error DataFusion error: Error during planning: Unknown value for infinite_source: g\. Expected 'true' or 'false' -CREATE external table t(c1 integer, c2 integer, c3 integer) -STORED as CSV -WITH HEADER ROW -OPTIONS('infinite_source' 'g') -LOCATION 'tests/data/empty.csv'; diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt index b9d2543e11b09..729210022046f 100644 --- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt +++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt @@ -1928,7 +1928,7 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1 # a,b,c column. Column a has cardinality 2, column b has cardinality 4. # Column c has cardinality 100 (unique entries). Column d has cardinality 5. statement ok -CREATE EXTERNAL TABLE annotated_data_infinite2 ( +CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 ( a0 INTEGER, a INTEGER, b INTEGER, @@ -1938,7 +1938,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 ( STORED AS CSV WITH HEADER ROW WITH ORDER (a ASC, b ASC, c ASC) -OPTIONS('infinite_source' 'true') LOCATION 'tests/data/window_2.csv'; # test_window_agg_sort diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 80423a5fe2e9b..b15bc90fb13bd 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -2451,7 +2451,7 @@ LOCATION 'tests/data/window_1.csv' # Source is CsvExec which is ordered by ts column. # Infinite source statement ok -CREATE EXTERNAL TABLE annotated_data_infinite ( +CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite ( ts INTEGER, inc_col INTEGER, desc_col INTEGER, @@ -2459,7 +2459,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite ( STORED AS CSV WITH HEADER ROW WITH ORDER (ts ASC) -OPTIONS('infinite_source' 'true') LOCATION 'tests/data/window_1.csv'; # test_source_sorted_aggregate @@ -2866,7 +2865,7 @@ LOCATION 'tests/data/window_2.csv'; # a,b,c column. Column a has cardinality 2, column b has cardinality 4. # Column c has cardinality 100 (unique entries). Column d has cardinality 5. statement ok -CREATE EXTERNAL TABLE annotated_data_infinite2 ( +CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 ( a0 INTEGER, a INTEGER, b INTEGER, @@ -2876,7 +2875,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 ( STORED AS CSV WITH HEADER ROW WITH ORDER (a ASC, b ASC, c ASC) -OPTIONS('infinite_source' 'true') LOCATION 'tests/data/window_2.csv'; diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index eeaa1c5ddbd47..9ddafea3b6bef 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -192,6 +192,8 @@ pub struct CreateExternalTable { pub order_exprs: Vec, /// File compression type (GZIP, BZIP2, XZ, ZSTD) pub file_compression_type: CompressionTypeVariant, + /// Whether the table is an infinite streams + pub unbounded: bool, /// Table(provider) specific options pub options: HashMap, } @@ -211,6 +213,7 @@ impl Hash for CreateExternalTable { self.definition.hash(state); self.file_compression_type.hash(state); self.order_exprs.hash(state); + self.unbounded.hash(state); self.options.len().hash(state); // HashMap is not hashable } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 08e6360aa30ec..47b54d1a3bca6 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -184,6 +184,7 @@ message CreateExternalTableNode { string definition = 9; string file_compression_type = 10; repeated LogicalExprNode order_exprs = 13; + bool unbounded = 14; map options = 11; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 79f4f06d37d8b..10936b8afa2c5 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3493,6 +3493,9 @@ impl serde::Serialize for CreateExternalTableNode { if !self.order_exprs.is_empty() { len += 1; } + if self.unbounded { + len += 1; + } if !self.options.is_empty() { len += 1; } @@ -3530,6 +3533,9 @@ impl serde::Serialize for CreateExternalTableNode { if !self.order_exprs.is_empty() { struct_ser.serialize_field("orderExprs", &self.order_exprs)?; } + if self.unbounded { + struct_ser.serialize_field("unbounded", &self.unbounded)?; + } if !self.options.is_empty() { struct_ser.serialize_field("options", &self.options)?; } @@ -3560,6 +3566,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { "fileCompressionType", "order_exprs", "orderExprs", + "unbounded", "options", ]; @@ -3576,6 +3583,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { Definition, FileCompressionType, OrderExprs, + Unbounded, Options, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -3609,6 +3617,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { "definition" => Ok(GeneratedField::Definition), "fileCompressionType" | "file_compression_type" => Ok(GeneratedField::FileCompressionType), "orderExprs" | "order_exprs" => Ok(GeneratedField::OrderExprs), + "unbounded" => Ok(GeneratedField::Unbounded), "options" => Ok(GeneratedField::Options), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } @@ -3640,6 +3649,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { let mut definition__ = None; let mut file_compression_type__ = None; let mut order_exprs__ = None; + let mut unbounded__ = None; let mut options__ = None; while let Some(k) = map.next_key()? { match k { @@ -3709,6 +3719,12 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { } order_exprs__ = Some(map.next_value()?); } + GeneratedField::Unbounded => { + if unbounded__.is_some() { + return Err(serde::de::Error::duplicate_field("unbounded")); + } + unbounded__ = Some(map.next_value()?); + } GeneratedField::Options => { if options__.is_some() { return Err(serde::de::Error::duplicate_field("options")); @@ -3731,6 +3747,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { definition: definition__.unwrap_or_default(), file_compression_type: file_compression_type__.unwrap_or_default(), order_exprs: order_exprs__.unwrap_or_default(), + unbounded: unbounded__.unwrap_or_default(), options: options__.unwrap_or_default(), }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 0736b750842a4..aaac764cf95ec 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -293,6 +293,8 @@ pub struct CreateExternalTableNode { pub file_compression_type: ::prost::alloc::string::String, #[prost(message, repeated, tag = "13")] pub order_exprs: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "14")] + pub unbounded: bool, #[prost(map = "string, string", tag = "11")] pub options: ::std::collections::HashMap< ::prost::alloc::string::String, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 8d68a958b2ddb..54eb04c7e8eb0 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -527,6 +527,7 @@ impl AsLogicalPlan for LogicalPlanNode { if_not_exists: create_extern_table.if_not_exists, file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?, definition, + unbounded: create_extern_table.unbounded, options: create_extern_table.options.clone(), }))) } @@ -1184,6 +1185,7 @@ impl AsLogicalPlan for LogicalPlanNode { definition, file_compression_type, order_exprs, + unbounded, options, }, )) => Ok(protobuf::LogicalPlanNode { @@ -1203,6 +1205,7 @@ impl AsLogicalPlan for LogicalPlanNode { .collect::, to_proto::Error>>()?, definition: definition.clone().unwrap_or_default(), file_compression_type: file_compression_type.to_string(), + unbounded: *unbounded, options: options.clone(), }, )), diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index b1116af3cfea0..a70868fa2f48c 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -89,6 +89,8 @@ pub struct CreateExternalTable { pub if_not_exists: bool, /// File compression type (GZIP, BZIP2, XZ) pub file_compression_type: CompressionTypeVariant, + /// Infinite streams? + pub unbounded: bool, /// Table(provider) specific options pub options: HashMap, } @@ -245,7 +247,10 @@ impl<'a> DFParser<'a> { /// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE` pub fn parse_create(&mut self) -> Result { if self.parser.parse_keyword(Keyword::EXTERNAL) { - self.parse_create_external_table() + self.parse_create_external_table(false) + } else if self.parser.parse_keyword(Keyword::UNBOUNDED) { + self.parser.expect_keyword(Keyword::EXTERNAL)?; + self.parse_create_external_table(true) } else { Ok(Statement::Statement(Box::from(self.parser.parse_create()?))) } @@ -396,7 +401,10 @@ impl<'a> DFParser<'a> { }) } - fn parse_create_external_table(&mut self) -> Result { + fn parse_create_external_table( + &mut self, + unbounded: bool, + ) -> Result { self.parser.expect_keyword(Keyword::TABLE)?; let if_not_exists = self.parser @@ -427,39 +435,72 @@ impl<'a> DFParser<'a> { } loop { - if self.parser.parse_keyword(Keyword::STORED) { - self.parser.expect_keyword(Keyword::AS)?; - ensure_not_set(&builder.file_type, "STORED AS")?; - builder.file_type = Some(self.parse_file_format()?); - } else if self.parser.parse_keyword(Keyword::LOCATION) { - ensure_not_set(&builder.location, "LOCATION")?; - builder.location = Some(self.parser.parse_literal_string()?); - } else if self.parser.parse_keyword(Keyword::WITH) { - if self.parser.parse_keyword(Keyword::ORDER) { - ensure_not_set(&builder.order_exprs, "WITH ORDER")?; - builder.order_exprs = Some(self.parse_order_by_exprs()?); - } else { - self.parser.expect_keyword(Keyword::HEADER)?; - self.parser.expect_keyword(Keyword::ROW)?; - ensure_not_set(&builder.has_header, "WITH HEADER ROW")?; - builder.has_header = Some(true); + if let Some(keyword) = self.parser.parse_one_of_keywords(&[ + Keyword::STORED, + Keyword::LOCATION, + Keyword::WITH, + Keyword::DELIMITER, + Keyword::COMPRESSION, + Keyword::PARTITIONED, + Keyword::OPTIONS, + ]) { + match keyword { + Keyword::STORED => { + self.parser.expect_keyword(Keyword::AS)?; + ensure_not_set(&builder.file_type, "STORED AS")?; + builder.file_type = Some(self.parse_file_format()?); + } + Keyword::LOCATION => { + ensure_not_set(&builder.location, "LOCATION")?; + builder.location = Some(self.parser.parse_literal_string()?); + } + Keyword::WITH => { + if self.parser.parse_keyword(Keyword::ORDER) { + ensure_not_set(&builder.order_exprs, "WITH ORDER")?; + builder.order_exprs = Some(self.parse_order_by_exprs()?); + } else { + self.parser.expect_keyword(Keyword::HEADER)?; + self.parser.expect_keyword(Keyword::ROW)?; + ensure_not_set(&builder.has_header, "WITH HEADER ROW")?; + builder.has_header = Some(true); + } + } + Keyword::DELIMITER => { + ensure_not_set(&builder.delimiter, "DELIMITER")?; + builder.delimiter = Some(self.parse_delimiter()?); + } + Keyword::COMPRESSION => { + self.parser.expect_keyword(Keyword::TYPE)?; + ensure_not_set( + &builder.file_compression_type, + "COMPRESSION TYPE", + )?; + builder.file_compression_type = + Some(self.parse_file_compression_type()?); + } + Keyword::PARTITIONED => { + self.parser.expect_keyword(Keyword::BY)?; + ensure_not_set(&builder.table_partition_cols, "PARTITIONED BY")?; + builder.table_partition_cols = Some(self.parse_partitions()?); + } + Keyword::OPTIONS => { + ensure_not_set(&builder.options, "OPTIONS")?; + builder.options = Some(self.parse_options()?); + } + _ => { + unreachable!() + } } - } else if self.parser.parse_keyword(Keyword::DELIMITER) { - ensure_not_set(&builder.delimiter, "DELIMITER")?; - builder.delimiter = Some(self.parse_delimiter()?); - } else if self.parser.parse_keyword(Keyword::COMPRESSION) { - self.parser.expect_keyword(Keyword::TYPE)?; - ensure_not_set(&builder.file_compression_type, "COMPRESSION TYPE")?; - builder.file_compression_type = Some(self.parse_file_compression_type()?); - } else if self.parser.parse_keyword(Keyword::PARTITIONED) { - self.parser.expect_keyword(Keyword::BY)?; - ensure_not_set(&builder.table_partition_cols, "PARTITIONED BY")?; - builder.table_partition_cols = Some(self.parse_partitions()?) - } else if self.parser.parse_keyword(Keyword::OPTIONS) { - ensure_not_set(&builder.options, "OPTIONS")?; - builder.options = Some(self.parse_options()?); } else { - break; + let token = self.parser.next_token(); + if token == Token::EOF || token == Token::SemiColon { + break; + } else { + return Err(ParserError::ParserError(format!( + "Unexpected token {}", + token + ))); + } } } @@ -488,6 +529,7 @@ impl<'a> DFParser<'a> { file_compression_type: builder .file_compression_type .unwrap_or(CompressionTypeVariant::UNCOMPRESSED), + unbounded, options: builder.options.unwrap_or(HashMap::new()), }; Ok(Statement::CreateExternalTable(create)) @@ -610,6 +652,44 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, + options: HashMap::new(), + }); + expect_parse_ok(sql, expected)?; + + // positive case: leading space + let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' "; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: "t".into(), + columns: vec![make_column_def("c1", DataType::Int(None))], + file_type: "CSV".to_string(), + has_header: false, + delimiter: ',', + location: "foo.csv".into(), + table_partition_cols: vec![], + order_exprs: vec![], + if_not_exists: false, + file_compression_type: UNCOMPRESSED, + unbounded: false, + options: HashMap::new(), + }); + expect_parse_ok(sql, expected)?; + + // positive case: leading space + semicolon + let sql = + "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' ;"; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: "t".into(), + columns: vec![make_column_def("c1", DataType::Int(None))], + file_type: "CSV".to_string(), + has_header: false, + delimiter: ',', + location: "foo.csv".into(), + table_partition_cols: vec![], + order_exprs: vec![], + if_not_exists: false, + file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -628,6 +708,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -646,6 +727,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -667,6 +749,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -693,6 +776,7 @@ mod tests { file_compression_type: CompressionTypeVariant::from_str( file_compression_type, )?, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -711,6 +795,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -728,6 +813,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -745,6 +831,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -763,6 +850,7 @@ mod tests { order_exprs: vec![], if_not_exists: true, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -786,6 +874,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::from([("k1".into(), "v1".into())]), }); expect_parse_ok(sql, expected)?; @@ -804,6 +893,7 @@ mod tests { order_exprs: vec![], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::from([ ("k1".into(), "v1".into()), ("k2".into(), "v2".into()), @@ -851,6 +941,7 @@ mod tests { }], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -890,6 +981,7 @@ mod tests { ], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; @@ -925,13 +1017,14 @@ mod tests { }], if_not_exists: false, file_compression_type: UNCOMPRESSED, + unbounded: false, options: HashMap::new(), }); expect_parse_ok(sql, expected)?; // Most complete CREATE EXTERNAL TABLE statement possible let sql = " - CREATE EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float) + CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float) STORED AS PARQUET DELIMITER '*' WITH HEADER ROW @@ -969,6 +1062,7 @@ mod tests { }], if_not_exists: true, file_compression_type: CompressionTypeVariant::ZSTD, + unbounded: true, options: HashMap::from([ ("ROW_GROUP_SIZE".into(), "1024".into()), ("TRUNCATE".into(), "NO".into()), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 16d3c6c793092..64c59eee87fd3 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -588,6 +588,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if_not_exists, file_compression_type, order_exprs, + unbounded, options, } = statement; @@ -629,6 +630,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { definition, file_compression_type, order_exprs: ordered_exprs, + unbounded, options, }, )))