diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 35b32385fee9..6d113f40b5ea 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.sdk.util.construction.resources.PipelineResources.detectClassPathResourcesToStage; import java.util.UUID; @@ -32,10 +31,14 @@ import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Job Invoker for the {@link FlinkRunner}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) public class FlinkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); @@ -54,7 +57,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo protected JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - String retrievalToken, + @Nullable String retrievalToken, ListeningExecutorService executorService) { // TODO: How to make Java/Python agree on names of keys and their values? @@ -71,22 +74,20 @@ protected JobInvocation invokeWithExecutor( PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class); - ClassLoader thisClassLoader = - checkStateNotNull( - FlinkJobInvoker.class.getClassLoader(), - "FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible"); - PortablePipelineRunner pipelineRunner; if (Strings.isNullOrEmpty(portableOptions.getOutputExecutablePath())) { pipelineRunner = new FlinkPipelineRunner( flinkOptions, serverConfig.getFlinkConfDir(), - detectClassPathResourcesToStage(thisClassLoader, flinkOptions)); + detectClassPathResourcesToStage( + FlinkJobInvoker.class.getClassLoader(), flinkOptions)); } else { pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class); } + flinkOptions.setRunner(null); + LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); return createJobInvocation( invocationId, retrievalToken, executorService, pipeline, flinkOptions, pipelineRunner); diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java index 6a5a051705e1..815a39b16113 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; @@ -38,11 +39,11 @@ public abstract class JobInvoker { protected abstract JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - String retrievalToken, + @Nullable String retrievalToken, ListeningExecutorService executorService) throws IOException; - JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, String retrievalToken) + JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) throws IOException { return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService); }