diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java index 33ba8b1cbbd0..3713b8d1cdfd 100644 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java +++ b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.reference.testing; import com.google.auto.service.AutoService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; @@ -32,11 +33,10 @@ public interface TestPortablePipelineOptions extends TestPipelineOptions, PortablePipelineOptions { @Required - @Description( - "Fully qualified class name of TestJobServiceDriver capable of managing the JobService.") - Class getJobServerDriver(); + @Description("Fully qualified class name of a JobServerDriver subclass.") + Class getJobServerDriver(); - void setJobServerDriver(Class jobServerDriver); + void setJobServerDriver(Class jobServerDriver); @Description("String containing comma separated arguments for the JobServer.") @Default.InstanceFactory(DefaultJobServerConfigFactory.class) diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java index d7295f2e934e..349241916e1f 100644 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java +++ b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java @@ -19,7 +19,8 @@ import static org.hamcrest.MatcherAssert.assertThat; -import java.lang.reflect.InvocationTargetException; +import java.io.IOException; +import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver; import org.apache.beam.runners.reference.PortableRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -37,13 +38,8 @@ * {@link TestPortableRunner} is a pipeline runner that wraps a {@link PortableRunner} when running * tests against the {@link TestPipeline}. * - *

This runner requires a JobServerDriver with following methods. - * - *

+ *

This runner requires a {@link JobServerDriver} subclass with the following factory method: + * public static JobServerDriver fromParams(String[] args) * * @see TestPipeline */ @@ -64,8 +60,8 @@ public PipelineResult run(Pipeline pipeline) { TestPortablePipelineOptions testPortablePipelineOptions = options.as(TestPortablePipelineOptions.class); String jobServerHostPort; - Object jobServerDriver; - Class jobServerDriverClass = testPortablePipelineOptions.getJobServerDriver(); + JobServerDriver jobServerDriver; + Class jobServerDriverClass = testPortablePipelineOptions.getJobServerDriver(); String[] parameters = testPortablePipelineOptions.getJobServerConfig(); try { jobServerDriver = @@ -73,9 +69,9 @@ public PipelineResult run(Pipeline pipeline) { .fromFactoryMethod("fromParams") .withArg(String[].class, parameters) .build(); - jobServerHostPort = (String) jobServerDriverClass.getMethod("start").invoke(jobServerDriver); - } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new IllegalArgumentException(e); + jobServerHostPort = jobServerDriver.start(); + } catch (IOException e) { + throw new RuntimeException("Failed to start job server", e); } try { @@ -87,14 +83,7 @@ public PipelineResult run(Pipeline pipeline) { assertThat("Pipeline did not succeed.", result.waitUntilFinish(), Matchers.is(State.DONE)); return result; } finally { - try { - jobServerDriverClass.getMethod("stop").invoke(jobServerDriver); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - LOG.error( - String.format( - "Provided JobServiceDriver %s does not implement stop().", jobServerDriverClass), - e); - } + jobServerDriver.stop(); } } }