From 0beee3ff5457106a2f1d39527cd85ff5d142eb11 Mon Sep 17 00:00:00 2001 From: gong Date: Mon, 27 Mar 2023 17:30:24 +0800 Subject: [PATCH 1/2] Flink: resolve writers object of PartitionedDeltaWriter will cause OOM when partition number is big --- .../iceberg/flink/sink/PartitionedDeltaWriter.java | 13 +++++++++++-- .../iceberg/flink/sink/PartitionedDeltaWriter.java | 13 +++++++++++-- .../iceberg/flink/sink/PartitionedDeltaWriter.java | 13 +++++++++++-- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..27dd56d1f619 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -31,14 +32,22 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final PartitionKey partitionKey; - private final Map writers = Maps.newHashMap(); + private final int capacity = 10; + + private final Map writers = new LinkedHashMap( + (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..27dd56d1f619 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -31,14 +32,22 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final PartitionKey partitionKey; - private final Map writers = Maps.newHashMap(); + private final int capacity = 10; + + private final Map writers = new LinkedHashMap( + (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..27dd56d1f619 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -31,14 +32,22 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final PartitionKey partitionKey; - private final Map writers = Maps.newHashMap(); + private final int capacity = 10; + + private final Map writers = new LinkedHashMap( + (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, From 75ddd171933e76d78523bc25f1806305735e834f Mon Sep 17 00:00:00 2001 From: gong Date: Mon, 27 Mar 2023 17:37:08 +0800 Subject: [PATCH 2/2] Fix code style --- .../iceberg/flink/sink/PartitionedDeltaWriter.java | 13 +++++++------ .../iceberg/flink/sink/PartitionedDeltaWriter.java | 13 +++++++------ .../iceberg/flink/sink/PartitionedDeltaWriter.java | 13 +++++++------ 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 27dd56d1f619..58f5a5633e5a 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -40,14 +40,15 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final int capacity = 10; - private final Map writers = new LinkedHashMap( + private final Map writers = + new LinkedHashMap( (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > capacity; - } - }; + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 27dd56d1f619..58f5a5633e5a 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -40,14 +40,15 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final int capacity = 10; - private final Map writers = new LinkedHashMap( + private final Map writers = + new LinkedHashMap( (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > capacity; - } - }; + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec, diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 27dd56d1f619..58f5a5633e5a 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -40,14 +40,15 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { private final int capacity = 10; - private final Map writers = new LinkedHashMap( + private final Map writers = + new LinkedHashMap( (int) Math.ceil(capacity / 0.75f) + 1, 0.75f, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > capacity; - } - }; + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; PartitionedDeltaWriter( PartitionSpec spec,