Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`ParquetOpener`] for opening Parquet files

use crate::page_filter::PagePruningAccessPlanFilter;
use crate::row_filter::build_projection_read_plan;
use crate::row_group_filter::RowGroupAccessPlanFilter;
use crate::{
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
Expand Down Expand Up @@ -59,13 +60,13 @@ use datafusion_execution::parquet_encryption::EncryptionFactory;
use futures::{Stream, StreamExt, ready};
use log::debug;
use parquet::DecodeResult;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};

/// Implements [`FileOpener`] for a parquet file
Expand Down Expand Up @@ -583,12 +584,14 @@ impl FileOpener for ParquetOpener {
// metrics from the arrow reader itself
let arrow_reader_metrics = ArrowReaderMetrics::enabled();

let indices = projection.column_indices();
let mask =
ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone());
let read_plan = build_projection_read_plan(
projection.expr_iter(),
&physical_file_schema,
reader_metadata.parquet_schema(),
);

let decoder = builder
.with_projection(mask)
.with_projection(read_plan.projection_mask)
.with_metrics(arrow_reader_metrics.clone())
.build()?;

Expand All @@ -601,7 +604,7 @@ impl FileOpener for ParquetOpener {
// Rebase column indices to match the narrowed stream schema.
// The projection expressions have indices based on physical_file_schema,
// but the stream only contains the columns selected by the ProjectionMask.
let stream_schema = Arc::new(physical_file_schema.project(&indices)?);
let stream_schema = read_plan.projected_schema;
let replace_schema = stream_schema != output_schema;
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
Expand Down
251 changes: 246 additions & 5 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};

use datafusion_physical_plan::metrics;
Expand Down Expand Up @@ -424,10 +424,26 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
.first()
.and_then(|a| a.as_any().downcast_ref::<Column>())
{
// for Map columns, get_field performs a runtime key lookup rather than a
// schema-level field access so the entire Map column must be read,
// we skip the struct field optimization and defer to normal Column traversal
let is_map_column = self
.file_schema
.index_of(column.name())
.ok()
.map(|idx| {
matches!(
self.file_schema.field(idx).data_type(),
DataType::Map(_, _)
)
})
.unwrap_or(false);

let return_type = func.return_type();

if !DataType::is_nested(return_type)
|| self.is_nested_type_supported(return_type)
if !is_map_column
&& (!DataType::is_nested(return_type)
|| self.is_nested_type_supported(return_type))
{
// try to resolve all field name arguments to strinrg literals
// if any argument is not a string literal, we can not determine the exact
Expand Down Expand Up @@ -579,6 +595,136 @@ pub(crate) fn build_parquet_read_plan(
)))
}

/// Builds a unified [`ParquetReadPlan`] for a set of projection expressions
///
/// Unlike [`build_parquet_read_plan`] (which is used for filter pushdown and
/// returns `None` when an expression references unsupported nested types or
/// missing columns), this function always succeeds. It collects every column
/// that *can* be resolved in the file and produces a leaf-level projection
/// mask. Columns missing from the file are silently skipped since the projection
/// layer handles those by inserting nulls.
Comment on lines +604 to +605
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we never expect to be called with columns missing from the file, shouldn't we error if we are?

pub(crate) fn build_projection_read_plan(
exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
file_schema: &Schema,
schema_descr: &SchemaDescriptor,
) -> ParquetReadPlan {
// fast path: if every expression is a plain Column reference, skip all
// struct analysis and use root-level projection directly
let exprs = exprs.into_iter().collect::<Vec<_>>();
let all_plain_columns = exprs
.iter()
.all(|e| e.as_any().downcast_ref::<Column>().is_some());

if all_plain_columns {
let mut root_indices: Vec<usize> = exprs
.iter()
.map(|e| e.as_any().downcast_ref::<Column>().unwrap().index())
.collect();
root_indices.sort_unstable();
root_indices.dedup();

let projection_mask =
ProjectionMask::roots(schema_descr, root_indices.iter().copied());
let projected_schema = Arc::new(
file_schema
.project(&root_indices)
.expect("valid column indices"),
);

return ParquetReadPlan {
projection_mask,
projected_schema,
};
}

// secondary fast path: if the schema has no struct columns, we can skip
// PushdownChecker traversal and use root-level projection
let has_struct_columns = file_schema
.fields()
.iter()
.any(|f| matches!(f.data_type(), DataType::Struct(_)));

if !has_struct_columns {
let mut root_indices = exprs
.into_iter()
.flat_map(|e| collect_columns(&e).into_iter().map(|col| col.index()))
.collect::<Vec<_>>();

root_indices.sort_unstable();
root_indices.dedup();

let projection_mask =
ProjectionMask::roots(schema_descr, root_indices.iter().copied());

let projected_schema = Arc::new(
file_schema
.project(&root_indices)
.expect("valid column indices"),
);

return ParquetReadPlan {
projection_mask,
projected_schema,
};
}

let mut all_root_indices = Vec::new();
let mut all_struct_accesses = Vec::new();

for expr in exprs {
let mut checker = PushdownChecker::new(file_schema, true);
let _ = expr.visit(&mut checker);
let columns = checker.into_sorted_columns();

all_root_indices.extend_from_slice(&columns.required_columns);
all_struct_accesses.extend(columns.struct_field_accesses);
}

all_root_indices.sort_unstable();
all_root_indices.dedup();

// when no struct field accesses were found, fall back to root-level projection
// to match the performance of the simple path
if all_struct_accesses.is_empty() {
let projection_mask =
ProjectionMask::roots(schema_descr, all_root_indices.iter().copied());
let projected_schema = Arc::new(
file_schema
.project(&all_root_indices)
.expect("valid column indices"),
);

return ParquetReadPlan {
projection_mask,
projected_schema,
};
}

let leaf_indices = {
let mut out =
leaf_indices_for_roots(all_root_indices.iter().copied(), schema_descr);
let struct_leaf_indices =
resolve_struct_field_leaves(&all_struct_accesses, file_schema, schema_descr);

out.extend_from_slice(&struct_leaf_indices);
out.sort_unstable();
out.dedup();

out
};

let projection_mask =
ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied());

