diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9ad5cf995932d..3ddf1c85e241b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -17,44 +17,53 @@ //! The table implementation. -use std::collections::HashMap; -use std::{any::Any, str::FromStr, sync::Arc}; - -use super::helpers::{expr_applicable_for_cols, pruned_partition_list}; -use super::{ListingTableUrl, PartitionedFile}; -use crate::datasource::{ - create_ordering, - file_format::{file_compression_type::FileCompressionType, FileFormat}, - physical_plan::FileSinkConfig, +use super::{ + helpers::{expr_applicable_for_cols, pruned_partition_list}, + ListingTableUrl, PartitionedFile, +}; +use crate::{ + datasource::file_format::{file_compression_type::FileCompressionType, FileFormat}, + datasource::{create_ordering, physical_plan::FileSinkConfig}, + execution::context::SessionState, }; -use crate::execution::context::SessionState; - use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; +use async_trait::async_trait; use datafusion_catalog::{Session, TableProvider}; -use datafusion_common::stats::Precision; use datafusion_common::{ config_datafusion_err, config_err, internal_err, plan_err, project_schema, - Constraints, DataFusionError, Result, SchemaExt, + stats::Precision, Constraints, DataFusionError, Result, SchemaExt, }; -use datafusion_datasource::compute_all_files_statistics; -use datafusion_datasource::file_groups::FileGroup; -use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; -use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; -use datafusion_execution::cache::{ - cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache, +use datafusion_datasource::{ + compute_all_files_statistics, + file_groups::FileGroup, + file_scan_config::{FileScanConfig, FileScanConfigBuilder}, + schema_adapter::DefaultSchemaAdapterFactory, +}; +use datafusion_execution::{ + cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache}, + config::SessionConfig, +}; +use datafusion_expr::{ + dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, }; -use datafusion_execution::config::SessionConfig; -use datafusion_expr::dml::InsertOp; -use datafusion_expr::{Expr, SortExpr, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::empty::EmptyExec; -use datafusion_physical_plan::{ExecutionPlan, Statistics}; - -use async_trait::async_trait; +use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; +use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; +/// Indicates the source of the schema for a [`ListingTable`] +// PartialEq required for assert_eq! in tests +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum SchemaSource { + /// Schema is not yet set (initial state) + None, + /// Schema was inferred from first table_path + Inferred, + /// Schema was specified explicitly via with_schema + Specified, +} /// Configuration for creating a [`ListingTable`] /// @@ -72,6 +81,8 @@ pub struct ListingTableConfig { /// /// See details on [`ListingTableConfig::with_listing_options`] pub options: Option, + /// Tracks the source of the schema information + schema_source: SchemaSource, } impl ListingTableConfig { @@ -82,6 +93,7 @@ impl ListingTableConfig { table_paths, file_schema: None, options: None, + schema_source: SchemaSource::None, } } @@ -93,8 +105,14 @@ impl ListingTableConfig { table_paths, file_schema: None, options: None, + schema_source: SchemaSource::None, } } + + /// Returns the source of the schema for this configuration + pub fn schema_source(&self) -> SchemaSource { + self.schema_source + } /// Set the `schema` for the overall [`ListingTable`] /// /// [`ListingTable`] will automatically coerce, when possible, the schema @@ -110,6 +128,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: Some(schema), options: self.options, + schema_source: SchemaSource::Specified, } } @@ -122,6 +141,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(listing_options), + schema_source: self.schema_source, } } @@ -201,6 +221,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(listing_options), + schema_source: self.schema_source, }) } @@ -214,16 +235,32 @@ impl ListingTableConfig { pub async fn infer_schema(self, state: &dyn Session) -> Result { match self.options { Some(options) => { - let schema = if let Some(url) = self.table_paths.first() { - options.infer_schema(state, url).await? - } else { - Arc::new(Schema::empty()) + let ListingTableConfig { + table_paths, + file_schema, + options: _, + schema_source, + } = self; + + let (schema, new_schema_source) = match file_schema { + Some(schema) => (schema, schema_source), // Keep existing source if schema exists + None => { + if let Some(url) = table_paths.first() { + ( + options.infer_schema(state, url).await?, + SchemaSource::Inferred, + ) + } else { + (Arc::new(Schema::empty()), SchemaSource::Inferred) + } + } }; Ok(Self { - table_paths: self.table_paths, + table_paths, file_schema: Some(schema), options: Some(options), + schema_source: new_schema_source, }) } None => internal_err!("No `ListingOptions` set for inferring schema"), @@ -264,6 +301,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(options), + schema_source: self.schema_source, }) } None => config_err!("No `ListingOptions` set for inferring schema"), @@ -756,6 +794,8 @@ pub struct ListingTable { /// - Partition columns are derived from directory paths (not stored in files) /// - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet` table_schema: SchemaRef, + /// Indicates how the schema was derived (inferred or explicitly specified) + schema_source: SchemaSource, options: ListingOptions, definition: Option, collected_statistics: FileStatisticsCache, @@ -768,6 +808,9 @@ impl ListingTable { /// /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`] pub fn try_new(config: ListingTableConfig) -> Result { + // Extract schema_source before moving other parts of the config + let schema_source = config.schema_source(); + let file_schema = config .file_schema .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?; @@ -792,6 +835,7 @@ impl ListingTable { table_paths: config.table_paths, file_schema, table_schema, + schema_source, options, definition: None, collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), @@ -845,6 +889,11 @@ impl ListingTable { &self.options } + /// Get the schema source + pub fn schema_source(&self) -> SchemaSource { + self.schema_source + } + /// If file_sort_order is specified, creates the appropriate physical expressions fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) @@ -1252,30 +1301,114 @@ async fn get_files_with_limit( #[cfg(test)] mod tests { use super::*; - use crate::datasource::file_format::csv::CsvFormat; - use crate::datasource::file_format::json::JsonFormat; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; - use crate::datasource::{provider_as_source, DefaultTableSource, MemTable}; - use crate::execution::options::ArrowReadOptions; use crate::prelude::*; - use crate::test::columns; - use crate::test::object_store::{ - ensure_head_concurrency, make_test_store_and_state, register_test_store, + use crate::{ + datasource::{ + file_format::csv::CsvFormat, file_format::json::JsonFormat, + provider_as_source, DefaultTableSource, MemTable, + }, + execution::options::ArrowReadOptions, + test::{ + columns, object_store::ensure_head_concurrency, + object_store::make_test_store_and_state, object_store::register_test_store, + }, + }; + use arrow::{compute::SortOptions, record_batch::RecordBatch}; + use datafusion_common::{ + assert_contains, + stats::Precision, + test_util::{batches_to_string, datafusion_test_data}, + ScalarValue, }; - - use arrow::compute::SortOptions; - use arrow::record_batch::RecordBatch; - use datafusion_common::stats::Precision; - use datafusion_common::test_util::batches_to_string; - use datafusion_common::{assert_contains, ScalarValue}; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::{collect, ExecutionPlanProperties}; - + use std::io::Write; use tempfile::TempDir; use url::Url; + /// Creates a test schema with standard field types used in tests + fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Float32, true), + Field::new("c2", DataType::Float64, true), + Field::new("c3", DataType::Boolean, true), + Field::new("c4", DataType::Utf8, true), + ])) + } + + /// Helper function to generate test file paths with given prefix, count, and optional start index + fn generate_test_files(prefix: &str, count: usize) -> Vec { + generate_test_files_with_start(prefix, count, 0) + } + + /// Helper function to generate test file paths with given prefix, count, and start index + fn generate_test_files_with_start( + prefix: &str, + count: usize, + start_index: usize, + ) -> Vec { + (start_index..start_index + count) + .map(|i| format!("{prefix}/file{i}")) + .collect() + } + + #[tokio::test] + async fn test_schema_source_tracking_comprehensive() -> Result<()> { + let ctx = SessionContext::new(); + let testdata = datafusion_test_data(); + let filename = format!("{testdata}/aggregate_simple.csv"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + + // Test default schema source + let config = ListingTableConfig::new(table_path.clone()); + assert_eq!(config.schema_source(), SchemaSource::None); + + // Test schema source after setting a schema explicitly + let provided_schema = create_test_schema(); + let config_with_schema = config.clone().with_schema(provided_schema.clone()); + assert_eq!(config_with_schema.schema_source(), SchemaSource::Specified); + + // Test schema source after inferring schema + let format = CsvFormat::default(); + let options = ListingOptions::new(Arc::new(format)); + let config_with_options = config.with_listing_options(options.clone()); + assert_eq!(config_with_options.schema_source(), SchemaSource::None); + + let config_with_inferred = config_with_options.infer_schema(&ctx.state()).await?; + assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred); + + // Test schema preservation through operations + let config_with_schema_and_options = config_with_schema + .clone() + .with_listing_options(options.clone()); + assert_eq!( + config_with_schema_and_options.schema_source(), + SchemaSource::Specified + ); + + // Make sure inferred schema doesn't override specified schema + let config_with_schema_and_infer = config_with_schema_and_options + .clone() + .infer(&ctx.state()) + .await?; + assert_eq!( + config_with_schema_and_infer.schema_source(), + SchemaSource::Specified + ); + + // Verify sources in actual ListingTable objects + let table_specified = ListingTable::try_new(config_with_schema_and_options)?; + assert_eq!(table_specified.schema_source(), SchemaSource::Specified); + + let table_inferred = ListingTable::try_new(config_with_inferred)?; + assert_eq!(table_inferred.schema_source(), SchemaSource::Inferred); + + Ok(()) + } + #[tokio::test] async fn read_single_file() -> Result<()> { let ctx = SessionContext::new_with_config( @@ -1305,85 +1438,6 @@ mod tests { Ok(()) } - #[cfg(feature = "parquet")] - #[tokio::test] - async fn do_not_load_table_stats_by_default() -> Result<()> { - use crate::datasource::file_format::parquet::ParquetFormat; - - let testdata = crate::test_util::parquet_test_data(); - let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let table_path = ListingTableUrl::parse(filename).unwrap(); - - let ctx = SessionContext::new(); - let state = ctx.state(); - - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt.infer_schema(&state, &table_path).await?; - let config = ListingTableConfig::new(table_path.clone()) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; - - let exec = table.scan(&state, None, &[], None).await?; - assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - assert_eq!( - exec.partition_statistics(None)?.total_byte_size, - Precision::Absent - ); - - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) - .with_collect_stat(true); - let schema = opt.infer_schema(&state, &table_path).await?; - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; - - let exec = table.scan(&state, None, &[], None).await?; - assert_eq!( - exec.partition_statistics(None)?.num_rows, - Precision::Exact(8) - ); - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - assert_eq!( - exec.partition_statistics(None)?.total_byte_size, - Precision::Exact(671) - ); - - Ok(()) - } - - #[cfg(feature = "parquet")] - #[tokio::test] - async fn load_table_stats_when_no_stats() -> Result<()> { - use crate::datasource::file_format::parquet::ParquetFormat; - - let testdata = crate::test_util::parquet_test_data(); - let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let table_path = ListingTableUrl::parse(filename).unwrap(); - - let ctx = SessionContext::new(); - let state = ctx.state(); - - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) - .with_collect_stat(false); - let schema = opt.infer_schema(&state, &table_path).await?; - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(schema); - let table = ListingTable::try_new(config)?; - - let exec = table.scan(&state, None, &[], None).await?; - assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); - assert_eq!( - exec.partition_statistics(None)?.total_byte_size, - Precision::Absent - ); - - Ok(()) - } - #[cfg(feature = "parquet")] #[tokio::test] async fn test_try_create_output_ordering() { @@ -1514,263 +1568,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_assert_list_files_for_scan_grouping() -> Result<()> { - // more expected partitions than files - assert_list_files_for_scan_grouping( - &[ - "bucket/key-prefix/file0", - "bucket/key-prefix/file1", - "bucket/key-prefix/file2", - "bucket/key-prefix/file3", - "bucket/key-prefix/file4", - ], - "test:///bucket/key-prefix/", - 12, - 5, - Some(""), - ) - .await?; - - // as many expected partitions as files - assert_list_files_for_scan_grouping( - &[ - "bucket/key-prefix/file0", - "bucket/key-prefix/file1", - "bucket/key-prefix/file2", - "bucket/key-prefix/file3", - ], - "test:///bucket/key-prefix/", - 4, - 4, - Some(""), - ) - .await?; - - // more files as expected partitions - assert_list_files_for_scan_grouping( - &[ - "bucket/key-prefix/file0", - "bucket/key-prefix/file1", - "bucket/key-prefix/file2", - "bucket/key-prefix/file3", - "bucket/key-prefix/file4", - ], - "test:///bucket/key-prefix/", - 2, - 2, - Some(""), - ) - .await?; - - // no files => no groups - assert_list_files_for_scan_grouping( - &[], - "test:///bucket/key-prefix/", - 2, - 0, - Some(""), - ) - .await?; - - // files that don't match the prefix - assert_list_files_for_scan_grouping( - &[ - "bucket/key-prefix/file0", - "bucket/key-prefix/file1", - "bucket/other-prefix/roguefile", - ], - "test:///bucket/key-prefix/", - 10, - 2, - Some(""), - ) - .await?; - - // files that don't match the prefix or the default file extention - assert_list_files_for_scan_grouping( - &[ - "bucket/key-prefix/file0.json", - "bucket/key-prefix/file1.parquet", - "bucket/other-prefix/roguefile.json", - ], - "test:///bucket/key-prefix/", - 10, - 1, - None, - ) - .await?; - Ok(()) - } - - #[tokio::test] - async fn test_assert_list_files_for_multi_path() -> Result<()> { - // more expected partitions than files - assert_list_files_for_multi_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - "bucket/key3/file5", - ], - &["test:///bucket/key1/", "test:///bucket/key2/"], - 12, - 5, - Some(""), - ) - .await?; - - // as many expected partitions as files - assert_list_files_for_multi_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - "bucket/key3/file5", - ], - &["test:///bucket/key1/", "test:///bucket/key2/"], - 5, - 5, - Some(""), - ) - .await?; - - // more files as expected partitions - assert_list_files_for_multi_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - "bucket/key3/file5", - ], - &["test:///bucket/key1/"], - 2, - 2, - Some(""), - ) - .await?; - - // no files => no groups - assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some("")) - .await?; - - // files that don't match the prefix - assert_list_files_for_multi_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - "bucket/key3/file5", - ], - &["test:///bucket/key3/"], - 2, - 1, - Some(""), - ) - .await?; - - // files that don't match the prefix or the default file ext - assert_list_files_for_multi_paths( - &[ - "bucket/key1/file0.json", - "bucket/key1/file1.csv", - "bucket/key1/file2.json", - "bucket/key2/file3.csv", - "bucket/key2/file4.json", - "bucket/key3/file5.csv", - ], - &["test:///bucket/key1/", "test:///bucket/key3/"], - 2, - 2, - None, - ) - .await?; - Ok(()) - } - - #[tokio::test] - async fn test_assert_list_files_for_exact_paths() -> Result<()> { - // more expected partitions than files - assert_list_files_for_exact_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - ], - 12, - 5, - Some(""), - ) - .await?; - - // more files than meta_fetch_concurrency (32) - let files: Vec = - (0..64).map(|i| format!("bucket/key1/file{i}")).collect(); - // Collect references to each string - let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); - assert_list_files_for_exact_paths(file_refs.as_slice(), 5, 5, Some("")).await?; - - // as many expected partitions as files - assert_list_files_for_exact_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - ], - 5, - 5, - Some(""), - ) - .await?; - - // more files as expected partitions - assert_list_files_for_exact_paths( - &[ - "bucket/key1/file0", - "bucket/key1/file1", - "bucket/key1/file2", - "bucket/key2/file3", - "bucket/key2/file4", - ], - 2, - 2, - Some(""), - ) - .await?; - - // no files => no groups - assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?; - - // files that don't match the default file ext - assert_list_files_for_exact_paths( - &[ - "bucket/key1/file0.json", - "bucket/key1/file1.csv", - "bucket/key1/file2.json", - "bucket/key2/file3.csv", - "bucket/key2/file4.json", - "bucket/key3/file5.csv", - ], - 2, - 2, - None, - ) - .await?; - Ok(()) - } - async fn load_table( ctx: &SessionContext, name: &str, @@ -1873,10 +1670,10 @@ mod tests { .execution .meta_fetch_concurrency; let expected_concurrency = files.len().min(meta_fetch_concurrency); - let head_blocking_store = ensure_head_concurrency(store, expected_concurrency); + let head_concurrency_store = ensure_head_concurrency(store, expected_concurrency); let url = Url::parse("test://").unwrap(); - ctx.register_object_store(&url, head_blocking_store.clone()); + ctx.register_object_store(&url, head_concurrency_store.clone()); let format = JsonFormat::default(); @@ -1903,80 +1700,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_new_json_files() -> Result<()> { - let mut config_map: HashMap = HashMap::new(); - config_map.insert("datafusion.execution.batch_size".into(), "10".into()); - config_map.insert( - "datafusion.execution.soft_max_rows_per_output_file".into(), - "10".into(), - ); - helper_test_append_new_files_to_table( - JsonFormat::default().get_ext(), - FileCompressionType::UNCOMPRESSED, - Some(config_map), - 2, - ) - .await?; - Ok(()) - } - - #[tokio::test] - async fn test_insert_into_append_new_csv_files() -> Result<()> { - let mut config_map: HashMap = HashMap::new(); - config_map.insert("datafusion.execution.batch_size".into(), "10".into()); - config_map.insert( - "datafusion.execution.soft_max_rows_per_output_file".into(), - "10".into(), - ); - helper_test_append_new_files_to_table( - CsvFormat::default().get_ext(), - FileCompressionType::UNCOMPRESSED, - Some(config_map), - 2, - ) - .await?; - Ok(()) - } - - #[cfg(feature = "parquet")] - #[tokio::test] - async fn test_insert_into_append_2_new_parquet_files_defaults() -> Result<()> { - let mut config_map: HashMap = HashMap::new(); - config_map.insert("datafusion.execution.batch_size".into(), "10".into()); - config_map.insert( - "datafusion.execution.soft_max_rows_per_output_file".into(), - "10".into(), - ); - helper_test_append_new_files_to_table( - ParquetFormat::default().get_ext(), - FileCompressionType::UNCOMPRESSED, - Some(config_map), - 2, - ) - .await?; - Ok(()) - } - - #[cfg(feature = "parquet")] - #[tokio::test] - async fn test_insert_into_append_1_new_parquet_files_defaults() -> Result<()> { - let mut config_map: HashMap = HashMap::new(); - config_map.insert("datafusion.execution.batch_size".into(), "20".into()); - config_map.insert( - "datafusion.execution.soft_max_rows_per_output_file".into(), - "20".into(), - ); - helper_test_append_new_files_to_table( - ParquetFormat::default().get_ext(), - FileCompressionType::UNCOMPRESSED, - Some(config_map), - 1, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_sql_csv_defaults() -> Result<()> { helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None) @@ -2452,4 +2175,382 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn infer_preserves_provided_schema() -> Result<()> { + let ctx = SessionContext::new(); + + let testdata = datafusion_test_data(); + let filename = format!("{testdata}/aggregate_simple.csv"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let provided_schema = create_test_schema(); + + let config = + ListingTableConfig::new(table_path).with_schema(Arc::clone(&provided_schema)); + + let config = config.infer(&ctx.state()).await?; + + assert_eq!(*config.file_schema.unwrap(), *provided_schema); + + Ok(()) + } + + #[tokio::test] + async fn test_listing_table_config_with_multiple_files_comprehensive() -> Result<()> { + let ctx = SessionContext::new(); + + // Create test files with different schemas + let tmp_dir = TempDir::new()?; + let file_path1 = tmp_dir.path().join("file1.csv"); + let file_path2 = tmp_dir.path().join("file2.csv"); + + // File 1: c1,c2,c3 + let mut file1 = std::fs::File::create(&file_path1)?; + writeln!(file1, "c1,c2,c3")?; + writeln!(file1, "1,2,3")?; + writeln!(file1, "4,5,6")?; + + // File 2: c1,c2,c3,c4 + let mut file2 = std::fs::File::create(&file_path2)?; + writeln!(file2, "c1,c2,c3,c4")?; + writeln!(file2, "7,8,9,10")?; + writeln!(file2, "11,12,13,14")?; + + // Parse paths + let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?; + let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?; + + // Create format and options + let format = CsvFormat::default().with_has_header(true); + let options = ListingOptions::new(Arc::new(format)); + + // Test case 1: Infer schema using first file's schema + let config1 = ListingTableConfig::new_with_multi_paths(vec![ + table_path1.clone(), + table_path2.clone(), + ]) + .with_listing_options(options.clone()); + let config1 = config1.infer_schema(&ctx.state()).await?; + assert_eq!(config1.schema_source(), SchemaSource::Inferred); + + // Verify schema matches first file + let schema1 = config1.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema1.fields().len(), 3); + assert_eq!(schema1.field(0).name(), "c1"); + assert_eq!(schema1.field(1).name(), "c2"); + assert_eq!(schema1.field(2).name(), "c3"); + + // Test case 2: Use specified schema with 3 columns + let schema_3cols = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Utf8, true), + ])); + + let config2 = ListingTableConfig::new_with_multi_paths(vec![ + table_path1.clone(), + table_path2.clone(), + ]) + .with_schema(schema_3cols) + .with_listing_options(options.clone()); + let config2 = config2.infer_schema(&ctx.state()).await?; + assert_eq!(config2.schema_source(), SchemaSource::Specified); + + // Verify that the schema is still the one we specified (3 columns) + let schema2 = config2.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema2.fields().len(), 3); + assert_eq!(schema2.field(0).name(), "c1"); + assert_eq!(schema2.field(1).name(), "c2"); + assert_eq!(schema2.field(2).name(), "c3"); + + // Test case 3: Use specified schema with 4 columns + let schema_4cols = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Utf8, true), + Field::new("c4", DataType::Utf8, true), + ])); + + let config3 = ListingTableConfig::new_with_multi_paths(vec![ + table_path1.clone(), + table_path2.clone(), + ]) + .with_schema(schema_4cols) + .with_listing_options(options.clone()); + let config3 = config3.infer_schema(&ctx.state()).await?; + assert_eq!(config3.schema_source(), SchemaSource::Specified); + + // Verify that the schema is still the one we specified (4 columns) + let schema3 = config3.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema3.fields().len(), 4); + assert_eq!(schema3.field(0).name(), "c1"); + assert_eq!(schema3.field(1).name(), "c2"); + assert_eq!(schema3.field(2).name(), "c3"); + assert_eq!(schema3.field(3).name(), "c4"); + + // Test case 4: Verify order matters when inferring schema + let config4 = ListingTableConfig::new_with_multi_paths(vec![ + table_path2.clone(), + table_path1.clone(), + ]) + .with_listing_options(options); + let config4 = config4.infer_schema(&ctx.state()).await?; + + // Should use first file's schema, which now has 4 columns + let schema4 = config4.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema4.fields().len(), 4); + assert_eq!(schema4.field(0).name(), "c1"); + assert_eq!(schema4.field(1).name(), "c2"); + assert_eq!(schema4.field(2).name(), "c3"); + assert_eq!(schema4.field(3).name(), "c4"); + + Ok(()) + } + + #[tokio::test] + async fn test_list_files_configurations() -> Result<()> { + // Define common test cases as (description, files, paths, target_partitions, expected_partitions, file_ext) + let test_cases = vec![ + // Single path cases + ( + "Single path, more partitions than files", + generate_test_files("bucket/key-prefix", 5), + vec!["test:///bucket/key-prefix/"], + 12, + 5, + Some(""), + ), + ( + "Single path, equal partitions and files", + generate_test_files("bucket/key-prefix", 4), + vec!["test:///bucket/key-prefix/"], + 4, + 4, + Some(""), + ), + ( + "Single path, more files than partitions", + generate_test_files("bucket/key-prefix", 5), + vec!["test:///bucket/key-prefix/"], + 2, + 2, + Some(""), + ), + // Multi path cases + ( + "Multi path, more partitions than files", + { + let mut files = generate_test_files("bucket/key1", 3); + files.extend(generate_test_files_with_start("bucket/key2", 2, 3)); + files.extend(generate_test_files_with_start("bucket/key3", 1, 5)); + files + }, + vec!["test:///bucket/key1/", "test:///bucket/key2/"], + 12, + 5, + Some(""), + ), + // No files case + ( + "No files", + vec![], + vec!["test:///bucket/key-prefix/"], + 2, + 0, + Some(""), + ), + // Exact path cases + ( + "Exact paths test", + { + let mut files = generate_test_files("bucket/key1", 3); + files.extend(generate_test_files_with_start("bucket/key2", 2, 3)); + files + }, + vec![ + "test:///bucket/key1/file0", + "test:///bucket/key1/file1", + "test:///bucket/key1/file2", + "test:///bucket/key2/file3", + "test:///bucket/key2/file4", + ], + 12, + 5, + Some(""), + ), + ]; + + // Run each test case + for (test_name, files, paths, target_partitions, expected_partitions, file_ext) in + test_cases + { + println!("Running test: {test_name}"); + + if files.is_empty() { + // Test empty files case + assert_list_files_for_multi_paths( + &[], + &paths, + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } else if paths.len() == 1 { + // Test using single path API + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_scan_grouping( + &file_refs, + paths[0], + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } else if paths[0].contains("test:///bucket/key") { + // Test using multi path API + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_multi_paths( + &file_refs, + &paths, + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } else { + // Test using exact path API for specific cases + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_exact_paths( + &file_refs, + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } + } + + Ok(()) + } + + #[cfg(feature = "parquet")] + #[tokio::test] + async fn test_table_stats_behaviors() -> Result<()> { + use crate::datasource::file_format::parquet::ParquetFormat; + + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + // Test 1: Default behavior - stats not collected + let opt_default = ListingOptions::new(Arc::new(ParquetFormat::default())); + let schema_default = opt_default.infer_schema(&state, &table_path).await?; + let config_default = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt_default) + .with_schema(schema_default); + let table_default = ListingTable::try_new(config_default)?; + + let exec_default = table_default.scan(&state, None, &[], None).await?; + assert_eq!( + exec_default.partition_statistics(None)?.num_rows, + Precision::Absent + ); + + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!( + exec_default.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); + + // Test 2: Explicitly disable stats + let opt_disabled = ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_collect_stat(false); + let schema_disabled = opt_disabled.infer_schema(&state, &table_path).await?; + let config_disabled = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt_disabled) + .with_schema(schema_disabled); + let table_disabled = ListingTable::try_new(config_disabled)?; + + let exec_disabled = table_disabled.scan(&state, None, &[], None).await?; + assert_eq!( + exec_disabled.partition_statistics(None)?.num_rows, + Precision::Absent + ); + assert_eq!( + exec_disabled.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); + + // Test 3: Explicitly enable stats + let opt_enabled = ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_collect_stat(true); + let schema_enabled = opt_enabled.infer_schema(&state, &table_path).await?; + let config_enabled = ListingTableConfig::new(table_path) + .with_listing_options(opt_enabled) + .with_schema(schema_enabled); + let table_enabled = ListingTable::try_new(config_enabled)?; + + let exec_enabled = table_enabled.scan(&state, None, &[], None).await?; + assert_eq!( + exec_enabled.partition_statistics(None)?.num_rows, + Precision::Exact(8) + ); + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!( + exec_enabled.partition_statistics(None)?.total_byte_size, + Precision::Exact(671) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_parameterized() -> Result<()> { + let test_cases = vec![ + // (file_format, batch_size, soft_max_rows, expected_files) + ("json", 10, 10, 2), + ("csv", 10, 10, 2), + #[cfg(feature = "parquet")] + ("parquet", 10, 10, 2), + #[cfg(feature = "parquet")] + ("parquet", 20, 20, 1), + ]; + + for (format, batch_size, soft_max_rows, expected_files) in test_cases { + println!("Testing insert with format: {format}, batch_size: {batch_size}, expected files: {expected_files}"); + + let mut config_map = HashMap::new(); + config_map.insert( + "datafusion.execution.batch_size".into(), + batch_size.to_string(), + ); + config_map.insert( + "datafusion.execution.soft_max_rows_per_output_file".into(), + soft_max_rows.to_string(), + ); + + let file_extension = match format { + "json" => JsonFormat::default().get_ext(), + "csv" => CsvFormat::default().get_ext(), + #[cfg(feature = "parquet")] + "parquet" => ParquetFormat::default().get_ext(), + _ => unreachable!("Unsupported format"), + }; + + helper_test_append_new_files_to_table( + file_extension, + FileCompressionType::UNCOMPRESSED, + Some(config_map), + expected_files, + ) + .await?; + } + + Ok(()) + } }