Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -56,7 +56,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.StructProjection;
Expand All @@ -33,7 +33,8 @@
* Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record
* will be emitted to same writer in order.
*/
class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
@Internal
public class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {

private final Schema schema;
private final RowType flinkSchema;
Expand All @@ -43,10 +44,11 @@ class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
private transient StructProjection structProjection;
private transient StructLikeWrapper structLikeWrapper;

EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
public EqualityFieldKeySelector(
Schema schema, RowType flinkSchema, Set<Integer> equalityFieldIds) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.deleteSchema = TypeUtil.select(schema, equalityFieldIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private DataStreamSink<Void> chainIcebergOperators() {
flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);

// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds =
Set<Integer> equalityFieldIds =
SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);

RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
Expand Down Expand Up @@ -524,7 +524,7 @@ private SingleOutputStreamOperator<Void> appendCommitter(
private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(
DataStream<RowData> input,
RowType flinkRowType,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
int writerParallelism) {
// Validate the equality fields and partition fields if we enable the upsert mode.
if (flinkWriteConf.upsertMode()) {
Expand Down Expand Up @@ -575,7 +575,7 @@ private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(

private DataStream<RowData> distributeDataStream(
DataStream<RowData> input,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
RowType flinkRowType,
int writerParallelism) {
DistributionMode writeMode = flinkWriteConf.distributionMode();
Expand Down Expand Up @@ -711,7 +711,7 @@ static IcebergStreamWriter<RowData> createStreamWriter(
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Set<Integer> equalityFieldIds) {
Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");

Table initTable = tableSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.annotation.Experimental;
Expand Down Expand Up @@ -144,7 +145,7 @@ public class IcebergSink
private final RowType flinkRowType;
private final SerializableSupplier<Table> tableSupplier;
private final transient FlinkWriteConf flinkWriteConf;
private final List<Integer> equalityFieldIds;
private final Set<Integer> equalityFieldIds;
private final boolean upsertMode;
private final FileFormat dataFileFormat;
private final long targetDataFileSize;
Expand All @@ -153,7 +154,7 @@ public class IcebergSink
private final int workerPoolSize;

private final Table table;
private final List<String> equalityFieldColumns = null;
private final Set<String> equalityFieldColumns = null;

private IcebergSink(
TableLoader tableLoader,
Expand All @@ -164,7 +165,7 @@ private IcebergSink(
RowType flinkRowType,
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
String branch,
boolean overwriteMode) {
this.tableLoader = tableLoader;
Expand Down Expand Up @@ -561,7 +562,7 @@ IcebergSink build() {
boolean overwriteMode = flinkWriteConf.overwriteMode();

// Validate the equality fields and partition fields if we enable the upsert mode.
List<Integer> equalityFieldIds =
Set<Integer> equalityFieldIds =
SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);

if (flinkWriteConf.upsertMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -31,15 +32,16 @@
* wrote by only one task. That will reduce lots of small files in partitioned fanout write policy
* for {@link FlinkSink}.
*/
class PartitionKeySelector implements KeySelector<RowData, String> {
@Internal
public class PartitionKeySelector implements KeySelector<RowData, String> {

private final Schema schema;
private final PartitionKey partitionKey;
private final RowType flinkSchema;

private transient RowDataWrapper rowDataWrapper;

PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) {
public PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) {
this.schema = schema;
this.partitionKey = new PartitionKey(spec, schema);
this.flinkSchema = flinkSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -49,7 +49,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert) {
super(
spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -48,7 +49,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final PartitionSpec spec;
private final long targetFileSizeBytes;
private final FileFormat format;
private final List<Integer> equalityFieldIds;
private final Set<Integer> equalityFieldIds;
private final boolean upsert;
private final FileAppenderFactory<RowData> appenderFactory;

Expand All @@ -60,7 +61,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Collection<Integer> equalityFieldIds,
boolean upsert) {
this(
() -> table,
Expand All @@ -78,7 +79,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Collection<Integer> equalityFieldIds,
boolean upsert) {
this(
tableSupplier,
Expand All @@ -98,7 +99,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Collection<Integer> equalityFieldIds,
boolean upsert,
Schema schema,
PartitionSpec spec) {
Expand All @@ -117,7 +118,7 @@ public RowDataTaskWriterFactory(
this.spec = spec;
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
this.equalityFieldIds = equalityFieldIds != null ? Sets.newHashSet(equalityFieldIds) : null;
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
Expand All @@ -137,7 +138,7 @@ public RowDataTaskWriterFactory(
flinkSchema,
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])),
TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
null);
} else {
Expand All @@ -148,7 +149,7 @@ public RowDataTaskWriterFactory(
flinkSchema,
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])),
schema,
null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,8 +41,8 @@ private SinkUtil() {}

private static final Logger LOG = LoggerFactory.getLogger(SinkUtil.class);

static List<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equalityFieldColumns) {
List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
static Set<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equalityFieldColumns) {
Set<Integer> equalityFieldIds = Sets.newHashSet(table.schema().identifierFieldIds());
if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) {
Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
for (String column : equalityFieldColumns) {
Expand All @@ -63,7 +62,7 @@ static List<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equal
equalityFieldSet,
table.schema().identifierFieldIds());
}
equalityFieldIds = Lists.newArrayList(equalityFieldSet);
equalityFieldIds = Sets.newHashSet(equalityFieldSet);
}
return equalityFieldIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand All @@ -41,7 +41,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert) {
super(
spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.DistributionMode;
Expand All @@ -37,7 +37,7 @@ public class DynamicRecord {
private DistributionMode distributionMode;
private int writeParallelism;
private boolean upsertMode;
@Nullable private List<String> equalityFields;
@Nullable private Set<String> equalityFields;

public DynamicRecord(
TableIdentifier tableIdentifier,
Expand Down Expand Up @@ -120,11 +120,11 @@ public void setUpsertMode(boolean upsertMode) {
this.upsertMode = upsertMode;
}

public List<String> equalityFields() {
public Set<String> equalityFields() {
return equalityFields;
}

public void setEqualityFields(List<String> equalityFields) {
public void setEqualityFields(Set<String> equalityFields) {
this.equalityFields = equalityFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
Expand All @@ -37,7 +37,7 @@ class DynamicRecordInternal {
private int writerKey;
private RowData rowData;
private boolean upsertMode;
private List<Integer> equalityFieldIds;
private Set<Integer> equalityFieldIds;

// Required for serialization instantiation
DynamicRecordInternal() {}
Expand All @@ -50,7 +50,7 @@ class DynamicRecordInternal {
PartitionSpec spec,
int writerKey,
boolean upsertMode,
List<Integer> equalityFieldsIds) {
Set<Integer> equalityFieldsIds) {
this.tableName = tableName;
this.branch = branch;
this.schema = schema;
Expand Down Expand Up @@ -117,11 +117,11 @@ public void setUpsertMode(boolean upsertMode) {
this.upsertMode = upsertMode;
}

public List<Integer> equalityFields() {
public Set<Integer> equalityFields() {
return equalityFieldIds;
}

public void setEqualityFieldIds(List<Integer> equalityFieldIds) {
public void setEqualityFieldIds(Set<Integer> equalityFieldIds) {
this.equalityFieldIds = equalityFieldIds;
}

Expand Down
Loading