Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 101 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod tests {
use crate::test::object_store::local_unpartitioned_file;
use arrow::array::{
ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
StructArray,
StringViewArray, StructArray,
};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -100,6 +100,7 @@ mod tests {
predicate: Option<Expr>,
pushdown_predicate: bool,
page_index_predicate: bool,
bloom_filters: bool,
}

impl RoundTrip {
Expand Down Expand Up @@ -132,6 +133,11 @@ mod tests {
self
}

fn with_bloom_filters(mut self) -> Self {
self.bloom_filters = true;
self
}

/// run the test, returning only the resulting RecordBatches
async fn round_trip_to_batches(
self,
Expand All @@ -156,10 +162,20 @@ mod tests {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
} else {
source = source.with_pushdown_filters(false);
}
Comment on lines +165 to 167
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to be explicit


if self.page_index_predicate {
source = source.with_enable_page_index(true);
} else {
source = source.with_enable_page_index(false);
}

if self.bloom_filters {
source = source.with_bloom_filter_on_read(true);
} else {
source = source.with_bloom_filter_on_read(false);
}
Comment on lines +175 to 179
Copy link
Contributor Author

@adriangb adriangb May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise bloom filters will pick up the slack and do pruning even if the types mismatch and stats / page / row pruning fails (bloom filters use &str for both Utf8 and Utf8View so they don't care). Only one of the tests actually relies on bloom filters.


source.with_schema(Arc::clone(&file_schema))
Expand Down Expand Up @@ -817,7 +833,7 @@ mod tests {
}

#[tokio::test]
async fn evolved_schema_filter() {
async fn evolved_schema_column_order_filter() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

Expand Down Expand Up @@ -848,6 +864,88 @@ mod tests {
assert_eq!(read.len(), 0);
}

#[tokio::test]
async fn evolved_schema_column_type_filter_strings() {
// The table and filter have a common data type, but the file schema differs
let c1: ArrayRef =
Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
let batch = create_batch(vec![("c1", c1.clone())]);

let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));

// Predicate should prune all row groups
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the filter should be constructed by Utf8View, it's a logical expr.

let filter = col("c1").eq(lit(ScalarValue::Utf8View(Some("aaa".to_string()))));

Then, the test will fail because https://github.com/apache/datafusion/pull/16086/files#diff-c8eebe4fb9ee7662170c6ce507ad2be92c0229b3a91a2f4431a9bfc7185a0eb9L148 uses file_schema to transfer logical expr to physical expr. And #16133 will fix it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon more reflection, I think the predicate needs to be created using the logical schema today

@adriangb @etseidl and myself are discussing potentially changing that here:

let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema.clone())
.round_trip(vec![batch.clone()])
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
assert_eq!(rt.batches.unwrap().len(), 0);

// Predicate should prune no row groups
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema)
.round_trip(vec![batch])
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
let read = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
}

#[tokio::test]
async fn evolved_schema_column_type_filter_ints() {
// The table and filter have a common data type, but the file schema differs
let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
let batch = create_batch(vec![("c1", c1.clone())]);

let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)]));

// Predicate should prune all row groups
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema.clone())
.round_trip(vec![batch.clone()])
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(rt.batches.unwrap().len(), 0);

// Predicate should prune no row groups
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema)
.round_trip(vec![batch])
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
let read = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
}

