diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json index 5abe02fc09c7..3a009261f4f9 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index a3187de05cb6..4ea2d8d8b176 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -22,11 +22,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -212,6 +214,11 @@ public Row filter(Row row) { return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema())); } + /** Applies {@link #filter(Row)} to each Row in a list. */ + public List filter(List rows) { + return rows.stream().map(this::filter).collect(Collectors.toList()); + } + /** Returns the output {@link Row}'s {@link Schema}. */ public Schema outputSchema() { return transformedSchema != null ? transformedSchema : rowSchema; @@ -374,6 +381,14 @@ static Schema dropFields(Schema schema, List fieldsToDrop) { newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField); } } + + // re-order based on original schema's ordering + Map indexMap = new HashMap<>(); + for (int i = 0; i < schema.getFieldCount(); i++) { + indexMap.put(schema.getField(i).getName(), i); + } + newFieldsList.sort(Comparator.comparingInt(f -> checkStateNotNull(indexMap.get(f.getName())))); + return new Schema(newFieldsList); } @@ -416,6 +431,13 @@ static Schema keepFields(Schema schema, List fieldsToKeep) { newFieldsList.add(fieldToKeep); } + // re-order based on original schema's ordering + Map indexMap = new HashMap<>(); + for (int i = 0; i < schema.getFieldCount(); i++) { + indexMap.put(schema.getField(i).getName(), i); + } + newFieldsList.sort(Comparator.comparingInt(f -> checkStateNotNull(indexMap.get(f.getName())))); + return new Schema(newFieldsList); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java index 895563e960ad..d4b4b6ecdbc7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java @@ -78,7 +78,11 @@ public void process( } LOG.info("Planning to scan snapshot {}", toSnapshot); - IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot); + IncrementalAppendScan scan = + table + .newIncrementalAppendScan() + .toSnapshot(toSnapshot) + .project(scanConfig.getProjectedSchema()); if (fromSnapshot != null) { scan = scan.fromSnapshotExclusive(fromSnapshot); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java index 0064b49475d0..53d0e587f58f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java @@ -115,7 +115,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .fromTimestamp(configuration.getFromTimestamp()) .toTimestamp(configuration.getToTimestamp()) .withStartingStrategy(strategy) - .streaming(configuration.getStreaming()); + .streaming(configuration.getStreaming()) + .keeping(configuration.getKeep()) + .dropping(configuration.getDrop()); @Nullable Integer pollIntervalSeconds = configuration.getPollIntervalSeconds(); if (pollIntervalSeconds != null) { @@ -177,6 +179,14 @@ static Builder builder() { "The interval at which to poll for new snapshots. Defaults to 60 seconds.") abstract @Nullable Integer getPollIntervalSeconds(); + @SchemaFieldDescription( + "A subset of column names to read exclusively. If null or empty, all columns will be read.") + abstract @Nullable List getKeep(); + + @SchemaFieldDescription( + "A subset of column names to exclude from reading. If null or empty, all columns will be read.") + abstract @Nullable List getDrop(); + @AutoValue.Builder abstract static class Builder { abstract Builder setTable(String table); @@ -201,6 +211,10 @@ abstract static class Builder { abstract Builder setStreaming(Boolean streaming); + abstract Builder setKeep(List keep); + + abstract Builder setDrop(List drop); + abstract Configuration build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index efeb9a97587f..9c34f0651ba9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -603,6 +603,10 @@ public enum StartingStrategy { abstract @Nullable Duration getPollInterval(); + abstract @Nullable List getKeep(); + + abstract @Nullable List getDrop(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -627,6 +631,10 @@ abstract static class Builder { abstract Builder setPollInterval(@Nullable Duration triggeringFrequency); + abstract Builder setKeep(@Nullable List fields); + + abstract Builder setDrop(@Nullable List fields); + abstract ReadRows build(); } @@ -666,6 +674,14 @@ public ReadRows withStartingStrategy(@Nullable StartingStrategy strategy) { return toBuilder().setStartingStrategy(strategy).build(); } + public ReadRows keeping(@Nullable List keep) { + return toBuilder().setKeep(keep).build(); + } + + public ReadRows dropping(@Nullable List drop) { + return toBuilder().setDrop(drop).build(); + } + @Override public PCollection expand(PBegin input) { TableIdentifier tableId = @@ -687,6 +703,8 @@ public PCollection expand(PBegin input) { .setStreaming(getStreaming()) .setPollInterval(getPollInterval()) .setUseCdc(getUseCdc()) + .setKeepFields(getKeep()) + .setDropFields(getDrop()) .build(); scanConfig.validate(table); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 35500676ae2f..b4eaf0800f0d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -93,7 +93,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .getPipeline() .apply( IcebergIO.readRows(configuration.getIcebergCatalog()) - .from(TableIdentifier.parse(configuration.getTable()))); + .from(TableIdentifier.parse(configuration.getTable())) + .keeping(configuration.getKeep()) + .dropping(configuration.getDrop())); return PCollectionRowTuple.of(OUTPUT_TAG, output); } @@ -121,6 +123,14 @@ static Builder builder() { @Nullable abstract Map getConfigProperties(); + @SchemaFieldDescription( + "A subset of column names to read exclusively. If null or empty, all columns will be read.") + abstract @Nullable List getKeep(); + + @SchemaFieldDescription( + "A subset of column names to exclude from reading. If null or empty, all columns will be read.") + abstract @Nullable List getDrop(); + @AutoValue.Builder abstract static class Builder { abstract Builder setTable(String table); @@ -131,6 +141,10 @@ abstract static class Builder { abstract Builder setConfigProperties(Map confProperties); + abstract Builder setKeep(List keep); + + abstract Builder setDrop(List drop); + abstract Configuration build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index ff5e4c736244..5c3b0471c4be 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -18,11 +18,15 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.util.Sets.newHashSet; import com.google.auto.value.AutoValue; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -31,6 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -66,6 +71,24 @@ public Table getTable() { @Pure public abstract Schema getSchema(); + @VisibleForTesting + static org.apache.iceberg.Schema resolveSchema( + org.apache.iceberg.Schema schema, @Nullable List keep, @Nullable List drop) { + if (keep != null && !keep.isEmpty()) { + schema = schema.select(keep); + } else if (drop != null && !drop.isEmpty()) { + Set fields = + schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); + drop.forEach(fields::remove); + schema = schema.select(fields); + } + return schema; + } + + public org.apache.iceberg.Schema getProjectedSchema() { + return resolveSchema(getTable().schema(), getKeepFields(), getDropFields()); + } + @Pure public abstract @Nullable Expression getFilter(); @@ -123,6 +146,12 @@ public Table getTable() { @Pure public abstract @Nullable String getBranch(); + @Pure + public abstract @Nullable List getKeepFields(); + + @Pure + public abstract @Nullable List getDropFields(); + @Pure public static Builder builder() { return new AutoValue_IcebergScanConfig.Builder() @@ -204,6 +233,10 @@ public Builder setTableIdentifier(String... names) { public abstract Builder setBranch(@Nullable String branch); + public abstract Builder setKeepFields(@Nullable List fields); + + public abstract Builder setDropFields(@Nullable List fields); + public abstract IcebergScanConfig build(); } @@ -211,6 +244,28 @@ public Builder setTableIdentifier(String... names) { abstract Builder toBuilder(); void validate(Table table) { + @Nullable List keep = getKeepFields(); + @Nullable List drop = getDropFields(); + if (keep != null || drop != null) { + checkArgument( + keep == null || drop == null, error("only one of 'keep' or 'drop' can be set.")); + + Set fieldsSpecified; + String param; + if (keep != null) { + param = "keep"; + fieldsSpecified = newHashSet(checkNotNull(keep)); + } else { // drop != null + param = "drop"; + fieldsSpecified = newHashSet(checkNotNull(drop)); + } + table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name())); + + checkArgument( + fieldsSpecified.isEmpty(), + error(String.format("'%s' specifies unknown field(s): %s", param, fieldsSpecified))); + } + // TODO(#34168, ahmedabu98): fill these gaps for the existing batch source if (!getUseCdc()) { List invalidOptions = new ArrayList<>(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java index 0b3ba3498162..4df3eecb18e5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java @@ -70,7 +70,7 @@ public PCollection expand(PBegin input) { .setCoder(KvCoder.of(ReadTaskDescriptor.getCoder(), ReadTask.getCoder())) .apply(Redistribute.arbitrarily()) .apply("Read Rows From Tasks", ParDo.of(new ReadFromTasks(scanConfig))) - .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())); + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema())); } /** Continuously watches for new snapshots. */ diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java index ec5a11e9377c..050c549638c8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.values.KV; @@ -73,9 +74,11 @@ public void process( return; } FileScanTask task = fileScanTasks.get((int) l); - try (CloseableIterable reader = ReadUtils.createReader(task, table)) { + org.apache.iceberg.Schema projected = scanConfig.getProjectedSchema(); + Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected); + try (CloseableIterable reader = ReadUtils.createReader(task, table, projected)) { for (Record record : reader) { - Row row = IcebergUtils.icebergRecordToBeamRow(scanConfig.getSchema(), record); + Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record); out.output(row); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java index d42005e96cb4..827d17f7819d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -67,7 +68,7 @@ public class ReadUtils { "parquet.read.support.class", "parquet.crypto.factory.class"); - static ParquetReader createReader(FileScanTask task, Table table) { + static ParquetReader createReader(FileScanTask task, Table table, Schema schema) { String filePath = task.file().path().toString(); InputFile inputFile; try (FileIO io = table.io()) { @@ -100,11 +101,11 @@ static ParquetReader createReader(FileScanTask task, Table table) { return new ParquetReader<>( inputFile, - table.schema(), + schema, optionsBuilder.build(), // TODO(ahmedabu98): Implement a Parquet-to-Beam Row reader, bypassing conversion to Iceberg // Record - fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema, idToConstants), + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema, idToConstants), mapping, task.residual(), false, diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java index 949faaa65eac..19218b85b63c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java @@ -48,7 +48,7 @@ public ScanSource(IcebergScanConfig scanConfig) { private TableScan getTableScan() { Table table = scanConfig.getTable(); - TableScan tableScan = table.newScan().project(table.schema()); + TableScan tableScan = table.newScan().project(scanConfig.getProjectedSchema()); if (scanConfig.getFilter() != null) { tableScan = tableScan.filter(scanConfig.getFilter()); @@ -115,7 +115,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @Override public Coder getOutputCoder() { - return RowCoder.of(scanConfig.getSchema()); + return RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema())); } @Override diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index c4bc8072e114..5f63ff14cca9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -70,7 +70,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader { public ScanTaskReader(ScanTaskSource source) { this.source = source; this.project = source.getSchema(); - this.beamSchema = icebergSchemaToBeamSchema(source.getSchema()); + this.beamSchema = icebergSchemaToBeamSchema(project); } @Override diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java index 7d4b05d221f6..55c10f2096b6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java @@ -56,7 +56,7 @@ CombinedScanTask getTask() { @Pure Schema getSchema() { - return getTable().schema(); + return scanConfig.getProjectedSchema(); } @Override diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index fe92480ab2ce..ddd16c24009b 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -17,18 +17,20 @@ */ package org.apache.beam.sdk.io.iceberg; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.beam.sdk.io.iceberg.IcebergScanConfig.resolveSchema; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; import static org.apache.beam.sdk.io.iceberg.TestFixtures.createRecord; import static org.apache.iceberg.types.Types.NestedField.required; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -43,6 +45,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -71,6 +74,8 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.checkerframework.checker.nullness.qual.Nullable; @@ -103,7 +108,7 @@ public class IcebergIOReadTest { @Parameters public static Iterable data() { - return Arrays.asList(new Object[][] {{false}, {true}}); + return asList(new Object[][] {{false}, {true}}); } // TODO(#34168, ahmedabu98): Update tests when we close feature gaps between regular and cdc @@ -202,6 +207,69 @@ public void testFailWhenPollIntervalIsSetOnBatchRead() { read.expand(PBegin.in(testPipeline)); } + @Test + public void testFailWhenDropAndKeepAreSet() { + TableIdentifier tableId = TableIdentifier.of("default", testName.getMethodName()); + warehouse.createTable(tableId, TestFixtures.SCHEMA); + IcebergIO.ReadRows read = + IcebergIO.readRows(catalogConfig()) + .from(tableId) + .keeping(asList("a")) + .dropping(asList("b")) + .withPollInterval(Duration.standardSeconds(5)); + + if (useIncrementalScan) { + read = read.withCdc(); + } + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid source configuration: only one of 'keep' or 'drop' can be set"); + read.expand(PBegin.in(testPipeline)); + } + + @Test + public void testFailWhenFilteringUnknownFields() { + TableIdentifier tableId = TableIdentifier.of("default", testName.getMethodName()); + warehouse.createTable(tableId, TestFixtures.SCHEMA); + IcebergIO.ReadRows read = + IcebergIO.readRows(catalogConfig()) + .from(tableId) + .keeping(asList("id", "unknown")) + .withPollInterval(Duration.standardSeconds(5)); + + if (useIncrementalScan) { + read = read.withCdc(); + } + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Invalid source configuration: 'keep' specifies unknown field(s): [unknown]"); + read.expand(PBegin.in(testPipeline)); + } + + @Test + public void testProjectedSchema() { + org.apache.iceberg.Schema original = + new org.apache.iceberg.Schema( + required(1, "a", StringType.get()), + required(2, "b", StructType.of(required(5, "b.a", StringType.get()))), + required(3, "c", StringType.get()), + required(4, "d", StringType.get())); + + org.apache.iceberg.Schema projectDrop = resolveSchema(original, null, asList("a", "c")); + org.apache.iceberg.Schema expectedDrop = + new org.apache.iceberg.Schema( + required(2, "b", StructType.of(required(5, "b.a", StringType.get()))), + required(4, "d", StringType.get())); + assertTrue(projectDrop.sameSchema(expectedDrop)); + + org.apache.iceberg.Schema projectKeep = resolveSchema(original, asList("a", "c"), null); + org.apache.iceberg.Schema expectedKeep = + new org.apache.iceberg.Schema( + required(1, "a", StringType.get()), required(3, "c", StringType.get())); + assertTrue(projectKeep.sameSchema(expectedKeep)); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId = @@ -234,18 +302,65 @@ public void testSimpleScan() throws Exception { testPipeline.run(); } + @Test + public void testScanSelectedFields() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + final Schema schema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + + List> expectedRecords = warehouse.commitData(simpleTable); + + IcebergIO.ReadRows read = IcebergIO.readRows(catalogConfig()).from(tableId); + + if (useIncrementalScan) { + read = read.withCdc().toSnapshot(simpleTable.currentSnapshot().snapshotId()); + } + + final List originalRows = + expectedRecords.stream() + .flatMap(List::stream) + .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) + .collect(Collectors.toList()); + + // test keep fields + read = read.keeping(singletonList("id")); + PCollection outputKeep = + testPipeline.apply("keep", read).apply("print keep", new PrintRow()); + RowFilter keepFilter = new RowFilter(schema).keep(singletonList("id")); + PAssert.that(outputKeep) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(keepFilter.filter(originalRows).toArray())); + return null; + }); + + // test drop fields + read = read.keeping(null).dropping(singletonList("id")); + PCollection outputDrop = + testPipeline.apply("drop", read).apply("print drop", new PrintRow()); + RowFilter dropFilter = new RowFilter(schema).drop(singletonList("id")); + PAssert.that(outputDrop) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(dropFilter.filter(originalRows).toArray())); + return null; + }); + + testPipeline.run(); + } + @Test public void testReadSchemaWithRandomlyOrderedIds() throws IOException { TableIdentifier tableId = TableIdentifier.of("default", testName.getMethodName()); org.apache.iceberg.Schema nestedSchema = new org.apache.iceberg.Schema( - required(3, "b.a", Types.IntegerType.get()), - required(4, "b.b", Types.StringType.get())); + required(3, "b.a", Types.IntegerType.get()), required(4, "b.b", StringType.get())); org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema( required(1, "a", Types.IntegerType.get()), - required(2, "b", Types.StructType.of(nestedSchema.columns())), - required(5, "c", Types.StringType.get())); + required(2, "b", StructType.of(nestedSchema.columns())), + required(5, "c", StringType.get())); // hadoop catalog will re-order by breadth-first ordering Table simpleTable = warehouse.createTable(tableId, schema); @@ -267,7 +382,7 @@ public void testReadSchemaWithRandomlyOrderedIds() throws IOException { Row expectedRow = Row.withSchema(icebergSchemaToBeamSchema(schema)).addValues(nestedRow, 1, "sss").build(); - DataFile file = warehouse.writeRecords("file1.parquet", schema, Collections.singletonList(rec)); + DataFile file = warehouse.writeRecords("file1.parquet", schema, singletonList(rec)); simpleTable.newFastAppend().appendFile(file).commit(); IcebergIO.ReadRows read = IcebergIO.readRows(catalogConfig()).from(tableId); @@ -294,7 +409,7 @@ public void testIdentityColumnScan() throws Exception { String identityColumnName = "identity"; String identityColumnValue = "some-value"; - simpleTable.updateSchema().addColumn(identityColumnName, Types.StringType.get()).commit(); + simpleTable.updateSchema().addColumn(identityColumnName, StringType.get()).commit(); simpleTable.updateSpec().addField(identityColumnName).commit(); PartitionSpec spec = simpleTable.spec(); @@ -591,7 +706,7 @@ public static GenericRecord avroGenericRecord( } @SuppressWarnings("unchecked") - public static Record icebergGenericRecord(Types.StructType type, Map values) { + public static Record icebergGenericRecord(StructType type, Map values) { org.apache.iceberg.data.GenericRecord record = org.apache.iceberg.data.GenericRecord.create(type); for (Types.NestedField field : type.fields()) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java index 5efe6cd76a72..73a0fd19e893 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java @@ -81,7 +81,8 @@ public void testCreateReader() throws IOException { for (FileScanTask fileScanTask : combinedScanTask.tasks()) { String fileName = Iterables.getLast(Splitter.on("/").split(fileScanTask.file().path())); List recordsRead = new ArrayList<>(); - try (ParquetReader reader = ReadUtils.createReader(fileScanTask, simpleTable)) { + try (ParquetReader reader = + ReadUtils.createReader(fileScanTask, simpleTable, simpleTable.schema())) { reader.forEach(recordsRead::add); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index a6b4df69f42b..30425a75b0da 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -423,6 +423,24 @@ public void testRead() throws Exception { pipeline.run().waitUntilFinish(); } + @Test + public void testReadAndKeepSomeFields() throws Exception { + Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); + + List expectedRows = populateTable(table); + + List fieldsToKeep = Arrays.asList("row", "str", "modulo_5", "nullable_long"); + RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(fieldsToKeep); + + Map config = new HashMap<>(managedIcebergConfig(tableId())); + config.put("keep", fieldsToKeep); + + PCollection rows = + pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(rows).containsInAnyOrder(rowFilter.filter(expectedRows)); + pipeline.run().waitUntilFinish(); + } + @Test public void testStreamingRead() throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); @@ -441,6 +459,28 @@ public void testStreamingRead() throws Exception { pipeline.run().waitUntilFinish(); } + @Test + public void testStreamingReadAndDropSomeFields() throws Exception { + Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); + + List expectedRows = populateTable(table); + + List fieldsToDrop = Arrays.asList("row", "str", "modulo_5", "nullable_long"); + RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).drop(fieldsToDrop); + + Map config = new HashMap<>(managedIcebergConfig(tableId())); + config.put("streaming", true); + config.put("to_snapshot", table.currentSnapshot().snapshotId()); + config.put("drop", fieldsToDrop); + + PCollection rows = + pipeline.apply(Managed.read(ICEBERG_CDC).withConfig(config)).getSinglePCollection(); + + assertThat(rows.isBounded(), equalTo(UNBOUNDED)); + PAssert.that(rows).containsInAnyOrder(rowFilter.filter(expectedRows)); + pipeline.run().waitUntilFinish(); + } + @Test public void testBatchReadBetweenSnapshots() throws Exception { runReadBetween(true, false); diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 9d4b98b82570..9ccb81e325e4 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -50,8 +50,10 @@ manual updates or user intervention required!) catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
+ drop (list[str])
from_snapshot (int64)
from_timestamp (int64)
+ keep (list[str])
poll_interval_seconds (int32)
starting_strategy (str)
streaming (boolean)
@@ -69,6 +71,8 @@ manual updates or user intervention required!) catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
+ drop (list[str])
+ keep (list[str])
table (str)
@@ -180,6 +184,17 @@ manual updates or user intervention required!) Properties passed to the Hadoop Configuration. + + + drop + + + list[str] + + + A subset of column names to exclude from reading. If null or empty, all columns will be read. + + from_snapshot @@ -202,6 +217,17 @@ manual updates or user intervention required!) Starts reading from the first snapshot (inclusive) that was created after this timestamp (in milliseconds). + + + keep + + + list[str] + + + A subset of column names to read exclusively. If null or empty, all columns will be read. + + poll_interval_seconds @@ -413,6 +439,28 @@ manual updates or user intervention required!) Properties passed to the Hadoop Configuration. + + + drop + + + list[str] + + + A subset of column names to exclude from reading. If null or empty, all columns will be read. + + + + + keep + + + list[str] + + + A subset of column names to read exclusively. If null or empty, all columns will be read. + +