From 97b7b4d5926ac94d92e4ea4c7e48eb1f83201fba Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 May 2025 18:43:30 +0800 Subject: [PATCH 1/6] Add field casting utility functions and refactor schema mapping logic --- datafusion/datasource/src/schema_adapter.rs | 104 +++++++++++++++----- 1 file changed, 80 insertions(+), 24 deletions(-) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index bacec7f4f9f00..452b10c91725a 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -23,7 +23,7 @@ use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::{plan_err, ColumnStatistics}; use std::fmt::Debug; use std::sync::Arc; @@ -225,6 +225,25 @@ pub(crate) struct DefaultSchemaAdapter { projected_table_schema: SchemaRef, } +/// Checks if a file field can be cast to a table field +/// +/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible +pub(crate) fn can_cast_field( + file_field: &Field, + table_field: &Field, +) -> datafusion_common::Result { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + Ok(true) + } else { + plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } +} + impl SchemaAdapter for DefaultSchemaAdapter { /// Map a column index in the table schema to a column index in a particular /// file schema @@ -248,29 +267,11 @@ impl SchemaAdapter for DefaultSchemaAdapter { &self, file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - self.projected_table_schema.fields().find(file_field.name()) - { - match can_cast_types(file_field.data_type(), table_field.data_type()) { - true => { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } - false => { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) - } - } - } - } + let (field_mappings, projection) = create_field_mapping( + file_schema, + &self.projected_table_schema, + can_cast_field, + )?; Ok(( Arc::new(SchemaMapping { @@ -282,6 +283,43 @@ impl SchemaAdapter for DefaultSchemaAdapter { } } +/// Helper function that creates field mappings between file schema and table schema +/// +/// # Arguments +/// +/// * `file_schema` - The schema of the source file +/// * `projected_table_schema` - The schema that we're mapping to +/// * `can_map_field` - A closure that determines whether a field from file schema can be mapped to table schema +/// +/// # Returns +/// A tuple containing: +/// * Field mappings from table schema indices to file schema projection indices +/// * A projection of indices from the file schema +pub(crate) fn create_field_mapping( + file_schema: &Schema, + projected_table_schema: &SchemaRef, + can_map_field: F, +) -> datafusion_common::Result<(Vec>, Vec)> +where + F: Fn(&Field, &Field) -> datafusion_common::Result, +{ + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; projected_table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + projected_table_schema.fields().find(file_field.name()) + { + if can_map_field(file_field, table_field)? { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + } + } + + Ok((field_mappings, projection)) +} + /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// @@ -304,6 +342,24 @@ pub struct SchemaMapping { field_mappings: Vec>, } +impl SchemaMapping { + /// Creates a new SchemaMapping instance + /// + /// # Arguments + /// + /// * `projected_table_schema` - The schema expected for query results + /// * `field_mappings` - Mapping from field index in projected_table_schema to index in file schema + pub fn new( + projected_table_schema: SchemaRef, + field_mappings: Vec>, + ) -> Self { + Self { + projected_table_schema, + field_mappings, + } + } +} + impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and /// conversions. From ed95e1d7aacb39e21a2dc55bbbb1a68914696bfb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 20 May 2025 18:49:48 +0800 Subject: [PATCH 2/6] Fix tests for field casting and schema mapping functionality --- datafusion/datasource/src/schema_adapter.rs | 173 ++++++++++++++++++++ 1 file changed, 173 insertions(+) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 452b10c91725a..8d28a4eed5a4c 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -518,4 +518,177 @@ mod tests { assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),); assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),); } + + #[test] + fn test_can_cast_field() { + // Same type should work + let from_field = Field::new("col", DataType::Int32, true); + let to_field = Field::new("col", DataType::Int32, true); + assert!(can_cast_field(&from_field, &to_field).unwrap()); + + // Casting Int32 to Float64 is allowed + let from_field = Field::new("col", DataType::Int32, true); + let to_field = Field::new("col", DataType::Float64, true); + assert!(can_cast_field(&from_field, &to_field).unwrap()); + + // Casting Float64 to Utf8 should work (converts to string) + let from_field = Field::new("col", DataType::Float64, true); + let to_field = Field::new("col", DataType::Utf8, true); + assert!(can_cast_field(&from_field, &to_field).unwrap()); + + // Binary to Utf8 is not supported - this is an example of a cast that should fail + // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast + let from_field = Field::new("col", DataType::Binary, true); + let to_field = Field::new("col", DataType::Decimal128(10, 2), true); + let result = can_cast_field(&from_field, &to_field); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Cannot cast file schema field col")); + } + + #[test] + fn test_create_field_mapping() { + // Define the table schema + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ])); + + // Define file schema: different order, missing column c, and b has different type + let file_schema = Schema::new(vec![ + Field::new("b", DataType::Float64, true), // Different type but castable to Utf8 + Field::new("a", DataType::Int32, true), // Same type + Field::new("d", DataType::Boolean, true), // Not in table schema + ]); + + // Custom can_map_field function that allows all mappings for testing + let allow_all = |_: &Field, _: &Field| Ok(true); + + // Test field mapping + let (field_mappings, projection) = + create_field_mapping(&file_schema, &table_schema, allow_all).unwrap(); + + // Expected: + // - field_mappings[0] (a) maps to projection[1] + // - field_mappings[1] (b) maps to projection[0] + // - field_mappings[2] (c) is None (not in file) + assert_eq!(field_mappings, vec![Some(1), Some(0), None]); + assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a + + // Test with a failing mapper + let fails_all = |_: &Field, _: &Field| Ok(false); + let (field_mappings, projection) = + create_field_mapping(&file_schema, &table_schema, fails_all).unwrap(); + + // Should have no mappings or projections if all cast checks fail + assert_eq!(field_mappings, vec![None, None, None]); + assert_eq!(projection, Vec::::new()); + + // Test with error-producing mapper + let error_mapper = |_: &Field, _: &Field| plan_err!("Test error"); + let result = create_field_mapping(&file_schema, &table_schema, error_mapper); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Test error")); + } + + #[test] + fn test_schema_mapping_new() { + // Define the projected table schema + let projected_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ])); + + // Define field mappings from table to file + let field_mappings = vec![Some(1), Some(0)]; + + // Create SchemaMapping manually + let mapping = + SchemaMapping::new(Arc::clone(&projected_schema), field_mappings.clone()); + + // Check that fields were set correctly + assert_eq!(*mapping.projected_table_schema, *projected_schema); + assert_eq!(mapping.field_mappings, field_mappings); + + // Test with a batch to ensure it works properly + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("b_file", DataType::Utf8, true), + Field::new("a_file", DataType::Int32, true), + ])), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 2])), + ], + ) + .unwrap(); + + // Test that map_batch works with our manually created mapping + let mapped_batch = mapping.map_batch(batch).unwrap(); + + // Verify the mapped batch has the correct schema and data + assert_eq!(*mapped_batch.schema(), *projected_schema); + assert_eq!(mapped_batch.num_columns(), 2); + assert_eq!(mapped_batch.column(0).len(), 2); // a column + assert_eq!(mapped_batch.column(1).len(), 2); // b column + } + + #[test] + fn test_map_schema_integration() { + // Define the table schema + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules + ])); + + // Define file schema with incompatible type for column c + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Float64, true), // Different but castable + Field::new("c", DataType::Binary, true), // Not castable to Decimal128 + ]); + + // Create DefaultSchemaAdapter + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + + // map_schema should error due to incompatible types + let result = adapter.map_schema(&file_schema); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Cannot cast file schema field c")); + + // Now with compatible types + let compatible_file_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), // Can be cast to Int32 + Field::new("b", DataType::Float64, true), // Can be cast to Utf8 + ]); + + let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap(); + + // Verify field_mappings and projection created correctly + assert_eq!(projection, vec![0, 1]); // Projecting a and b + + // Verify the SchemaMapping works + let file_batch = RecordBatch::try_new( + Arc::new(compatible_file_schema.clone()), + vec![ + Arc::new(arrow::array::Int64Array::from(vec![100, 200])), + Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])), + ], + ) + .unwrap(); + + let mapped_batch = mapper.map_batch(file_batch).unwrap(); + assert_eq!(*mapped_batch.schema(), *table_schema); + assert_eq!(mapped_batch.num_columns(), 3); // a, b, c + + // Column c should be null + let c_array = mapped_batch.column(2); + assert_eq!(c_array.len(), 2); + assert_eq!(c_array.null_count(), 2); + } } From bf06eaa10c07e20d07b67bf809dfa44c80058065 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 21 May 2025 18:53:04 +0800 Subject: [PATCH 3/6] refactor: simplify SchemaMapping instantiation in DefaultSchemaAdapter --- datafusion/datasource/src/schema_adapter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 8d28a4eed5a4c..9c4a509ab0eca 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -274,10 +274,10 @@ impl SchemaAdapter for DefaultSchemaAdapter { )?; Ok(( - Arc::new(SchemaMapping { - projected_table_schema: Arc::clone(&self.projected_table_schema), + Arc::new(SchemaMapping::new( + Arc::clone(&self.projected_table_schema), field_mappings, - }), + )), projection, )) } From 48bd5aa1dd3ff618d5b18a84cbbb5bb9353dc0e9 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 21 May 2025 18:56:28 +0800 Subject: [PATCH 4/6] refactor: improve documentation for create_field_mapping and SchemaMapping::new functions --- datafusion/datasource/src/schema_adapter.rs | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 9c4a509ab0eca..6952932b89bf3 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -285,16 +285,10 @@ impl SchemaAdapter for DefaultSchemaAdapter { /// Helper function that creates field mappings between file schema and table schema /// -/// # Arguments +/// Maps columns from the file schema to their corresponding positions in the table schema, +/// applying type compatibility checking via the provided predicate function. /// -/// * `file_schema` - The schema of the source file -/// * `projected_table_schema` - The schema that we're mapping to -/// * `can_map_field` - A closure that determines whether a field from file schema can be mapped to table schema -/// -/// # Returns -/// A tuple containing: -/// * Field mappings from table schema indices to file schema projection indices -/// * A projection of indices from the file schema +/// Returns field mappings (for column reordering) and a projection (for field selection). pub(crate) fn create_field_mapping( file_schema: &Schema, projected_table_schema: &SchemaRef, @@ -345,10 +339,7 @@ pub struct SchemaMapping { impl SchemaMapping { /// Creates a new SchemaMapping instance /// - /// # Arguments - /// - /// * `projected_table_schema` - The schema expected for query results - /// * `field_mappings` - Mapping from field index in projected_table_schema to index in file schema + /// Initializes the field mappings needed to transform file data to the projected table schema pub fn new( projected_table_schema: SchemaRef, field_mappings: Vec>, From 42d7f7eb2ca3ef0c4ec4e89a864aa6e636ccd094 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 21 May 2025 19:14:42 +0800 Subject: [PATCH 5/6] test: rename schema mapping test and add happy path scenario --- datafusion/datasource/src/schema_adapter.rs | 26 +++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 6952932b89bf3..519be97a81021 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -626,7 +626,7 @@ mod tests { } #[test] - fn test_map_schema_integration() { + fn test_map_schema_error_path() { // Define the table schema let table_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), @@ -651,19 +651,35 @@ mod tests { assert!(result.is_err()); let error_msg = result.unwrap_err().to_string(); assert!(error_msg.contains("Cannot cast file schema field c")); + } + + #[test] + fn test_map_schema_happy_path() { + // Define the table schema + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Decimal128(10, 2), true), + ])); - // Now with compatible types + // Create DefaultSchemaAdapter + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + + // Define compatible file schema (missing column c) let compatible_file_schema = Schema::new(vec![ Field::new("a", DataType::Int64, true), // Can be cast to Int32 Field::new("b", DataType::Float64, true), // Can be cast to Utf8 ]); + // Test successful schema mapping let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap(); // Verify field_mappings and projection created correctly assert_eq!(projection, vec![0, 1]); // Projecting a and b - // Verify the SchemaMapping works + // Verify the SchemaMapping works with actual data let file_batch = RecordBatch::try_new( Arc::new(compatible_file_schema.clone()), vec![ @@ -674,10 +690,12 @@ mod tests { .unwrap(); let mapped_batch = mapper.map_batch(file_batch).unwrap(); + + // Verify correct schema mapping assert_eq!(*mapped_batch.schema(), *table_schema); assert_eq!(mapped_batch.num_columns(), 3); // a, b, c - // Column c should be null + // Column c should be null since it wasn't in the file schema let c_array = mapped_batch.column(2); assert_eq!(c_array.len(), 2); assert_eq!(c_array.null_count(), 2); From 9d93dac1086d3e68ab3c1250ee1986dd42fb5ad3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 26 May 2025 16:09:18 +0800 Subject: [PATCH 6/6] trigger ci