diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..58f5a5633e5a 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -31,14 +32,23 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final PartitionKey partitionKey; - private final Map writers = Maps.newHashMap(); + private final int capacity = 10; + + private final Map writers = + new LinkedHashMap( + (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..58f5a5633e5a 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -31,14 +32,23 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final PartitionKey partitionKey; - private final Map writers = Maps.newHashMap(); + private final int capacity = 10; + + private final Map writers = + new LinkedHashMap( + (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..58f5a5633e5a 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -31,14 +32,23 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final PartitionKey partitionKey; - private final Map writers = Maps.newHashMap(); + private final int capacity = 10; + + private final Map writers = + new LinkedHashMap( + (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec,