From 9f7fcc055cd41e4070d60b8204f331c0987f1d9a Mon Sep 17 00:00:00 2001 From: Armin Primadi Date: Sun, 7 May 2023 19:50:43 +0700 Subject: [PATCH 1/5] Change CsvReadOptions default file_extension to None --- .../core/src/datasource/file_format/options.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index e51edf829e857..855f46a727f52 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -24,7 +24,6 @@ use async_trait::async_trait; use datafusion_common::DataFusionError; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; -use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; @@ -59,9 +58,9 @@ pub struct CsvReadOptions<'a> { pub schema: Option<&'a Schema>, /// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`. pub schema_infer_max_records: usize, - /// File extension; only files with this extension are selected for data input. - /// Defaults to `FileType::CSV.get_ext().as_str()`. - pub file_extension: &'a str, + /// File extension; only files with this extension are selected for data + /// input. + pub file_extension: Option, /// Partition Columns pub table_partition_cols: Vec<(String, DataType)>, /// File compression type @@ -84,7 +83,7 @@ impl<'a> CsvReadOptions<'a> { schema: None, schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD, delimiter: b',', - file_extension: DEFAULT_CSV_EXTENSION, + file_extension: None, table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, @@ -110,8 +109,8 @@ impl<'a> CsvReadOptions<'a> { } /// Specify the file extension for CSV file selection - pub fn file_extension(mut self, file_extension: &'a str) -> Self { - self.file_extension = file_extension; + pub fn file_extension(mut self, file_extension: impl Into) -> Self { + self.file_extension = Some(file_extension.into()); self } @@ -392,7 +391,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_file_compression_type(self.file_compression_type.to_owned()); ListingOptions::new(Arc::new(file_format)) - .with_file_extension(self.file_extension) + .with_file_extension(self.file_extension.as_deref().unwrap_or("")) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) // TODO: Add file sort order into CsvReadOptions and introduce here. From d07be6eb8f145e20ce072dde9c1155fb401620a1 Mon Sep 17 00:00:00 2001 From: Armin Primadi Date: Mon, 8 May 2023 19:49:38 +0700 Subject: [PATCH 2/5] Revert "Change CsvReadOptions default file_extension to None" This reverts commit 9f7fcc055cd41e4070d60b8204f331c0987f1d9a. --- .../core/src/datasource/file_format/options.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 855f46a727f52..e51edf829e857 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -24,6 +24,7 @@ use async_trait::async_trait; use datafusion_common::DataFusionError; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; +use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; @@ -58,9 +59,9 @@ pub struct CsvReadOptions<'a> { pub schema: Option<&'a Schema>, /// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`. pub schema_infer_max_records: usize, - /// File extension; only files with this extension are selected for data - /// input. - pub file_extension: Option, + /// File extension; only files with this extension are selected for data input. + /// Defaults to `FileType::CSV.get_ext().as_str()`. + pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec<(String, DataType)>, /// File compression type @@ -83,7 +84,7 @@ impl<'a> CsvReadOptions<'a> { schema: None, schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD, delimiter: b',', - file_extension: None, + file_extension: DEFAULT_CSV_EXTENSION, table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, @@ -109,8 +110,8 @@ impl<'a> CsvReadOptions<'a> { } /// Specify the file extension for CSV file selection - pub fn file_extension(mut self, file_extension: impl Into) -> Self { - self.file_extension = Some(file_extension.into()); + pub fn file_extension(mut self, file_extension: &'a str) -> Self { + self.file_extension = file_extension; self } @@ -391,7 +392,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_file_compression_type(self.file_compression_type.to_owned()); ListingOptions::new(Arc::new(file_format)) - .with_file_extension(self.file_extension.as_deref().unwrap_or("")) + .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) // TODO: Add file sort order into CsvReadOptions and introduce here. From 1aa60ec9dcf2f301f2bfa82d24b23f600d091a66 Mon Sep 17 00:00:00 2001 From: Armin Primadi Date: Mon, 8 May 2023 19:53:18 +0700 Subject: [PATCH 3/5] Fix `ListingTableFactory::create` using invalid file extension --- datafusion/core/src/datasource/listing_table_factory.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 01d8ea6eac81c..fbff7dd6c7756 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -17,6 +17,7 @@ //! Factory for creating ListingTables with default options +use std::path::Path; use std::str::FromStr; use std::sync::Arc; @@ -66,8 +67,11 @@ impl TableProviderFactory for ListingTableFactory { DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type)) })?; - let file_extension = - file_type.get_ext_with_compression(file_compression_type.to_owned())?; + let file_path: &str = cmd.location.as_ref(); + let file_extension = Path::new(file_path) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or(""); let file_format: Arc = match file_type { FileType::CSV => Arc::new( From ccdb49f8d8e850ad05ff5be21f4678d7ab92e280 Mon Sep 17 00:00:00 2001 From: Armin Primadi Date: Mon, 8 May 2023 21:30:16 +0700 Subject: [PATCH 4/5] Adding test and fix get extension function --- .../src/datasource/listing_table_factory.rs | 60 +++++++++++++++++-- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index fbff7dd6c7756..e216be2f7571f 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -67,11 +67,7 @@ impl TableProviderFactory for ListingTableFactory { DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type)) })?; - let file_path: &str = cmd.location.as_ref(); - let file_extension = Path::new(file_path) - .extension() - .and_then(|ext| ext.to_str()) - .unwrap_or(""); + let file_extension = get_extension(cmd.location.as_str()); let file_format: Arc = match file_type { FileType::CSV => Arc::new( @@ -168,3 +164,57 @@ impl TableProviderFactory for ListingTableFactory { Ok(Arc::new(table)) } } + +// Get file extension from path +fn get_extension(path: &str) -> String { + let res = Path::new(path) + .extension() + .and_then(|ext| ext.to_str()); + match res { + Some(ext) => format!(".{}", ext), + None => "".to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + use crate::execution::context::SessionContext; + use datafusion_common::parsers::CompressionTypeVariant; + use datafusion_common::{DFSchema, OwnedTableReference}; + + #[tokio::test] + async fn test_create_using_non_std_file_ext() { + let csv_file = tempfile::Builder::new() + .prefix("foo") + .suffix(".tbl") + .tempfile() + .unwrap(); + + let factory = ListingTableFactory::new(); + let context = SessionContext::new(); + let state = context.state(); + let name = OwnedTableReference::bare("foo".to_string()); + let cmd = CreateExternalTable { + name, + location: csv_file.path().to_str().unwrap().to_string(), + file_type: "csv".to_string(), + has_header: true, + delimiter: ',', + schema: Arc::new(DFSchema::empty()), + table_partition_cols: vec![], + if_not_exists: false, + file_compression_type: CompressionTypeVariant::UNCOMPRESSED, + definition: None, + order_exprs: vec![], + options: HashMap::new(), + }; + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider.as_any().downcast_ref::().unwrap(); + let listing_options = listing_table.options(); + assert_eq!(".tbl", listing_options.file_extension); + } +} From adfc4201d67f2325edc33fe3830640bc81faae9e Mon Sep 17 00:00:00 2001 From: Armin Primadi Date: Mon, 8 May 2023 21:35:45 +0700 Subject: [PATCH 5/5] Fix cargo fmt --- datafusion/core/src/datasource/listing_table_factory.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index e216be2f7571f..d61235445a3e6 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -167,9 +167,7 @@ impl TableProviderFactory for ListingTableFactory { // Get file extension from path fn get_extension(path: &str) -> String { - let res = Path::new(path) - .extension() - .and_then(|ext| ext.to_str()); + let res = Path::new(path).extension().and_then(|ext| ext.to_str()); match res { Some(ext) => format!(".{}", ext), None => "".to_string(), @@ -213,7 +211,10 @@ mod tests { options: HashMap::new(), }; let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider.as_any().downcast_ref::().unwrap(); + let listing_table = table_provider + .as_any() + .downcast_ref::() + .unwrap(); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); }