From 7ebc33c505deeb308ef973dc7fa0aaf48ca79399 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 22 Jan 2025 10:37:26 -0500 Subject: [PATCH] Fix incorrect nullness in FlinkJobInvoker and JobInvoker --- .../beam/runners/flink/FlinkJobInvoker.java | 17 ++++++++--------- .../beam/runners/jobsubmission/JobInvoker.java | 5 ++--- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index bbb3cc67ca4a..0d04f1aa06ac 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.sdk.util.construction.resources.PipelineResources.detectClassPathResourcesToStage; import java.util.UUID; @@ -31,14 +32,10 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Job Invoker for the {@link FlinkRunner}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); @@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo protected JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - @Nullable String retrievalToken, + String retrievalToken, ListeningExecutorService executorService) { // TODO: How to make Java/Python agree on names of keys and their values? @@ -74,20 +71,22 @@ protected JobInvocation invokeWithExecutor( PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class); + ClassLoader thisClassLoader = + checkStateNotNull( + FlinkJobInvoker.class.getClassLoader(), + "FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible"); + PortablePipelineRunner pipelineRunner; if (Strings.isNullOrEmpty(portableOptions.getOutputExecutablePath())) { pipelineRunner = new FlinkPipelineRunner( flinkOptions, serverConfig.getFlinkConfDir(), - detectClassPathResourcesToStage( - FlinkJobInvoker.class.getClassLoader(), flinkOptions)); + detectClassPathResourcesToStage(thisClassLoader, flinkOptions)); } else { pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class); } - flinkOptions.setRunner(null); - LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); return createJobInvocation( invocationId, retrievalToken, executorService, pipeline, flinkOptions, pipelineRunner); diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java index 24f82e752b63..98dc4f5443b5 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; @@ -39,11 +38,11 @@ public abstract class JobInvoker { protected abstract JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - @Nullable String retrievalToken, + String retrievalToken, ListeningExecutorService executorService) throws IOException; - JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, String retrievalToken) throws IOException { return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService); }