diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 593230262fde..2dfb8c467ca1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -89,6 +89,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; @@ -162,7 +163,7 @@ public class IcebergSink private final transient FlinkMaintenanceConfig flinkMaintenanceConfig; private final Table table; - private final Set equalityFieldColumns = null; + private final Set equalityFieldColumns; private IcebergSink( TableLoader tableLoader, @@ -176,7 +177,8 @@ private IcebergSink( Set equalityFieldIds, String branch, boolean overwriteMode, - FlinkMaintenanceConfig flinkMaintenanceConfig) { + FlinkMaintenanceConfig flinkMaintenanceConfig, + Set equalityFieldColumns) { this.tableLoader = tableLoader; this.snapshotProperties = snapshotProperties; this.uidSuffix = uidSuffix; @@ -198,6 +200,7 @@ private IcebergSink( this.sinkId = UUID.randomUUID().toString(); this.compactMode = flinkWriteConf.compactMode(); this.flinkMaintenanceConfig = flinkMaintenanceConfig; + this.equalityFieldColumns = equalityFieldColumns; } @Override @@ -666,6 +669,9 @@ IcebergSink build() { FlinkMaintenanceConfig flinkMaintenanceConfig = new FlinkMaintenanceConfig(table, writeOptions, readableConfig); + Set equalityFieldColumnsSet = + equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns) : null; + return new IcebergSink( tableLoader, table, @@ -680,7 +686,8 @@ IcebergSink build() { equalityFieldIds, flinkWriteConf.branch(), overwriteMode, - flinkMaintenanceConfig); + flinkMaintenanceConfig, + equalityFieldColumnsSet); } /** diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java index 0feb4cc282d2..89f2c7b0daad 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2DistributionMode.java @@ -562,6 +562,39 @@ public void testRangeDistributionStatisticsMigration() throws Exception { } } + @TestTemplate + public void testHashDistributionWithPartitionNotInEqualityFields() { + assumeThat(partitioned).isTrue(); + + List rows = createRows(""); + DataStream dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO); + + if (isTableSchema) { + IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_TABLE_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(writeParallelism) + .distributionMode(DistributionMode.HASH) + .upsert(false) + .equalityFieldColumns(ImmutableList.of("id")) + .append(); + } else { + IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(writeParallelism) + .distributionMode(DistributionMode.HASH) + .upsert(false) + .equalityFieldColumns(ImmutableList.of("id")) + .append(); + } + + assertThatThrownBy(env::execute) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "In 'hash' distribution mode with equality fields set, source column 'data' of partition field '1000: data: identity(2)' should be included in equality fields: '[id]'"); + } + private BoundedTestSource createRangeDistributionBoundedSource( List> rowsPerCheckpoint) { return new BoundedTestSource<>(rowsPerCheckpoint);