Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -56,7 +56,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.StructProjection;
Expand All @@ -33,7 +33,8 @@
* Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record
* will be emitted to same writer in order.
*/
class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
@Internal
public class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {

private final Schema schema;
private final RowType flinkSchema;
Expand All @@ -43,10 +44,11 @@ class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
private transient StructProjection structProjection;
private transient StructLikeWrapper structLikeWrapper;

EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
public EqualityFieldKeySelector(
Schema schema, RowType flinkSchema, Set<Integer> equalityFieldIds) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.deleteSchema = TypeUtil.select(schema, equalityFieldIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private DataStreamSink<Void> chainIcebergOperators() {
flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);

// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds =
Set<Integer> equalityFieldIds =
SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);

RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
Expand Down Expand Up @@ -510,7 +510,7 @@ private SingleOutputStreamOperator<Void> appendCommitter(
private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(
DataStream<RowData> input,
RowType flinkRowType,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
int writerParallelism) {
// Validate the equality fields and partition fields if we enable the upsert mode.
if (flinkWriteConf.upsertMode()) {
Expand Down Expand Up @@ -561,7 +561,7 @@ private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(

private DataStream<RowData> distributeDataStream(
DataStream<RowData> input,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
RowType flinkRowType,
int writerParallelism) {
DistributionMode writeMode = flinkWriteConf.distributionMode();
Expand Down Expand Up @@ -697,7 +697,7 @@ static IcebergStreamWriter<RowData> createStreamWriter(
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Set<Integer> equalityFieldIds) {
Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");

Table initTable = tableSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.annotation.Experimental;
Expand Down Expand Up @@ -151,7 +152,7 @@ public class IcebergSink
private final RowType flinkRowType;
private final SerializableSupplier<Table> tableSupplier;
private final transient FlinkWriteConf flinkWriteConf;
private final List<Integer> equalityFieldIds;
private final Set<Integer> equalityFieldIds;
private final boolean upsertMode;
private final FileFormat dataFileFormat;
private final long targetDataFileSize;
Expand All @@ -162,7 +163,7 @@ public class IcebergSink
private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;

private final Table table;
private final List<String> equalityFieldColumns = null;
private final Set<String> equalityFieldColumns = null;

private IcebergSink(
TableLoader tableLoader,
Expand All @@ -173,7 +174,7 @@ private IcebergSink(
RowType flinkRowType,
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
String branch,
boolean overwriteMode,
FlinkMaintenanceConfig flinkMaintenanceConfig) {
Expand Down Expand Up @@ -605,7 +606,7 @@ IcebergSink build() {
boolean overwriteMode = flinkWriteConf.overwriteMode();

// Validate the equality fields and partition fields if we enable the upsert mode.
List<Integer> equalityFieldIds =
Set<Integer> equalityFieldIds =
SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);

if (flinkWriteConf.upsertMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -31,15 +32,16 @@
* wrote by only one task. That will reduce lots of small files in partitioned fanout write policy
* for {@link FlinkSink}.
*/
class PartitionKeySelector implements KeySelector<RowData, String> {
@Internal
public class PartitionKeySelector implements KeySelector<RowData, String> {

private final Schema schema;
private final PartitionKey partitionKey;
private final RowType flinkSchema;

private transient RowDataWrapper rowDataWrapper;

PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) {
public PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) {
this.schema = schema;
this.partitionKey = new PartitionKey(spec, schema);
this.flinkSchema = flinkSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -49,7 +49,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert) {
super(
spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -48,7 +49,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final PartitionSpec spec;
private final long targetFileSizeBytes;
private final FileFormat format;
private final List<Integer> equalityFieldIds;
private final Set<Integer> equalityFieldIds;
private final boolean upsert;
private final FileAppenderFactory<RowData> appenderFactory;

Expand All @@ -60,7 +61,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Collection<Integer> equalityFieldIds,
boolean upsert) {
this(
() -> table,
Expand All @@ -78,7 +79,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Collection<Integer> equalityFieldIds,
boolean upsert) {
this(
tableSupplier,
Expand All @@ -98,7 +99,7 @@ public RowDataTaskWriterFactory(
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
Collection<Integer> equalityFieldIds,
boolean upsert,
Schema schema,
PartitionSpec spec) {
Expand All @@ -117,7 +118,7 @@ public RowDataTaskWriterFactory(
this.spec = spec;
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
this.equalityFieldIds = equalityFieldIds != null ? Sets.newHashSet(equalityFieldIds) : null;
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
Expand All @@ -137,7 +138,7 @@ public RowDataTaskWriterFactory(
flinkSchema,
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])),
TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
null);
} else {
Expand All @@ -148,7 +149,7 @@ public RowDataTaskWriterFactory(
flinkSchema,
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])),
schema,
null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,8 +41,8 @@ private SinkUtil() {}

private static final Logger LOG = LoggerFactory.getLogger(SinkUtil.class);

static List<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equalityFieldColumns) {
List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
static Set<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equalityFieldColumns) {
Set<Integer> equalityFieldIds = Sets.newHashSet(table.schema().identifierFieldIds());
if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) {
Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
for (String column : equalityFieldColumns) {
Expand All @@ -63,7 +62,7 @@ static List<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equal
equalityFieldSet,
table.schema().identifierFieldIds());
}
equalityFieldIds = Lists.newArrayList(equalityFieldSet);
equalityFieldIds = Sets.newHashSet(equalityFieldSet);
}
return equalityFieldIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand All @@ -41,7 +41,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds,
Set<Integer> equalityFieldIds,
boolean upsert) {
super(
spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.DistributionMode;
Expand All @@ -37,7 +37,7 @@ public class DynamicRecord {
private DistributionMode distributionMode;
private int writeParallelism;
private boolean upsertMode;
@Nullable private List<String> equalityFields;
@Nullable private Set<String> equalityFields;

public DynamicRecord(
TableIdentifier tableIdentifier,
Expand Down Expand Up @@ -120,11 +120,11 @@ public void setUpsertMode(boolean upsertMode) {
this.upsertMode = upsertMode;
}

public List<String> equalityFields() {
public Set<String> equalityFields() {
return equalityFields;
}

public void setEqualityFields(List<String> equalityFields) {
public void setEqualityFields(Set<String> equalityFields) {
this.equalityFields = equalityFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
Expand All @@ -37,7 +37,7 @@ class DynamicRecordInternal {
private int writerKey;
private RowData rowData;
private boolean upsertMode;
private List<Integer> equalityFieldIds;
private Set<Integer> equalityFieldIds;

// Required for serialization instantiation
DynamicRecordInternal() {}
Expand All @@ -50,7 +50,7 @@ class DynamicRecordInternal {
PartitionSpec spec,
int writerKey,
boolean upsertMode,
List<Integer> equalityFieldsIds) {
Set<Integer> equalityFieldsIds) {
this.tableName = tableName;
this.branch = branch;
this.schema = schema;
Expand Down Expand Up @@ -117,11 +117,11 @@ public void setUpsertMode(boolean upsertMode) {
this.upsertMode = upsertMode;
}

public List<Integer> equalityFields() {
public Set<Integer> equalityFields() {
return equalityFieldIds;
}

public void setEqualityFieldIds(List<Integer> equalityFieldIds) {
public void setEqualityFieldIds(Set<Integer> equalityFieldIds) {
this.equalityFieldIds = equalityFieldIds;
}

Expand Down
Loading