diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3ddf1c85e241b..3c079ba4db3ed 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -36,9 +36,10 @@ use datafusion_common::{ }; use datafusion_datasource::{ compute_all_files_statistics, + file::FileSource, file_groups::FileGroup, file_scan_config::{FileScanConfig, FileScanConfigBuilder}, - schema_adapter::DefaultSchemaAdapterFactory, + schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory}, }; use datafusion_execution::{ cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache}, @@ -55,10 +56,11 @@ use object_store::ObjectStore; use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; /// Indicates the source of the schema for a [`ListingTable`] // PartialEq required for assert_eq! in tests -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Default)] pub enum SchemaSource { /// Schema is not yet set (initial state) - None, + #[default] + Unset, /// Schema was inferred from first table_path Inferred, /// Schema was specified explicitly via with_schema @@ -67,8 +69,20 @@ pub enum SchemaSource { /// Configuration for creating a [`ListingTable`] /// +/// # Schema Evolution Support /// -#[derive(Debug, Clone)] +/// This configuration supports schema evolution through the optional +/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need: +/// +/// - **Type coercion requirements**: When you need custom logic for converting between +/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) +/// - **Column mapping**: You need to map columns with a legacy name to a new name +/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. +/// +/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles +/// basic schema compatibility cases. +/// +#[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. /// They should share the same schema and object store. @@ -83,17 +97,16 @@ pub struct ListingTableConfig { pub options: Option, /// Tracks the source of the schema information schema_source: SchemaSource, + /// Optional [`SchemaAdapterFactory`] for creating schema adapters + schema_adapter_factory: Option>, } impl ListingTableConfig { /// Creates new [`ListingTableConfig`] for reading the specified URL pub fn new(table_path: ListingTableUrl) -> Self { - let table_paths = vec![table_path]; Self { - table_paths, - file_schema: None, - options: None, - schema_source: SchemaSource::None, + table_paths: vec![table_path], + ..Default::default() } } @@ -103,9 +116,7 @@ impl ListingTableConfig { pub fn new_with_multi_paths(table_paths: Vec) -> Self { Self { table_paths, - file_schema: None, - options: None, - schema_source: SchemaSource::None, + ..Default::default() } } @@ -123,12 +134,38 @@ impl ListingTableConfig { /// /// If the schema is provided, it must contain only the fields in the file /// without the table partitioning columns. + /// + /// # Example: Specifying Table Schema + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # use arrow::datatypes::{Schema, Field, DataType}; + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("id", DataType::Int64, false), + /// Field::new("name", DataType::Utf8, true), + /// ])); + /// + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(listing_options) // Set options first + /// .with_schema(schema); // Then set schema + /// ``` pub fn with_schema(self, schema: SchemaRef) -> Self { + // Note: We preserve existing options state, but downstream code may expect + // options to be set. Consider calling with_listing_options() or infer_options() + // before operations that require options to be present. + debug_assert!( + self.options.is_some() || cfg!(test), + "ListingTableConfig::with_schema called without options set. \ + Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code." + ); + Self { - table_paths: self.table_paths, file_schema: Some(schema), - options: self.options, schema_source: SchemaSource::Specified, + ..self } } @@ -136,12 +173,33 @@ impl ListingTableConfig { /// /// If not provided, format and other options are inferred via /// [`Self::infer_options`]. + /// + /// # Example: Configuring Parquet Files with Custom Options + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) + /// .with_file_extension(".parquet") + /// .with_collect_stat(true); + /// + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(options); // Configure file format and options + /// ``` pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { + // Note: This method properly sets options, but be aware that downstream + // methods like infer_schema() and try_new() require both schema and options + // to be set to function correctly. + debug_assert!( + !self.table_paths.is_empty() || cfg!(test), + "ListingTableConfig::with_listing_options called without table_paths set. \ + Consider calling new() or new_with_multi_paths() first to establish table paths." + ); + Self { - table_paths: self.table_paths, - file_schema: self.file_schema, options: Some(listing_options), - schema_source: self.schema_source, + ..self } } @@ -222,6 +280,7 @@ impl ListingTableConfig { file_schema: self.file_schema, options: Some(listing_options), schema_source: self.schema_source, + schema_adapter_factory: self.schema_adapter_factory, }) } @@ -240,6 +299,7 @@ impl ListingTableConfig { file_schema, options: _, schema_source, + schema_adapter_factory, } = self; let (schema, new_schema_source) = match file_schema { @@ -261,6 +321,7 @@ impl ListingTableConfig { file_schema: Some(schema), options: Some(options), schema_source: new_schema_source, + schema_adapter_factory, }) } None => internal_err!("No `ListingOptions` set for inferring schema"), @@ -302,11 +363,58 @@ impl ListingTableConfig { file_schema: self.file_schema, options: Some(options), schema_source: self.schema_source, + schema_adapter_factory: self.schema_adapter_factory, }) } None => config_err!("No `ListingOptions` set for inferring schema"), } } + + /// Set the [`SchemaAdapterFactory`] for the [`ListingTable`] + /// + /// The schema adapter factory is used to create schema adapters that can + /// handle schema evolution and type conversions when reading files with + /// different schemas than the table schema. + /// + /// If not provided, a default schema adapter factory will be used. + /// + /// # Example: Custom Schema Adapter for Type Coercion + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; + /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + /// # + /// # #[derive(Debug)] + /// # struct MySchemaAdapterFactory; + /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { + /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { + /// # unimplemented!() + /// # } + /// # } + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(listing_options) + /// .with_schema(table_schema) + /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory)); + /// ``` + pub fn with_schema_adapter_factory( + self, + schema_adapter_factory: Arc, + ) -> Self { + Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self + } + } + + /// Get the [`SchemaAdapterFactory`] for this configuration + pub fn schema_adapter_factory(&self) -> Option<&Arc> { + self.schema_adapter_factory.as_ref() + } } /// Options for creating a [`ListingTable`] @@ -801,6 +909,8 @@ pub struct ListingTable { collected_statistics: FileStatisticsCache, constraints: Constraints, column_defaults: HashMap, + /// Optional [`SchemaAdapterFactory`] for creating schema adapters + schema_adapter_factory: Option>, } impl ListingTable { @@ -841,6 +951,7 @@ impl ListingTable { collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), constraints: Constraints::default(), column_defaults: HashMap::new(), + schema_adapter_factory: config.schema_adapter_factory, }; Ok(table) @@ -894,6 +1005,70 @@ impl ListingTable { self.schema_source } + /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`] + /// + /// The schema adapter factory is used to create schema adapters that can + /// handle schema evolution and type conversions when reading files with + /// different schemas than the table schema. + /// + /// # Example: Adding Schema Evolution Support + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingOptions, ListingTableUrl}; + /// # use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter}; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema); + /// # let table = ListingTable::try_new(config).unwrap(); + /// let table_with_evolution = table + /// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)); + /// ``` + /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory. + pub fn with_schema_adapter_factory( + self, + schema_adapter_factory: Arc, + ) -> Self { + Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self + } + } + + /// Get the [`SchemaAdapterFactory`] for this table + pub fn schema_adapter_factory(&self) -> Option<&Arc> { + self.schema_adapter_factory.as_ref() + } + + /// Creates a schema adapter for mapping between file and table schemas + /// + /// Uses the configured schema adapter factory if available, otherwise falls back + /// to the default implementation. + fn create_schema_adapter(&self) -> Box { + let table_schema = self.schema(); + match &self.schema_adapter_factory { + Some(factory) => { + factory.create_with_projected_schema(Arc::clone(&table_schema)) + } + None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)), + } + } + + /// Creates a file source and applies schema adapter factory if available + fn create_file_source_with_schema_adapter(&self) -> Result> { + let mut source = self.options.format.file_source(); + // Apply schema adapter to source if available + // + // The source will use this SchemaAdapter to adapt data batches as they flow up the plan. + // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics. + if let Some(factory) = &self.schema_adapter_factory { + source = source.with_schema_adapter_factory(Arc::clone(factory))?; + } + Ok(source) + } + /// If file_sort_order is specified, creates the appropriate physical expressions fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) @@ -1002,6 +1177,8 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + let file_source = self.create_file_source_with_schema_adapter()?; + // create the execution plan self.options .format @@ -1010,7 +1187,7 @@ impl TableProvider for ListingTable { FileScanConfigBuilder::new( object_store_url, Arc::clone(&self.file_schema), - self.options.format.file_source(), + file_source, ) .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) @@ -1169,8 +1346,10 @@ impl ListingTable { self.options.collect_stat, inexact_stats, )?; - let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema()) - .map_schema(self.file_schema.as_ref())?; + + let schema_adapter = self.create_schema_adapter(); + let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; + stats.column_statistics = schema_mapper.map_column_statistics(&stats.column_statistics)?; file_groups.iter_mut().try_for_each(|file_group| { @@ -1320,15 +1499,21 @@ mod tests { assert_contains, stats::Precision, test_util::{batches_to_string, datafusion_test_data}, - ScalarValue, + ColumnStatistics, ScalarValue, + }; + use datafusion_datasource::schema_adapter::{ + SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::{collect, ExecutionPlanProperties}; + use rstest::rstest; use std::io::Write; use tempfile::TempDir; use url::Url; + const DUMMY_NULL_COUNT: Precision = Precision::Exact(42); + /// Creates a test schema with standard field types used in tests fn create_test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ @@ -1364,7 +1549,7 @@ mod tests { // Test default schema source let config = ListingTableConfig::new(table_path.clone()); - assert_eq!(config.schema_source(), SchemaSource::None); + assert_eq!(config.schema_source(), SchemaSource::Unset); // Test schema source after setting a schema explicitly let provided_schema = create_test_schema(); @@ -1375,7 +1560,7 @@ mod tests { let format = CsvFormat::default(); let options = ListingOptions::new(Arc::new(format)); let config_with_options = config.with_listing_options(options.clone()); - assert_eq!(config_with_options.schema_source(), SchemaSource::None); + assert_eq!(config_with_options.schema_source(), SchemaSource::Unset); let config_with_inferred = config_with_options.infer_schema(&ctx.state()).await?; assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred); @@ -2553,4 +2738,262 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_statistics_mapping_with_custom_factory() -> Result<()> { + let ctx = SessionContext::new(); + let table = create_test_listing_table_with_json_and_adapter( + &ctx, + false, + // NullStatsAdapterFactory sets column_statistics null_count to DUMMY_NULL_COUNT + Arc::new(NullStatsAdapterFactory {}), + )?; + + let (groups, stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + + assert_eq!(stats.column_statistics[0].null_count, DUMMY_NULL_COUNT); + for g in groups { + if let Some(s) = g.file_statistics(None) { + assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT); + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_statistics_mapping_with_default_factory() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a table without providing a custom schema adapter factory + // This should fall back to using DefaultSchemaAdapterFactory + let path = "table/file.json"; + register_test_store(&ctx, &[(path, 10)]); + + let format = JsonFormat::default(); + let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(false); + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(Arc::new(schema)); + // Note: NOT calling .with_schema_adapter_factory() to test default behavior + + let table = ListingTable::try_new(config)?; + + // Verify that no custom schema adapter factory is set + assert!(table.schema_adapter_factory().is_none()); + + // The scan should work correctly with the default schema adapter + let scan_result = table.scan(&ctx.state(), None, &[], None).await; + assert!( + scan_result.is_ok(), + "Scan should succeed with default schema adapter" + ); + + // Verify that the default adapter handles basic schema compatibility + let (groups, _stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + assert!( + !groups.is_empty(), + "Should list files successfully with default adapter" + ); + + Ok(()) + } + + #[rstest] + #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")] + #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")] + #[case( + MapSchemaError::InvalidProjection, + "Invalid projection in schema mapping" + )] + #[tokio::test] + async fn test_schema_adapter_map_schema_errors( + #[case] error_type: MapSchemaError, + #[case] expected_error_msg: &str, + ) -> Result<()> { + let ctx = SessionContext::new(); + let table = create_test_listing_table_with_json_and_adapter( + &ctx, + false, + Arc::new(FailingMapSchemaAdapterFactory { error_type }), + )?; + + // The error should bubble up from the scan operation when schema mapping fails + let scan_result = table.scan(&ctx.state(), None, &[], None).await; + + assert!(scan_result.is_err()); + let error_msg = scan_result.unwrap_err().to_string(); + assert!( + error_msg.contains(expected_error_msg), + "Expected error containing '{expected_error_msg}', got: {error_msg}" + ); + + Ok(()) + } + + // Test that errors during file listing also bubble up correctly + #[tokio::test] + async fn test_schema_adapter_error_during_file_listing() -> Result<()> { + let ctx = SessionContext::new(); + let table = create_test_listing_table_with_json_and_adapter( + &ctx, + true, + Arc::new(FailingMapSchemaAdapterFactory { + error_type: MapSchemaError::TypeIncompatible, + }), + )?; + + // The error should bubble up from list_files_for_scan when collecting statistics + let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await; + + assert!(list_result.is_err()); + let error_msg = list_result.unwrap_err().to_string(); + assert!( + error_msg.contains("Cannot map incompatible types"), + "Expected type incompatibility error during file listing, got: {error_msg}" + ); + + Ok(()) + } + + #[derive(Debug, Copy, Clone)] + enum MapSchemaError { + TypeIncompatible, + GeneralFailure, + InvalidProjection, + } + + #[derive(Debug)] + struct FailingMapSchemaAdapterFactory { + error_type: MapSchemaError, + } + + impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(FailingMapSchemaAdapter { + schema: projected_table_schema, + error_type: self.error_type, + }) + } + } + + #[derive(Debug)] + struct FailingMapSchemaAdapter { + schema: SchemaRef, + error_type: MapSchemaError, + } + + impl SchemaAdapter for FailingMapSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + _file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + // Always fail with different error types based on the configured error_type + match self.error_type { + MapSchemaError::TypeIncompatible => { + plan_err!( + "Cannot map incompatible types: Boolean cannot be cast to Utf8" + ) + } + MapSchemaError::GeneralFailure => { + plan_err!("Schema adapter mapping failed due to internal error") + } + MapSchemaError::InvalidProjection => { + plan_err!("Invalid projection in schema mapping: column index out of bounds") + } + } + } + } + + #[derive(Debug)] + struct NullStatsAdapterFactory; + + impl SchemaAdapterFactory for NullStatsAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(NullStatsAdapter { + schema: projected_table_schema, + }) + } + } + + #[derive(Debug)] + struct NullStatsAdapter { + schema: SchemaRef, + } + + impl SchemaAdapter for NullStatsAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + let projection = (0..file_schema.fields().len()).collect(); + Ok((Arc::new(NullStatsMapper {}), projection)) + } + } + + #[derive(Debug)] + struct NullStatsMapper; + + impl SchemaMapper for NullStatsMapper { + fn map_batch(&self, batch: RecordBatch) -> Result { + Ok(batch) + } + + fn map_column_statistics( + &self, + stats: &[ColumnStatistics], + ) -> Result> { + Ok(stats + .iter() + .map(|s| { + let mut s = s.clone(); + s.null_count = DUMMY_NULL_COUNT; + s + }) + .collect()) + } + } + + /// Helper function to create a test ListingTable with JSON format and custom schema adapter factory + fn create_test_listing_table_with_json_and_adapter( + ctx: &SessionContext, + collect_stat: bool, + schema_adapter_factory: Arc, + ) -> Result { + let path = "table/file.json"; + register_test_store(ctx, &[(path, 10)]); + + let format = JsonFormat::default(); + let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat); + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(Arc::new(schema)) + .with_schema_adapter_factory(schema_adapter_factory); + + ListingTable::try_new(config) + } } diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index b43041c8d14db..5e743a3f0c233 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -57,6 +57,17 @@ pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { projected_table_schema: SchemaRef, table_schema: SchemaRef, ) -> Box; + + /// Create a [`SchemaAdapter`] using only the projected table schema. + /// + /// This is a convenience method for cases where the table schema and the + /// projected table schema are the same. + fn create_with_projected_schema( + &self, + projected_table_schema: SchemaRef, + ) -> Box { + self.create(Arc::clone(&projected_table_schema), projected_table_schema) + } } /// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table