Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.sdk.util.construction.resources.PipelineResources.detectClassPathResourcesToStage;

import java.util.UUID;
Expand All @@ -31,14 +32,10 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Job Invoker for the {@link FlinkRunner}. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkJobInvoker extends JobInvoker {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);

Expand All @@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo
protected JobInvocation invokeWithExecutor(
RunnerApi.Pipeline pipeline,
Struct options,
@Nullable String retrievalToken,
String retrievalToken,
ListeningExecutorService executorService) {

// TODO: How to make Java/Python agree on names of keys and their values?
Expand All @@ -74,20 +71,22 @@ protected JobInvocation invokeWithExecutor(

PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class);

ClassLoader thisClassLoader =
checkStateNotNull(
FlinkJobInvoker.class.getClassLoader(),
"FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible");

PortablePipelineRunner pipelineRunner;
if (Strings.isNullOrEmpty(portableOptions.getOutputExecutablePath())) {
pipelineRunner =
new FlinkPipelineRunner(
flinkOptions,
serverConfig.getFlinkConfDir(),
detectClassPathResourcesToStage(
FlinkJobInvoker.class.getClassLoader(), flinkOptions));
detectClassPathResourcesToStage(thisClassLoader, flinkOptions));
} else {
pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class);
}

flinkOptions.setRunner(null);

LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner);
return createJobInvocation(
invocationId, retrievalToken, executorService, pipeline, flinkOptions, pipelineRunner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService;
Expand All @@ -39,11 +38,11 @@ public abstract class JobInvoker {
protected abstract JobInvocation invokeWithExecutor(
RunnerApi.Pipeline pipeline,
Struct options,
@Nullable String retrievalToken,
String retrievalToken,
ListeningExecutorService executorService)
throws IOException;

JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, String retrievalToken)
throws IOException {
return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService);
}
Expand Down
Loading