Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ mod tests {

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let source = ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
.unwrap();
let base_conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Expand Down
16 changes: 14 additions & 2 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::sync::Arc;

use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use crate::error::Result;
use datafusion_datasource::as_file_source;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{as_file_source, impl_schema_adapter_methods};

use arrow::buffer::Buffer;
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -99,7 +99,19 @@ impl FileSource for ArrowSource {
"arrow"
}

impl_schema_adapter_methods!();
fn with_schema_adapter_factory(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of the impl_schema_adapter_methods macro, I instead replicated the code several places. While there is more duplicated code I think it is clearer because what is happening is now more explicit and doesn't require a macro (even though impl_schema_adapter_methods was super well documented)

&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>> {
Ok(Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self.clone()
}))
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
}

/// The struct arrow that implements `[FileOpener]` trait
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Integration test for schema adapter factory functionality

use std::any::Any;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -258,3 +259,187 @@ fn test_schema_adapter_preservation() {
// Verify the schema adapter factory is present in the file source
assert!(config.source().schema_adapter_factory().is_some());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is moved without change from datafusion/core/tests/test_adapter_updated.rs


/// A test source for testing schema adapters
#[derive(Debug, Clone)]
struct TestSource {
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl TestSource {
fn new() -> Self {
Self {
schema_adapter_factory: None,
}
}
}

impl FileSource for TestSource {
fn file_type(&self) -> &str {
"test"
}

fn as_any(&self) -> &dyn Any {
self
}

fn create_file_opener(
&self,
_store: Arc<dyn ObjectStore>,
_conf: &FileScanConfig,
_index: usize,
) -> Arc<dyn FileOpener> {
unimplemented!("Not needed for this test")
}

fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}

fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}

fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}

fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
unimplemented!("Not needed for this test")
}

fn statistics(&self) -> Result<Statistics, DataFusionError> {
Ok(Statistics::default())
}

fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>> {
Ok(Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
}))
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
}

/// A test schema adapter factory
#[derive(Debug)]
struct TestSchemaAdapterFactory {}

impl SchemaAdapterFactory for TestSchemaAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(TestSchemaAdapter {
table_schema: projected_table_schema,
})
}
}

/// A test schema adapter implementation
#[derive(Debug)]
struct TestSchemaAdapter {
table_schema: SchemaRef,
}

impl SchemaAdapter for TestSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
file_schema.fields.find(field.name()).map(|(i, _)| i)
}

fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
if self.table_schema.fields().find(file_field.name()).is_some() {
projection.push(file_idx);
}
}

Ok((Arc::new(TestSchemaMapping {}), projection))
}
}

/// A test schema mapper implementation
#[derive(Debug)]
struct TestSchemaMapping {}

impl SchemaMapper for TestSchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
// For testing, just return the original batch
Ok(batch)
}

fn map_column_statistics(
&self,
stats: &[ColumnStatistics],
) -> Result<Vec<ColumnStatistics>> {
// For testing, just return the input statistics
Ok(stats.to_vec())
}
}

#[test]
fn test_schema_adapter() {
// This test verifies the functionality of the SchemaAdapter and SchemaAdapterFactory
// components used in DataFusion's file sources.
//
// The test specifically checks:
// 1. Creating and attaching a schema adapter factory to a file source
// 2. Creating a schema adapter using the factory
// 3. The schema adapter's ability to map column indices between a table schema and a file schema
// 4. The schema adapter's ability to create a projection that selects only the columns
// from the file schema that are present in the table schema
//
// Schema adapters are used when the schema of data in files doesn't exactly match
// the schema expected by the query engine, allowing for field mapping and data transformation.

// Create a test schema
let table_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));

// Create a file schema
let file_schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
Field::new("extra", DataType::Int64, true),
]);

// Create a TestSource
let source = TestSource::new();
assert!(source.schema_adapter_factory().is_none());

// Add a schema adapter factory
let factory = Arc::new(TestSchemaAdapterFactory {});
let source_with_adapter = source.with_schema_adapter_factory(factory).unwrap();
assert!(source_with_adapter.schema_adapter_factory().is_some());

// Create a schema adapter
let adapter_factory = source_with_adapter.schema_adapter_factory().unwrap();
let adapter =
adapter_factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema));

// Test mapping column index
assert_eq!(adapter.map_column_index(0, &file_schema), Some(0));
assert_eq!(adapter.map_column_index(1, &file_schema), Some(1));

// Test creating schema mapper
let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap();
assert_eq!(projection, vec![0, 1]);
}
19 changes: 15 additions & 4 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}
use datafusion_datasource::{
file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
file_stream::FileOpener, impl_schema_adapter_methods,
schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory,
source::DataSourceExec, PartitionedFile,
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
};
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
Expand Down Expand Up @@ -232,7 +231,19 @@ impl FileSource for TestSource {
}
}

impl_schema_adapter_methods!();
fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>> {
Ok(Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self.clone()
}))
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
}

#[derive(Debug, Clone)]
Expand Down
Loading