Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -162,7 +163,7 @@ public class IcebergSink
private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;

private final Table table;
private final Set<String> equalityFieldColumns = null;
private final Set<String> equalityFieldColumns;

private IcebergSink(
TableLoader tableLoader,
Expand All @@ -176,7 +177,8 @@ private IcebergSink(
Set<Integer> equalityFieldIds,
String branch,
boolean overwriteMode,
FlinkMaintenanceConfig flinkMaintenanceConfig) {
FlinkMaintenanceConfig flinkMaintenanceConfig,
Set<String> equalityFieldColumns) {
this.tableLoader = tableLoader;
this.snapshotProperties = snapshotProperties;
this.uidSuffix = uidSuffix;
Expand All @@ -198,6 +200,7 @@ private IcebergSink(
this.sinkId = UUID.randomUUID().toString();
this.compactMode = flinkWriteConf.compactMode();
this.flinkMaintenanceConfig = flinkMaintenanceConfig;
this.equalityFieldColumns = equalityFieldColumns;
}

@Override
Expand Down Expand Up @@ -666,6 +669,9 @@ IcebergSink build() {
FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);

Set<String> equalityFieldColumnsSet =
equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns) : null;

return new IcebergSink(
tableLoader,
table,
Expand All @@ -680,7 +686,8 @@ IcebergSink build() {
equalityFieldIds,
flinkWriteConf.branch(),
overwriteMode,
flinkMaintenanceConfig);
flinkMaintenanceConfig,
equalityFieldColumnsSet);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,39 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
}
}

@TestTemplate
public void testHashDistributionWithPartitionNotInEqualityFields() {
assumeThat(partitioned).isTrue();

List<Row> rows = createRows("");
DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);

if (isTableSchema) {
IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_TABLE_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(writeParallelism)
.distributionMode(DistributionMode.HASH)
.upsert(false)
.equalityFieldColumns(ImmutableList.of("id"))
.append();
} else {
IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(writeParallelism)
.distributionMode(DistributionMode.HASH)
.upsert(false)
.equalityFieldColumns(ImmutableList.of("id"))
.append();
}

assertThatThrownBy(env::execute)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"In 'hash' distribution mode with equality fields set, source column 'data' of partition field '1000: data: identity(2)' should be included in equality fields: '[id]'");
}

private BoundedTestSource<Row> createRangeDistributionBoundedSource(
List<List<Row>> rowsPerCheckpoint) {
return new BoundedTestSource<>(rowsPerCheckpoint);
Expand Down