Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 265 additions & 27 deletions datafusion/datasource/src/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow::compute::{can_cast_types, cast};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::{Field, Schema, SchemaRef};
use datafusion_common::{plan_err, ColumnStatistics};
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -225,6 +225,25 @@ pub(crate) struct DefaultSchemaAdapter {
projected_table_schema: SchemaRef,
}

/// Checks if a file field can be cast to a table field
///
/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
pub(crate) fn can_cast_field(
file_field: &Field,
table_field: &Field,
) -> datafusion_common::Result<bool> {
if can_cast_types(file_field.data_type(), table_field.data_type()) {
Ok(true)
} else {
plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)
}
}

impl SchemaAdapter for DefaultSchemaAdapter {
/// Map a column index in the table schema to a column index in a particular
/// file schema
Expand All @@ -248,40 +267,53 @@ impl SchemaAdapter for DefaultSchemaAdapter {
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.projected_table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.projected_table_schema.fields().find(file_field.name())
{
match can_cast_types(file_field.data_type(), table_field.data_type()) {
true => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
false => {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)
}
}
}
}
let (field_mappings, projection) = create_field_mapping(
file_schema,
&self.projected_table_schema,
can_cast_field,
)?;
Comment on lines +270 to +274
Copy link
Contributor Author

@kosiew kosiew May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor into a helper function so that we can re-use in later PRs for deep-nested SchemaAdapter


Ok((
Arc::new(SchemaMapping {
projected_table_schema: Arc::clone(&self.projected_table_schema),
Arc::new(SchemaMapping::new(
Arc::clone(&self.projected_table_schema),
field_mappings,
}),
)),
projection,
))
}
}

/// Helper function that creates field mappings between file schema and table schema
///
/// Maps columns from the file schema to their corresponding positions in the table schema,
/// applying type compatibility checking via the provided predicate function.
///
/// Returns field mappings (for column reordering) and a projection (for field selection).
pub(crate) fn create_field_mapping<F>(
file_schema: &Schema,
projected_table_schema: &SchemaRef,
can_map_field: F,
) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
where
F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
{
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; projected_table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
projected_table_schema.fields().find(file_field.name())
{
if can_map_field(file_field, table_field)? {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
}
}

Ok((field_mappings, projection))
}

/// The SchemaMapping struct holds a mapping from the file schema to the table
/// schema and any necessary type conversions.
///
Expand All @@ -304,6 +336,21 @@ pub struct SchemaMapping {
field_mappings: Vec<Option<usize>>,
}

impl SchemaMapping {
/// Creates a new SchemaMapping instance
///
/// Initializes the field mappings needed to transform file data to the projected table schema
pub fn new(
projected_table_schema: SchemaRef,
field_mappings: Vec<Option<usize>>,
) -> Self {
Self {
projected_table_schema,
field_mappings,
}
}
}

impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
/// conversions.
Expand Down Expand Up @@ -462,4 +509,195 @@ mod tests {
assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
}

#[test]
fn test_can_cast_field() {
// Same type should work
let from_field = Field::new("col", DataType::Int32, true);
let to_field = Field::new("col", DataType::Int32, true);
assert!(can_cast_field(&from_field, &to_field).unwrap());

// Casting Int32 to Float64 is allowed
let from_field = Field::new("col", DataType::Int32, true);
let to_field = Field::new("col", DataType::Float64, true);
assert!(can_cast_field(&from_field, &to_field).unwrap());

// Casting Float64 to Utf8 should work (converts to string)
let from_field = Field::new("col", DataType::Float64, true);
let to_field = Field::new("col", DataType::Utf8, true);
assert!(can_cast_field(&from_field, &to_field).unwrap());

// Binary to Utf8 is not supported - this is an example of a cast that should fail
// Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
let from_field = Field::new("col", DataType::Binary, true);
let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
let result = can_cast_field(&from_field, &to_field);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Cannot cast file schema field col"));
}

