From 60fee6ccf17eafd6e4e5750f18336cfbb4f3794c Mon Sep 17 00:00:00 2001 From: Akarys Shorabek Date: Mon, 14 Apr 2025 19:46:35 +0500 Subject: [PATCH 1/2] Revert PR #33713 --- .../beam/runners/flink/FlinkJobInvoker.java | 16 ++++++++-------- .../beam/runners/jobsubmission/JobInvoker.java | 5 +++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 35b32385fee9..180687c674c4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.sdk.util.construction.resources.PipelineResources.detectClassPathResourcesToStage; import java.util.UUID; @@ -32,10 +31,14 @@ import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Job Invoker for the {@link FlinkRunner}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) public class FlinkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); @@ -54,7 +57,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo protected JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - String retrievalToken, + @Nullable String retrievalToken, ListeningExecutorService executorService) { // TODO: How to make Java/Python agree on names of keys and their values? @@ -71,22 +74,19 @@ protected JobInvocation invokeWithExecutor( PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class); - ClassLoader thisClassLoader = - checkStateNotNull( - FlinkJobInvoker.class.getClassLoader(), - "FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible"); - PortablePipelineRunner pipelineRunner; if (Strings.isNullOrEmpty(portableOptions.getOutputExecutablePath())) { pipelineRunner = new FlinkPipelineRunner( flinkOptions, serverConfig.getFlinkConfDir(), - detectClassPathResourcesToStage(thisClassLoader, flinkOptions)); + detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader(), flinkOptions)); } else { pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class); } + flinkOptions.setRunner(null); + LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); return createJobInvocation( invocationId, retrievalToken, executorService, pipeline, flinkOptions, pipelineRunner); diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java index 6a5a051705e1..815a39b16113 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; @@ -38,11 +39,11 @@ public abstract class JobInvoker { protected abstract JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - String retrievalToken, + @Nullable String retrievalToken, ListeningExecutorService executorService) throws IOException; - JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, String retrievalToken) + JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) throws IOException { return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService); } From 8308c6ba2cfbc7084907a60978a5af3283f387f4 Mon Sep 17 00:00:00 2001 From: Akarys Shorabek Date: Mon, 14 Apr 2025 20:03:56 +0500 Subject: [PATCH 2/2] Run SpotlessJavaApply --- .../java/org/apache/beam/runners/flink/FlinkJobInvoker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 180687c674c4..6d113f40b5ea 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -37,7 +37,7 @@ /** Job Invoker for the {@link FlinkRunner}. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class FlinkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); @@ -80,7 +80,8 @@ protected JobInvocation invokeWithExecutor( new FlinkPipelineRunner( flinkOptions, serverConfig.getFlinkConfDir(), - detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader(), flinkOptions)); + detectClassPathResourcesToStage( + FlinkJobInvoker.class.getClassLoader(), flinkOptions)); } else { pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class); }