diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index b6e040ca3c54..269ebde66916 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -45,7 +45,7 @@ public static FlinkJobInvoker create(FlinkJobServerDriver.FlinkServerConfigurati
private final FlinkJobServerDriver.FlinkServerConfiguration serverConfig;
- private FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
+ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
super("flink-runner-job-invoker");
this.serverConfig = serverConfig;
}
@@ -90,7 +90,7 @@ protected JobInvocation invokeWithExecutor(
invocationId, retrievalToken, executorService, pipeline, flinkOptions, pipelineRunner);
}
- static JobInvocation createJobInvocation(
+ protected JobInvocation createJobInvocation(
String invocationId,
String retrievalToken,
ListeningExecutorService executorService,
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 0c283d864373..f89a28097b4e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -19,7 +19,6 @@
import javax.annotation.Nullable;
import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystems;
@@ -66,7 +65,7 @@ public static void main(String[] args) throws Exception {
options.as(GcsOptions.class).setGcsUploadBufferSizeBytes(1024 * 1024);
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
- fromParams(args).run();
+ fromConfig(parseArgs(args)).run();
}
private static void printUsage(CmdLineParser parser) {
@@ -76,7 +75,7 @@ private static void printUsage(CmdLineParser parser) {
System.err.println();
}
- public static FlinkJobServerDriver fromParams(String[] args) {
+ public static FlinkServerConfiguration parseArgs(String[] args) {
FlinkServerConfiguration configuration = new FlinkServerConfiguration();
CmdLineParser parser = new CmdLineParser(configuration);
try {
@@ -86,33 +85,40 @@ public static FlinkJobServerDriver fromParams(String[] args) {
printUsage(parser);
throw new IllegalArgumentException("Unable to parse command line arguments.", e);
}
-
- return fromConfig(configuration);
+ return configuration;
}
public static FlinkJobServerDriver fromConfig(FlinkServerConfiguration configuration) {
return create(
configuration,
createJobServerFactory(configuration),
- createArtifactServerFactory(configuration));
+ createArtifactServerFactory(configuration),
+ () -> FlinkJobInvoker.create(configuration));
+ }
+
+ public static FlinkJobServerDriver fromConfig(
+ FlinkServerConfiguration configuration, JobInvokerFactory jobInvokerFactory) {
+ return create(
+ configuration,
+ createJobServerFactory(configuration),
+ createArtifactServerFactory(configuration),
+ jobInvokerFactory);
}
- public static FlinkJobServerDriver create(
+ private static FlinkJobServerDriver create(
FlinkServerConfiguration configuration,
ServerFactory jobServerFactory,
- ServerFactory artifactServerFactory) {
- return new FlinkJobServerDriver(configuration, jobServerFactory, artifactServerFactory);
+ ServerFactory artifactServerFactory,
+ JobInvokerFactory jobInvokerFactory) {
+ return new FlinkJobServerDriver(
+ configuration, jobServerFactory, artifactServerFactory, jobInvokerFactory);
}
private FlinkJobServerDriver(
FlinkServerConfiguration configuration,
ServerFactory jobServerFactory,
- ServerFactory artifactServerFactory) {
- super(configuration, jobServerFactory, artifactServerFactory);
- }
-
- @Override
- protected JobInvoker createJobInvoker() {
- return FlinkJobInvoker.create((FlinkServerConfiguration) configuration);
+ ServerFactory artifactServerFactory,
+ JobInvokerFactory jobInvokerFactory) {
+ super(configuration, jobServerFactory, artifactServerFactory, jobInvokerFactory);
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
new file mode 100644
index 000000000000..04be5fea2e03
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.environment.ProcessManager;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.flink.api.common.time.Deadline;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink job entry point to launch a Beam pipeline by executing an external SDK driver program.
+ *
+ *
Designed for non-interactive Flink REST client and container with Beam job server jar and SDK
+ * client (for example when using the FlinkK8sOperator). In the future it would be possible to
+ * support driver program execution in a separate (sidecar) container by introducing a client
+ * environment abstraction similar to how it exists for SDK workers.
+ *
+ *
Using this entry point eliminates the need to build jar files with materialized pipeline
+ * protos offline. Allows the driver program to access actual execution environment and services, on
+ * par with code executed by SDK workers.
+ *
+ *
The entry point starts the job server and provides the endpoint to the the driver program.
+ *
+ *
The external driver program constructs the Beam pipeline and submits it to the job service.
+ *
+ *
The job service defers execution of the pipeline to the plan environment and returns the
+ * "detached" status to the driver program.
+ *
+ *
Upon arrival of the job invocation, the entry point executes the runner, which prepares
+ * ("executes") the Flink job through the plan environment.
+ *
+ *
Finally Flink launches the job.
+ */
+public class FlinkPortableClientEntryPoint {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPortableClientEntryPoint.class);
+ private static final String JOB_ENDPOINT_FLAG = "--job_endpoint";
+ private static final Duration JOB_INVOCATION_TIMEOUT = Duration.ofSeconds(30);
+ private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = Duration.ofSeconds(30);
+
+ private final String driverCmd;
+ private FlinkJobServerDriver jobServer;
+ private Thread jobServerThread;
+ private DetachedJobInvokerFactory jobInvokerFactory;
+ private int jobPort = 0; // pick any free port
+
+ public FlinkPortableClientEntryPoint(String driverCmd) {
+ Preconditions.checkState(
+ !driverCmd.contains(JOB_ENDPOINT_FLAG),
+ "Driver command must not contain " + JOB_ENDPOINT_FLAG);
+ this.driverCmd = driverCmd;
+ }
+
+ /** Main method to be called standalone or by Flink (CLI or REST API). */
+ public static void main(String[] args) throws Exception {
+ LOG.info("entry points args: {}", Arrays.asList(args));
+ EntryPointConfiguration configuration = parseArgs(args);
+ FlinkPortableClientEntryPoint runner =
+ new FlinkPortableClientEntryPoint(configuration.driverCmd);
+ try {
+ runner.startJobService();
+ runner.runDriverProgram();
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Job %s failed.", configuration.driverCmd), e);
+ } finally {
+ LOG.info("Stopping job service");
+ runner.stopJobService();
+ }
+ LOG.info("Job submitted successfully.");
+ }
+
+ private static class EntryPointConfiguration {
+ @Option(
+ name = "--driver-cmd",
+ required = true,
+ usage =
+ "Command that launches the Python driver program. "
+ + "(The job service endpoint will be appended as --job_endpoint=localhost:.)")
+ private String driverCmd;
+ }
+
+ private static EntryPointConfiguration parseArgs(String[] args) {
+ EntryPointConfiguration configuration = new EntryPointConfiguration();
+ CmdLineParser parser = new CmdLineParser(configuration);
+ try {
+ parser.parseArgument(args);
+ } catch (CmdLineException e) {
+ LOG.error("Unable to parse command line arguments.", e);
+ parser.printUsage(System.err);
+ throw new IllegalArgumentException("Unable to parse command line arguments.", e);
+ }
+ return configuration;
+ }
+
+ private void startJobService() throws Exception {
+ jobInvokerFactory = new DetachedJobInvokerFactory();
+ jobServer =
+ FlinkJobServerDriver.fromConfig(
+ FlinkJobServerDriver.parseArgs(
+ new String[] {"--job-port=" + jobPort, "--artifact-port=0", "--expansion-port=0"}),
+ jobInvokerFactory);
+ jobServerThread = new Thread(jobServer);
+ jobServerThread.start();
+
+ Deadline deadline = Deadline.fromNow(JOB_SERVICE_STARTUP_TIMEOUT);
+ while (jobServer.getJobServerUrl() == null && deadline.hasTimeLeft()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException interruptEx) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(interruptEx);
+ }
+ }
+
+ if (!jobServerThread.isAlive()) {
+ throw new IllegalStateException("Job service thread is not alive");
+ }
+
+ if (jobServer.getJobServerUrl() == null) {
+ String msg = String.format("Timeout of %s waiting for job service to start.", deadline);
+ throw new TimeoutException(msg);
+ }
+ }
+
+ private void runDriverProgram() throws Exception {
+ ProcessManager processManager = ProcessManager.create();
+ String executable = "bash";
+ List args =
+ ImmutableList.of(
+ "-c",
+ String.format("%s %s=%s", driverCmd, JOB_ENDPOINT_FLAG, jobServer.getJobServerUrl()));
+ String processId = "client1";
+ File outputFile = File.createTempFile("beam-driver-program", ".log");
+
+ try {
+ final ProcessManager.RunningProcess driverProcess =
+ processManager.startProcess(processId, executable, args, System.getenv(), outputFile);
+ driverProcess.isAliveOrThrow();
+ LOG.info("Started driver program");
+
+ // await effect of the driver program submitting the job
+ jobInvokerFactory.executeDetachedJob();
+ } catch (Exception e) {
+ try {
+ processManager.stopProcess(processId);
+ } catch (Exception processKillException) {
+ e.addSuppressed(processKillException);
+ }
+ byte[] output = Files.readAllBytes(outputFile.toPath());
+ String msg =
+ String.format(
+ "Failed to start job with driver program: %s %s output: %s",
+ executable, args, new String(output, Charset.defaultCharset()));
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private void stopJobService() throws InterruptedException {
+ if (jobServer != null) {
+ jobServer.stop();
+ }
+ if (jobServerThread != null) {
+ jobServerThread.interrupt();
+ jobServerThread.join();
+ }
+ }
+
+ private class DetachedJobInvokerFactory implements FlinkJobServerDriver.JobInvokerFactory {
+
+ private CountDownLatch latch = new CountDownLatch(1);
+ private volatile PortablePipelineRunner actualPipelineRunner;
+ private volatile RunnerApi.Pipeline pipeline;
+ private volatile JobInfo jobInfo;
+
+ private PortablePipelineRunner handoverPipelineRunner =
+ new PortablePipelineRunner() {
+ @Override
+ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
+ DetachedJobInvokerFactory.this.pipeline = pipeline;
+ DetachedJobInvokerFactory.this.jobInfo = jobInfo;
+ LOG.info("Pipeline execution handover for {}", jobInfo.jobId());
+ latch.countDown();
+ return new FlinkPortableRunnerResult.Detached();
+ }
+ };
+
+ @Override
+ public JobInvoker create() {
+ return new FlinkJobInvoker(
+ (FlinkJobServerDriver.FlinkServerConfiguration) jobServer.configuration) {
+ @Override
+ protected JobInvocation createJobInvocation(
+ String invocationId,
+ String retrievalToken,
+ ListeningExecutorService executorService,
+ RunnerApi.Pipeline pipeline,
+ FlinkPipelineOptions flinkOptions,
+ PortablePipelineRunner pipelineRunner) {
+ // replace pipeline runner to handover execution
+ actualPipelineRunner = pipelineRunner;
+ return super.createJobInvocation(
+ invocationId,
+ retrievalToken,
+ executorService,
+ pipeline,
+ flinkOptions,
+ handoverPipelineRunner);
+ }
+ };
+ }
+
+ private void executeDetachedJob() throws Exception {
+ long timeoutSeconds = JOB_INVOCATION_TIMEOUT.getSeconds();
+ if (latch.await(timeoutSeconds, TimeUnit.SECONDS)) {
+ actualPipelineRunner.run(pipeline, jobInfo);
+ } else {
+ throw new TimeoutException(
+ String.format("Timeout of %s seconds waiting for job submission.", timeoutSeconds));
+ }
+ }
+ }
+}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
index 1e345d053c2b..b1a1bd3100af 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,8 +51,8 @@ public void testConfigurationDefaults() {
@Test
public void testConfigurationFromArgs() {
- FlinkJobServerDriver driver =
- FlinkJobServerDriver.fromParams(
+ FlinkJobServerDriver.FlinkServerConfiguration config =
+ FlinkJobServerDriver.parseArgs(
new String[] {
"--job-host=test",
"--job-port",
@@ -63,8 +64,6 @@ public void testConfigurationFromArgs() {
"--flink-master-url=jobmanager",
"--clean-artifacts-per-job=false",
});
- FlinkJobServerDriver.FlinkServerConfiguration config =
- (FlinkJobServerDriver.FlinkServerConfiguration) driver.configuration;
assertThat(config.getHost(), is("test"));
assertThat(config.getPort(), is(42));
assertThat(config.getArtifactPort(), is(43));
@@ -91,8 +90,9 @@ public void testJobServerDriver() throws Exception {
try {
System.setErr(newErr);
driver =
- FlinkJobServerDriver.fromParams(
- new String[] {"--job-port=0", "--artifact-port=0", "--expansion-port=0"});
+ FlinkJobServerDriver.fromConfig(
+ FlinkJobServerDriver.parseArgs(
+ new String[] {"--job-port=0", "--artifact-port=0", "--expansion-port=0"}));
driverThread = new Thread(driver);
driverThread.start();
boolean success = false;
@@ -107,6 +107,8 @@ public void testJobServerDriver() throws Exception {
Thread.sleep(100);
}
}
+ assertThat(driver.getJobServerUrl(), is(not(nullValue())));
+ assertThat(baos.toString(Charsets.UTF_8.name()), containsString(driver.getJobServerUrl()));
assertThat(driverThread.isAlive(), is(true));
} catch (Throwable t) {
// restore to print exception
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 4315e621e394..2ea9160aade8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -209,13 +209,14 @@ private JobID executePortable(Pipeline pipeline) throws Exception {
FlinkPipelineOptions pipelineOptions = pipeline.getOptions().as(FlinkPipelineOptions.class);
try {
JobInvocation jobInvocation =
- FlinkJobInvoker.createJobInvocation(
- "id",
- "none",
- executorService,
- pipelineProto,
- pipelineOptions,
- new FlinkPipelineRunner(pipelineOptions, null, Collections.emptyList()));
+ FlinkJobInvoker.create(null)
+ .createJobInvocation(
+ "id",
+ "none",
+ executorService,
+ pipelineProto,
+ pipelineOptions,
+ new FlinkPipelineRunner(pipelineOptions, null, Collections.emptyList()));
jobInvocation.start();
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 10db32cc16e2..18bf64ef3c89 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -142,14 +142,15 @@ public void process(ProcessContext ctxt) {
// execute the pipeline
JobInvocation jobInvocation =
- FlinkJobInvoker.createJobInvocation(
- "fakeId",
- "fakeRetrievalToken",
- flinkJobExecutor,
- pipelineProto,
- options.as(FlinkPipelineOptions.class),
- new FlinkPipelineRunner(
- options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
+ FlinkJobInvoker.create(null)
+ .createJobInvocation(
+ "fakeId",
+ "fakeRetrievalToken",
+ flinkJobExecutor,
+ pipelineProto,
+ options.as(FlinkPipelineOptions.class),
+ new FlinkPipelineRunner(
+ options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
jobInvocation.start();
while (jobInvocation.getState() != Enum.DONE) {
Thread.sleep(1000);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index 91f243bfe3b5..9ba0721d530a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -196,14 +196,15 @@ private void performStateUpdates(
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
JobInvocation jobInvocation =
- FlinkJobInvoker.createJobInvocation(
- "id",
- "none",
- flinkJobExecutor,
- pipelineProto,
- options.as(FlinkPipelineOptions.class),
- new FlinkPipelineRunner(
- options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
+ FlinkJobInvoker.create(null)
+ .createJobInvocation(
+ "id",
+ "none",
+ flinkJobExecutor,
+ pipelineProto,
+ options.as(FlinkPipelineOptions.class),
+ new FlinkPipelineRunner(
+ options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
jobInvocation.start();
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 9cddcd636e2a..669cc510cd3e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -183,14 +183,15 @@ public void onTimer(
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
JobInvocation jobInvocation =
- FlinkJobInvoker.createJobInvocation(
- "id",
- "none",
- flinkJobExecutor,
- pipelineProto,
- options.as(FlinkPipelineOptions.class),
- new FlinkPipelineRunner(
- options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
+ FlinkJobInvoker.create(null)
+ .createJobInvocation(
+ "id",
+ "none",
+ flinkJobExecutor,
+ pipelineProto,
+ options.as(FlinkPipelineOptions.class),
+ new FlinkPipelineRunner(
+ options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
jobInvocation.start();
while (jobInvocation.getState() != Enum.DONE) {
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
index 40d621f45336..88c2a8d0968a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
@@ -104,14 +104,15 @@ public void testExecution() throws Exception {
// execute the pipeline
JobInvocation jobInvocation =
- FlinkJobInvoker.createJobInvocation(
- "fakeId",
- "fakeRetrievalToken",
- flinkJobExecutor,
- pipelineProto,
- options.as(FlinkPipelineOptions.class),
- new FlinkPipelineRunner(
- options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
+ FlinkJobInvoker.create(null)
+ .createJobInvocation(
+ "fakeId",
+ "fakeRetrievalToken",
+ flinkJobExecutor,
+ pipelineProto,
+ options.as(FlinkPipelineOptions.class),
+ new FlinkPipelineRunner(
+ options.as(FlinkPipelineOptions.class), null, Collections.emptyList()));
jobInvocation.start();
while (jobInvocation.getState() != Enum.DONE) {
Thread.sleep(100);
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
index e5864f9b449f..65fcdf2a3a2b 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
@@ -38,6 +38,9 @@
public class ProcessManager {
private static final Logger LOG = LoggerFactory.getLogger(ProcessManager.class);
+ /** A symbolic file to indicate that we want to inherit I/O of parent process. */
+ public static final File INHERIT_IO_FILE = new File("_inherit_io_unused_filename_");
+
/** For debugging purposes, we inherit I/O of processes. */
private static final boolean INHERIT_IO = LOG.isDebugEnabled();
@@ -63,7 +66,7 @@ private ProcessManager() {
this.processes = Collections.synchronizedMap(new HashMap<>());
}
- static class RunningProcess {
+ public static class RunningProcess {
private Process process;
RunningProcess(Process process) {
@@ -71,7 +74,7 @@ static class RunningProcess {
}
/** Checks if the underlying process is still running. */
- void isAliveOrThrow() throws IllegalStateException {
+ public void isAliveOrThrow() throws IllegalStateException {
if (!process.isAlive()) {
throw new IllegalStateException("Process died with exit code " + process.exitValue());
}
@@ -106,27 +109,41 @@ RunningProcess startProcess(String id, String command, List args) throws
*/
public RunningProcess startProcess(
String id, String command, List args, Map env) throws IOException {
+ final File outputFile;
+ if (INHERIT_IO) {
+ LOG.debug(
+ "==> DEBUG enabled: Inheriting stdout/stderr of process (adjustable in ProcessManager)");
+ outputFile = INHERIT_IO_FILE;
+ } else {
+ // Pipe stdout and stderr to /dev/null to avoid blocking the process due to filled PIPE
+ // buffer
+ if (System.getProperty("os.name", "").startsWith("Windows")) {
+ outputFile = new File("nul");
+ } else {
+ outputFile = new File("/dev/null");
+ }
+ }
+ return startProcess(id, command, args, env, outputFile);
+ }
+
+ public RunningProcess startProcess(
+ String id, String command, List args, Map env, File outputFile)
+ throws IOException {
checkNotNull(id, "Process id must not be null");
checkNotNull(command, "Command must not be null");
checkNotNull(args, "Process args must not be null");
checkNotNull(env, "Environment map must not be null");
+ checkNotNull(outputFile, "Output redirect file must not be null");
ProcessBuilder pb =
new ProcessBuilder(ImmutableList.builder().add(command).addAll(args).build());
pb.environment().putAll(env);
- if (INHERIT_IO) {
- LOG.debug(
- "==> DEBUG enabled: Inheriting stdout/stderr of process (adjustable in ProcessManager)");
+ if (INHERIT_IO_FILE.equals(outputFile)) {
pb.inheritIO();
} else {
pb.redirectErrorStream(true);
- // Pipe stdout and stderr to /dev/null to avoid blocking the process due to filled PIPE buffer
- if (System.getProperty("os.name", "").startsWith("Windows")) {
- pb.redirectOutput(new File("nul"));
- } else {
- pb.redirectOutput(new File("/dev/null"));
- }
+ pb.redirectOutput(outputFile);
}
LOG.debug("Attempting to start process with command: {}", pb.command());
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index 0c5bf9428cd9..d0061d2ac460 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -40,18 +40,21 @@ public abstract class JobServerDriver implements Runnable {
private final ServerFactory jobServerFactory;
private final ServerFactory artifactServerFactory;
+ private final JobInvokerFactory jobInvokerFactory;
private volatile GrpcFnServer jobServer;
private volatile GrpcFnServer artifactStagingServer;
private volatile ExpansionServer expansionServer;
- protected abstract JobInvoker createJobInvoker();
+ public interface JobInvokerFactory {
+ JobInvoker create();
+ }
protected InMemoryJobService createJobService() throws IOException {
artifactStagingServer = createArtifactStagingService();
expansionServer = createExpansionService();
- JobInvoker invoker = createJobInvoker();
+ JobInvoker invoker = jobInvokerFactory.create();
return InMemoryJobService.create(
artifactStagingServer.getApiServiceDescriptor(),
this::createSessionToken,
@@ -130,10 +133,17 @@ protected static ServerFactory createArtifactServerFactory(ServerConfiguration c
protected JobServerDriver(
ServerConfiguration configuration,
ServerFactory jobServerFactory,
- ServerFactory artifactServerFactory) {
+ ServerFactory artifactServerFactory,
+ JobInvokerFactory jobInvokerFactory) {
this.configuration = configuration;
this.jobServerFactory = jobServerFactory;
this.artifactServerFactory = artifactServerFactory;
+ this.jobInvokerFactory = jobInvokerFactory;
+ }
+
+ // Can be used to discover the address of the job server, and if it is ready
+ public String getJobServerUrl() {
+ return (jobServer != null) ? jobServer.getApiServiceDescriptor().getUrl() : null;
}
// This method is executed by TestPortableRunner via Reflection
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
index 39efeef92793..d0c02c663280 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
@@ -19,10 +19,17 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import org.junit.Test;
@@ -99,4 +106,55 @@ public void testEnvironmentVariables() throws IOException, InterruptedException
assertThat(process.getUnderlyingProcess().exitValue(), is(1));
processManager.stopProcess("1");
}
+
+ @Test
+ public void testRedirectOutput() throws IOException, InterruptedException {
+ File outputFile = File.createTempFile("beam-redirect-output-", "");
+ outputFile.deleteOnExit();
+ ProcessManager processManager = ProcessManager.create();
+ ProcessManager.RunningProcess process =
+ processManager.startProcess(
+ "1",
+ "bash",
+ Arrays.asList("-c", "echo 'testing123'"),
+ Collections.emptyMap(),
+ outputFile);
+ for (int i = 0; i < 10 && process.getUnderlyingProcess().isAlive(); i++) {
+ Thread.sleep(100);
+ }
+ processManager.stopProcess("1");
+ byte[] output = Files.readAllBytes(outputFile.toPath());
+ assertNotNull(output);
+ String outputStr = new String(output, Charset.defaultCharset());
+ assertThat(outputStr, containsString("testing123"));
+ }
+
+ @Test
+ public void testInheritIO() throws IOException, InterruptedException {
+ final PrintStream oldOut = System.out;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream newOut = new PrintStream(baos);
+ try {
+ System.setOut(newOut);
+ ProcessManager processManager = ProcessManager.create();
+ ProcessManager.RunningProcess process =
+ processManager.startProcess(
+ "1",
+ "bash",
+ Arrays.asList("-c", "echo 'testing123' 1>&2;"),
+ Collections.emptyMap(),
+ ProcessManager.INHERIT_IO_FILE);
+ for (int i = 0; i < 10 && process.getUnderlyingProcess().isAlive(); i++) {
+ Thread.sleep(100);
+ }
+ processManager.stopProcess("1");
+ } finally {
+ System.setOut(oldOut);
+ }
+ // TODO: this doesn't work as inherit IO bypasses System.out/err
+ // the output instead appears in the console
+ // String outputStr = new String(baos.toByteArray(), Charset.defaultCharset());
+ // assertThat(outputStr, containsString("testing123"));
+ assertFalse(ProcessManager.INHERIT_IO_FILE.exists());
+ }
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
index f0302f141534..301cf48bb408 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.spark;
import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystems;
@@ -33,11 +32,6 @@
/** Driver program that starts a job server for the Spark runner. */
public class SparkJobServerDriver extends JobServerDriver {
- @Override
- protected JobInvoker createJobInvoker() {
- return SparkJobInvoker.create((SparkServerConfiguration) configuration);
- }
-
private static final Logger LOG = LoggerFactory.getLogger(SparkJobServerDriver.class);
/** Spark runner-specific Configuration for the jobServer. */
@@ -100,6 +94,10 @@ private SparkJobServerDriver(
SparkServerConfiguration configuration,
ServerFactory jobServerFactory,
ServerFactory artifactServerFactory) {
- super(configuration, jobServerFactory, artifactServerFactory);
+ super(
+ configuration,
+ jobServerFactory,
+ artifactServerFactory,
+ () -> SparkJobInvoker.create(configuration));
}
}