diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 0d25fb969840..51c0572f0b46 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -39,6 +39,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; @@ -144,7 +145,8 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD private Schema lazySchema() { if (schema == null) { if (requestedSchema != null) { - this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema); + // the projection should include all columns that will be returned, including those only used in filters + this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive); } else { this.schema = table.schema(); } @@ -152,6 +154,13 @@ private Schema lazySchema() { return schema; } + private Expression filterExpression() { + if (filterExpressions != null) { + return filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and); + } + return Expressions.alwaysTrue(); + } + private StructType lazyType() { if (type == null) { this.type = SparkSchemaUtil.convert(lazySchema()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index ec9aa706ef09..8e240a1ce565 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -95,51 +95,33 @@ Iterator open(FileScanTask task) { InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); // schema or rows returned by readers - Schema finalSchema = expectedSchema; PartitionSpec spec = task.spec(); Set idColumns = spec.identitySourceIds(); + Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); - // schema needed for the projection and filtering - StructType sparkType = SparkSchemaUtil.convert(finalSchema); - Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); - boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); - boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); - - Schema iterSchema; - Iterator iter; - - if (hasJoinedPartitionColumns) { + if (projectsIdentityPartitionColumns) { if (SUPPORTS_CONSTANTS.contains(file.format())) { - iterSchema = requiredSchema; - iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)); - } else { - // schema used to read data files - Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); - Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); + return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)); + } - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); + // schema used to read data files + Schema readSchema = TypeUtil.selectNot(expectedSchema, idColumns); + PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); + JoinedRow joined = new JoinedRow(); - // create joined rows and project from the joined schema to the final schema - iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft); - } - } else if (hasExtraFilterColumns) { - // add projection to the final schema - iterSchema = requiredSchema; - iter = open(task, requiredSchema, ImmutableMap.of()); - } else { - // return the base iterator - iterSchema = finalSchema; - iter = open(task, finalSchema, ImmutableMap.of()); + // create joined rows and project from the joined schema to the final schema + Schema joinedSchema = TypeUtil.join(readSchema, partitionSchema); + InternalRow partition = convertToRow.apply(file.partition()); + joined.withRight(partition); + + return Iterators.transform( + Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft), + APPLY_PROJECTION.bind(projection(expectedSchema, joinedSchema))::invoke); } - // TODO: remove the projection by reporting the iterator's schema back to Spark - return Iterators.transform( - iter, - APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); + // return the base iterator + return open(task, expectedSchema, ImmutableMap.of()); } private Iterator open(FileScanTask task, Schema readSchema, Map idToConstant) {