From fd634ab015141bca609476912a13bc1c9fab3865 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 30 Jun 2025 22:18:39 -0700 Subject: [PATCH 1/4] [IcebergIO]Create tables with Iceberg Table properties --- .../IO_Iceberg_Integration_Tests.json | 2 +- .../io/iceberg/IcebergTableCreateConfig.java | 6 +++ .../IcebergWriteSchemaTransformProvider.java | 8 +++ .../iceberg/PortableIcebergDestinations.java | 5 ++ .../sdk/io/iceberg/RecordWriterManager.java | 6 ++- ...ebergWriteSchemaTransformProviderTest.java | 52 +++++++++++++++++++ .../iceberg/catalog/IcebergCatalogBaseIT.java | 28 ++++++++++ .../content/en/documentation/io/managed-io.md | 15 ++++++ 8 files changed, 120 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..5d04b2c0a8c7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 5 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java index 9070cdfec1b8..706d9eea4ebb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.schemas.Schema; import org.apache.iceberg.PartitionSpec; import org.checkerframework.checker.nullness.qual.Nullable; @@ -40,6 +41,9 @@ public PartitionSpec getPartitionSpec() { @Pure public abstract @Nullable List getPartitionFields(); + @Pure + public abstract @Nullable Map getTableProperties(); + @Pure public static Builder builder() { return new AutoValue_IcebergTableCreateConfig.Builder(); @@ -51,6 +55,8 @@ public abstract static class Builder { public abstract Builder setPartitionFields(@Nullable List partitionFields); + public abstract Builder setTableProperties(@Nullable Map tableProperties); + @Pure public abstract IcebergTableCreateConfig build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 48fcfe536746..fb0d63092964 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -124,6 +124,11 @@ public static Builder builder() { + "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.") public abstract @Nullable List getPartitionFields(); + @SchemaFieldDescription( + "Iceberg table properties to be set on the table when it is created," + + "For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") + public abstract @Nullable Map getTableProperties(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String table); @@ -144,6 +149,8 @@ public abstract static class Builder { public abstract Builder setPartitionFields(List partitionFields); + public abstract Builder setTableProperties(Map tableProperties); + public abstract Configuration build(); } @@ -209,6 +216,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { FileFormat.PARQUET.toString(), rows.getSchema(), configuration.getPartitionFields(), + configuration.getTableProperties(), configuration.getDrop(), configuration.getKeep(), configuration.getOnly())); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java index 2893f2f82c0b..161c82b654d4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.iceberg; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.util.RowStringInterpolator; @@ -33,17 +34,20 @@ class PortableIcebergDestinations implements DynamicDestinations { private final String fileFormat; private final @Nullable List partitionFields; + private final @Nullable Map tableProperties; public PortableIcebergDestinations( String destinationTemplate, String fileFormat, Schema inputSchema, @Nullable List partitionFields, + @Nullable Map tableProperties, @Nullable List fieldsToDrop, @Nullable List fieldsToKeep, @Nullable String onlyField) { this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema); this.partitionFields = partitionFields; + this.tableProperties = tableProperties; RowFilter rf = new RowFilter(inputSchema); if (fieldsToDrop != null) { @@ -82,6 +86,7 @@ public IcebergDestination instantiateDestination(String dest) { IcebergTableCreateConfig.builder() .setSchema(getDataSchema()) .setPartitionFields(partitionFields) + .setTableProperties(tableProperties) .build()) .setFileFormat(FileFormat.fromString(fileFormat)) .build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index bcad457187d0..1c8a352382d3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -293,6 +293,10 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema @Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig(); PartitionSpec partitionSpec = createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned(); + Map tableProperties = + (createConfig != null && createConfig.getTableProperties() != null) + ? createConfig.getTableProperties() + : Maps.newHashMap(); synchronized (TABLE_CACHE) { // Create namespace if it does not exist yet @@ -316,7 +320,7 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema } catch (NoSuchTableException e) { // Otherwise, create the table org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema); try { - table = catalog.createTable(identifier, tableSchema, partitionSpec); + table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties); LOG.info( "Created Iceberg table '{}' with schema: {}\n, partition spec: {}", identifier, diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 47713b16d1a1..8b9f4c986f7b 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -543,4 +543,56 @@ public void testWriteCreateTableWithPartitionSpec() { Table table = warehouse.loadTable(TableIdentifier.parse(identifier)); assertEquals(expectedSpec, table.spec()); } + + @Test + public void testWriteCreateTableWithTableProperties() { + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build(); + + // Use real Iceberg table property keys + Map tableProperties = + ImmutableMap.of( + "write.format.default", "orc", + "commit.retry.num-retries", "5", + "read.split.target-size", "134217728"); + + Map config = + ImmutableMap.of( + "table", + identifier, + "catalog_properties", + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), + "table_properties", + tableProperties); + + List rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Row row = Row.withSchema(schema).addValues("str_" + i, i).build(); + rows.add(row); + } + + PCollection result = + testPipeline + .apply("Records To Add", Create.of(rows)) + .setRowSchema(schema) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)) + .get(SNAPSHOTS_TAG); + + PAssert.that(result) + .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append")); + testPipeline.run().waitUntilFinish(); + + // Read back and check records are correct + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + PCollection readRows = + p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(readRows).containsInAnyOrder(rows); + p.run().waitUntilFinish(); + + Table table = warehouse.loadTable(TableIdentifier.parse(identifier)); + // Assert that the table properties are set on the Iceberg table + assertEquals("orc", table.properties().get("write.format.default")); + assertEquals("5", table.properties().get("commit.retry.num-retries")); + assertEquals("134217728", table.properties().get("read.split.target-size")); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index b0d0d159b096..f242938b34c7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -668,6 +668,11 @@ public void testStreamingWrite() throws IOException { Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("triggering_frequency_seconds", 4); config.put("partition_fields", Arrays.asList("bool_field", "modulo_5")); + // Add table properties for testing + Map tableProperties = new HashMap<>(); + tableProperties.put("write.format.default", "parquet"); + tableProperties.put("commit.retry.num-retries", "3"); + config.put("table_properties", tableProperties); // create elements from longs in range [0, 1000) PCollection input = @@ -687,6 +692,8 @@ public void testStreamingWrite() throws IOException { List returnedRecords = readRecords(table); assertThat( returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); + assertEquals("parquet", table.properties().get("write.format.default")); + assertEquals("3", table.properties().get("commit.retry.num-retries")); } @Test @@ -971,4 +978,25 @@ public void runReadBetween(boolean useSnapshotBoundary, boolean streaming) throw PAssert.that(rows).containsInAnyOrder(expectedRows); pipeline.run().waitUntilFinish(); } + + @Test + public void testWriteWithTableProperties() throws IOException { + Map config = new HashMap<>(managedIcebergConfig(tableId())); + Map tableProperties = new HashMap<>(); + tableProperties.put("write.format.default", "parquet"); + tableProperties.put("commit.retry.num-retries", "3"); + config.put("table_properties", tableProperties); + PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); + input.apply(Managed.write(ICEBERG).withConfig(config)); + pipeline.run().waitUntilFinish(); + + Table table = catalog.loadTable(TableIdentifier.parse(tableId())); + // Read back and check records are correct + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); + // Assert that the table properties are set on the Iceberg table + assertEquals("parquet", table.properties().get("write.format.default")); + assertEquals("3", table.properties().get("commit.retry.num-retries")); + } } diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 882e1da3e5e5..88f80ed21832 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -98,6 +98,8 @@ and Beam SQL is invoked via the Managed API under the hood. only (str)
partition_fields (list[str])
triggering_frequency_seconds (int32)
+ table_properties (map[str, str])
@@ -431,6 +433,19 @@ For more information on partition transforms, please visit https://iceberg.apach For a streaming pipeline, sets the frequency at which snapshots are produced. + + + table_properties + + + map[str, str] + + + Table Properties set while creating Iceberg Table. + + For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties + + From ee74a3e1dd1a7bd97713aa81c5530afa553c3019 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 30 Jun 2025 22:18:39 -0700 Subject: [PATCH 2/4] [IcebergIO]Create tables with Iceberg Table properties --- .../sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java | 5 +++-- .../org/apache/beam/sdk/io/iceberg/RecordWriterManager.java | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index fb0d63092964..71c898b00444 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -125,8 +125,9 @@ public static Builder builder() { public abstract @Nullable List getPartitionFields(); @SchemaFieldDescription( - "Iceberg table properties to be set on the table when it is created," - + "For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") + "Iceberg table properties to be set on the table when it is created.\n" + + "For more information on table properties," + + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") public abstract @Nullable Map getTableProperties(); @AutoValue.Builder diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 1c8a352382d3..941efe67e1fd 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -293,6 +293,7 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema @Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig(); PartitionSpec partitionSpec = createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned(); + // Ensure table properties are not null, Iceberg's createTable does not accept null values. Map tableProperties = (createConfig != null && createConfig.getTableProperties() != null) ? createConfig.getTableProperties() From a7b77ad096ba5b963e66dc0e7e38fdd3b3bf6876 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 30 Jun 2025 22:18:39 -0700 Subject: [PATCH 3/4] [IcebergIO]Create tables with Iceberg Table properties --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 1 + .../sdk/io/iceberg/IcebergCatalogConfig.java | 17 ++++-- .../sdk/io/iceberg/RecordWriterManager.java | 8 +-- ...ebergWriteSchemaTransformProviderTest.java | 55 ++++++++++++++++++- .../sdk/io/iceberg/TestDataWarehouse.java | 13 ++++- .../content/en/documentation/io/managed-io.md | 2 +- 7 files changed, 85 insertions(+), 13 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 5d04b2c0a8c7..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 5 + "modification": 2 } diff --git a/CHANGES.md b/CHANGES.md index 5cde77cfebf8..86680d5ba762 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)). +* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496)) ## Breaking Changes diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 7929d028bcdc..141f1a7c2f38 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -84,17 +84,26 @@ public org.apache.iceberg.catalog.Catalog catalog() { } public void createTable( - String tableIdentifier, Schema tableSchema, @Nullable List partitionFields) { + String tableIdentifier, + Schema tableSchema, + @Nullable List partitionFields, + @Nullable Map tableProperties) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); try { - catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); + catalog() + .createTable( + icebergIdentifier, + icebergSchema, + icebergSpec, + tableProperties == null ? Maps.newHashMap() : tableProperties); LOG.info( - "Created table '{}' with schema: {}\n, partition spec: {}", + "Created table '{}' with schema: {}\n, partition spec: {}, table properties: {}", icebergIdentifier, icebergSchema, - icebergSpec); + icebergSpec, + tableProperties); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 941efe67e1fd..b1e8a825601d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -293,9 +293,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema @Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig(); PartitionSpec partitionSpec = createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned(); - // Ensure table properties are not null, Iceberg's createTable does not accept null values. Map tableProperties = - (createConfig != null && createConfig.getTableProperties() != null) + createConfig != null && createConfig.getTableProperties() != null ? createConfig.getTableProperties() : Maps.newHashMap(); @@ -323,10 +322,11 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema try { table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties); LOG.info( - "Created Iceberg table '{}' with schema: {}\n, partition spec: {}", + "Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}", identifier, tableSchema, - partitionSpec); + partitionSpec, + tableProperties); } catch (AlreadyExistsException ignored) { // race condition: another worker already created this table table = catalog.loadTable(identifier); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 8b9f4c986f7b..7028a394d2fd 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -545,7 +545,7 @@ public void testWriteCreateTableWithPartitionSpec() { } @Test - public void testWriteCreateTableWithTableProperties() { + public void testWriteCreateTableWithTablePropertiesSpec() { String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build(); @@ -595,4 +595,57 @@ public void testWriteCreateTableWithTableProperties() { assertEquals("5", table.properties().get("commit.retry.num-retries")); assertEquals("134217728", table.properties().get("read.split.target-size")); } + + @Test + public void testWriteCreateTableWithTableProperties() { + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map tableProperties = + ImmutableMap.of( + "write.format.default", "orc", + "commit.retry.num-retries", "5", + "read.split.target-size", "134217728"); + + // Create the table with properties + warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec, tableProperties); + + List rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Row row = Row.withSchema(schema).addValues("str_" + i, i).build(); + rows.add(row); + } + + Map config = + ImmutableMap.of( + "table", + identifier, + "catalog_properties", + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)); + + PCollection result = + testPipeline + .apply("Records To Add", Create.of(rows)) + .setRowSchema(schema) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)) + .get(SNAPSHOTS_TAG); + + PAssert.that(result) + .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append")); + testPipeline.run().waitUntilFinish(); + + // Read back and check records are correct + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + PCollection readRows = + p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(readRows).containsInAnyOrder(rows); + p.run().waitUntilFinish(); + + Table table = warehouse.loadTable(TableIdentifier.parse(identifier)); + // Assert that the table properties are set on the Iceberg table + assertEquals("orc", table.properties().get("write.format.default")); + assertEquals("5", table.properties().get("commit.retry.num-retries")); + assertEquals("134217728", table.properties().get("read.split.target-size")); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index 04f3751b9fbb..61eba3f6ff88 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -163,13 +163,22 @@ public DataFile writeRecords( } public Table createTable(TableIdentifier tableId, Schema schema) { - return createTable(tableId, schema, null); + return createTable(tableId, schema, null, null); } public Table createTable( TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) { someTableHasBeenCreated = true; - return catalog.createTable(tableId, schema, partitionSpec); + return catalog.createTable(tableId, schema, partitionSpec, null); + } + + public Table createTable( + TableIdentifier tableId, + Schema schema, + @Nullable PartitionSpec partitionSpec, + @Nullable Map tableProperties) { + someTableHasBeenCreated = true; + return catalog.createTable(tableId, schema, partitionSpec, tableProperties); } public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) { diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 88f80ed21832..6ccb9f7f57cc 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -443,7 +443,7 @@ For more information on partition transforms, please visit https://iceberg.apach Table Properties set while creating Iceberg Table. - For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties +For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties From 5458a46ff69b38d4ee040ce227edb0fa612b8179 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 30 Jun 2025 22:18:39 -0700 Subject: [PATCH 4/4] [IcebergIO]Create tables with Iceberg Table properties --- .../sdk/io/iceberg/IcebergCatalogConfig.java | 17 ++++------------- .../content/en/documentation/io/managed-io.md | 18 ++++++++---------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 141f1a7c2f38..7929d028bcdc 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -84,26 +84,17 @@ public org.apache.iceberg.catalog.Catalog catalog() { } public void createTable( - String tableIdentifier, - Schema tableSchema, - @Nullable List partitionFields, - @Nullable Map tableProperties) { + String tableIdentifier, Schema tableSchema, @Nullable List partitionFields) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); try { - catalog() - .createTable( - icebergIdentifier, - icebergSchema, - icebergSpec, - tableProperties == null ? Maps.newHashMap() : tableProperties); + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); LOG.info( - "Created table '{}' with schema: {}\n, partition spec: {}, table properties: {}", + "Created table '{}' with schema: {}\n, partition spec: {}", icebergIdentifier, icebergSchema, - icebergSpec, - tableProperties); + icebergSpec); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e); } diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 6ccb9f7f57cc..53631d279381 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -97,9 +97,8 @@ and Beam SQL is invoked via the Managed API under the hood. keep (list[str])
only (str)
partition_fields (list[str])
+ table_properties (map[str, str])
triggering_frequency_seconds (int32)
- table_properties (map[str, str])
@@ -424,26 +423,25 @@ For more information on partition transforms, please visit https://iceberg.apach - triggering_frequency_seconds + table_properties - int32 + map[str, str] - For a streaming pipeline, sets the frequency at which snapshots are produced. + Iceberg table properties to be set on the table when it is created. +For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties. - table_properties + triggering_frequency_seconds - map[str, str] + int32 - Table Properties set while creating Iceberg Table. - -For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties + For a streaming pipeline, sets the frequency at which snapshots are produced.