diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 32fbf03b580f8..c3ab50fd430c7 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -545,12 +545,11 @@ impl DataSink for CsvSink { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); for part_idx in 0..num_partitions { - let header = true; + let header = self.has_header; let builder = WriterBuilder::new().with_delimiter(self.delimiter); let serializer = CsvSerializer::new() .with_builder(builder) .with_header(header); - serializers.push(Box::new(serializer)); let file_path = base_path .prefix() .child(format!("/{}_{}.csv", write_id, part_idx)); @@ -567,6 +566,8 @@ impl DataSink for CsvSink { object_store.clone(), ) .await?; + + serializers.push(Box::new(serializer)); writers.push(writer); } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 4f2387ad0dfa4..d4e2c4aafeb54 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -217,6 +217,22 @@ pub enum ListingTableInsertMode { ///Throw an error if insert into is attempted on this table Error, } + +impl FromStr for ListingTableInsertMode { + type Err = DataFusionError; + fn from_str(s: &str) -> Result { + let s_lower = s.to_lowercase(); + match s_lower.as_str() { + "append_to_file" => Ok(ListingTableInsertMode::AppendToFile), + "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles), + "error" => Ok(ListingTableInsertMode::Error), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported insert mode {s}. Supported options are \ + append_to_file, append_new_files, and error." + ))), + } + } +} /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -1607,6 +1623,124 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_insert_into_sql_csv_defaults() -> Result<()> { + helper_test_insert_into_sql( + "csv", + FileCompressionType::UNCOMPRESSED, + "OPTIONS (insert_mode 'append_new_files')", + None, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> { + helper_test_insert_into_sql( + "csv", + FileCompressionType::UNCOMPRESSED, + "WITH HEADER ROW \ + OPTIONS (insert_mode 'append_new_files')", + None, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_sql_json_defaults() -> Result<()> { + helper_test_insert_into_sql( + "json", + FileCompressionType::UNCOMPRESSED, + "OPTIONS (insert_mode 'append_new_files')", + None, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_sql_parquet_defaults() -> Result<()> { + helper_test_insert_into_sql( + "parquet", + FileCompressionType::UNCOMPRESSED, + "", + None, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert( + "datafusion.execution.parquet.compression".into(), + "zstd(5)".into(), + ); + config_map.insert( + "datafusion.execution.parquet.dictionary_enabled".into(), + "false".into(), + ); + config_map.insert( + "datafusion.execution.parquet.dictionary_page_size_limit".into(), + "100".into(), + ); + config_map.insert( + "datafusion.execution.parquet.staistics_enabled".into(), + "none".into(), + ); + config_map.insert( + "datafusion.execution.parquet.max_statistics_size".into(), + "10".into(), + ); + config_map.insert( + "datafusion.execution.parquet.max_row_group_size".into(), + "5".into(), + ); + config_map.insert( + "datafusion.execution.parquet.created_by".into(), + "datafusion test".into(), + ); + config_map.insert( + "datafusion.execution.parquet.column_index_truncate_length".into(), + "50".into(), + ); + config_map.insert( + "datafusion.execution.parquet.data_page_row_count_limit".into(), + "50".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_enabled".into(), + "true".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_fpp".into(), + "0.01".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_ndv".into(), + "1000".into(), + ); + config_map.insert( + "datafusion.execution.parquet.writer_version".into(), + "2.0".into(), + ); + config_map.insert( + "datafusion.execution.parquet.write_batch_size".into(), + "5".into(), + ); + helper_test_insert_into_sql( + "parquet", + FileCompressionType::UNCOMPRESSED, + "", + Some(config_map), + ) + .await?; + Ok(()) + } + #[tokio::test] async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -2096,4 +2230,64 @@ mod tests { // Return Ok if the function Ok(()) } + + /// tests insert into with end to end sql + /// create external table + insert into statements + async fn helper_test_insert_into_sql( + file_type: &str, + // TODO test with create statement options such as compression + _file_compression_type: FileCompressionType, + external_table_options: &str, + session_config_map: Option>, + ) -> Result<()> { + // Create the initial context + let session_ctx = match session_config_map { + Some(cfg) => { + let config = SessionConfig::from_string_hash_map(cfg)?; + SessionContext::with_config(config) + } + None => SessionContext::new(), + }; + + // create table + let tmp_dir = TempDir::new()?; + let tmp_path = tmp_dir.into_path(); + let str_path = tmp_path.to_str().expect("Temp path should convert to &str"); + session_ctx + .sql(&format!( + "create external table foo(a varchar, b varchar, c int) \ + stored as {file_type} \ + location '{str_path}' \ + {external_table_options}" + )) + .await? + .collect() + .await?; + + // insert data + session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)") + .await? + .collect() + .await?; + + // check count + let batches = session_ctx + .sql("select * from foo") + .await? + .collect() + .await?; + + let expected = vec![ + "+-----+-----+---+", + "| a | b | c |", + "+-----+-----+---+", + "| foo | bar | 1 |", + "| foo | bar | 2 |", + "| foo | bar | 3 |", + "+-----+-----+---+", + ]; + assert_batches_eq!(expected, &batches); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index fdd696172b1c2..3f2dcf627e3d3 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -40,6 +40,8 @@ use crate::datasource::provider::TableProviderFactory; use crate::datasource::TableProvider; use crate::execution::context::SessionState; +use super::listing::ListingTableInsertMode; + /// A `TableProviderFactory` capable of creating new `ListingTable`s pub struct ListingTableFactory {} @@ -131,13 +133,26 @@ impl TableProviderFactory for ListingTableFactory { // look for 'infinite' as an option let infinite_source = cmd.unbounded; + let explicit_insert_mode = cmd.options.get("insert_mode"); + let insert_mode = match explicit_insert_mode { + Some(mode) => ListingTableInsertMode::from_str(mode), + None => match file_type { + FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), + FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), + FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles), + FileType::JSON => Ok(ListingTableInsertMode::AppendToFile), + FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles), + }, + }?; + let options = ListingOptions::new(file_format) .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) .with_infinite_source(infinite_source) - .with_file_sort_order(cmd.order_exprs.clone()); + .with_file_sort_order(cmd.order_exprs.clone()) + .with_insert_mode(insert_mode); let table_path = ListingTableUrl::parse(&cmd.location)?; let resolved_schema = match provided_schema { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index ad66640efa14c..16036defda949 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -607,11 +607,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { options, } = statement; - // semantic checks - if file_type == "PARQUET" && !columns.is_empty() { - plan_err!("Column definitions can not be specified for PARQUET files.")?; - } - if file_type != "CSV" && file_type != "JSON" && file_compression_type != CompressionTypeVariant::UNCOMPRESSED diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index eef9093947fb9..accb6ec9ced55 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1796,11 +1796,15 @@ fn create_external_table_with_compression_type() { #[test] fn create_external_table_parquet() { let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Plan(\"Column definitions can not be specified for PARQUET files.\")", - format!("{err:?}") - ); + let expected = "CreateExternalTable: Bare { table: \"t\" }"; + quick_test(sql, expected); +} + +#[test] +fn create_external_table_parquet_sort_order() { + let sql = "create external table foo(a varchar, b varchar, c timestamp) stored as parquet location '/tmp/foo' with order (c)"; + let expected = "CreateExternalTable: Bare { table: \"foo\" }"; + quick_test(sql, expected); } #[test]