#[test]
fn test_create_field_mapping() {
// Define the table schema
let table_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]));

// Define file schema: different order, missing column c, and b has different type
let file_schema = Schema::new(vec![
Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
Field::new("a", DataType::Int32, true), // Same type
Field::new("d", DataType::Boolean, true), // Not in table schema
]);

// Custom can_map_field function that allows all mappings for testing
let allow_all = |_: &Field, _: &Field| Ok(true);

// Test field mapping
let (field_mappings, projection) =
create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();

// Expected:
// - field_mappings[0] (a) maps to projection[1]
// - field_mappings[1] (b) maps to projection[0]
// - field_mappings[2] (c) is None (not in file)
assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a

// Test with a failing mapper
let fails_all = |_: &Field, _: &Field| Ok(false);
let (field_mappings, projection) =
create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();

// Should have no mappings or projections if all cast checks fail
assert_eq!(field_mappings, vec![None, None, None]);
assert_eq!(projection, Vec::<usize>::new());

// Test with error-producing mapper
let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Test error"));
}

#[test]
fn test_schema_mapping_new() {
// Define the projected table schema
let projected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
]));

// Define field mappings from table to file
let field_mappings = vec![Some(1), Some(0)];

// Create SchemaMapping manually
let mapping =
SchemaMapping::new(Arc::clone(&projected_schema), field_mappings.clone());

// Check that fields were set correctly
assert_eq!(*mapping.projected_table_schema, *projected_schema);
assert_eq!(mapping.field_mappings, field_mappings);

// Test with a batch to ensure it works properly
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("b_file", DataType::Utf8, true),
Field::new("a_file", DataType::Int32, true),
])),
vec![
Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
],
)
.unwrap();

// Test that map_batch works with our manually created mapping
let mapped_batch = mapping.map_batch(batch).unwrap();

// Verify the mapped batch has the correct schema and data
assert_eq!(*mapped_batch.schema(), *projected_schema);
assert_eq!(mapped_batch.num_columns(), 2);
assert_eq!(mapped_batch.column(0).len(), 2); // a column
assert_eq!(mapped_batch.column(1).len(), 2); // b column
}

#[test]
fn test_map_schema_error_path() {
// Define the table schema
let table_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
]));

// Define file schema with incompatible type for column c
let file_schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float64, true), // Different but castable
Field::new("c", DataType::Binary, true), // Not castable to Decimal128
]);

// Create DefaultSchemaAdapter
let adapter = DefaultSchemaAdapter {
projected_table_schema: Arc::clone(&table_schema),
};

// map_schema should error due to incompatible types
let result = adapter.map_schema(&file_schema);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Cannot cast file schema field c"));
}

#[test]
fn test_map_schema_happy_path() {
// Define the table schema
let table_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Decimal128(10, 2), true),
]));

// Create DefaultSchemaAdapter
let adapter = DefaultSchemaAdapter {
projected_table_schema: Arc::clone(&table_schema),
};

// Define compatible file schema (missing column c)
let compatible_file_schema = Schema::new(vec![
Field::new("a", DataType::Int64, true), // Can be cast to Int32
Field::new("b", DataType::Float64, true), // Can be cast to Utf8
]);

// Test successful schema mapping
let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();

// Verify field_mappings and projection created correctly
assert_eq!(projection, vec![0, 1]); // Projecting a and b

// Verify the SchemaMapping works with actual data
let file_batch = RecordBatch::try_new(
Arc::new(compatible_file_schema.clone()),
vec![
Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
],
)
.unwrap();

let mapped_batch = mapper.map_batch(file_batch).unwrap();

// Verify correct schema mapping
assert_eq!(*mapped_batch.schema(), *table_schema);
assert_eq!(mapped_batch.num_columns(), 3); // a, b, c

// Column c should be null since it wasn't in the file schema
let c_array = mapped_batch.column(2);
assert_eq!(c_array.len(), 2);
assert_eq!(c_array.null_count(), 2);
}
}