From 8ee80511d4b3c2e977eacacef2746950085e542d Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 28 Oct 2019 18:20:21 -0700 Subject: [PATCH] [BEAM-8509] TestPortableRunner uses JobServerDriver interface Unfortunately, due to Java language limitations we cannot define fromParams as an abstract static method. But at least start and stop are enforced by the type system now. --- .../testing/TestPortablePipelineOptions.java | 8 ++--- .../reference/testing/TestPortableRunner.java | 31 ++++++------------- 2 files changed, 14 insertions(+), 25 deletions(-) diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java index 33ba8b1cbbd0..3713b8d1cdfd 100644 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java +++ b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortablePipelineOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.reference.testing; import com.google.auto.service.AutoService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; @@ -32,11 +33,10 @@ public interface TestPortablePipelineOptions extends TestPipelineOptions, PortablePipelineOptions { @Required - @Description( - "Fully qualified class name of TestJobServiceDriver capable of managing the JobService.") - Class getJobServerDriver(); + @Description("Fully qualified class name of a JobServerDriver subclass.") + Class getJobServerDriver(); - void setJobServerDriver(Class jobServerDriver); + void setJobServerDriver(Class jobServerDriver); @Description("String containing comma separated arguments for the JobServer.") @Default.InstanceFactory(DefaultJobServerConfigFactory.class) diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java index d7295f2e934e..349241916e1f 100644 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java +++ b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/TestPortableRunner.java @@ -19,7 +19,8 @@ import static org.hamcrest.MatcherAssert.assertThat; -import java.lang.reflect.InvocationTargetException; +import java.io.IOException; +import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver; import org.apache.beam.runners.reference.PortableRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -37,13 +38,8 @@ * {@link TestPortableRunner} is a pipeline runner that wraps a {@link PortableRunner} when running * tests against the {@link TestPipeline}. * - *

This runner requires a JobServerDriver with following methods. - * - *

    - *
  • public static Object fromParams(String... params) - *
  • public String start() // Start JobServer and returns the JobServer host and port. - *
  • public void stop() // Stop the JobServer and free all resources. - *
+ *

This runner requires a {@link JobServerDriver} subclass with the following factory method: + * public static JobServerDriver fromParams(String[] args) * * @see TestPipeline */ @@ -64,8 +60,8 @@ public PipelineResult run(Pipeline pipeline) { TestPortablePipelineOptions testPortablePipelineOptions = options.as(TestPortablePipelineOptions.class); String jobServerHostPort; - Object jobServerDriver; - Class jobServerDriverClass = testPortablePipelineOptions.getJobServerDriver(); + JobServerDriver jobServerDriver; + Class jobServerDriverClass = testPortablePipelineOptions.getJobServerDriver(); String[] parameters = testPortablePipelineOptions.getJobServerConfig(); try { jobServerDriver = @@ -73,9 +69,9 @@ public PipelineResult run(Pipeline pipeline) { .fromFactoryMethod("fromParams") .withArg(String[].class, parameters) .build(); - jobServerHostPort = (String) jobServerDriverClass.getMethod("start").invoke(jobServerDriver); - } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new IllegalArgumentException(e); + jobServerHostPort = jobServerDriver.start(); + } catch (IOException e) { + throw new RuntimeException("Failed to start job server", e); } try { @@ -87,14 +83,7 @@ public PipelineResult run(Pipeline pipeline) { assertThat("Pipeline did not succeed.", result.waitUntilFinish(), Matchers.is(State.DONE)); return result; } finally { - try { - jobServerDriverClass.getMethod("stop").invoke(jobServerDriver); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - LOG.error( - String.format( - "Provided JobServiceDriver %s does not implement stop().", jobServerDriverClass), - e); - } + jobServerDriver.stop(); } } }