#[tokio::test]
async fn evolved_schema_disjoint_schema_filter() {
let c1: ArrayRef =
Expand Down Expand Up @@ -1630,6 +1728,7 @@ mod tests {
let rt = RoundTrip::new()
.with_predicate(filter.clone())
.with_pushdown_predicate()
.with_bloom_filters()
.round_trip(vec![batch1])
.await;

Expand Down
28 changes: 16 additions & 12 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ pub(super) struct ParquetOpener {
pub limit: Option<usize>,
/// Optional predicate to apply during the scan
pub predicate: Option<Arc<dyn PhysicalExpr>>,
/// Schema of the output table
pub table_schema: SchemaRef,
/// Schema of the output table without partition columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that when the code changes are reverted this test fails:

cargo test --all-features -p datafusion -- parquet
...

---- datasource::physical_plan::parquet::tests::evolved_schema_column_type_filter_ints stdout ----

thread 'datasource::physical_plan::parquet::tests::evolved_schema_column_type_filter_ints' panicked at datafusion/core/src/datasource/physical_plan/parquet.rs:927:9:
assertion `left == right` failed
  left: 1
 right: 0
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- datasource::physical_plan::parquet::tests::evolved_schema_column_type_filter_strings stdout ----

thread 'datasource::physical_plan::parquet::tests::evolved_schema_column_type_filter_strings' panicked at datafusion/core/src/datasource/physical_plan/parquet.rs:885:9:
assertion `left == right` failed
  left: 1
 right: 0


failures:
    datasource::physical_plan::parquet::tests::evolved_schema_column_type_filter_ints
    datasource::physical_plan::parquet::tests::evolved_schema_column_type_filter_strings

Copy link
Member

@xudong963 xudong963 May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they're related to the PR: #16133

Sorry, I misread @alamb 's message

/// This is the schema we coerce the physical file schema into.
pub logical_file_schema: SchemaRef,
/// Optional hint for how large the initial request to read parquet metadata
/// should be
pub metadata_size_hint: Option<usize>,
Expand Down Expand Up @@ -104,13 +105,13 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;

let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, Arc::clone(&self.table_schema));
.create(projected_schema, Arc::clone(&self.logical_file_schema));
let predicate = self.predicate.clone();
let table_schema = Arc::clone(&self.table_schema);
let logical_file_schema = Arc::clone(&self.logical_file_schema);
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let coerce_int96 = self.coerce_int96;
Expand Down Expand Up @@ -141,17 +142,20 @@ impl FileOpener for ParquetOpener {
.await?;

// Note about schemas: we are actually dealing with **3 different schemas** here:
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
// - The table schema as defined by the TableProvider.
// This is what the user sees, what they get when they `SELECT * FROM table`, etc.
// - The logical file schema: this is the table schema minus any hive partition columns and projections.
// This is what the physicalfile schema is coerced to.
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
let mut physical_file_schema = Arc::clone(reader_metadata.schema());

// The schema loaded from the file may not be the same as the
// desired schema (for example if we want to instruct the parquet
// reader to read strings using Utf8View instead). Update if necessary
if let Some(merged) =
apply_file_schema_type_coercions(&table_schema, &physical_file_schema)
{
if let Some(merged) = apply_file_schema_type_coercions(
&logical_file_schema,
&physical_file_schema,
) {
physical_file_schema = Arc::new(merged);
options = options.with_schema(Arc::clone(&physical_file_schema));
reader_metadata = ArrowReaderMetadata::try_new(
Expand All @@ -178,7 +182,7 @@ impl FileOpener for ParquetOpener {
// Build predicates for this specific file
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
predicate.as_ref(),
&physical_file_schema,
&logical_file_schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual change (from physical to logical schema) -- I am calling this out because it took me a while to spot it (at first I thought this was only a name change)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sorry for also changing the names in this PR - it was just too confusing to even work on this PR without giving things a new name. Let me know if you have any suggestions for better names, happy to accept them, naming here seems to be 1/2 of the problem. You know what they say, there's 3 hard problems...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries -- I think the new names are much clearer to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, coming to this too late, but I'm curious why the logical schema? To me pruning implies a physical layer kind of thing. I ask because now I'm having issues in #15821 trying to build pruning predicates for columns that don't exist in the physical schema.

This may just mean #15821 is operating at too low a level and needs to operate up here where we a) know it's parquet and b) have the physical schema.

@alamb @adriangb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like it to operate at the physical layer and maybe we can get back there through future work.

My immediate goal merging this PR was to fix the pretty bad regression I had introduced. A step backwards to avoid running into a wall.

But I also am not sure that we can even make it operate at the physical layer properly: there's a lot of logic that goes into determining what casts to do and such that happens at the logical layer. We'd have to re-create all of that at the physical layer because if the types of the columns change everything has to be re-evaluated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key observation is that the predicates are expressed in terms of the table schema (not the file schema).

Today there is a bunch of logic to translate table schema -> file schema for data and statistics.

So in order to evaluate predicates on the physical schema we would need to translate the expression as well, which I think is what @adriangb is saying above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep precisely. And that requires re evaluating cast rules, etc which is... not easy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for the explanation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&predicate_creation_errors,
);

Expand Down Expand Up @@ -215,7 +219,7 @@ impl FileOpener for ParquetOpener {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
&table_schema,
&logical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<boo
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
physical_file_schema: &SchemaRef,
table_schema: &SchemaRef,
logical_file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
Expand All @@ -447,7 +447,7 @@ pub fn build_row_filter(
FilterCandidateBuilder::new(
Arc::clone(expr),
Arc::clone(physical_file_schema),
Arc::clone(table_schema),
Arc::clone(logical_file_schema),
Arc::clone(schema_adapter_factory),
)
.build(metadata)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ impl FileSource for ParquetSource {
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
predicate: self.predicate.clone(),
table_schema: Arc::clone(&base_config.file_schema),
logical_file_schema: Arc::clone(&base_config.file_schema),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
Expand Down