From 3a8d185914ca718b1310f1dd7a79b16ab9433c12 Mon Sep 17 00:00:00 2001 From: regadas Date: Wed, 15 Jan 2025 06:39:40 -0500 Subject: [PATCH 1/2] Reapply "Add support for Iceberg table identifiers with special characters (#33293)" (#33575) This reverts commit bb2e0ad37bf521aebaf4839cf989855ae062f2c9. --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 4 +- .../sdk/io/iceberg/AppendFilesToTables.java | 3 +- .../beam/sdk/io/iceberg/FileWriteResult.java | 5 ++- .../IcebergReadSchemaTransformProvider.java | 3 +- .../sdk/io/iceberg/IcebergScanConfig.java | 7 +++- .../beam/sdk/io/iceberg/IcebergUtils.java | 17 ++++++++ .../iceberg/OneTableDynamicDestinations.java | 4 +- .../iceberg/PortableIcebergDestinations.java | 3 +- .../sdk/io/iceberg/IcebergIOReadTest.java | 24 +++++++++-- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 40 +++++++++++++++++++ 11 files changed, 94 insertions(+), 18 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 1 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 41e12921e6f8..1775dfc5b77b 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -55,8 +55,10 @@ dependencies { implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" - implementation library.java.hadoop_common runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" + implementation library.java.hadoop_common + implementation library.java.jackson_core + implementation library.java.jackson_databind testImplementation project(":sdks:java:managed") testImplementation library.java.hadoop_client diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index fed72a381d5e..72220faf3004 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -47,7 +47,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; @@ -133,7 +132,7 @@ public void processElement( return; } - Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey())); // vast majority of the time, we will simply append data files. // in the rare case we get a batch that contains multiple partition specs, we will group diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index bf00bf8519fc..d58ac8696d37 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -25,6 +25,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @AutoValue @@ -41,7 +42,7 @@ abstract class FileWriteResult { @SchemaIgnore public TableIdentifier getTableIdentifier() { if (cachedTableIdentifier == null) { - cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString()); + cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString()); } return cachedTableIdentifier; } @@ -67,7 +68,7 @@ abstract static class Builder { @SchemaIgnore public Builder setTableIdentifier(TableIdentifier tableId) { - return setTableIdentifierString(tableId.toString()); + return setTableIdentifierString(TableIdentifierParser.toJson(tableId)); } public abstract FileWriteResult build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index d44149fda08e..951442e2c95f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.iceberg.catalog.TableIdentifier; /** * SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and @@ -86,7 +85,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .getPipeline() .apply( IcebergIO.readRows(configuration.getIcebergCatalog()) - .from(TableIdentifier.parse(configuration.getTable()))); + .from(IcebergUtils.parseTableIdentifier(configuration.getTable()))); return PCollectionRowTuple.of(OUTPUT_TAG, output); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 60372b172af7..640283d83c2e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -23,6 +23,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.expressions.Expression; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -51,7 +52,9 @@ public enum ScanType { public Table getTable() { if (cachedTable == null) { cachedTable = - getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier())); + getCatalogConfig() + .catalog() + .loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier())); } return cachedTable; } @@ -126,7 +129,7 @@ public abstract static class Builder { public abstract Builder setTableIdentifier(String tableIdentifier); public Builder setTableIdentifier(TableIdentifier tableIdentifier) { - return this.setTableIdentifier(tableIdentifier.toString()); + return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier)); } public Builder setTableIdentifier(String... names) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index ef19a5881366..bd2f743172dc 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -19,6 +19,9 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; @@ -36,6 +39,8 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -47,6 +52,9 @@ /** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private IcebergUtils() {} private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = @@ -506,4 +514,13 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } + + public static TableIdentifier parseTableIdentifier(String table) { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(table); + return TableIdentifierParser.fromJson(jsonNode); + } catch (JsonProcessingException e) { + return TableIdentifier.parse(table); + } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java index 861a8ad198a8..be810aa20a13 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java @@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable @VisibleForTesting TableIdentifier getTableIdentifier() { if (tableId == null) { - tableId = TableIdentifier.parse(checkStateNotNull(tableIdString)); + tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString)); } return tableId; } @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException { tableIdString = in.readUTF(); - tableId = TableIdentifier.parse(tableIdString); + tableId = IcebergUtils.parseTableIdentifier(tableIdString); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java index 47f661bba3f8..58f70463bc76 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; class PortableIcebergDestinations implements DynamicDestinations { @@ -73,7 +72,7 @@ public String getTableStringIdentifier(ValueInSingleWindow element) { @Override public IcebergDestination instantiateDestination(String dest) { return IcebergDestination.builder() - .setTableIdentifier(TableIdentifier.parse(dest)) + .setTableIdentifier(IcebergUtils.parseTableIdentifier(dest)) .setTableCreateConfig(null) .setFileFormat(FileFormat.fromString(fileFormat)) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 0406ff31e61e..1c3f9b53f31a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -23,6 +23,8 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -68,11 +70,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class IcebergIOReadTest { private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class); @@ -83,6 +85,21 @@ public class IcebergIOReadTest { @Rule public TestPipeline testPipeline = TestPipeline.create(); + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList( + new Object[][] { + {String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())}, + {String.format("default.%s", tableId())}, + }); + } + + public static String tableId() { + return "table" + Long.toString(UUID.randomUUID().hashCode(), 16); + } + + @Parameterized.Parameter public String tableStringIdentifier; + static class PrintRow extends DoFn { @ProcessElement @@ -94,8 +111,7 @@ public void process(@Element Row row, OutputReceiver output) throws Excepti @Test public void testSimpleScan() throws Exception { - TableIdentifier tableId = - TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 134f05c34bfb..918c6b1146ee 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -19,11 +19,13 @@ import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.math.BigDecimal; @@ -32,6 +34,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -49,6 +52,7 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Test class for {@link IcebergUtils}. */ @RunWith(Enclosed.class) @@ -802,4 +806,40 @@ public void testStructIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema); } } + + @RunWith(Parameterized.class) + public static class TableIdentifierParseTests { + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList( + new Object[][] { + { + "{\"namespace\": [\"dogs\", \"owners.and.handlers\"], \"name\": \"food\"}", + "dogs.owners.and.handlers.food", + true + }, + {"dogs.owners.and.handlers.food", "dogs.owners.and.handlers.food", true}, + {"{\"name\": \"food\"}", "food", true}, + {"{\"table_name\": \"food\"}", "{\"table_name\": \"food\"}", false}, + }); + } + + @Parameterized.Parameter public String input; + + @Parameterized.Parameter(1) + public String expected; + + @Parameterized.Parameter(2) + public boolean shouldSucceed; + + @Test + public void test() { + if (shouldSucceed) { + assertEquals(expected, parseTableIdentifier(input).toString()); + } else { + assertThrows(IllegalArgumentException.class, () -> parseTableIdentifier(input)); + } + } + } } From b526c901d638399934d4f972b59b885646d492ae Mon Sep 17 00:00:00 2001 From: regadas Date: Thu, 16 Jan 2025 15:58:28 -0500 Subject: [PATCH 2/2] Avoid TableIdentifier json parsing --- .../sdk/io/iceberg/AppendFilesToTables.java | 30 +++--- .../sdk/io/iceberg/AssignDestinations.java | 22 +++-- .../sdk/io/iceberg/DynamicDestinations.java | 4 +- .../beam/sdk/io/iceberg/FileWriteResult.java | 23 ++++- .../sdk/io/iceberg/IcebergScanConfig.java | 17 ++-- .../sdk/io/iceberg/IcebergWriteResult.java | 7 +- .../IcebergWriteSchemaTransformProvider.java | 18 ++-- .../iceberg/OneTableDynamicDestinations.java | 20 ++-- .../iceberg/PortableIcebergDestinations.java | 17 ++-- .../iceberg/SerializableTableIdentifier.java | 93 +++++++++++++++++++ .../sdk/io/iceberg/TableIdentifierCoder.java | 58 ++++++++++++ .../TableIdentifierRowInterpolator.java | 50 ++++++++++ .../io/iceberg/WriteGroupedRowsToFiles.java | 11 ++- .../sdk/io/iceberg/WriteToDestinations.java | 20 ++-- .../io/iceberg/WriteUngroupedRowsToFiles.java | 24 ++--- .../sdk/io/iceberg/IcebergIOWriteTest.java | 22 +++-- ...ebergWriteSchemaTransformProviderTest.java | 68 ++++++++------ 17 files changed, 380 insertions(+), 124 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableTableIdentifier.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierCoder.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierRowInterpolator.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 72220faf3004..96f001c4f38e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; @@ -47,13 +46,15 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class AppendFilesToTables - extends PTransform, PCollection>> { + extends PTransform< + PCollection, PCollection>> { private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class); private final IcebergCatalogConfig catalogConfig; private final String manifestFilePrefix; @@ -64,28 +65,31 @@ class AppendFilesToTables } @Override - public PCollection> expand(PCollection writtenFiles) { + public PCollection> expand( + PCollection writtenFiles) { // Apply any sharded writes and flatten everything for catalog updates return writtenFiles .apply( "Key metadata updates by table", WithKeys.of( - new SerializableFunction() { + new SerializableFunction() { @Override - public String apply(FileWriteResult input) { - return input.getTableIdentifier().toString(); + public TableIdentifier apply(FileWriteResult input) { + return input.getTableIdentifier(); } })) + .setCoder(KvCoder.of(TableIdentifierCoder.of(), FileWriteResult.CODER)) .apply("Group metadata updates by table", GroupByKey.create()) .apply( "Append metadata updates to tables", ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix))) - .setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER)); + .setCoder(KvCoder.of(TableIdentifierCoder.of(), SnapshotInfo.CODER)); } private static class AppendFilesToTablesDoFn - extends DoFn>, KV> { + extends DoFn< + KV>, KV> { private final Counter snapshotsCreated = Metrics.counter(AppendFilesToTables.class, "snapshotsCreated"); private final Distribution committedDataFileByteSize = @@ -122,17 +126,17 @@ private boolean containsMultiplePartitionSpecs(Iterable fileWri @ProcessElement public void processElement( - @Element KV> element, - OutputReceiver> out, + @Element KV> element, + OutputReceiver> out, BoundedWindow window) throws IOException { - String tableStringIdentifier = element.getKey(); + TableIdentifier tableIdentifier = element.getKey(); Iterable fileWriteResults = element.getValue(); if (!fileWriteResults.iterator().hasNext()) { return; } - Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey())); + Table table = getCatalog().loadTable(tableIdentifier); // vast majority of the time, we will simply append data files. // in the rare case we get a batch that contains multiple partition specs, we will group @@ -145,7 +149,7 @@ public void processElement( } Snapshot snapshot = table.currentSnapshot(); - LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot); + LOG.info("Created new snapshot for table '{}': {}", tableIdentifier, snapshot); snapshotsCreated.inc(); out.outputWithTimestamp( KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java index 9aba3d830234..704255a2eeb8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java @@ -19,7 +19,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -29,6 +28,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.catalog.TableIdentifier; import org.joda.time.Instant; /** @@ -37,7 +37,8 @@ *

The output record will have the format { dest: ..., data: ...} where the dest field has the * assigned metadata and the data field has the original row. */ -class AssignDestinations extends PTransform, PCollection>> { +class AssignDestinations + extends PTransform, PCollection>> { private final DynamicDestinations dynamicDestinations; @@ -46,27 +47,30 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) { } @Override - public PCollection> expand(PCollection input) { + public PCollection> expand(PCollection input) { return input .apply( ParDo.of( - new DoFn>() { + new DoFn>() { @ProcessElement public void processElement( @Element Row element, BoundedWindow window, PaneInfo paneInfo, @Timestamp Instant timestamp, - OutputReceiver> out) { - String tableIdentifier = - dynamicDestinations.getTableStringIdentifier( - ValueInSingleWindow.of(element, timestamp, window, paneInfo)); + OutputReceiver> out) { + TableIdentifier tableIdentifier = + dynamicDestinations + .getTableIdentifier( + ValueInSingleWindow.of(element, timestamp, window, paneInfo)) + .toTableIdentifier(); Row data = dynamicDestinations.getData(element); out.output(KV.of(tableIdentifier, data)); } })) .setCoder( - KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + KvCoder.of( + TableIdentifierCoder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java index 0185758c8aeb..e55de38d9622 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java @@ -29,9 +29,9 @@ public interface DynamicDestinations extends Serializable { Row getData(Row element); - IcebergDestination instantiateDestination(String destination); + IcebergDestination instantiateDestination(TableIdentifier destination); - String getTableStringIdentifier(ValueInSingleWindow element); + SerializableTableIdentifier getTableIdentifier(ValueInSingleWindow element); static DynamicDestinations singleTable(TableIdentifier tableId, Schema inputSchema) { return new OneTableDynamicDestinations(tableId, inputSchema); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index d58ac8696d37..cf1788c93223 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -20,29 +20,42 @@ import com.google.auto.value.AutoValue; import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.catalog.TableIdentifierParser; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @AutoValue @DefaultSchema(AutoValueSchema.class) abstract class FileWriteResult { + public static final SchemaCoder CODER; + + static { + try { + SchemaRegistry registry = SchemaRegistry.createDefault(); + CODER = registry.getSchemaCoder(FileWriteResult.class); + + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier; private transient @MonotonicNonNull DataFile cachedDataFile; - abstract String getTableIdentifierString(); + abstract SerializableTableIdentifier getSerializableTableIdentifier(); abstract SerializableDataFile getSerializableDataFile(); @SchemaIgnore public TableIdentifier getTableIdentifier() { if (cachedTableIdentifier == null) { - cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString()); + cachedTableIdentifier = getSerializableTableIdentifier().toTableIdentifier(); } return cachedTableIdentifier; } @@ -62,13 +75,13 @@ public static Builder builder() { @AutoValue.Builder abstract static class Builder { - abstract Builder setTableIdentifierString(String tableIdString); + abstract Builder setSerializableTableIdentifier(SerializableTableIdentifier tableId); abstract Builder setSerializableDataFile(SerializableDataFile dataFile); @SchemaIgnore public Builder setTableIdentifier(TableIdentifier tableId) { - return setTableIdentifierString(TableIdentifierParser.toJson(tableId)); + return setSerializableTableIdentifier(SerializableTableIdentifier.of(tableId)); } public abstract FileWriteResult build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 640283d83c2e..9dd9afa66062 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -23,7 +23,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.expressions.Expression; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -46,15 +45,17 @@ public enum ScanType { public abstract IcebergCatalogConfig getCatalogConfig(); @Pure - public abstract String getTableIdentifier(); + abstract SerializableTableIdentifier getSerializableTableIdentifier(); + + @Pure + public TableIdentifier getTableIdentifier() { + return getSerializableTableIdentifier().toTableIdentifier(); + } @Pure public Table getTable() { if (cachedTable == null) { - cachedTable = - getCatalogConfig() - .catalog() - .loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier())); + cachedTable = getCatalogConfig().catalog().loadTable(getTableIdentifier()); } return cachedTable; } @@ -126,10 +127,10 @@ public abstract static class Builder { public abstract Builder setCatalogConfig(IcebergCatalogConfig catalog); - public abstract Builder setTableIdentifier(String tableIdentifier); + abstract Builder setSerializableTableIdentifier(SerializableTableIdentifier tableIdentifier); public Builder setTableIdentifier(TableIdentifier tableIdentifier) { - return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier)); + return this.setSerializableTableIdentifier(SerializableTableIdentifier.of(tableIdentifier)); } public Builder setTableIdentifier(String... names) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteResult.java index 8e2549b5dadb..1dd00a91e07d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteResult.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; public final class IcebergWriteResult implements POutput { @@ -35,13 +36,13 @@ public final class IcebergWriteResult implements POutput { private final Pipeline pipeline; - private final PCollection> snapshots; + private final PCollection> snapshots; - public PCollection> getSnapshots() { + public PCollection> getSnapshots() { return snapshots; } - IcebergWriteResult(Pipeline pipeline, PCollection> snapshots) { + IcebergWriteResult(Pipeline pipeline, PCollection> snapshots) { this.pipeline = pipeline; this.snapshots = snapshots; } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 6aa830e7fbc6..862db4e9f056 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -58,13 +59,16 @@ public class IcebergWriteSchemaTransformProvider static final String SNAPSHOTS_TAG = "snapshots"; static final Schema OUTPUT_SCHEMA = - Schema.builder().addStringField("table").addFields(SnapshotInfo.SCHEMA.getFields()).build(); + Schema.builder() + .addRowField("table", SerializableTableIdentifier.SCHEMA) + .addFields(SnapshotInfo.SCHEMA.getFields()) + .build(); @Override public String description() { return "Writes Beam Rows to Iceberg.\n" + "Returns a PCollection representing the snapshots produced in the process, with the following schema:\n" - + "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), \"manifestListLocation\" (str)}"; + + "{\"table\" {\"namespace\" (list[str]), \"tableName\" (str)}, \"operation\" (str), \"summary\" (map[str, str]), \"manifestListLocation\" (str)}"; } @DefaultSchema(AutoValueSchema.class) @@ -184,7 +188,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { IcebergIO.writeRows(configuration.getIcebergCatalog()) .to( new PortableIcebergDestinations( - configuration.getTable(), + IcebergUtils.parseTableIdentifier(configuration.getTable()), FileFormat.PARQUET.toString(), rows.getSchema(), configuration.getDrop(), @@ -209,13 +213,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } @VisibleForTesting - static class SnapshotToRow extends SimpleFunction, Row> { + static class SnapshotToRow extends SimpleFunction, Row> { @Override - public Row apply(KV input) { + public Row apply(KV input) { + SerializableTableIdentifier tableIdentifier = + SerializableTableIdentifier.of(input.getKey()); SnapshotInfo snapshot = input.getValue(); return Row.withSchema(OUTPUT_SCHEMA) - .addValue(input.getKey()) + .addValue(tableIdentifier.toRow()) .addValues(snapshot.toRow().getValues()) .build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java index be810aa20a13..86245a0432f9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java @@ -33,7 +33,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable { // TableId represented as String for serializability - private transient @MonotonicNonNull String tableIdString; + private transient @MonotonicNonNull SerializableTableIdentifier tableIdString; private transient @MonotonicNonNull TableIdentifier tableId; private transient @MonotonicNonNull Schema rowSchema; @@ -41,13 +41,13 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable @VisibleForTesting TableIdentifier getTableIdentifier() { if (tableId == null) { - tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString)); + tableId = checkStateNotNull(tableIdString).toTableIdentifier(); } return tableId; } OneTableDynamicDestinations(TableIdentifier tableId, Schema rowSchema) { - this.tableIdString = tableId.toString(); + this.tableIdString = SerializableTableIdentifier.of(tableId); this.rowSchema = rowSchema; } @@ -62,12 +62,12 @@ public Row getData(Row element) { } @Override - public String getTableStringIdentifier(ValueInSingleWindow element) { + public SerializableTableIdentifier getTableIdentifier(ValueInSingleWindow element) { return checkStateNotNull(tableIdString); } @Override - public IcebergDestination instantiateDestination(String unused) { + public IcebergDestination instantiateDestination(TableIdentifier unused) { return IcebergDestination.builder() .setTableIdentifier(getTableIdentifier()) .setTableCreateConfig(null) @@ -80,12 +80,16 @@ public OneTableDynamicDestinations() {} @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(checkStateNotNull(tableIdString)); + out.writeObject(checkStateNotNull(tableIdString)); } @Override public void readExternal(ObjectInput in) throws IOException { - tableIdString = in.readUTF(); - tableId = IcebergUtils.parseTableIdentifier(tableIdString); + try { + tableIdString = (SerializableTableIdentifier) in.readObject(); + tableId = tableIdString.toTableIdentifier(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java index 58f70463bc76..502fd013c56d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -20,25 +20,25 @@ import java.util.List; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.RowFilter; -import org.apache.beam.sdk.util.RowStringInterpolator; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; class PortableIcebergDestinations implements DynamicDestinations { private final RowFilter rowFilter; - private final RowStringInterpolator interpolator; + private final TableIdentifierRowInterpolator interpolator; private final String fileFormat; public PortableIcebergDestinations( - String destinationTemplate, + TableIdentifier destinationTemplate, String fileFormat, Schema inputSchema, @Nullable List fieldsToDrop, @Nullable List fieldsToKeep, @Nullable String onlyField) { - interpolator = new RowStringInterpolator(destinationTemplate, inputSchema); + interpolator = new TableIdentifierRowInterpolator(destinationTemplate, inputSchema); RowFilter rf = new RowFilter(inputSchema); if (fieldsToDrop != null) { @@ -65,14 +65,15 @@ public Row getData(Row element) { } @Override - public String getTableStringIdentifier(ValueInSingleWindow element) { - return interpolator.interpolate(element); + public SerializableTableIdentifier getTableIdentifier(ValueInSingleWindow element) { + TableIdentifier tableIdentifier = interpolator.interpolate(element); + return SerializableTableIdentifier.of(tableIdentifier); } @Override - public IcebergDestination instantiateDestination(String dest) { + public IcebergDestination instantiateDestination(TableIdentifier dest) { return IcebergDestination.builder() - .setTableIdentifier(IcebergUtils.parseTableIdentifier(dest)) + .setTableIdentifier(dest) .setTableCreateConfig(null) .setFileFormat(FileFormat.fromString(fileFormat)) .build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableTableIdentifier.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableTableIdentifier.java new file mode 100644 index 000000000000..c3d257bee392 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableTableIdentifier.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +@AutoValue +@DefaultSchema(AutoValueSchema.class) +abstract class SerializableTableIdentifier implements Serializable { + private static final SerializableFunction TO_ROW_FUNCTION; + private static final SerializableFunction FROM_ROW_FUNCTION; + static final Schema SCHEMA; + + static { + try { + SchemaRegistry registry = SchemaRegistry.createDefault(); + SCHEMA = registry.getSchema(SerializableTableIdentifier.class); + TO_ROW_FUNCTION = registry.getToRowFunction(SerializableTableIdentifier.class); + FROM_ROW_FUNCTION = registry.getFromRowFunction(SerializableTableIdentifier.class); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + Row toRow() { + return TO_ROW_FUNCTION.apply(this); + } + + static SerializableTableIdentifier fromRow(Row row) { + return FROM_ROW_FUNCTION.apply(row); + } + + abstract List getNamespace(); + + abstract String getTableName(); + + static SerializableTableIdentifier of(TableIdentifier tableIdentifier) { + List namespace = Arrays.asList(tableIdentifier.namespace().levels()); + String tableName = tableIdentifier.name(); + + return SerializableTableIdentifier.builder() + .setNamespace(namespace) + .setTableName(tableName) + .build(); + } + + @SchemaIgnore + TableIdentifier toTableIdentifier() { + String[] levels = getNamespace().toArray(new String[0]); + return TableIdentifier.of(Namespace.of(levels), getTableName()); + } + + public static Builder builder() { + return new AutoValue_SerializableTableIdentifier.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setNamespace(List namespace); + + abstract Builder setTableName(String tableName); + + abstract SerializableTableIdentifier build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierCoder.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierCoder.java new file mode 100644 index 000000000000..0f7a710f8d37 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierCoder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +public final class TableIdentifierCoder extends AtomicCoder { + private static final StringUtf8Coder NAME_CODER = StringUtf8Coder.of(); + private static final ListCoder NAMESPACE_CODER = ListCoder.of(StringUtf8Coder.of()); + private static final TableIdentifierCoder INSTANCE = new TableIdentifierCoder(); + + public static TableIdentifierCoder of() { + return INSTANCE; + } + + @Override + public void encode(TableIdentifier value, OutputStream outStream) throws IOException { + NAMESPACE_CODER.encode(Arrays.asList(value.namespace().levels()), outStream); + NAME_CODER.encode(value.name(), outStream); + } + + @Override + public TableIdentifier decode(InputStream inStream) throws IOException { + List levels = NAMESPACE_CODER.decode(inStream); + Namespace namespace = Namespace.of(levels.toArray(new String[0])); + String name = NAME_CODER.decode(inStream); + return TableIdentifier.of(namespace, name); + } + + @Override + public boolean consistentWithEquals() { + return true; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierRowInterpolator.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierRowInterpolator.java new file mode 100644 index 000000000000..be7144083b8d --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableIdentifierRowInterpolator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.util.RowStringInterpolator; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +public class TableIdentifierRowInterpolator implements Serializable { + + private final List namespaceInterpolators; + private final RowStringInterpolator nameInterpolator; + + public TableIdentifierRowInterpolator(TableIdentifier tableIdentifier, Schema inputSchema) { + namespaceInterpolators = + Arrays.stream(tableIdentifier.namespace().levels()) + .map(l -> new RowStringInterpolator(l, inputSchema)) + .collect(Collectors.toList()); + nameInterpolator = new RowStringInterpolator(tableIdentifier.name(), inputSchema); + } + + public TableIdentifier interpolate(ValueInSingleWindow element) { + String[] levels = + namespaceInterpolators.stream().map(i -> i.interpolate(element)).toArray(String[]::new); + String name = nameInterpolator.interpolate(element); + return TableIdentifier.of(Namespace.of(levels), name); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 6a61aafbe8b9..cf68a0479978 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -30,11 +30,12 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; class WriteGroupedRowsToFiles extends PTransform< - PCollection, Iterable>>, PCollection> { + PCollection, Iterable>>, PCollection> { private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb @@ -53,7 +54,7 @@ class WriteGroupedRowsToFiles @Override public PCollection expand( - PCollection, Iterable>> input) { + PCollection, Iterable>> input) { return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( @@ -61,7 +62,7 @@ public PCollection expand( } private static class WriteGroupedRowsToFilesDoFn - extends DoFn, Iterable>, FileWriteResult> { + extends DoFn, Iterable>, FileWriteResult> { private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -90,12 +91,12 @@ private org.apache.iceberg.catalog.Catalog getCatalog() { @ProcessElement public void processElement( ProcessContext c, - @Element KV, Iterable> element, + @Element KV, Iterable> element, BoundedWindow window, PaneInfo pane) throws Exception { - String tableIdentifier = element.getKey().getKey(); + TableIdentifier tableIdentifier = element.getKey().getKey(); IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); WindowedValue windowedDestination = WindowedValue.of(destination, window.maxTimestamp(), window, pane); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index fb3bf43f3515..7b9443718872 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; @@ -38,10 +37,12 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; -class WriteToDestinations extends PTransform>, IcebergWriteResult> { +class WriteToDestinations + extends PTransform>, IcebergWriteResult> { // Used for auto-sharding in streaming. Limits number of records per batch/file private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000; @@ -65,7 +66,7 @@ class WriteToDestinations extends PTransform>, Icebe } @Override - public IcebergWriteResult expand(PCollection> input) { + public IcebergWriteResult expand(PCollection> input) { // Write records to files PCollection writtenFiles = input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) @@ -73,31 +74,31 @@ public IcebergWriteResult expand(PCollection> input) { : writeUntriggered(input); // Commit files to tables - PCollection> snapshots = + PCollection> snapshots = writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix)); return new IcebergWriteResult(input.getPipeline(), snapshots); } - private PCollection writeTriggered(PCollection> input) { + private PCollection writeTriggered(PCollection> input) { checkArgumentNotNull( triggeringFrequency, "Streaming pipelines must set a triggering frequency."); // Group records into batches to avoid writing thousands of small files - PCollection, Iterable>> groupedRecords = + PCollection, Iterable>> groupedRecords = input .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) // We rely on GroupIntoBatches to group and parallelize records properly, // respecting our thresholds for number of records and bytes per batch. // Each output batch will be written to a file. .apply( - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) .withByteSize(FILE_TRIGGERING_BYTE_COUNT) .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) .withShardedKey()) .setCoder( KvCoder.of( - org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()), + org.apache.beam.sdk.util.ShardedKey.Coder.of(TableIdentifierCoder.of()), IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); return groupedRecords @@ -115,7 +116,8 @@ private PCollection writeTriggered(PCollection> .discardingFiredPanes()); } - private PCollection writeUntriggered(PCollection> input) { + private PCollection writeUntriggered( + PCollection> input) { Preconditions.checkArgument( triggeringFrequency == null, "Triggering frequency is only applicable for streaming pipelines."); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 80bf962283f0..f1fbc74f968b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -47,6 +46,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -56,7 +56,7 @@ * written via another method. */ class WriteUngroupedRowsToFiles - extends PTransform>, WriteUngroupedRowsToFiles.Result> { + extends PTransform>, WriteUngroupedRowsToFiles.Result> { /** * Maximum number of writers that will be created per bundle. Any elements requiring more writers @@ -68,8 +68,8 @@ class WriteUngroupedRowsToFiles private static final TupleTag WRITTEN_FILES_TAG = new TupleTag<>("writtenFiles"); private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; - private static final TupleTag, Row>> SPILLED_ROWS_TAG = - new TupleTag, Row>>("spilledRows") {}; + private static final TupleTag, Row>> SPILLED_ROWS_TAG = + new TupleTag, Row>>("spilledRows") {}; private final String filePrefix; private final DynamicDestinations dynamicDestinations; @@ -85,7 +85,7 @@ class WriteUngroupedRowsToFiles } @Override - public Result expand(PCollection> input) { + public Result expand(PCollection> input) { PCollectionTuple resultTuple = input.apply( @@ -110,7 +110,7 @@ public Result expand(PCollection> input) { .get(SPILLED_ROWS_TAG) .setCoder( KvCoder.of( - ShardedKey.Coder.of(StringUtf8Coder.of()), + ShardedKey.Coder.of(TableIdentifierCoder.of()), RowCoder.of(dynamicDestinations.getDataSchema())))); } @@ -122,14 +122,14 @@ static class Result implements POutput { private final Pipeline pipeline; private final PCollection writtenRows; - private final PCollection, Row>> spilledRows; + private final PCollection, Row>> spilledRows; private final PCollection writtenFiles; private Result( Pipeline pipeline, PCollection writtenFiles, PCollection writtenRows, - PCollection, Row>> spilledRows) { + PCollection, Row>> spilledRows) { this.pipeline = pipeline; this.writtenFiles = writtenFiles; this.writtenRows = writtenRows; @@ -140,7 +140,7 @@ public PCollection getWrittenRows() { return writtenRows; } - public PCollection, Row>> getSpilledRows() { + public PCollection, Row>> getSpilledRows() { return spilledRows; } @@ -182,7 +182,7 @@ public void finishSpecifyingOutput( * */ private static class WriteUngroupedRowsToFilesDoFn - extends DoFn, FileWriteResult> { + extends DoFn, FileWriteResult> { // When we spill records, shard the output keys to prevent hotspots. private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; @@ -224,12 +224,12 @@ public void startBundle() { @ProcessElement public void processElement( - @Element KV element, + @Element KV element, BoundedWindow window, PaneInfo pane, MultiOutputReceiver out) throws Exception { - String dest = element.getKey(); + TableIdentifier dest = element.getKey(); Row data = element.getValue(); IcebergDestination destination = dynamicDestinations.instantiateDestination(dest); WindowedValue windowedDestination = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 87a543a439ec..1285489119f9 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -140,15 +140,17 @@ public Row getData(Row element) { } @Override - public String getTableStringIdentifier(ValueInSingleWindow element) { + public SerializableTableIdentifier getTableIdentifier(ValueInSingleWindow element) { long tableNumber = element.getValue().getInt64("id") / 3 + 1; - return String.format("default.table%s-%s", tableNumber, salt); + String tableIdentifierStr = String.format("default.table%s-%s", tableNumber, salt); + TableIdentifier tableIdentifier = TableIdentifier.parse(tableIdentifierStr); + return SerializableTableIdentifier.of(tableIdentifier); } @Override - public IcebergDestination instantiateDestination(String dest) { + public IcebergDestination instantiateDestination(TableIdentifier dest) { return IcebergDestination.builder() - .setTableIdentifier(TableIdentifier.parse(dest)) + .setTableIdentifier(dest) .setFileFormat(FileFormat.PARQUET) .build(); } @@ -240,15 +242,17 @@ public Row getData(Row element) { } @Override - public String getTableStringIdentifier(ValueInSingleWindow element) { + public SerializableTableIdentifier getTableIdentifier(ValueInSingleWindow element) { long tableNumber = element.getValue().getInt64("id") % numDestinations; - return String.format("default.table%s-%s", tableNumber, salt); + String tableIdentifierStr = String.format("default.table%s-%s", tableNumber, salt); + TableIdentifier tableIdentifier = TableIdentifier.parse(tableIdentifierStr); + return SerializableTableIdentifier.of(tableIdentifier); } @Override - public IcebergDestination instantiateDestination(String dest) { + public IcebergDestination instantiateDestination(TableIdentifier dest) { return IcebergDestination.builder() - .setTableIdentifier(TableIdentifier.parse(dest)) + .setTableIdentifier(dest) .setFileFormat(FileFormat.PARQUET) .build(); } @@ -345,7 +349,7 @@ public void testStreamingWrite() { .advanceProcessingTime(Duration.standardSeconds(5)) .advanceWatermarkToInfinity(); - PCollection> output = + PCollection> output = testPipeline .apply("Stream Records", stream) .apply( diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 9834547c4741..2912a1e4be9a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; @@ -53,6 +54,8 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -102,7 +105,8 @@ public void testBuildTransformWithRow() { @Test public void testSimpleAppend() { - String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + TableIdentifier identifier = + TableIdentifier.parse("default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16)); Map properties = new HashMap<>(); properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); @@ -110,7 +114,7 @@ public void testSimpleAppend() { Configuration config = Configuration.builder() - .setTable(identifier) + .setTable(identifier.toString()) .setCatalogName("name") .setCatalogProperties(properties) .build(); @@ -133,8 +137,7 @@ public void testSimpleAppend() { testPipeline.run().waitUntilFinish(); - TableIdentifier tableId = TableIdentifier.parse(identifier); - Table table = warehouse.loadTable(tableId); + Table table = warehouse.loadTable(identifier); List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); @@ -143,7 +146,8 @@ public void testSimpleAppend() { @Test public void testWriteUsingManagedTransform() { - String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + TableIdentifier identifier = + TableIdentifier.parse("default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16)); String yamlConfig = String.format( @@ -167,7 +171,7 @@ public void testWriteUsingManagedTransform() { testPipeline.run().waitUntilFinish(); - Table table = warehouse.loadTable(TableIdentifier.parse(identifier)); + Table table = warehouse.loadTable(identifier); List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); } @@ -256,15 +260,20 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo Instant first = new Instant(0); Instant second = first.plus(Duration.standardDays(1)); Instant third = second.plus(Duration.standardDays(1)); - String identifier0 = - interpolator.interpolate( - ValueInSingleWindow.of(rows.get(0), first, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - String identifier1 = - interpolator.interpolate( - ValueInSingleWindow.of(rows.get(1), second, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - String identifier2 = - interpolator.interpolate( - ValueInSingleWindow.of(rows.get(2), third, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + + List instants = Lists.newArrayList(first, second, third); + + List identifiers = + Streams.zip( + rows.stream(), + instants.stream(), + (row, instant) -> { + return ValueInSingleWindow.of( + row, instant, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + }) + .map(interpolator::interpolate) + .map(TableIdentifier::parse) + .collect(Collectors.toList()); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema()); @@ -301,13 +310,15 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo PAssert.that(result) .satisfies( - new VerifyOutputs(Arrays.asList(identifier0, identifier1, identifier2), "append")); + new VerifyOutputs( + Arrays.asList(identifiers.get(0), identifiers.get(1), identifiers.get(2)), + "append")); testPipeline.run().waitUntilFinish(); - Table table0 = warehouse.loadTable(TableIdentifier.parse(identifier0)); - Table table1 = warehouse.loadTable(TableIdentifier.parse(identifier1)); - Table table2 = warehouse.loadTable(TableIdentifier.parse(identifier2)); + Table table0 = warehouse.loadTable(identifiers.get(0)); + Table table1 = warehouse.loadTable(identifiers.get(1)); + Table table2 = warehouse.loadTable(identifiers.get(2)); List table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build()); List table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build()); List table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build()); @@ -352,19 +363,21 @@ public void testStreamToDynamicDestinationsAndKeepFields() { } private static class VerifyOutputs implements SerializableFunction, Void> { - private final List tableIds; + private final List tableIds; private final String operation; - public VerifyOutputs(List identifier, String operation) { - this.tableIds = identifier; + public VerifyOutputs(List identifier, String operation) { + this.tableIds = + identifier.stream().map(SerializableTableIdentifier::of).collect(Collectors.toList()); this.operation = operation; } @Override public Void apply(Iterable input) { Row row = input.iterator().next(); - - assertThat(tableIds, Matchers.hasItem(row.getString("table"))); + TableIdentifier tableIdentifier = + SerializableTableIdentifier.fromRow(row.getRow("table")).toTableIdentifier(); + assertThat(tableIds, Matchers.hasItem(SerializableTableIdentifier.of(tableIdentifier))); assertEquals(operation, row.getString("operation")); return null; } @@ -405,13 +418,14 @@ public void testWritePartitionedData() { .hour("h_datetime") .hour("h_datetime_tz") .build(); - String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + TableIdentifier identifier = + TableIdentifier.parse("default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16)); - warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec); + warehouse.createTable(identifier, icebergSchema, spec); Map config = ImmutableMap.of( "table", - identifier, + identifier.toString(), "catalog_properties", ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));