Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
"modification": 2
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))
* Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)).
Beam now supports Milvus enrichment handler capabilities for vector, keyword,
and hybrid search operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.iceberg.PartitionSpec;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -40,6 +41,9 @@ public PartitionSpec getPartitionSpec() {
@Pure
public abstract @Nullable List<String> getPartitionFields();

@Pure
public abstract @Nullable Map<String, String> getTableProperties();

@Pure
public static Builder builder() {
return new AutoValue_IcebergTableCreateConfig.Builder();
Expand All @@ -51,6 +55,8 @@ public abstract static class Builder {

public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);

public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);

@Pure
public abstract IcebergTableCreateConfig build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public static Builder builder() {
+ "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.")
public abstract @Nullable List<String> getPartitionFields();

@SchemaFieldDescription(
"Iceberg table properties to be set on the table when it is created.\n"
+ "For more information on table properties,"
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
public abstract @Nullable Map<String, String> getTableProperties();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);
Expand All @@ -144,6 +150,8 @@ public abstract static class Builder {

public abstract Builder setPartitionFields(List<String> partitionFields);

public abstract Builder setTableProperties(Map<String, String> tableProperties);

public abstract Configuration build();
}

Expand Down Expand Up @@ -209,6 +217,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
FileFormat.PARQUET.toString(),
rows.getSchema(),
configuration.getPartitionFields(),
configuration.getTableProperties(),
configuration.getDrop(),
configuration.getKeep(),
configuration.getOnly()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.iceberg;

import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.util.RowStringInterpolator;
Expand All @@ -33,17 +34,20 @@ class PortableIcebergDestinations implements DynamicDestinations {
private final String fileFormat;

private final @Nullable List<String> partitionFields;
private final @Nullable Map<String, String> tableProperties;

public PortableIcebergDestinations(
String destinationTemplate,
String fileFormat,
Schema inputSchema,
@Nullable List<String> partitionFields,
@Nullable Map<String, String> tableProperties,
@Nullable List<String> fieldsToDrop,
@Nullable List<String> fieldsToKeep,
@Nullable String onlyField) {
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
this.partitionFields = partitionFields;
this.tableProperties = tableProperties;
RowFilter rf = new RowFilter(inputSchema);

if (fieldsToDrop != null) {
Expand Down Expand Up @@ -82,6 +86,7 @@ public IcebergDestination instantiateDestination(String dest) {
IcebergTableCreateConfig.builder()
.setSchema(getDataSchema())
.setPartitionFields(partitionFields)
.setTableProperties(tableProperties)
.build())
.setFileFormat(FileFormat.fromString(fileFormat))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
PartitionSpec partitionSpec =
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
Map<String, String> tableProperties =
createConfig != null && createConfig.getTableProperties() != null
? createConfig.getTableProperties()
: Maps.newHashMap();

synchronized (TABLE_CACHE) {
// Create namespace if it does not exist yet
Expand All @@ -316,12 +320,13 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
table = catalog.createTable(identifier, tableSchema, partitionSpec);
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
LOG.info(
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}",
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec);
partitionSpec,
tableProperties);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
table = catalog.loadTable(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,109 @@ public void testWriteCreateTableWithPartitionSpec() {
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
assertEquals(expectedSpec, table.spec());
}

@Test
public void testWriteCreateTableWithTablePropertiesSpec() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();

// Use real Iceberg table property keys
Map<String, String> tableProperties =
ImmutableMap.of(
"write.format.default", "orc",
"commit.retry.num-retries", "5",
"read.split.target-size", "134217728");

Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location),
"table_properties",
tableProperties);

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
rows.add(row);
}

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

// Read back and check records are correct
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
// Assert that the table properties are set on the Iceberg table
assertEquals("orc", table.properties().get("write.format.default"));
assertEquals("5", table.properties().get("commit.retry.num-retries"));
assertEquals("134217728", table.properties().get("read.split.target-size"));
}

@Test
public void testWriteCreateTableWithTableProperties() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> tableProperties =
ImmutableMap.of(
"write.format.default", "orc",
"commit.retry.num-retries", "5",
"read.split.target-size", "134217728");

// Create the table with properties
warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec, tableProperties);

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
rows.add(row);
}

Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

// Read back and check records are correct
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
// Assert that the table properties are set on the Iceberg table
assertEquals("orc", table.properties().get("write.format.default"));
assertEquals("5", table.properties().get("commit.retry.num-retries"));
assertEquals("134217728", table.properties().get("read.split.target-size"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,22 @@ public DataFile writeRecords(
}

public Table createTable(TableIdentifier tableId, Schema schema) {
return createTable(tableId, schema, null);
return createTable(tableId, schema, null, null);
}

public Table createTable(
TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) {
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec);
return catalog.createTable(tableId, schema, partitionSpec, null);
}

public Table createTable(
TableIdentifier tableId,
Schema schema,
@Nullable PartitionSpec partitionSpec,
@Nullable Map<String, String> tableProperties) {
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec, tableProperties);
}

public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,11 @@ public void testStreamingWrite() throws IOException {
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
config.put("triggering_frequency_seconds", 4);
config.put("partition_fields", Arrays.asList("bool_field", "modulo_5"));
// Add table properties for testing
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("write.format.default", "parquet");
tableProperties.put("commit.retry.num-retries", "3");
config.put("table_properties", tableProperties);

// create elements from longs in range [0, 1000)
PCollection<Row> input =
Expand All @@ -687,6 +692,8 @@ public void testStreamingWrite() throws IOException {
List<Record> returnedRecords = readRecords(table);
assertThat(
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
assertEquals("parquet", table.properties().get("write.format.default"));
assertEquals("3", table.properties().get("commit.retry.num-retries"));
}

@Test
Expand Down Expand Up @@ -971,4 +978,25 @@ public void runReadBetween(boolean useSnapshotBoundary, boolean streaming) throw
PAssert.that(rows).containsInAnyOrder(expectedRows);
pipeline.run().waitUntilFinish();
}

@Test
public void testWriteWithTableProperties() throws IOException {
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("write.format.default", "parquet");
tableProperties.put("commit.retry.num-retries", "3");
config.put("table_properties", tableProperties);
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
// Read back and check records are correct
List<Record> returnedRecords = readRecords(table);
assertThat(
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
// Assert that the table properties are set on the Iceberg table
assertEquals("parquet", table.properties().get("write.format.default"));
assertEquals("3", table.properties().get("commit.retry.num-retries"));
}
}
13 changes: 13 additions & 0 deletions website/www/site/content/en/documentation/io/managed-io.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ and Beam SQL is invoked via the Managed API under the hood.
keep (<code>list[<span style="color: green;">str</span>]</code>)<br>
only (<code style="color: green">str</code>)<br>
partition_fields (<code>list[<span style="color: green;">str</span>]</code>)<br>
table_properties (<code>map[<span style="color: green;">str</span>, <span style="color: green;">str</span>]</code>)<br>
triggering_frequency_seconds (<code style="color: #f54251">int32</code>)<br>
</td>
</tr>
Expand Down Expand Up @@ -420,6 +421,18 @@ and Beam SQL is invoked via the Managed API under the hood.
For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.
</td>
</tr>
<tr>
<td>
table_properties
</td>
<td>
<code>map[<span style="color: green;">str</span>, <span style="color: green;">str</span>]</code>
</td>
<td>
Iceberg table properties to be set on the table when it is created.
For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.
</td>
</tr>
<tr>
<td>
triggering_frequency_seconds
Expand Down
Loading