diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 2522ae3050000..1dbb801c9309e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -18,6 +18,7 @@ //! [`ParquetOpener`] for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; +use crate::row_filter::build_projection_read_plan; use crate::row_group_filter::RowGroupAccessPlanFilter; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, @@ -59,13 +60,13 @@ use datafusion_execution::parquet_encryption::EncryptionFactory; use futures::{Stream, StreamExt, ready}; use log::debug; use parquet::DecodeResult; +use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; -use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; /// Implements [`FileOpener`] for a parquet file @@ -583,12 +584,14 @@ impl FileOpener for ParquetOpener { // metrics from the arrow reader itself let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let indices = projection.column_indices(); - let mask = - ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone()); + let read_plan = build_projection_read_plan( + projection.expr_iter(), + &physical_file_schema, + reader_metadata.parquet_schema(), + ); let decoder = builder - .with_projection(mask) + .with_projection(read_plan.projection_mask) .with_metrics(arrow_reader_metrics.clone()) .build()?; @@ -601,7 +604,7 @@ impl FileOpener for ParquetOpener { // Rebase column indices to match the narrowed stream schema. // The projection expressions have indices based on physical_file_schema, // but the stream only contains the columns selected by the ProjectionMask. - let stream_schema = Arc::new(physical_file_schema.project(&indices)?); + let stream_schema = read_plan.projected_schema; let replace_schema = stream_schema != output_schema; let projection = projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index d120f743fa1d5..67b65321d9bff 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -83,7 +83,7 @@ use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::expressions::{Column, Literal}; -use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; @@ -424,10 +424,26 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { .first() .and_then(|a| a.as_any().downcast_ref::()) { + // for Map columns, get_field performs a runtime key lookup rather than a + // schema-level field access so the entire Map column must be read, + // we skip the struct field optimization and defer to normal Column traversal + let is_map_column = self + .file_schema + .index_of(column.name()) + .ok() + .map(|idx| { + matches!( + self.file_schema.field(idx).data_type(), + DataType::Map(_, _) + ) + }) + .unwrap_or(false); + let return_type = func.return_type(); - if !DataType::is_nested(return_type) - || self.is_nested_type_supported(return_type) + if !is_map_column + && (!DataType::is_nested(return_type) + || self.is_nested_type_supported(return_type)) { // try to resolve all field name arguments to strinrg literals // if any argument is not a string literal, we can not determine the exact @@ -579,6 +595,136 @@ pub(crate) fn build_parquet_read_plan( ))) } +/// Builds a unified [`ParquetReadPlan`] for a set of projection expressions +/// +/// Unlike [`build_parquet_read_plan`] (which is used for filter pushdown and +/// returns `None` when an expression references unsupported nested types or +/// missing columns), this function always succeeds. It collects every column +/// that *can* be resolved in the file and produces a leaf-level projection +/// mask. Columns missing from the file are silently skipped since the projection +/// layer handles those by inserting nulls. +pub(crate) fn build_projection_read_plan( + exprs: impl IntoIterator>, + file_schema: &Schema, + schema_descr: &SchemaDescriptor, +) -> ParquetReadPlan { + // fast path: if every expression is a plain Column reference, skip all + // struct analysis and use root-level projection directly + let exprs = exprs.into_iter().collect::>(); + let all_plain_columns = exprs + .iter() + .all(|e| e.as_any().downcast_ref::().is_some()); + + if all_plain_columns { + let mut root_indices: Vec = exprs + .iter() + .map(|e| e.as_any().downcast_ref::().unwrap().index()) + .collect(); + root_indices.sort_unstable(); + root_indices.dedup(); + + let projection_mask = + ProjectionMask::roots(schema_descr, root_indices.iter().copied()); + let projected_schema = Arc::new( + file_schema + .project(&root_indices) + .expect("valid column indices"), + ); + + return ParquetReadPlan { + projection_mask, + projected_schema, + }; + } + + // secondary fast path: if the schema has no struct columns, we can skip + // PushdownChecker traversal and use root-level projection + let has_struct_columns = file_schema + .fields() + .iter() + .any(|f| matches!(f.data_type(), DataType::Struct(_))); + + if !has_struct_columns { + let mut root_indices = exprs + .into_iter() + .flat_map(|e| collect_columns(&e).into_iter().map(|col| col.index())) + .collect::>(); + + root_indices.sort_unstable(); + root_indices.dedup(); + + let projection_mask = + ProjectionMask::roots(schema_descr, root_indices.iter().copied()); + + let projected_schema = Arc::new( + file_schema + .project(&root_indices) + .expect("valid column indices"), + ); + + return ParquetReadPlan { + projection_mask, + projected_schema, + }; + } + + let mut all_root_indices = Vec::new(); + let mut all_struct_accesses = Vec::new(); + + for expr in exprs { + let mut checker = PushdownChecker::new(file_schema, true); + let _ = expr.visit(&mut checker); + let columns = checker.into_sorted_columns(); + + all_root_indices.extend_from_slice(&columns.required_columns); + all_struct_accesses.extend(columns.struct_field_accesses); + } + + all_root_indices.sort_unstable(); + all_root_indices.dedup(); + + // when no struct field accesses were found, fall back to root-level projection + // to match the performance of the simple path + if all_struct_accesses.is_empty() { + let projection_mask = + ProjectionMask::roots(schema_descr, all_root_indices.iter().copied()); + let projected_schema = Arc::new( + file_schema + .project(&all_root_indices) + .expect("valid column indices"), + ); + + return ParquetReadPlan { + projection_mask, + projected_schema, + }; + } + + let leaf_indices = { + let mut out = + leaf_indices_for_roots(all_root_indices.iter().copied(), schema_descr); + let struct_leaf_indices = + resolve_struct_field_leaves(&all_struct_accesses, file_schema, schema_descr); + + out.extend_from_slice(&struct_leaf_indices); + out.sort_unstable(); + out.dedup(); + + out + }; + + let projection_mask = + ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied()); + + let projected_schema = + build_filter_schema(file_schema, &all_root_indices, &all_struct_accesses); + + ParquetReadPlan { + projection_mask, + projected_schema, + } +} + fn leaf_indices_for_roots( root_indices: I, schema_descr: &SchemaDescriptor, @@ -654,6 +800,8 @@ fn build_filter_schema( regular_indices: &[usize], struct_field_accesses: &[StructFieldAccess], ) -> SchemaRef { + let regular_set: BTreeSet = regular_indices.iter().copied().collect(); + let all_indices = regular_indices .iter() .copied() @@ -669,6 +817,15 @@ fn build_filter_schema( .map(|&idx| { let field = file_schema.field(idx); + // if this column appears as a regular (whole-column) reference, + // keep the full type + // + // Pruning is only valid when the column is accessed exclusively + // through struct field accesses + if regular_set.contains(&idx) { + return Arc::new(field.clone()); + } + // collect all field paths that access this root struct column let field_paths = struct_field_accesses .iter() @@ -683,7 +840,6 @@ fn build_filter_schema( .collect::>(); if field_paths.is_empty() { - // its a regular column - use the full type return Arc::new(field.clone()); } @@ -696,7 +852,10 @@ fn build_filter_schema( }) .collect::>(); - Arc::new(Schema::new(fields)) + Arc::new(Schema::new_with_metadata( + fields, + file_schema.metadata().clone(), + )) } fn prune_struct_type(dt: &DataType, paths: &[&[String]]) -> DataType { @@ -958,6 +1117,8 @@ mod test { use parquet::file::reader::{FileReader, SerializedFileReader}; use tempfile::NamedTempFile; + use datafusion_physical_expr::expressions::Column as PhysicalColumn; + // List predicates used by the decoder should be accepted for pushdown #[test] fn test_filter_candidate_builder_supports_list_types() { @@ -1814,6 +1975,86 @@ mod test { assert_eq!(file_metrics.pushdown_rows_matched.value(), 2); } + #[test] + fn projection_read_plan_preserves_full_struct() { + // Schema: id (Int32), s (Struct{value: Int32, label: Utf8}) + // Parquet leaves: id=0, s.value=1, s.label=2 + let struct_fields: Fields = vec![ + Arc::new(Field::new("value", DataType::Int32, false)), + Arc::new(Field::new("label", DataType::Utf8, false)), + ] + .into(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("s", DataType::Struct(struct_fields.clone()), false), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StructArray::new( + struct_fields, + vec![ + Arc::new(Int32Array::from(vec![10, 20, 30])) as _, + Arc::new(StringArray::from(vec!["a", "b", "c"])) as _, + ], + None, + )), + ], + ) + .unwrap(); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None) + .expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = builder.metadata().clone(); + let file_schema = builder.schema().clone(); + let schema_descr = metadata.file_metadata().schema_descr(); + + // Simulate SELECT * output projection: Column("id") and Column("s") + // Plus a get_field(s, 'value') expression from the pushed-down filter + let exprs: Vec> = vec![ + Arc::new(PhysicalColumn::new("id", 0)), + Arc::new(PhysicalColumn::new("s", 1)), + logical2physical( + &get_field().call(vec![ + col("s"), + Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None), + ]), + &file_schema, + ), + ]; + + let read_plan = build_projection_read_plan(exprs, &file_schema, schema_descr); + + // The projected schema must have the FULL struct type because Column("s") + // is in the projection. It should NOT be narrowed to Struct{value: Int32}. + let s_field = read_plan.projected_schema.field_with_name("s").unwrap(); + assert_eq!( + s_field.data_type(), + &DataType::Struct( + vec![ + Arc::new(Field::new("value", DataType::Int32, false)), + Arc::new(Field::new("label", DataType::Utf8, false)), + ] + .into() + ), + ); + + // all3 Parquet leaves should be in the projection mask + let expected_mask = ProjectionMask::leaves(schema_descr, [0, 1, 2]); + assert_eq!(read_plan.projection_mask, expected_mask,); + } + /// Sanity check that the given expression could be evaluated against the given schema without any errors. /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc. fn check_expression_can_evaluate_against_schema( diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt b/datafusion/sqllogictest/test_files/projection_pushdown.slt index 1735b1fb41bb3..777f1e00ed312 100644 --- a/datafusion/sqllogictest/test_files/projection_pushdown.slt +++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt @@ -1994,9 +1994,83 @@ WHERE COALESCE(get_field(s, 'f1'), get_field(s, 'f2')) = 1; ---- 1 +##################### +# Section 8: SELECT * with struct field filter +##################### + +# When SELECT * includes the full struct but the filter only accesses a +# sub-field (e.g. s['id']), the leaf-level projection must not narrow the +# struct schema in the output. Previously build_projection_read_plan would +# produce a schema with Struct("id": Int32) while the data still contained +# Struct("id": Int32, "value": Utf8), causing an ArrowError. + +# 8.1: SELECT * with equality filter on struct sub-field +query I? +SELECT * FROM simple_struct WHERE s['value'] = 100; +---- +1 {value: 100, label: alpha} + +# 8.2: Explicit SELECT of whole struct with struct sub-field filter +query ? +SELECT s FROM simple_struct WHERE s['value'] = 100; +---- +{value: 100, label: alpha} + +# 8.3: Whole struct + sub-field projection + sub-field filter +query I?I +SELECT s['value'], s, id FROM simple_struct WHERE s['value'] = 100; +---- +100 {value: 100, label: alpha} 1 + +# 8.4: Whole struct in output, filter on a different sub-field than projected +query ?T +SELECT s, s['label'] FROM simple_struct WHERE s['value'] > 200; +---- +{value: 300, label: delta} delta +{value: 250, label: epsilon} epsilon + +# 8.5: Filter references both sub-fields, output includes whole struct +query I? +SELECT id, s FROM simple_struct WHERE s['value'] > 100 AND s['label'] = 'beta'; +---- +2 {value: 200, label: beta} + +# 8.6: Only sub-field projection with sub-field filter (no whole struct — should prune) +query II +SELECT id, s['value'] FROM simple_struct WHERE s['value'] = 100; +---- +1 100 + +# 8.7: Nested struct — whole struct output with deeply nested field filter +query I? +SELECT * FROM nested_struct WHERE nested['outer']['inner'] > 15; +---- +2 {outer: {inner: 20, name: two}, extra: y} +3 {outer: {inner: 30, name: three}, extra: z} + +# 8.8: Nested struct — explicit whole struct select with sibling field filter +query ? +SELECT nested FROM nested_struct WHERE nested['extra'] = 'y'; +---- +{outer: {inner: 20, name: two}, extra: y} + +# 8.9: Nullable struct — whole struct output with sub-field filter +query ? +SELECT s FROM nullable_struct WHERE s['value'] > 100; +---- +{value: 150, label: gamma} +{value: 250, label: epsilon} + +# 8.10: Struct sub-field filter combined with top-level column filter +query ?I +SELECT s, id FROM simple_struct WHERE s['value'] > 100 AND id < 4; +---- +{value: 200, label: beta} 2 +{value: 150, label: gamma} 3 + # Config reset -# The SLT runner sets `target_partitions` to 4 instead of using the default, so +# The SLT runner sets `target_partitions` to 4 instead of using the default, so # reset it explicitly. statement ok SET datafusion.execution.target_partitions = 4;