diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 2aa7862baf26..e72202c20511 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.34' + project.version = '2.45.35' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 32eac24ad084..c6bfc0f0b0e8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.34 -sdk_version=2.45.34 +version=2.45.35 +sdk_version=2.45.35 javaVersion=1.8 diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d1224400dfcd..5deada1da4b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -110,6 +110,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -954,7 +955,13 @@ public void translateNode( DataStream>> inputDataSet = context.getInputDataStream(context.getInput(transform)); - context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); + DataStream>> outputDataset = + inputDataSet.partitionCustom( + (Partitioner) + (key, numPartitions) -> (key.hashCode() & Integer.MAX_VALUE) % numPartitions, + input -> input.getValue().getKey()); + + context.setOutputDataStream(context.getOutput(transform), outputDataset); } }