From f7793604ce4af8d108dda66c8dadad68b3996401 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 18:44:35 +0800 Subject: [PATCH 01/16] Add schema adapter factory and related code into ListingTableConfig --- .../core/src/datasource/listing/table.rs | 336 ++++++++++++++++-- 1 file changed, 316 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3ddf1c85e241b..ecf5c18cc1602 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -36,9 +36,10 @@ use datafusion_common::{ }; use datafusion_datasource::{ compute_all_files_statistics, + file::FileSource, file_groups::FileGroup, file_scan_config::{FileScanConfig, FileScanConfigBuilder}, - schema_adapter::DefaultSchemaAdapterFactory, + schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory}, }; use datafusion_execution::{ cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache}, @@ -55,9 +56,10 @@ use object_store::ObjectStore; use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; /// Indicates the source of the schema for a [`ListingTable`] // PartialEq required for assert_eq! in tests -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Default)] pub enum SchemaSource { /// Schema is not yet set (initial state) + #[default] None, /// Schema was inferred from first table_path Inferred, @@ -68,7 +70,7 @@ pub enum SchemaSource { /// Configuration for creating a [`ListingTable`] /// /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. /// They should share the same schema and object store. @@ -83,17 +85,16 @@ pub struct ListingTableConfig { pub options: Option, /// Tracks the source of the schema information schema_source: SchemaSource, + /// Optional [`SchemaAdapterFactory`] for creating schema adapters + schema_adapter_factory: Option>, } impl ListingTableConfig { /// Creates new [`ListingTableConfig`] for reading the specified URL pub fn new(table_path: ListingTableUrl) -> Self { - let table_paths = vec![table_path]; Self { - table_paths, - file_schema: None, - options: None, - schema_source: SchemaSource::None, + table_paths: vec![table_path], + ..Default::default() } } @@ -103,9 +104,7 @@ impl ListingTableConfig { pub fn new_with_multi_paths(table_paths: Vec) -> Self { Self { table_paths, - file_schema: None, - options: None, - schema_source: SchemaSource::None, + ..Default::default() } } @@ -125,10 +124,9 @@ impl ListingTableConfig { /// without the table partitioning columns. pub fn with_schema(self, schema: SchemaRef) -> Self { Self { - table_paths: self.table_paths, file_schema: Some(schema), - options: self.options, schema_source: SchemaSource::Specified, + ..self } } @@ -138,10 +136,8 @@ impl ListingTableConfig { /// [`Self::infer_options`]. pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { Self { - table_paths: self.table_paths, - file_schema: self.file_schema, options: Some(listing_options), - schema_source: self.schema_source, + ..self } } @@ -222,6 +218,7 @@ impl ListingTableConfig { file_schema: self.file_schema, options: Some(listing_options), schema_source: self.schema_source, + schema_adapter_factory: self.schema_adapter_factory, }) } @@ -240,6 +237,7 @@ impl ListingTableConfig { file_schema, options: _, schema_source, + schema_adapter_factory, } = self; let (schema, new_schema_source) = match file_schema { @@ -261,6 +259,7 @@ impl ListingTableConfig { file_schema: Some(schema), options: Some(options), schema_source: new_schema_source, + schema_adapter_factory, }) } None => internal_err!("No `ListingOptions` set for inferring schema"), @@ -302,11 +301,34 @@ impl ListingTableConfig { file_schema: self.file_schema, options: Some(options), schema_source: self.schema_source, + schema_adapter_factory: self.schema_adapter_factory, }) } None => config_err!("No `ListingOptions` set for inferring schema"), } } + + /// Set the [`SchemaAdapterFactory`] for the [`ListingTable`] + /// + /// The schema adapter factory is used to create schema adapters that can + /// handle schema evolution and type conversions when reading files with + /// different schemas than the table schema. + /// + /// If not provided, a default schema adapter factory will be used. + pub fn with_schema_adapter_factory( + self, + schema_adapter_factory: Arc, + ) -> Self { + Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self + } + } + + /// Get the [`SchemaAdapterFactory`] for this configuration + pub fn schema_adapter_factory(&self) -> Option<&Arc> { + self.schema_adapter_factory.as_ref() + } } /// Options for creating a [`ListingTable`] @@ -801,6 +823,8 @@ pub struct ListingTable { collected_statistics: FileStatisticsCache, constraints: Constraints, column_defaults: HashMap, + /// Optional [`SchemaAdapterFactory`] for creating schema adapters + schema_adapter_factory: Option>, } impl ListingTable { @@ -841,6 +865,7 @@ impl ListingTable { collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), constraints: Constraints::default(), column_defaults: HashMap::new(), + schema_adapter_factory: config.schema_adapter_factory, }; Ok(table) @@ -894,6 +919,50 @@ impl ListingTable { self.schema_source } + /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`] + /// + /// The schema adapter factory is used to create schema adapters that can + /// handle schema evolution and type conversions when reading files with + /// different schemas than the table schema. + pub fn with_schema_adapter_factory( + self, + schema_adapter_factory: Arc, + ) -> Self { + Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self + } + } + + /// Get the [`SchemaAdapterFactory`] for this table + pub fn schema_adapter_factory(&self) -> Option<&Arc> { + self.schema_adapter_factory.as_ref() + } + + /// Creates a schema adapter for mapping between file and table schemas + /// + /// Uses the configured schema adapter factory if available, otherwise falls back + /// to the default implementation. + fn create_schema_adapter(&self) -> Box { + let table_schema = self.schema(); + match &self.schema_adapter_factory { + Some(factory) => { + factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)) + } + None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)), + } + } + + /// Creates a file source and applies schema adapter factory if available + fn create_file_source_with_schema_adapter(&self) -> Result> { + let mut source = self.options.format.file_source(); + // Apply schema adapter to source if available + if let Some(factory) = &self.schema_adapter_factory { + source = source.with_schema_adapter_factory(Arc::clone(factory))?; + } + Ok(source) + } + /// If file_sort_order is specified, creates the appropriate physical expressions fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) @@ -1002,6 +1071,8 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + let file_source = self.create_file_source_with_schema_adapter()?; + // create the execution plan self.options .format @@ -1010,7 +1081,7 @@ impl TableProvider for ListingTable { FileScanConfigBuilder::new( object_store_url, Arc::clone(&self.file_schema), - self.options.format.file_source(), + file_source, ) .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) @@ -1169,8 +1240,10 @@ impl ListingTable { self.options.collect_stat, inexact_stats, )?; - let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema()) - .map_schema(self.file_schema.as_ref())?; + + let schema_adapter = self.create_schema_adapter(); + let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; + stats.column_statistics = schema_mapper.map_column_statistics(&stats.column_statistics)?; file_groups.iter_mut().try_for_each(|file_group| { @@ -1320,15 +1393,21 @@ mod tests { assert_contains, stats::Precision, test_util::{batches_to_string, datafusion_test_data}, - ScalarValue, + ColumnStatistics, ScalarValue, + }; + use datafusion_datasource::schema_adapter::{ + SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::{collect, ExecutionPlanProperties}; + use rstest::rstest; use std::io::Write; use tempfile::TempDir; use url::Url; + const DUMMY_NULL_COUNT: Precision = Precision::Exact(42); + /// Creates a test schema with standard field types used in tests fn create_test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ @@ -2553,4 +2632,221 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_statistics_mapping_with_custom_factory() -> Result<()> { + let ctx = SessionContext::new(); + let table = create_test_listing_table_with_json_and_adapter( + &ctx, + false, + // NullStatsAdapterFactory sets column_statistics null_count to DUMMY_NULL_COUNT + Arc::new(NullStatsAdapterFactory {}), + )?; + + let (groups, stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + + assert_eq!(stats.column_statistics[0].null_count, DUMMY_NULL_COUNT); + for g in groups { + if let Some(s) = g.file_statistics(None) { + assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT); + } + } + + Ok(()) + } + + #[rstest] + #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")] + #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")] + #[case( + MapSchemaError::InvalidProjection, + "Invalid projection in schema mapping" + )] + #[tokio::test] + async fn test_schema_adapter_map_schema_errors( + #[case] error_type: MapSchemaError, + #[case] expected_error_msg: &str, + ) -> Result<()> { + let ctx = SessionContext::new(); + let table = create_test_listing_table_with_json_and_adapter( + &ctx, + false, + Arc::new(FailingMapSchemaAdapterFactory { error_type }), + )?; + + // The error should bubble up from the scan operation when schema mapping fails + let scan_result = table.scan(&ctx.state(), None, &[], None).await; + + assert!(scan_result.is_err()); + let error_msg = scan_result.unwrap_err().to_string(); + assert!( + error_msg.contains(expected_error_msg), + "Expected error containing '{expected_error_msg}', got: {error_msg}" + ); + + Ok(()) + } + + // Test that errors during file listing also bubble up correctly + #[tokio::test] + async fn test_schema_adapter_error_during_file_listing() -> Result<()> { + let ctx = SessionContext::new(); + let table = create_test_listing_table_with_json_and_adapter( + &ctx, + true, + Arc::new(FailingMapSchemaAdapterFactory { + error_type: MapSchemaError::TypeIncompatible, + }), + )?; + + // The error should bubble up from list_files_for_scan when collecting statistics + let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await; + + assert!(list_result.is_err()); + let error_msg = list_result.unwrap_err().to_string(); + assert!( + error_msg.contains("Cannot map incompatible types"), + "Expected type incompatibility error during file listing, got: {error_msg}" + ); + + Ok(()) + } + + #[derive(Debug, Copy, Clone)] + enum MapSchemaError { + TypeIncompatible, + GeneralFailure, + InvalidProjection, + } + + #[derive(Debug)] + struct FailingMapSchemaAdapterFactory { + error_type: MapSchemaError, + } + + impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(FailingMapSchemaAdapter { + schema: projected_table_schema, + error_type: self.error_type, + }) + } + } + + #[derive(Debug)] + struct FailingMapSchemaAdapter { + schema: SchemaRef, + error_type: MapSchemaError, + } + + impl SchemaAdapter for FailingMapSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + _file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + // Always fail with different error types based on the configured error_type + match self.error_type { + MapSchemaError::TypeIncompatible => { + plan_err!( + "Cannot map incompatible types: Boolean cannot be cast to Utf8" + ) + } + MapSchemaError::GeneralFailure => { + plan_err!("Schema adapter mapping failed due to internal error") + } + MapSchemaError::InvalidProjection => { + plan_err!("Invalid projection in schema mapping: column index out of bounds") + } + } + } + } + + #[derive(Debug)] + struct NullStatsAdapterFactory; + + impl SchemaAdapterFactory for NullStatsAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(NullStatsAdapter { + schema: projected_table_schema, + }) + } + } + + #[derive(Debug)] + struct NullStatsAdapter { + schema: SchemaRef, + } + + impl SchemaAdapter for NullStatsAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + let projection = (0..file_schema.fields().len()).collect(); + Ok((Arc::new(NullStatsMapper {}), projection)) + } + } + + #[derive(Debug)] + struct NullStatsMapper; + + impl SchemaMapper for NullStatsMapper { + fn map_batch(&self, batch: RecordBatch) -> Result { + Ok(batch) + } + + fn map_column_statistics( + &self, + stats: &[ColumnStatistics], + ) -> Result> { + Ok(stats + .iter() + .map(|s| { + let mut s = s.clone(); + s.null_count = DUMMY_NULL_COUNT; + s + }) + .collect()) + } + } + + /// Helper function to create a test ListingTable with JSON format and custom schema adapter factory + fn create_test_listing_table_with_json_and_adapter( + ctx: &SessionContext, + collect_stat: bool, + schema_adapter_factory: Arc, + ) -> Result { + let path = "table/file.json"; + register_test_store(ctx, &[(path, 10)]); + + let format = JsonFormat::default(); + let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat); + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(Arc::new(schema)) + .with_schema_adapter_factory(schema_adapter_factory); + + ListingTable::try_new(config) + } } From cf35911efdbe236bd22d637b7fd79d8ab86561bd Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 19:32:20 +0800 Subject: [PATCH 02/16] refactor: rename `None` to `Unset` in `SchemaSource` and add debug assertions in `ListingTableConfig` methods --- .../core/src/datasource/listing/table.rs | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ecf5c18cc1602..e3ed690a5f28b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -60,7 +60,7 @@ use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; pub enum SchemaSource { /// Schema is not yet set (initial state) #[default] - None, + Unset, /// Schema was inferred from first table_path Inferred, /// Schema was specified explicitly via with_schema @@ -123,6 +123,15 @@ impl ListingTableConfig { /// If the schema is provided, it must contain only the fields in the file /// without the table partitioning columns. pub fn with_schema(self, schema: SchemaRef) -> Self { + // Note: We preserve existing options state, but downstream code may expect + // options to be set. Consider calling with_listing_options() or infer_options() + // before operations that require options to be present. + debug_assert!( + self.options.is_some() || cfg!(test), + "ListingTableConfig::with_schema called without options set. \ + Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code." + ); + Self { file_schema: Some(schema), schema_source: SchemaSource::Specified, @@ -135,6 +144,15 @@ impl ListingTableConfig { /// If not provided, format and other options are inferred via /// [`Self::infer_options`]. pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { + // Note: This method properly sets options, but be aware that downstream + // methods like infer_schema() and try_new() require both schema and options + // to be set to function correctly. + debug_assert!( + self.table_paths.len() > 0 || cfg!(test), + "ListingTableConfig::with_listing_options called without table_paths set. \ + Consider calling new() or new_with_multi_paths() first to establish table paths." + ); + Self { options: Some(listing_options), ..self From fe07c22d914b2066dfb0028ca1e54801e63cd1f2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 19:33:53 +0800 Subject: [PATCH 03/16] docs: enhance documentation for ListingTableConfig with schema evolution support details --- datafusion/core/src/datasource/listing/table.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e3ed690a5f28b..e595ebca2edcc 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -69,6 +69,23 @@ pub enum SchemaSource { /// Configuration for creating a [`ListingTable`] /// +/// # Schema Evolution Support +/// +/// This configuration supports schema evolution through the optional +/// [`SchemaAdapterFactory`]. You might want to override the default factory when: +/// +/// - **Reading files with evolving schemas**: When your data files have been written +/// over time with different but compatible schemas (e.g., added columns, renamed fields) +/// - **Type coercion requirements**: When you need custom logic for converting between +/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) +/// - **Column mapping**: When files have different column names or ordering than +/// your expected table schema +/// - **Backwards compatibility**: When newer table schemas need to read older file +/// formats gracefully +/// +/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles +/// basic schema compatibility cases. +/// /// #[derive(Debug, Clone, Default)] pub struct ListingTableConfig { From b2dfb98d11a527f7035292f6670007787eaff0f8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 20:43:26 +0800 Subject: [PATCH 04/16] docs: clarify schema adapter layering in create_file_source_with_schema_adapter method --- datafusion/core/src/datasource/listing/table.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e595ebca2edcc..aed6b6b3c5b05 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -992,6 +992,14 @@ impl ListingTable { fn create_file_source_with_schema_adapter(&self) -> Result> { let mut source = self.options.format.file_source(); // Apply schema adapter to source if available + // + // NOTE: This may layer the ListingTable's schema adapter factory on top of any + // existing factory that the file source already has. The composition semantics are: + // 1. The file format's existing adapter (if any) handles format-specific schema mapping + // 2. Our adapter handles table-level schema evolution requirements + // + // This layering is intentional but may need adjustment if the underlying source + // already handles the same schema evolution cases we're trying to address. if let Some(factory) = &self.schema_adapter_factory { source = source.with_schema_adapter_factory(Arc::clone(factory))?; } From 1db3b88147030c39dcc00694e01cfd3c53042284 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 20:45:20 +0800 Subject: [PATCH 05/16] test: add unit test for default schema adapter factory in ListingTable --- .../core/src/datasource/listing/table.rs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index aed6b6b3c5b05..3c13a58ba99c2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -2698,6 +2698,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_statistics_mapping_with_default_factory() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a table without providing a custom schema adapter factory + // This should fall back to using DefaultSchemaAdapterFactory + let path = "table/file.json"; + register_test_store(&ctx, &[(path, 10)]); + + let format = JsonFormat::default(); + let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(false); + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(opt) + .with_schema(Arc::new(schema)); + // Note: NOT calling .with_schema_adapter_factory() to test default behavior + + let table = ListingTable::try_new(config)?; + + // Verify that no custom schema adapter factory is set + assert!(table.schema_adapter_factory().is_none()); + + // The scan should work correctly with the default schema adapter + let scan_result = table.scan(&ctx.state(), None, &[], None).await; + assert!( + scan_result.is_ok(), + "Scan should succeed with default schema adapter" + ); + + // Verify that the default adapter handles basic schema compatibility + let (groups, _stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + assert!( + !groups.is_empty(), + "Should list files successfully with default adapter" + ); + + Ok(()) + } + #[rstest] #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")] #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")] From b1bf1c2ccebbd537e945aa9996510b5e72877439 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 20:56:58 +0800 Subject: [PATCH 06/16] docs: add examples for schema evolution and configuration in ListingTableConfig --- .../core/src/datasource/listing/table.rs | 104 +++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3c13a58ba99c2..35168b47090de 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -60,7 +60,7 @@ use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; pub enum SchemaSource { /// Schema is not yet set (initial state) #[default] - Unset, + None, /// Schema was inferred from first table_path Inferred, /// Schema was specified explicitly via with_schema @@ -86,6 +86,42 @@ pub enum SchemaSource { /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. /// +/// # Complete Example: Schema Evolution Setup +/// ```rust +/// # use std::sync::Arc; +/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; +/// # use datafusion::datasource::file_format::parquet::ParquetFormat; +/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; +/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; +/// # +/// # // Custom schema adapter for handling schema evolution +/// # struct EvolutionSchemaAdapterFactory; +/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { +/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { +/// # unimplemented!("Custom schema adapter implementation") +/// # } +/// # } +/// # +/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); +/// +/// // Define expected table schema (what queries will see) +/// let table_schema = Arc::new(Schema::new(vec![ +/// Field::new("id", DataType::Int64, false), +/// Field::new("name", DataType::Utf8, true), +/// Field::new("created_at", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), true), +/// ])); +/// +/// // Configure file format options +/// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) +/// .with_file_extension(".parquet") +/// .with_collect_stat(true); +/// +/// // Build configuration with schema evolution support +/// let config = ListingTableConfig::new(table_path) +/// .with_listing_options(options) +/// .with_schema(table_schema) +/// .with_schema_adapter_factory(Arc::new(EvolutionSchemaAdapterFactory)); +/// ``` /// #[derive(Debug, Clone, Default)] pub struct ListingTableConfig { @@ -139,6 +175,23 @@ impl ListingTableConfig { /// /// If the schema is provided, it must contain only the fields in the file /// without the table partitioning columns. + /// + /// # Example: Specifying Table Schema + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::ListingTableConfig; + /// # use arrow::datatypes::{Schema, Field, DataType}; + /// # let table_paths = vec![]; + /// # let listing_options = unimplemented!(); + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("id", DataType::Int64, false), + /// Field::new("name", DataType::Utf8, true), + /// ])); + /// + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(listing_options) // Set options first + /// .with_schema(schema); // Then set schema + /// ``` pub fn with_schema(self, schema: SchemaRef) -> Self { // Note: We preserve existing options state, but downstream code may expect // options to be set. Consider calling with_listing_options() or infer_options() @@ -160,6 +213,20 @@ impl ListingTableConfig { /// /// If not provided, format and other options are inferred via /// [`Self::infer_options`]. + /// + /// # Example: Configuring Parquet Files with Custom Options + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions}; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # let table_paths = vec![]; + /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) + /// .with_file_extension(".parquet") + /// .with_collect_stat(true); + /// + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(options); // Configure file format and options + /// ``` pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { // Note: This method properly sets options, but be aware that downstream // methods like infer_schema() and try_new() require both schema and options @@ -350,6 +417,24 @@ impl ListingTableConfig { /// different schemas than the table schema. /// /// If not provided, a default schema adapter factory will be used. + /// + /// # Example: Custom Schema Adapter for Type Coercion + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::ListingTableConfig; + /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; + /// # use arrow::datatypes::SchemaRef; + /// # struct MySchemaAdapterFactory; + /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { + /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { + /// # unimplemented!() + /// # } + /// # } + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(listing_options) + /// .with_schema(table_schema) + /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory)); + /// ``` pub fn with_schema_adapter_factory( self, schema_adapter_factory: Arc, @@ -959,6 +1044,23 @@ impl ListingTable { /// The schema adapter factory is used to create schema adapters that can /// handle schema evolution and type conversions when reading files with /// different schemas than the table schema. + /// + /// # Example: Adding Schema Evolution Support + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion::datasource::listing::ListingTable; + /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; + /// # use arrow::datatypes::SchemaRef; + /// # struct EvolutionAdapterFactory; + /// # impl SchemaAdapterFactory for EvolutionAdapterFactory { + /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { + /// # unimplemented!() + /// # } + /// # } + /// # let table = ListingTable::try_new(config).unwrap(); + /// let table_with_evolution = table + /// .with_schema_adapter_factory(Arc::new(EvolutionAdapterFactory)); + /// ``` pub fn with_schema_adapter_factory( self, schema_adapter_factory: Arc, From 86933ebe529faa94b279b2ca2aac8894ecad9690 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Jun 2025 20:57:27 +0800 Subject: [PATCH 07/16] refactor: rename `None` to `Unset` in `SchemaSource` for clarity in schema state representation --- datafusion/core/src/datasource/listing/table.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 35168b47090de..7d691a71a36fa 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -60,7 +60,7 @@ use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; pub enum SchemaSource { /// Schema is not yet set (initial state) #[default] - None, + Unset, /// Schema was inferred from first table_path Inferred, /// Schema was specified explicitly via with_schema @@ -1588,7 +1588,7 @@ mod tests { // Test default schema source let config = ListingTableConfig::new(table_path.clone()); - assert_eq!(config.schema_source(), SchemaSource::None); + assert_eq!(config.schema_source(), SchemaSource::Unset); // Test schema source after setting a schema explicitly let provided_schema = create_test_schema(); @@ -1599,7 +1599,7 @@ mod tests { let format = CsvFormat::default(); let options = ListingOptions::new(Arc::new(format)); let config_with_options = config.with_listing_options(options.clone()); - assert_eq!(config_with_options.schema_source(), SchemaSource::None); + assert_eq!(config_with_options.schema_source(), SchemaSource::Unset); let config_with_inferred = config_with_options.infer_schema(&ctx.state()).await?; assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred); From d28e6156dc69c27ea58775b5803b5603abc6465d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Sat, 28 Jun 2025 10:11:54 +0800 Subject: [PATCH 08/16] fix: improve assertion for table_paths in ListingTableConfig to use is_empty() for clarity --- datafusion/core/src/datasource/listing/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7d691a71a36fa..c91390ec43803 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -232,7 +232,7 @@ impl ListingTableConfig { // methods like infer_schema() and try_new() require both schema and options // to be set to function correctly. debug_assert!( - self.table_paths.len() > 0 || cfg!(test), + !self.table_paths.is_empty() || cfg!(test), "ListingTableConfig::with_listing_options called without table_paths set. \ Consider calling new() or new_with_multi_paths() first to establish table paths." ); From cb75d37f5156b8ef325c19eb00f42c26f61fe1b5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Sat, 28 Jun 2025 10:26:51 +0800 Subject: [PATCH 09/16] docs: update examples in ListingTableConfig to include ListingTableUrl for clarity --- .../core/src/datasource/listing/table.rs | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c91390ec43803..974a3bbe9b3fb 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -95,6 +95,7 @@ pub enum SchemaSource { /// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; /// # /// # // Custom schema adapter for handling schema evolution +/// # #[derive(Debug)] /// # struct EvolutionSchemaAdapterFactory; /// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { /// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { @@ -179,10 +180,11 @@ impl ListingTableConfig { /// # Example: Specifying Table Schema /// ```rust /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::ListingTableConfig; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; /// # use arrow::datatypes::{Schema, Field, DataType}; - /// # let table_paths = vec![]; - /// # let listing_options = unimplemented!(); + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); /// let schema = Arc::new(Schema::new(vec![ /// Field::new("id", DataType::Int64, false), /// Field::new("name", DataType::Utf8, true), @@ -217,9 +219,9 @@ impl ListingTableConfig { /// # Example: Configuring Parquet Files with Custom Options /// ```rust /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions}; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; /// # use datafusion::datasource::file_format::parquet::ParquetFormat; - /// # let table_paths = vec![]; + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) /// .with_file_extension(".parquet") /// .with_collect_stat(true); @@ -421,15 +423,21 @@ impl ListingTableConfig { /// # Example: Custom Schema Adapter for Type Coercion /// ```rust /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::ListingTableConfig; + /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; - /// # use arrow::datatypes::SchemaRef; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + /// # + /// # #[derive(Debug)] /// # struct MySchemaAdapterFactory; /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { /// # unimplemented!() /// # } /// # } + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); /// let config = ListingTableConfig::new(table_paths) /// .with_listing_options(listing_options) /// .with_schema(table_schema) @@ -1048,15 +1056,21 @@ impl ListingTable { /// # Example: Adding Schema Evolution Support /// ```rust /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::ListingTable; + /// # use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingOptions, ListingTableUrl}; /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; - /// # use arrow::datatypes::SchemaRef; + /// # use datafusion::datasource::file_format::parquet::ParquetFormat; + /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + /// # #[derive(Debug)] /// # struct EvolutionAdapterFactory; /// # impl SchemaAdapterFactory for EvolutionAdapterFactory { /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { /// # unimplemented!() /// # } /// # } + /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema); /// # let table = ListingTable::try_new(config).unwrap(); /// let table_with_evolution = table /// .with_schema_adapter_factory(Arc::new(EvolutionAdapterFactory)); From 5c719c2fe1c5ffdaea5f10e59a6025ec1fdc080c Mon Sep 17 00:00:00 2001 From: kosiew Date: Thu, 3 Jul 2025 11:55:09 +0800 Subject: [PATCH 10/16] Update datafusion/core/src/datasource/listing/table.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/core/src/datasource/listing/table.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 974a3bbe9b3fb..11c1004b47d5b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -74,14 +74,10 @@ pub enum SchemaSource { /// This configuration supports schema evolution through the optional /// [`SchemaAdapterFactory`]. You might want to override the default factory when: /// -/// - **Reading files with evolving schemas**: When your data files have been written -/// over time with different but compatible schemas (e.g., added columns, renamed fields) /// - **Type coercion requirements**: When you need custom logic for converting between /// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) -/// - **Column mapping**: When files have different column names or ordering than -/// your expected table schema -/// - **Backwards compatibility**: When newer table schemas need to read older file -/// formats gracefully +/// - **Column mapping**: You need to map columns with a legacy name to a new name +/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. /// /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. From 362767c6f34981944e278d134cd6afb8d81ee73f Mon Sep 17 00:00:00 2001 From: kosiew Date: Thu, 3 Jul 2025 11:55:09 +0800 Subject: [PATCH 11/16] Amend SchemaAdapterFactory documentation to be more specific Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/core/src/datasource/listing/table.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 974a3bbe9b3fb..1ab1e5efe432e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -72,16 +72,12 @@ pub enum SchemaSource { /// # Schema Evolution Support /// /// This configuration supports schema evolution through the optional -/// [`SchemaAdapterFactory`]. You might want to override the default factory when: +/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need: /// -/// - **Reading files with evolving schemas**: When your data files have been written -/// over time with different but compatible schemas (e.g., added columns, renamed fields) /// - **Type coercion requirements**: When you need custom logic for converting between /// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) -/// - **Column mapping**: When files have different column names or ordering than -/// your expected table schema -/// - **Backwards compatibility**: When newer table schemas need to read older file -/// formats gracefully +/// - **Column mapping**: You need to map columns with a legacy name to a new name +/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. /// /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. From a300a3001d2e7d600b7790065dd5118c39412ba1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 3 Jul 2025 12:33:15 +0800 Subject: [PATCH 12/16] move custom schema adapter example to schema_adapter.rs --- .../core/src/datasource/listing/table.rs | 38 ------------------ datafusion/datasource/src/schema_adapter.rs | 40 +++++++++++++++++++ 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1ab1e5efe432e..f62a3b31bd316 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -82,44 +82,6 @@ pub enum SchemaSource { /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. /// -/// # Complete Example: Schema Evolution Setup -/// ```rust -/// # use std::sync::Arc; -/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; -/// # use datafusion::datasource::file_format::parquet::ParquetFormat; -/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; -/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; -/// # -/// # // Custom schema adapter for handling schema evolution -/// # #[derive(Debug)] -/// # struct EvolutionSchemaAdapterFactory; -/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { -/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { -/// # unimplemented!("Custom schema adapter implementation") -/// # } -/// # } -/// # -/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); -/// -/// // Define expected table schema (what queries will see) -/// let table_schema = Arc::new(Schema::new(vec![ -/// Field::new("id", DataType::Int64, false), -/// Field::new("name", DataType::Utf8, true), -/// Field::new("created_at", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), true), -/// ])); -/// -/// // Configure file format options -/// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) -/// .with_file_extension(".parquet") -/// .with_collect_stat(true); -/// -/// // Build configuration with schema evolution support -/// let config = ListingTableConfig::new(table_path) -/// .with_listing_options(options) -/// .with_schema(table_schema) -/// .with_schema_adapter_factory(Arc::new(EvolutionSchemaAdapterFactory)); -/// ``` -/// #[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index b43041c8d14db..7b0ce315a053f 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -211,6 +211,46 @@ impl DefaultSchemaAdapterFactory { } } +/// # // Custom schema adapter for handling schema evolution +/// # #[derive(Debug)] +/// # Complete Example: Schema Evolution Setup +/// ```rust +/// # use std::sync::Arc; +/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; +/// # use datafusion::datasource::file_format::parquet::ParquetFormat; +/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; +/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; +/// # +/// # // Custom schema adapter for handling schema evolution +/// # #[derive(Debug)] +/// # struct EvolutionSchemaAdapterFactory; +/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { +/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { +/// # unimplemented!("Custom schema adapter implementation") +/// # } +/// # } +/// # +/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); +/// +/// // Define expected table schema (what queries will see) +/// let table_schema = Arc::new(Schema::new(vec![ +/// Field::new("id", DataType::Int64, false), +/// Field::new("name", DataType::Utf8, true), +/// Field::new("created_at", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), true), +/// ])); +/// +/// // Configure file format options +/// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) +/// .with_file_extension(".parquet") +/// .with_collect_stat(true); +/// +/// // Build configuration with schema evolution support +/// let config = ListingTableConfig::new(table_path) +/// .with_listing_options(options) +/// .with_schema(table_schema) +/// .with_schema_adapter_factory(Arc::new(EvolutionSchemaAdapterFactory)); +/// ``` +/// impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { fn create( &self, From dd77340745e3b81a6b93500cf9029a5dc296c3c7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 3 Jul 2025 12:33:52 +0800 Subject: [PATCH 13/16] Revert "move custom schema adapter example to schema_adapter.rs" This reverts commit a300a3001d2e7d600b7790065dd5118c39412ba1. --- .../core/src/datasource/listing/table.rs | 38 ++++++++++++++++++ datafusion/datasource/src/schema_adapter.rs | 40 ------------------- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index f62a3b31bd316..1ab1e5efe432e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -82,6 +82,44 @@ pub enum SchemaSource { /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. /// +/// # Complete Example: Schema Evolution Setup +/// ```rust +/// # use std::sync::Arc; +/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; +/// # use datafusion::datasource::file_format::parquet::ParquetFormat; +/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; +/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; +/// # +/// # // Custom schema adapter for handling schema evolution +/// # #[derive(Debug)] +/// # struct EvolutionSchemaAdapterFactory; +/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { +/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { +/// # unimplemented!("Custom schema adapter implementation") +/// # } +/// # } +/// # +/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); +/// +/// // Define expected table schema (what queries will see) +/// let table_schema = Arc::new(Schema::new(vec![ +/// Field::new("id", DataType::Int64, false), +/// Field::new("name", DataType::Utf8, true), +/// Field::new("created_at", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), true), +/// ])); +/// +/// // Configure file format options +/// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) +/// .with_file_extension(".parquet") +/// .with_collect_stat(true); +/// +/// // Build configuration with schema evolution support +/// let config = ListingTableConfig::new(table_path) +/// .with_listing_options(options) +/// .with_schema(table_schema) +/// .with_schema_adapter_factory(Arc::new(EvolutionSchemaAdapterFactory)); +/// ``` +/// #[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 7b0ce315a053f..b43041c8d14db 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -211,46 +211,6 @@ impl DefaultSchemaAdapterFactory { } } -/// # // Custom schema adapter for handling schema evolution -/// # #[derive(Debug)] -/// # Complete Example: Schema Evolution Setup -/// ```rust -/// # use std::sync::Arc; -/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; -/// # use datafusion::datasource::file_format::parquet::ParquetFormat; -/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; -/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; -/// # -/// # // Custom schema adapter for handling schema evolution -/// # #[derive(Debug)] -/// # struct EvolutionSchemaAdapterFactory; -/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { -/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { -/// # unimplemented!("Custom schema adapter implementation") -/// # } -/// # } -/// # -/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); -/// -/// // Define expected table schema (what queries will see) -/// let table_schema = Arc::new(Schema::new(vec![ -/// Field::new("id", DataType::Int64, false), -/// Field::new("name", DataType::Utf8, true), -/// Field::new("created_at", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), true), -/// ])); -/// -/// // Configure file format options -/// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) -/// .with_file_extension(".parquet") -/// .with_collect_stat(true); -/// -/// // Build configuration with schema evolution support -/// let config = ListingTableConfig::new(table_path) -/// .with_listing_options(options) -/// .with_schema(table_schema) -/// .with_schema_adapter_factory(Arc::new(EvolutionSchemaAdapterFactory)); -/// ``` -/// impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { fn create( &self, From 2c022b4408c399268a0239cc030949e7c3083feb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 3 Jul 2025 13:20:16 +0800 Subject: [PATCH 14/16] docs: Remove redundant complete example for schema evolution setup in ListingTableConfig --- .../core/src/datasource/listing/table.rs | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1ab1e5efe432e..f62a3b31bd316 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -82,44 +82,6 @@ pub enum SchemaSource { /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. /// -/// # Complete Example: Schema Evolution Setup -/// ```rust -/// # use std::sync::Arc; -/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; -/// # use datafusion::datasource::file_format::parquet::ParquetFormat; -/// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; -/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef}; -/// # -/// # // Custom schema adapter for handling schema evolution -/// # #[derive(Debug)] -/// # struct EvolutionSchemaAdapterFactory; -/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { -/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box { -/// # unimplemented!("Custom schema adapter implementation") -/// # } -/// # } -/// # -/// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); -/// -/// // Define expected table schema (what queries will see) -/// let table_schema = Arc::new(Schema::new(vec![ -/// Field::new("id", DataType::Int64, false), -/// Field::new("name", DataType::Utf8, true), -/// Field::new("created_at", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None), true), -/// ])); -/// -/// // Configure file format options -/// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) -/// .with_file_extension(".parquet") -/// .with_collect_stat(true); -/// -/// // Build configuration with schema evolution support -/// let config = ListingTableConfig::new(table_path) -/// .with_listing_options(options) -/// .with_schema(table_schema) -/// .with_schema_adapter_factory(Arc::new(EvolutionSchemaAdapterFactory)); -/// ``` -/// #[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. From 69ab53e50c485662f3b2f97fd30983421e9e776a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 3 Jul 2025 13:56:16 +0800 Subject: [PATCH 15/16] feat: Enhance SchemaAdapterFactory with create_with_projected_schema method for convenience --- .../core/src/datasource/listing/table.rs | 22 +++++++++---------- datafusion/datasource/src/schema_adapter.rs | 11 ++++++++++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index f62a3b31bd316..c58deb8446304 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -74,10 +74,14 @@ pub enum SchemaSource { /// This configuration supports schema evolution through the optional /// [`SchemaAdapterFactory`]. You might want to override the default factory when you need: /// +/// - **Reading files with evolving schemas**: When your data files have been written +/// over time with different but compatible schemas (e.g., added columns, renamed fields) /// - **Type coercion requirements**: When you need custom logic for converting between /// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) -/// - **Column mapping**: You need to map columns with a legacy name to a new name -/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. +/// - **Column mapping**: When files have different column names or ordering than +/// your expected table schema +/// - **Backwards compatibility**: When newer table schemas need to read older file +/// formats gracefully /// /// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles /// basic schema compatibility cases. @@ -1015,24 +1019,18 @@ impl ListingTable { /// ```rust /// # use std::sync::Arc; /// # use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingOptions, ListingTableUrl}; - /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; + /// # use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter}; /// # use datafusion::datasource::file_format::parquet::ParquetFormat; /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; - /// # #[derive(Debug)] - /// # struct EvolutionAdapterFactory; - /// # impl SchemaAdapterFactory for EvolutionAdapterFactory { - /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { - /// # unimplemented!() - /// # } - /// # } /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default())); /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema); /// # let table = ListingTable::try_new(config).unwrap(); /// let table_with_evolution = table - /// .with_schema_adapter_factory(Arc::new(EvolutionAdapterFactory)); + /// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)); /// ``` + /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory. pub fn with_schema_adapter_factory( self, schema_adapter_factory: Arc, @@ -1056,7 +1054,7 @@ impl ListingTable { let table_schema = self.schema(); match &self.schema_adapter_factory { Some(factory) => { - factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)) + factory.create_with_projected_schema(Arc::clone(&table_schema)) } None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)), } diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index b43041c8d14db..5e743a3f0c233 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -57,6 +57,17 @@ pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { projected_table_schema: SchemaRef, table_schema: SchemaRef, ) -> Box; + + /// Create a [`SchemaAdapter`] using only the projected table schema. + /// + /// This is a convenience method for cases where the table schema and the + /// projected table schema are the same. + fn create_with_projected_schema( + &self, + projected_table_schema: SchemaRef, + ) -> Box { + self.create(Arc::clone(&projected_table_schema), projected_table_schema) + } } /// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table From cc8657874c164d4aaaf3a1da4d98bce708eb517a Mon Sep 17 00:00:00 2001 From: kosiew Date: Fri, 4 Jul 2025 14:28:58 +0800 Subject: [PATCH 16/16] Simplify comment for applying listing table schema adapter to source Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/core/src/datasource/listing/table.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a597b343778c9..3c079ba4db3ed 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1061,13 +1061,8 @@ impl ListingTable { let mut source = self.options.format.file_source(); // Apply schema adapter to source if available // - // NOTE: This may layer the ListingTable's schema adapter factory on top of any - // existing factory that the file source already has. The composition semantics are: - // 1. The file format's existing adapter (if any) handles format-specific schema mapping - // 2. Our adapter handles table-level schema evolution requirements - // - // This layering is intentional but may need adjustment if the underlying source - // already handles the same schema evolution cases we're trying to address. + // The source will use this SchemaAdapter to adapt data batches as they flow up the plan. + // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics. if let Some(factory) = &self.schema_adapter_factory { source = source.with_schema_adapter_factory(Arc::clone(factory))?; }