From 4e8acc0834732cc148ca2cb4071ebf21c2c6e030 Mon Sep 17 00:00:00 2001 From: jto Date: Fri, 11 Aug 2023 16:22:06 +0200 Subject: [PATCH 1/4] Flink input split size change --- .../wrappers/SourceInputFormat.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 7e1835eac721..439a3db90c66 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -40,8 +41,8 @@ /** Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. */ @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class SourceInputFormat extends RichInputFormat, SourceInputSplit> { private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); @@ -108,12 +109,26 @@ public float getAverageRecordWidth() { return null; } + private long getDesiredSizeBytes(int numSplits) throws Exception { + long totalSize = initialSource.getEstimatedSizeBytes(options); + long defaultSplitSize = totalSize / numSplits; + if(initialSource instanceof FileBasedSource) { + // Most of the time parallelism is < number of files in source. + // Each file becomes a unique split which commonly create skew. + // This limits the size of splits to 128Mb to reduce skew. + return Math.min(defaultSplitSize, 128 * 1024 * 1024); + } else { + return defaultSplitSize; + } + } + @Override @SuppressWarnings("unchecked") public SourceInputSplit[] createInputSplits(int numSplits) throws IOException { try { - long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; + long desiredSizeBytes = getDesiredSizeBytes(numSplits); List> shards = initialSource.split(desiredSizeBytes, options); + int numShards = shards.size(); SourceInputSplit[] sourceInputSplits = new SourceInputSplit[numShards]; for (int i = 0; i < numShards; i++) { From 40d20937065c905c115b03ce9d41dcb89f5439f6 Mon Sep 17 00:00:00 2001 From: jto Date: Thu, 17 Aug 2023 10:45:12 +0200 Subject: [PATCH 2/4] Add Flink split size option --- .../beam/runners/flink/FlinkPipelineOptions.java | 6 ++++++ .../flink/translation/wrappers/SourceInputFormat.java | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 2f32ab2c2ea6..f12584b16df8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -313,6 +313,12 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); + @Description("Set the maximum size of input split when data is read from a filesystem.") + @Default.Long(128 * 1024 * 1024) + Long getFileInputSplitMaxSizeBytes(); + + void setFileInputSplitMaxSizeBytes(Long inputSplitMaxSizeBytes); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 439a3db90c66..44da03f8c193 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.sdk.io.BoundedSource; @@ -41,8 +42,8 @@ /** Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. */ @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class SourceInputFormat extends RichInputFormat, SourceInputSplit> { private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); @@ -112,11 +113,12 @@ public float getAverageRecordWidth() { private long getDesiredSizeBytes(int numSplits) throws Exception { long totalSize = initialSource.getEstimatedSizeBytes(options); long defaultSplitSize = totalSize / numSplits; - if(initialSource instanceof FileBasedSource) { + if (initialSource instanceof FileBasedSource) { + long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeBytes(); // Most of the time parallelism is < number of files in source. // Each file becomes a unique split which commonly create skew. // This limits the size of splits to 128Mb to reduce skew. - return Math.min(defaultSplitSize, 128 * 1024 * 1024); + return Math.min(defaultSplitSize, maxSplitSize); } else { return defaultSplitSize; } From 9579fe03970bc267004f107340afbbf3820949d2 Mon Sep 17 00:00:00 2001 From: jto Date: Mon, 28 Aug 2023 16:44:10 +0200 Subject: [PATCH 3/4] [Flink] Set default max input split size to -1 (disabled) --- .../apache/beam/runners/flink/FlinkPipelineOptions.java | 8 ++++---- .../flink/translation/wrappers/SourceInputFormat.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index f12584b16df8..8d007072b250 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -313,11 +313,11 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); - @Description("Set the maximum size of input split when data is read from a filesystem.") - @Default.Long(128 * 1024 * 1024) - Long getFileInputSplitMaxSizeBytes(); + @Description("Set the maximum size of input split when data is read from a filesystem. -1 implies no max size.") + @Default.Long(-1) + Long getFileInputSplitMaxSizeMB(); - void setFileInputSplitMaxSizeBytes(Long inputSplitMaxSizeBytes); + void setFileInputSplitMaxSizeMB(Long inputSplitMaxSizeMB); static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 44da03f8c193..c9502228901e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -113,12 +113,12 @@ public float getAverageRecordWidth() { private long getDesiredSizeBytes(int numSplits) throws Exception { long totalSize = initialSource.getEstimatedSizeBytes(options); long defaultSplitSize = totalSize / numSplits; - if (initialSource instanceof FileBasedSource) { - long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeBytes(); + long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + if (initialSource instanceof FileBasedSource && maxSplitSize > 0) { // Most of the time parallelism is < number of files in source. // Each file becomes a unique split which commonly create skew. - // This limits the size of splits to 128Mb to reduce skew. - return Math.min(defaultSplitSize, maxSplitSize); + // This limits the size of splits to reduce skew. + return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024); } else { return defaultSplitSize; } From 19f9423c2bf6ca42133bb650fd4c247331a8774f Mon Sep 17 00:00:00 2001 From: jto Date: Mon, 28 Aug 2023 18:41:48 +0200 Subject: [PATCH 4/4] Fix NPE in tests --- .../apache/beam/runners/flink/FlinkPipelineOptions.java | 7 ++++--- .../flink/translation/wrappers/SourceInputFormat.java | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 8d007072b250..1e01514fe8b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -313,11 +313,12 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); - @Description("Set the maximum size of input split when data is read from a filesystem. -1 implies no max size.") - @Default.Long(-1) + @Description( + "Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.") + @Default.Long(0) Long getFileInputSplitMaxSizeMB(); - void setFileInputSplitMaxSizeMB(Long inputSplitMaxSizeMB); + void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index c9502228901e..a1b8bced7a1d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -113,7 +113,10 @@ public float getAverageRecordWidth() { private long getDesiredSizeBytes(int numSplits) throws Exception { long totalSize = initialSource.getEstimatedSizeBytes(options); long defaultSplitSize = totalSize / numSplits; - long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + long maxSplitSize = 0; + if (options != null) { + maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + } if (initialSource instanceof FileBasedSource && maxSplitSize > 0) { // Most of the time parallelism is < number of files in source. // Each file becomes a unique split which commonly create skew.