From b8d79a909a97e71b03d76811696949a776599741 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Jun 2025 16:58:34 -0400 Subject: [PATCH 1/6] fix complex types; filx filter schema logic --- CHANGES.md | 1 + .../beam/sdk/io/iceberg/FilterUtils.java | 52 ++++++++ .../apache/beam/sdk/io/iceberg/IcebergIO.java | 2 +- .../sdk/io/iceberg/IcebergScanConfig.java | 67 ++++++++-- .../beam/sdk/io/iceberg/IcebergUtils.java | 73 ++++++++++- .../beam/sdk/io/iceberg/ReadFromTasks.java | 5 +- .../beam/sdk/io/iceberg/ScanTaskReader.java | 26 ++-- .../beam/sdk/io/iceberg/ScanTaskSource.java | 6 - .../beam/sdk/io/iceberg/FilterUtilsTest.java | 19 +++ .../sdk/io/iceberg/IcebergIOReadTest.java | 14 +- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 120 ++++++++++++++++++ 11 files changed, 344 insertions(+), 41 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0c126e4087e7..413005fb25d9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -103,6 +103,7 @@ * [Python] Fixed vLLM leaks connections causing a throughput bottleneck and underutilization of GPU ([35053](https://github.com/apache/beam/pull/35053)) * (Python) Fixed cloudpickle overwriting class states every time loading a same object of dynamic class ([#35062](https://github.com/apache/beam/issues/35062)). * [Python] Fixed pip install apache-beam[interactive] causes crash on google colab ([#35148](https://github.com/apache/beam/pull/35148)). +* [IcebergIO] Fixed Beam <-> Iceberg conversion logic for arrays of structs and maps of structs ([]()). ## Security Fixes * Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 74ff86fcafda..4fc4bb9c44c3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -24,9 +24,11 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall; @@ -72,6 +74,56 @@ class FilterUtils { .put(SqlKind.OR, Operation.OR) .build(); + /** + * Parses a SQL filter expression string and returns a set of all field names referenced within + * it. + */ + static Set getReferencedFieldNames(@Nullable String filter) { + if (filter == null || filter.trim().isEmpty()) { + return new HashSet<>(); + } + + SqlParser parser = SqlParser.create(filter); + try { + SqlNode expression = parser.parseExpression(); + Set fieldNames = new HashSet<>(); + extractFieldNames(expression, fieldNames); + System.out.println("xxx fields in filter: " + fieldNames); + return fieldNames; + } catch (Exception exception) { + throw new RuntimeException( + String.format("Encountered an error when parsing filter: '%s'", filter), exception); + } + } + + private static void extractFieldNames(SqlNode node, Set fieldNames) { + if (node instanceof SqlIdentifier) { + fieldNames.add(((SqlIdentifier) node).getSimple()); + } else if (node instanceof SqlBasicCall) { + // recursively check operands + SqlBasicCall call = (SqlBasicCall) node; + for (SqlNode operand : call.getOperandList()) { + extractFieldNames(operand, fieldNames); + } + } else if (node instanceof SqlNodeList) { + // For IN clauses, the right-hand side is a SqlNodeList, so iterate through its elements + SqlNodeList nodeList = (SqlNodeList) node; + for (SqlNode element : nodeList.getList()) { + if (element != null) { + extractFieldNames(element, fieldNames); + } + } + } + // SqlLiteral nodes do not contain field names, so we can ignore them. + } + + /** + * parses a SQL filter expression string into an Iceberg {@link Expression} that can be used for + * data pruning. + * + *

Note: This utility currently supports only top-level fields within the filter expression. + * Nested field references are not supported. + */ static Expression convert(@Nullable String filter, Schema schema) { if (filter == null) { return Expressions.alwaysTrue(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 6b2ea6721e31..956e45651df7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -603,7 +603,7 @@ public PCollection expand(PBegin input) { .setUseCdc(getUseCdc()) .setKeepFields(getKeep()) .setDropFields(getDrop()) - .setFilter(FilterUtils.convert(getFilter(), table.schema())) + .setFilterString(getFilter()) .build(); scanConfig.validate(table); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 4edf3512952e..e2f42e3e7cbe 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -45,6 +46,10 @@ @AutoValue public abstract class IcebergScanConfig implements Serializable { private transient @MonotonicNonNull Table cachedTable; + private transient org.apache.iceberg.@MonotonicNonNull Schema cachedProjectedSchema; + private transient org.apache.iceberg.@MonotonicNonNull Schema cachedRequiredSchema; + private transient @MonotonicNonNull Evaluator cachedEvaluator; + private transient @MonotonicNonNull Expression cachedFilter; public enum ScanType { TABLE, @@ -75,19 +80,56 @@ public Table getTable() { @VisibleForTesting static org.apache.iceberg.Schema resolveSchema( org.apache.iceberg.Schema schema, @Nullable List keep, @Nullable List drop) { + return resolveSchema(schema, keep, drop, null); + } + + @VisibleForTesting + static org.apache.iceberg.Schema resolveSchema( + org.apache.iceberg.Schema schema, + @Nullable List keep, + @Nullable List drop, + @Nullable Set fieldsInFilter) { + ImmutableList.Builder selectedFieldsBuilder = ImmutableList.builder(); if (keep != null && !keep.isEmpty()) { - schema = schema.select(keep); + selectedFieldsBuilder.addAll(keep); } else if (drop != null && !drop.isEmpty()) { Set fields = schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); drop.forEach(fields::remove); - schema = schema.select(fields); + selectedFieldsBuilder.addAll(fields); } - return schema; + + if (fieldsInFilter != null && !fieldsInFilter.isEmpty()) { + fieldsInFilter.stream() + .map(f -> schema.caseInsensitiveFindField(f).name()) + .forEach(selectedFieldsBuilder::add); + } + ImmutableList selectedFields = selectedFieldsBuilder.build(); + return selectedFields.isEmpty() ? schema : schema.select(selectedFields); } + /** Returns the projected Schema after applying column pruning. */ public org.apache.iceberg.Schema getProjectedSchema() { - return resolveSchema(getTable().schema(), getKeepFields(), getDropFields()); + if (cachedProjectedSchema == null) { + cachedProjectedSchema = resolveSchema(getTable().schema(), getKeepFields(), getDropFields()); + } + return cachedProjectedSchema; + } + + /** + * Returns a Schema that includes explicitly selected fields and fields referenced in the filter + * statement. + */ + public org.apache.iceberg.Schema getRequiredSchema() { + if (cachedRequiredSchema == null) { + cachedRequiredSchema = + resolveSchema( + getTable().schema(), + getKeepFields(), + getDropFields(), + FilterUtils.getReferencedFieldNames(getFilterString())); + } + return cachedRequiredSchema; } @Pure @@ -98,15 +140,22 @@ public Evaluator getEvaluator() { return null; } if (cachedEvaluator == null) { - cachedEvaluator = new Evaluator(getProjectedSchema().asStruct(), filter); + cachedEvaluator = new Evaluator(getRequiredSchema().asStruct(), filter); } return cachedEvaluator; } - private transient @Nullable Evaluator cachedEvaluator; + @Pure + @Nullable + public Expression getFilter() { + if (cachedFilter == null) { + cachedFilter = FilterUtils.convert(getFilterString(), getTable().schema()); + } + return cachedFilter; + } @Pure - public abstract @Nullable Expression getFilter(); + public abstract @Nullable String getFilterString(); @Pure public abstract @Nullable Boolean getCaseSensitive(); @@ -172,7 +221,7 @@ public Evaluator getEvaluator() { public static Builder builder() { return new AutoValue_IcebergScanConfig.Builder() .setScanType(ScanType.TABLE) - .setFilter(null) + .setFilterString(null) .setCaseSensitive(null) .setOptions(ImmutableMap.of()) .setSnapshot(null) @@ -211,7 +260,7 @@ public Builder setTableIdentifier(String... names) { public abstract Builder setSchema(Schema schema); - public abstract Builder setFilter(@Nullable Expression filter); + public abstract Builder setFilterString(@Nullable String filter); public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 23dbc3c81a00..409f6032798c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.nio.ByteBuffer; import java.time.LocalDate; @@ -26,10 +28,12 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; @@ -41,6 +45,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.Instant; @@ -345,10 +350,33 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row))); break; case LIST: - Optional.ofNullable(value.getArray(name)).ifPresent(list -> rec.setField(name, list)); + Collection<@NonNull ?> icebergList = value.getArray(name); + Type collectionType = ((Types.ListType) field.type()).elementType(); + + if (collectionType.isStructType() && icebergList != null) { + org.apache.iceberg.Schema innerSchema = collectionType.asStructType().asSchema(); + icebergList = + icebergList.stream() + .map(v -> beamRowToIcebergRecord(innerSchema, (Row) v)) + .collect(Collectors.toList()); + } + Optional.ofNullable(icebergList).ifPresent(list -> rec.setField(name, list)); break; case MAP: - Optional.ofNullable(value.getMap(name)).ifPresent(v -> rec.setField(name, v)); + Map icebergMap = value.getMap(name); + Type valueType = ((Types.MapType) field.type()).valueType(); + // recurse on struct types + if (valueType.isStructType() && icebergMap != null) { + org.apache.iceberg.Schema innerSchema = valueType.asStructType().asSchema(); + + ImmutableMap.Builder newMap = ImmutableMap.builder(); + for (Map.Entry entry : icebergMap.entrySet()) { + Row row = checkStateNotNull(((Row) entry.getValue())); + newMap.put(checkStateNotNull(entry.getKey()), beamRowToIcebergRecord(innerSchema, row)); + } + icebergMap = newMap.build(); + } + Optional.ofNullable(icebergMap).ifPresent(v -> rec.setField(name, v)); break; } } @@ -426,10 +454,49 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case DOUBLE: // Iceberg and Beam both use double case STRING: // Iceberg and Beam both use String case BOOLEAN: // Iceberg and Beam both use boolean + rowBuilder.addValue(icebergValue); + break; case ARRAY: case ITERABLE: + checkState( + icebergValue instanceof List, + "Expected List type for field '%s' but received %s", + field.getName(), + icebergValue.getClass()); + List<@NonNull ?> beamList = (List<@NonNull ?>) icebergValue; + Schema.FieldType collectionType = + checkStateNotNull(field.getType().getCollectionElementType()); + // recurse on struct types + if (collectionType.getTypeName().isCompositeType()) { + Schema innerSchema = checkStateNotNull(collectionType.getRowSchema()); + beamList = + beamList.stream() + .map(v -> icebergRecordToBeamRow(innerSchema, (Record) v)) + .collect(Collectors.toList()); + } + rowBuilder.addValue(beamList); + break; case MAP: - rowBuilder.addValue(icebergValue); + checkState( + icebergValue instanceof Map, + "Expected Map type for field '%s' but received %s", + field.getName(), + icebergValue.getClass()); + Map beamMap = (Map) icebergValue; + Schema.FieldType valueType = checkStateNotNull(field.getType().getMapValueType()); + // recurse on struct types + if (valueType.getTypeName().isCompositeType()) { + Schema innerSchema = checkStateNotNull(valueType.getRowSchema()); + ImmutableMap.Builder newMap = ImmutableMap.builder(); + for (Map.Entry entry : ((Map) icebergValue).entrySet()) { + Record rec = ((Record) entry.getValue()); + newMap.put( + checkStateNotNull(entry.getKey()), + icebergRecordToBeamRow(innerSchema, checkStateNotNull(rec))); + } + beamMap = newMap.build(); + } + rowBuilder.addValue(beamMap); break; case DATETIME: // Iceberg uses a long for micros. diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java index 366f5565d425..528b89c203bf 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java @@ -74,10 +74,9 @@ public void process( return; } FileScanTask task = fileScanTasks.get((int) l); - org.apache.iceberg.Schema projected = scanConfig.getProjectedSchema(); - Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected); + Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()); try (CloseableIterable fullIterable = - ReadUtils.createReader(task, table, projected)) { + ReadUtils.createReader(task, table, scanConfig.getRequiredSchema())) { CloseableIterable reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig); for (Record record : reader) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index c52b39dde1c2..de88b4af2699 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -50,7 +50,6 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +57,6 @@ class ScanTaskReader extends BoundedSource.BoundedReader { private static final Logger LOG = LoggerFactory.getLogger(ScanTaskReader.class); private final ScanTaskSource source; - private final org.apache.iceberg.Schema project; private final Schema beamSchema; transient @Nullable FileIO io; @@ -69,8 +67,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader { public ScanTaskReader(ScanTaskSource source) { this.source = source; - this.project = source.getSchema(); - this.beamSchema = icebergSchemaToBeamSchema(project); + this.beamSchema = icebergSchemaToBeamSchema(source.getScanConfig().getProjectedSchema()); } @Override @@ -97,7 +94,7 @@ public boolean advance() throws IOException { // This nullness annotation is incorrect, but the most expedient way to work with Iceberg's APIs // which are not null-safe. @SuppressWarnings("nullness") - org.apache.iceberg.@NonNull Schema project = this.project; + org.apache.iceberg.Schema requiredSchema = source.getScanConfig().getRequiredSchema(); @Nullable String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING); @@ -125,7 +122,8 @@ public boolean advance() throws IOException { DataFile file = fileTask.file(); InputFile input = decryptor.getInputFile(fileTask); Map idToConstants = - ReadUtils.constantsMap(fileTask, IdentityPartitionConverters::convertConstant, project); + ReadUtils.constantsMap( + fileTask, IdentityPartitionConverters::convertConstant, requiredSchema); CloseableIterable iterable; switch (file.format()) { @@ -134,10 +132,10 @@ public boolean advance() throws IOException { ORC.ReadBuilder orcReader = ORC.read(input) .split(fileTask.start(), fileTask.length()) - .project(project) + .project(requiredSchema) .createReaderFunc( fileSchema -> - GenericOrcReader.buildReader(project, fileSchema, idToConstants)) + GenericOrcReader.buildReader(requiredSchema, fileSchema, idToConstants)) .filter(fileTask.residual()); if (nameMapping != null) { @@ -151,10 +149,11 @@ public boolean advance() throws IOException { Parquet.ReadBuilder parquetReader = Parquet.read(input) .split(fileTask.start(), fileTask.length()) - .project(project) + .project(requiredSchema) .createReaderFunc( fileSchema -> - GenericParquetReaders.buildReader(project, fileSchema, idToConstants)) + GenericParquetReaders.buildReader( + requiredSchema, fileSchema, idToConstants)) .filter(fileTask.residual()); if (nameMapping != null) { @@ -168,9 +167,9 @@ public boolean advance() throws IOException { Avro.ReadBuilder avroReader = Avro.read(input) .split(fileTask.start(), fileTask.length()) - .project(project) + .project(requiredSchema) .createReaderFunc( - fileSchema -> DataReader.create(project, fileSchema, idToConstants)); + fileSchema -> DataReader.create(requiredSchema, fileSchema, idToConstants)); if (nameMapping != null) { avroReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); @@ -182,7 +181,8 @@ public boolean advance() throws IOException { throw new UnsupportedOperationException("Cannot read format: " + file.format()); } GenericDeleteFilter deleteFilter = - new GenericDeleteFilter(checkStateNotNull(io), fileTask, fileTask.schema(), project); + new GenericDeleteFilter( + checkStateNotNull(io), fileTask, fileTask.schema(), requiredSchema); iterable = deleteFilter.filter(iterable); iterable = ReadUtils.maybeApplyFilter(iterable, source.getScanConfig()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java index 2c92c5572c6d..7eb5730a61ec 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.checkerframework.dataflow.qual.Pure; @@ -54,11 +53,6 @@ CombinedScanTask getTask() { return task; } - @Pure - Schema getSchema() { - return scanConfig.getProjectedSchema(); - } - @Pure IcebergScanConfig getScanConfig() { return scanConfig; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java index 34e7be619110..ff6383d4d046 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.io.iceberg.FilterUtils.convert; +import static org.apache.beam.sdk.io.iceberg.FilterUtils.getReferencedFieldNames; import static org.apache.iceberg.expressions.Expressions.and; import static org.apache.iceberg.expressions.Expressions.equal; import static org.apache.iceberg.expressions.Expressions.greaterThan; @@ -46,11 +47,13 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; @@ -508,6 +511,22 @@ public void testScanFiles() throws IOException { assertEquals(expectedFiles, actualFiles.build()); } + @Test + public void testReferencedFieldsInFilter() { + List>> cases = + Arrays.asList( + Pair.of("field_1 < 35", Sets.newHashSet("FIELD_1")), + Pair.of("\"field_1\" in (1, 2, 3)", Sets.newHashSet("field_1")), + Pair.of("field_1 < 35 and \"fiELd_2\" = TRUE", Sets.newHashSet("FIELD_1", "fiELd_2")), + Pair.of( + "(\"field_1\" < 35 and \"field_2\" = TRUE) or \"field_3\" in ('a', 'b')", + Sets.newHashSet("field_1", "field_2", "field_3"))); + + for (Pair> pair : cases) { + assertEquals(pair.getRight(), getReferencedFieldNames(pair.getLeft())); + } + } + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 5bfcb1345c37..bb303ea9c305 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -32,6 +32,7 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -357,7 +358,6 @@ public void testScanWithFilter() throws Exception { TableIdentifier tableId = TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); - final Schema schema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA); List> expectedRecords = warehouse.commitData(simpleTable); @@ -365,24 +365,26 @@ public void testScanWithFilter() throws Exception { IcebergIO.readRows(catalogConfig()) .from(tableId) .withFilter( - "\"id\" < 10 AND \"id\" >= 2 AND \"data\" <> 'clammy' AND \"data\" <> 'brainy'"); + "\"id\" < 10 AND \"id\" >= 2 AND \"data\" <> 'clammy' AND \"data\" <> 'brainy'") + .keeping(Arrays.asList("id")); if (useIncrementalScan) { read = read.withCdc().toSnapshot(simpleTable.currentSnapshot().snapshotId()); } + final Schema outputSchema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA.select("id")); final List expectedRows = expectedRecords.stream() .flatMap(List::stream) - .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) .filter( - row -> { - long id = checkStateNotNull(row.getInt64("id")); - String data = checkStateNotNull(row.getString("data")); + record -> { + long id = checkStateNotNull((Long) record.getField("id")); + String data = checkStateNotNull((String) record.getField("data")); return id < 10 && id >= 2 && !Objects.equals(data, "clammy") && !Objects.equals(data, "brainy"); }) + .map(record -> IcebergUtils.icebergRecordToBeamRow(outputSchema, record)) .collect(Collectors.toList()); PCollection output = testPipeline.apply(read).apply(new PrintRow()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index e2f3b0f1c036..6b6c98be5cd7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -242,6 +242,20 @@ public void testList() { Types.ListType.ofRequired(1, Types.StringType.get()), list); } + + @Test + public void testListOfRecords() { + Record actual = + IcebergUtils.beamRowToIcebergRecord(RECORD_LIST_ICEBERG_SCHEMA, ROW_LIST_OF_ROWS); + assertEquals(RECORD_LIST_OF_RECORDS, actual); + } + + @Test + public void testMapOfRecords() { + Record actual = + IcebergUtils.beamRowToIcebergRecord(RECORD_MAP_ICEBERG_SCHEMA, ROW_MAP_OF_ROWS); + assertEquals(RECORD_MAP_OF_RECORDS, actual); + } } @RunWith(JUnit4.class) @@ -418,8 +432,114 @@ public void testList() { Schema.FieldType.iterable(Schema.FieldType.STRING), list); } + + @Test + public void testListOfRecords() { + Row actual = + IcebergUtils.icebergRecordToBeamRow(ROW_LIST_BEAM_SCHEMA, RECORD_LIST_OF_RECORDS); + assertEquals(ROW_LIST_OF_ROWS, actual); + } + + @Test + public void testMapOfRecords() { + Row actual = IcebergUtils.icebergRecordToBeamRow(ROW_MAP_BEAM_SCHEMA, RECORD_MAP_OF_RECORDS); + assertEquals(ROW_MAP_OF_ROWS, actual); + } } + static final Schema NESTED_BEAM_SCHEMA = + Schema.builder() + .addArrayField("str_list", Schema.FieldType.STRING) + .addInt32Field("int") + .build(); + static final Schema ROW_LIST_BEAM_SCHEMA = + Schema.builder() + .addArrayField("list", Schema.FieldType.row(NESTED_BEAM_SCHEMA)) + .addBooleanField("bool") + .build(); + static final Schema ROW_MAP_BEAM_SCHEMA = + Schema.builder() + .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(NESTED_BEAM_SCHEMA)) + .build(); + static final org.apache.iceberg.Schema NESTED_ICEBERG_SCHEMA = + new org.apache.iceberg.Schema( + required(4, "str_list", Types.ListType.ofRequired(6, Types.StringType.get())), + required(5, "int", Types.IntegerType.get())); + static final org.apache.iceberg.Schema RECORD_LIST_ICEBERG_SCHEMA = + new org.apache.iceberg.Schema( + required( + 1, + "list", + Types.ListType.ofRequired(3, Types.StructType.of(NESTED_ICEBERG_SCHEMA.columns()))), + required(2, "bool", Types.BooleanType.get())); + static final org.apache.iceberg.Schema RECORD_MAP_ICEBERG_SCHEMA = + new org.apache.iceberg.Schema( + required( + 1, + "map", + Types.MapType.ofRequired( + 2, + 3, + Types.StringType.get(), + Types.StructType.of(NESTED_ICEBERG_SCHEMA.columns())))); + static final Record RECORD_LIST_OF_RECORDS = + GenericRecord.create(RECORD_LIST_ICEBERG_SCHEMA) + .copy( + ImmutableMap.of( + "list", + Arrays.asList( + GenericRecord.create(NESTED_ICEBERG_SCHEMA) + .copy( + ImmutableMap.of( + "str_list", Arrays.asList("a", "b", "c"), "int", 123)), + GenericRecord.create(NESTED_ICEBERG_SCHEMA) + .copy( + ImmutableMap.of( + "str_list", Arrays.asList("x", "y", "z"), "int", 789))), + "bool", + true)); + static final Row ROW_LIST_OF_ROWS = + Row.withSchema(ROW_LIST_BEAM_SCHEMA) + .addValues( + Arrays.asList( + Row.withSchema(NESTED_BEAM_SCHEMA) + .addValues(Arrays.asList("a", "b", "c"), 123) + .build(), + Row.withSchema(NESTED_BEAM_SCHEMA) + .addValues(Arrays.asList("x", "y", "z"), 789) + .build()), + true) + .build(); + static final Record RECORD_MAP_OF_RECORDS = + GenericRecord.create(RECORD_MAP_ICEBERG_SCHEMA) + .copy( + ImmutableMap.of( + "map", + ImmutableMap.of( + "key_1", + GenericRecord.create(NESTED_ICEBERG_SCHEMA) + .copy( + ImmutableMap.of( + "str_list", Arrays.asList("a", "b", "c"), "int", 123)), + "key_2", + GenericRecord.create(NESTED_ICEBERG_SCHEMA) + .copy( + ImmutableMap.of( + "str_list", Arrays.asList("x", "y", "z"), "int", 789))))); + static final Row ROW_MAP_OF_ROWS = + Row.withSchema(ROW_MAP_BEAM_SCHEMA) + .addValues( + ImmutableMap.of( + "key_1", + Row.withSchema(NESTED_BEAM_SCHEMA) + .addValues(Arrays.asList("a", "b", "c"), 123) + .build(), + "key_2", + Row.withSchema(NESTED_BEAM_SCHEMA) + .addValues(Arrays.asList("x", "y", "z"), 789) + .build())) + .build(); + @RunWith(JUnit4.class) public static class SchemaTests { private static class BeamFieldTypeTestCase { From c6f201309f015a2a9c31b68fee31d6c9cf65e5c6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Jun 2025 17:46:13 -0400 Subject: [PATCH 2/6] update Changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 413005fb25d9..ec0099876a19 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -103,7 +103,7 @@ * [Python] Fixed vLLM leaks connections causing a throughput bottleneck and underutilization of GPU ([35053](https://github.com/apache/beam/pull/35053)) * (Python) Fixed cloudpickle overwriting class states every time loading a same object of dynamic class ([#35062](https://github.com/apache/beam/issues/35062)). * [Python] Fixed pip install apache-beam[interactive] causes crash on google colab ([#35148](https://github.com/apache/beam/pull/35148)). -* [IcebergIO] Fixed Beam <-> Iceberg conversion logic for arrays of structs and maps of structs ([]()). +* [IcebergIO] Fixed Beam <-> Iceberg conversion logic for arrays of structs and maps of structs ([#35230](https://github.com/apache/beam/pull/35230)). ## Security Fixes * Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From c1b967b0a2e47cf9478462035a1564094b66a2a7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Jun 2025 17:48:03 -0400 Subject: [PATCH 3/6] add missing changes from other PRs --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index ec0099876a19..40f75a1cf017 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,8 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856)) +* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827)) ## New Features / Improvements * Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)). From 0aedfec8f379655518dcde073afe6e8252d0b227 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Jun 2025 10:37:43 -0400 Subject: [PATCH 4/6] trigger ITs --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 98be2d60cbf9..5d04b2c0a8c7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,5 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3, - "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" + "modification": 5 } From 49bbb2ce6943930a001ce858982bf59b22a96940 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Jun 2025 12:19:39 -0400 Subject: [PATCH 5/6] make separate impl for iterable --- .../beam/sdk/io/iceberg/FilterUtils.java | 1 - .../beam/sdk/io/iceberg/IcebergUtils.java | 34 ++++++++--- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 60 ++++++++++++------- 3 files changed, 65 insertions(+), 30 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 4fc4bb9c44c3..1ff7119196e0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -88,7 +88,6 @@ static Set getReferencedFieldNames(@Nullable String filter) { SqlNode expression = parser.parseExpression(); Set fieldNames = new HashSet<>(); extractFieldNames(expression, fieldNames); - System.out.println("xxx fields in filter: " + fieldNames); return fieldNames; } catch (Exception exception) { throw new RuntimeException( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 409f6032798c..90e6c376637e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -28,7 +28,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -39,6 +38,7 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; @@ -350,15 +350,16 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row))); break; case LIST: - Collection<@NonNull ?> icebergList = value.getArray(name); + Iterable<@NonNull ?> icebergList = value.getIterable(name); Type collectionType = ((Types.ListType) field.type()).elementType(); if (collectionType.isStructType() && icebergList != null) { org.apache.iceberg.Schema innerSchema = collectionType.asStructType().asSchema(); - icebergList = - icebergList.stream() - .map(v -> beamRowToIcebergRecord(innerSchema, (Row) v)) - .collect(Collectors.toList()); + ImmutableList.Builder builder = ImmutableList.builder(); + for (Row v : (Iterable) icebergList) { + builder.add(beamRowToIcebergRecord(innerSchema, v)); + } + icebergList = builder.build(); } Optional.ofNullable(icebergList).ifPresent(list -> rec.setField(name, list)); break; @@ -457,7 +458,6 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { rowBuilder.addValue(icebergValue); break; case ARRAY: - case ITERABLE: checkState( icebergValue instanceof List, "Expected List type for field '%s' but received %s", @@ -476,6 +476,26 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { } rowBuilder.addValue(beamList); break; + case ITERABLE: + checkState( + icebergValue instanceof Iterable, + "Expected Iterable type for field '%s' but received %s", + field.getName(), + icebergValue.getClass()); + Iterable<@NonNull ?> beamIterable = (Iterable<@NonNull ?>) icebergValue; + Schema.FieldType iterableCollectionType = + checkStateNotNull(field.getType().getCollectionElementType()); + // recurse on struct types + if (iterableCollectionType.getTypeName().isCompositeType()) { + Schema innerSchema = checkStateNotNull(iterableCollectionType.getRowSchema()); + ImmutableList.Builder builder = ImmutableList.builder(); + for (Record v : (Iterable<@NonNull Record>) icebergValue) { + builder.add(icebergRecordToBeamRow(innerSchema, v)); + } + beamIterable = builder.build(); + } + rowBuilder.addValue(beamIterable); + break; case MAP: checkState( icebergValue instanceof Map, diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 6b6c98be5cd7..115a6790919e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -250,6 +251,13 @@ public void testListOfRecords() { assertEquals(RECORD_LIST_OF_RECORDS, actual); } + @Test + public void testIterableOfRecords() { + Record actual = + IcebergUtils.beamRowToIcebergRecord(RECORD_LIST_ICEBERG_SCHEMA, ROW_ITERABLE_OF_ROWS); + assertEquals(RECORD_LIST_OF_RECORDS, actual); + } + @Test public void testMapOfRecords() { Record actual = @@ -440,6 +448,13 @@ public void testListOfRecords() { assertEquals(ROW_LIST_OF_ROWS, actual); } + @Test + public void testIterableOfRecords() { + Row actual = + IcebergUtils.icebergRecordToBeamRow(ROW_ITERABLE_BEAM_SCHEMA, RECORD_ITERABLE_OF_RECORDS); + assertEquals(ROW_ITERABLE_OF_ROWS, actual); + } + @Test public void testMapOfRecords() { Row actual = IcebergUtils.icebergRecordToBeamRow(ROW_MAP_BEAM_SCHEMA, RECORD_MAP_OF_RECORDS); @@ -457,6 +472,11 @@ public void testMapOfRecords() { .addArrayField("list", Schema.FieldType.row(NESTED_BEAM_SCHEMA)) .addBooleanField("bool") .build(); + static final Schema ROW_ITERABLE_BEAM_SCHEMA = + Schema.builder() + .addIterableField("list", Schema.FieldType.row(NESTED_BEAM_SCHEMA)) + .addBooleanField("bool") + .build(); static final Schema ROW_MAP_BEAM_SCHEMA = Schema.builder() .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(NESTED_BEAM_SCHEMA)) @@ -482,33 +502,29 @@ public void testMapOfRecords() { 3, Types.StringType.get(), Types.StructType.of(NESTED_ICEBERG_SCHEMA.columns())))); + static final List LIST_OF_RECORDS = + Arrays.asList( + GenericRecord.create(NESTED_ICEBERG_SCHEMA) + .copy(ImmutableMap.of("str_list", Arrays.asList("a", "b", "c"), "int", 123)), + GenericRecord.create(NESTED_ICEBERG_SCHEMA) + .copy(ImmutableMap.of("str_list", Arrays.asList("x", "y", "z"), "int", 789))); static final Record RECORD_LIST_OF_RECORDS = + GenericRecord.create(RECORD_LIST_ICEBERG_SCHEMA) + .copy(ImmutableMap.of("list", LIST_OF_RECORDS, "bool", true)); + static final Record RECORD_ITERABLE_OF_RECORDS = GenericRecord.create(RECORD_LIST_ICEBERG_SCHEMA) .copy( ImmutableMap.of( - "list", - Arrays.asList( - GenericRecord.create(NESTED_ICEBERG_SCHEMA) - .copy( - ImmutableMap.of( - "str_list", Arrays.asList("a", "b", "c"), "int", 123)), - GenericRecord.create(NESTED_ICEBERG_SCHEMA) - .copy( - ImmutableMap.of( - "str_list", Arrays.asList("x", "y", "z"), "int", 789))), - "bool", - true)); + "list", Iterables.unmodifiableIterable(LIST_OF_RECORDS), "bool", true)); + static final List LIST_OF_ROWS = + Arrays.asList( + Row.withSchema(NESTED_BEAM_SCHEMA).addValues(Arrays.asList("a", "b", "c"), 123).build(), + Row.withSchema(NESTED_BEAM_SCHEMA).addValues(Arrays.asList("x", "y", "z"), 789).build()); static final Row ROW_LIST_OF_ROWS = - Row.withSchema(ROW_LIST_BEAM_SCHEMA) - .addValues( - Arrays.asList( - Row.withSchema(NESTED_BEAM_SCHEMA) - .addValues(Arrays.asList("a", "b", "c"), 123) - .build(), - Row.withSchema(NESTED_BEAM_SCHEMA) - .addValues(Arrays.asList("x", "y", "z"), 789) - .build()), - true) + Row.withSchema(ROW_LIST_BEAM_SCHEMA).addValues(LIST_OF_ROWS, true).build(); + static final Row ROW_ITERABLE_OF_ROWS = + Row.withSchema(ROW_ITERABLE_BEAM_SCHEMA) + .addValues(Iterables.unmodifiableIterable(LIST_OF_ROWS), true) .build(); static final Record RECORD_MAP_OF_RECORDS = GenericRecord.create(RECORD_MAP_ICEBERG_SCHEMA) From 377bf504559770f5011c34eeb2af769dd4b98eee Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Jun 2025 13:56:08 -0400 Subject: [PATCH 6/6] fix --- .../beam/sdk/io/iceberg/IcebergScanConfig.java | 7 +++++-- .../sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java | 12 +++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index e2f42e3e7cbe..3829baa43665 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -97,6 +97,9 @@ static org.apache.iceberg.Schema resolveSchema( schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); drop.forEach(fields::remove); selectedFieldsBuilder.addAll(fields); + } else { + // default: include all columns + return schema; } if (fieldsInFilter != null && !fieldsInFilter.isEmpty()) { @@ -117,8 +120,8 @@ public org.apache.iceberg.Schema getProjectedSchema() { } /** - * Returns a Schema that includes explicitly selected fields and fields referenced in the filter - * statement. + * Returns a Schema that includes all the fields required for a successful read. This includes + * explicitly selected fields and fields referenced in the filter statement. */ public org.apache.iceberg.Schema getRequiredSchema() { if (cachedRequiredSchema == null) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index e4b91fa4014a..485440d9fcf8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -424,7 +424,7 @@ public void testRead() throws Exception { } @Test - public void testReadAndKeepSomeFields() throws Exception { + public void testReadWithColumnPruning_keep() throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); List expectedRows = populateTable(table); @@ -442,22 +442,28 @@ public void testReadAndKeepSomeFields() throws Exception { } @Test - public void testReadWithFilter() throws Exception { + public void testReadWithFilterAndColumnPruning_keep() throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); + List keepFields = Arrays.asList("bool_field", "modulo_5", "str"); + RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(keepFields); + List expectedRows = populateTable(table).stream() .filter( row -> row.getBoolean("bool_field") && (row.getInt32("int_field") < 500 || row.getInt32("modulo_5") == 3)) + .map(rowFilter::filter) .collect(Collectors.toList()); Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 500 OR \"modulo_5\" = 3)"); + config.put("keep", keepFields); PCollection rows = pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection(); + assertEquals(rowFilter.outputSchema(), rows.getSchema()); PAssert.that(rows).containsInAnyOrder(expectedRows); pipeline.run().waitUntilFinish(); @@ -489,7 +495,7 @@ public void testStreamingReadWithFilter() throws Exception { } @Test - public void testStreamingReadAndDropSomeFields() throws Exception { + public void testStreamingReadWithColumnPruning_drop() throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); List expectedRows = populateTable(table);