let projected_schema =
build_filter_schema(file_schema, &all_root_indices, &all_struct_accesses);

ParquetReadPlan {
projection_mask,
projected_schema,
}
}

fn leaf_indices_for_roots<I>(
root_indices: I,
schema_descr: &SchemaDescriptor,
Expand Down Expand Up @@ -654,6 +800,8 @@ fn build_filter_schema(
regular_indices: &[usize],
struct_field_accesses: &[StructFieldAccess],
) -> SchemaRef {
let regular_set: BTreeSet<usize> = regular_indices.iter().copied().collect();

let all_indices = regular_indices
.iter()
.copied()
Expand All @@ -669,6 +817,15 @@ fn build_filter_schema(
.map(|&idx| {
let field = file_schema.field(idx);

// if this column appears as a regular (whole-column) reference,
// keep the full type
//
// Pruning is only valid when the column is accessed exclusively
// through struct field accesses
if regular_set.contains(&idx) {
return Arc::new(field.clone());
}

// collect all field paths that access this root struct column
let field_paths = struct_field_accesses
.iter()
Expand All @@ -683,7 +840,6 @@ fn build_filter_schema(
.collect::<Vec<_>>();

if field_paths.is_empty() {
// its a regular column - use the full type
return Arc::new(field.clone());
}

Expand All @@ -696,7 +852,10 @@ fn build_filter_schema(
})
.collect::<Vec<_>>();

Arc::new(Schema::new(fields))
Arc::new(Schema::new_with_metadata(
fields,
file_schema.metadata().clone(),
))
}

fn prune_struct_type(dt: &DataType, paths: &[&[String]]) -> DataType {
Expand Down Expand Up @@ -958,6 +1117,8 @@ mod test {
use parquet::file::reader::{FileReader, SerializedFileReader};
use tempfile::NamedTempFile;

use datafusion_physical_expr::expressions::Column as PhysicalColumn;

// List predicates used by the decoder should be accepted for pushdown
#[test]
fn test_filter_candidate_builder_supports_list_types() {
Expand Down Expand Up @@ -1814,6 +1975,86 @@ mod test {
assert_eq!(file_metrics.pushdown_rows_matched.value(), 2);
}

#[test]
fn projection_read_plan_preserves_full_struct() {
// Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
// Parquet leaves: id=0, s.value=1, s.label=2
let struct_fields: Fields = vec![
Arc::new(Field::new("value", DataType::Int32, false)),
Arc::new(Field::new("label", DataType::Utf8, false)),
]
.into();

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("s", DataType::Struct(struct_fields.clone()), false),
]));

let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StructArray::new(
struct_fields,
vec![
Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
Arc::new(StringArray::from(vec!["a", "b", "c"])) as _,
],
None,
)),
],
)
.unwrap();

let file = NamedTempFile::new().expect("temp file");
let mut writer =
ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None)
.expect("writer");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");

let reader_file = file.reopen().expect("reopen file");
let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
.expect("reader builder");
let metadata = builder.metadata().clone();
let file_schema = builder.schema().clone();
let schema_descr = metadata.file_metadata().schema_descr();

// Simulate SELECT * output projection: Column("id") and Column("s")
// Plus a get_field(s, 'value') expression from the pushed-down filter
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(PhysicalColumn::new("id", 0)),
Arc::new(PhysicalColumn::new("s", 1)),
logical2physical(
&get_field().call(vec![
col("s"),
Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None),
]),
&file_schema,
),
];

let read_plan = build_projection_read_plan(exprs, &file_schema, schema_descr);

// The projected schema must have the FULL struct type because Column("s")
// is in the projection. It should NOT be narrowed to Struct{value: Int32}.
let s_field = read_plan.projected_schema.field_with_name("s").unwrap();
assert_eq!(
s_field.data_type(),
&DataType::Struct(
vec![
Arc::new(Field::new("value", DataType::Int32, false)),
Arc::new(Field::new("label", DataType::Utf8, false)),
]
.into()
),
);

// all3 Parquet leaves should be in the projection mask
let expected_mask = ProjectionMask::leaves(schema_descr, [0, 1, 2]);
assert_eq!(read_plan.projection_mask, expected_mask,);
}

/// Sanity check that the given expression could be evaluated against the given schema without any errors.
/// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
fn check_expression_can_evaluate_against_schema(
Expand Down
Loading
Loading