Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.reference.testing;

import com.google.auto.service.AutoService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
Expand All @@ -32,11 +33,10 @@
public interface TestPortablePipelineOptions extends TestPipelineOptions, PortablePipelineOptions {

@Required
@Description(
"Fully qualified class name of TestJobServiceDriver capable of managing the JobService.")
Class getJobServerDriver();
@Description("Fully qualified class name of a JobServerDriver subclass.")
Class<JobServerDriver> getJobServerDriver();

void setJobServerDriver(Class jobServerDriver);
void setJobServerDriver(Class<JobServerDriver> jobServerDriver);

@Description("String containing comma separated arguments for the JobServer.")
@Default.InstanceFactory(DefaultJobServerConfigFactory.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import static org.hamcrest.MatcherAssert.assertThat;

import java.lang.reflect.InvocationTargetException;
import java.io.IOException;
import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver;
import org.apache.beam.runners.reference.PortableRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -37,13 +38,8 @@
* {@link TestPortableRunner} is a pipeline runner that wraps a {@link PortableRunner} when running
* tests against the {@link TestPipeline}.
*
* <p>This runner requires a JobServerDriver with following methods.
*
* <ul>
* <li>public static Object fromParams(String... params)
* <li>public String start() // Start JobServer and returns the JobServer host and port.
* <li>public void stop() // Stop the JobServer and free all resources.
* </ul>
* <p>This runner requires a {@link JobServerDriver} subclass with the following factory method:
* <code>public static JobServerDriver fromParams(String[] args)</code>
*
* @see TestPipeline
*/
Expand All @@ -64,18 +60,18 @@ public PipelineResult run(Pipeline pipeline) {
TestPortablePipelineOptions testPortablePipelineOptions =
options.as(TestPortablePipelineOptions.class);
String jobServerHostPort;
Object jobServerDriver;
Class<?> jobServerDriverClass = testPortablePipelineOptions.getJobServerDriver();
JobServerDriver jobServerDriver;
Class<JobServerDriver> jobServerDriverClass = testPortablePipelineOptions.getJobServerDriver();
String[] parameters = testPortablePipelineOptions.getJobServerConfig();
try {
jobServerDriver =
InstanceBuilder.ofType(jobServerDriverClass)
.fromFactoryMethod("fromParams")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should watch out for #9872 because the method name/usage changes there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh didn't see that this was a follow-up to #9911 :)

.withArg(String[].class, parameters)
.build();
jobServerHostPort = (String) jobServerDriverClass.getMethod("start").invoke(jobServerDriver);
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new IllegalArgumentException(e);
jobServerHostPort = jobServerDriver.start();
} catch (IOException e) {
throw new RuntimeException("Failed to start job server", e);
}

try {
Expand All @@ -87,14 +83,7 @@ public PipelineResult run(Pipeline pipeline) {
assertThat("Pipeline did not succeed.", result.waitUntilFinish(), Matchers.is(State.DONE));
return result;
} finally {
try {
jobServerDriverClass.getMethod("stop").invoke(jobServerDriver);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
LOG.error(
String.format(
"Provided JobServiceDriver %s does not implement stop().", jobServerDriverClass),
e);
}
jobServerDriver.stop();
}
}
}