From 87a5eabe411ca51269fd1b64272ca0a7de1cbf13 Mon Sep 17 00:00:00 2001 From: minxhe Date: Wed, 15 Jan 2025 00:02:18 -0800 Subject: [PATCH 1/4] fix reshuffle translation --- .../runners/flink/FlinkStreamingTransformTranslators.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d1224400dfcd..f68a0a2814f2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -110,6 +110,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -954,7 +955,11 @@ public void translateNode( DataStream>> inputDataSet = context.getInputDataStream(context.getInput(transform)); - context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); + DataStream>> outputDataset = + inputDataSet.partitionCustom((Partitioner) (key, numPartitions) -> + Math.abs(key.hashCode()) % numPartitions, input -> input.getValue().getKey()); + + context.setOutputDataStream(context.getOutput(transform), outputDataset); } } From 2833e734949d200a5d695a2f48f511ac762e878f Mon Sep 17 00:00:00 2001 From: minxhe Date: Wed, 15 Jan 2025 00:08:35 -0800 Subject: [PATCH 2/4] update version --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 2aa7862baf26..e72202c20511 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.34' + project.version = '2.45.35' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 32eac24ad084..c6bfc0f0b0e8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.34 -sdk_version=2.45.34 +version=2.45.35 +sdk_version=2.45.35 javaVersion=1.8 From ba80216c25dc2cc3d3816a607296f74784aa2560 Mon Sep 17 00:00:00 2001 From: minxhe Date: Wed, 15 Jan 2025 00:29:50 -0800 Subject: [PATCH 3/4] fix spotless --- .../runners/flink/FlinkStreamingTransformTranslators.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index f68a0a2814f2..cfed5301a996 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -956,8 +956,9 @@ public void translateNode( context.getInputDataStream(context.getInput(transform)); DataStream>> outputDataset = - inputDataSet.partitionCustom((Partitioner) (key, numPartitions) -> - Math.abs(key.hashCode()) % numPartitions, input -> input.getValue().getKey()); + inputDataSet.partitionCustom( + (Partitioner) (key, numPartitions) -> Math.abs(key.hashCode()) % numPartitions, + input -> input.getValue().getKey()); context.setOutputDataStream(context.getOutput(transform), outputDataset); } From 3da0db5ba89236446c630ca8fb7c3e01d18a5c4d Mon Sep 17 00:00:00 2001 From: minxhe Date: Wed, 15 Jan 2025 01:28:32 -0800 Subject: [PATCH 4/4] fix spotbug --- .../beam/runners/flink/FlinkStreamingTransformTranslators.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index cfed5301a996..5deada1da4b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -957,7 +957,8 @@ public void translateNode( DataStream>> outputDataset = inputDataSet.partitionCustom( - (Partitioner) (key, numPartitions) -> Math.abs(key.hashCode()) % numPartitions, + (Partitioner) + (key, numPartitions) -> (key.hashCode() & Integer.MAX_VALUE) % numPartitions, input -> input.getValue().getKey()); context.setOutputDataStream(context.getOutput(transform), outputDataset);