Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.dataflow.sdk.options;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.options.Validation.Required;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
Expand Down Expand Up @@ -1391,7 +1393,10 @@ private static ListMultimap<String, String> parseCommandLine(
* split up each string on ','.
*
* <p>We special case the "runner" option. It is mapped to the class of the {@link PipelineRunner}
* based off of the {@link PipelineRunner}s simple class name or fully qualified class name.
* based off of the {@link PipelineRunner PipelineRunners} simple class name. If the provided
* runner name is not registered via a {@link PipelineRunnerRegistrar}, we attempt to obtain the
* class that the name represents using {@link Class#forName(String)} and use the result class if
* it subclasses {@link PipelineRunner}.
*
* <p>If strict parsing is enabled, unknown options or options that cannot be converted to
* the expected java type using an {@link ObjectMapper} will be ignored.
Expand Down Expand Up @@ -1442,10 +1447,26 @@ public boolean apply(@Nullable String input) {
JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
if ("runner".equals(entry.getKey())) {
String runner = Iterables.getOnlyElement(entry.getValue());
Preconditions.checkArgument(SUPPORTED_PIPELINE_RUNNERS.containsKey(runner),
"Unknown 'runner' specified '%s', supported pipeline runners %s",
runner, Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the method comment.

In PipelineOptions it is clear what we are supposed to support but from this method description I take it as we only support registered runners and we can select it either by the simple or fully qualified names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

We only supported registered runners, and it looks like we only ever supported the simple names; this fixes that as well.

convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
} else {
try {
Class<?> runnerClass = Class.forName(runner);
checkArgument(
PipelineRunner.class.isAssignableFrom(runnerClass),
"Class '%s' does not implement PipelineRunner. Supported pipeline runners %s",
runner,
Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
convertedOptions.put("runner", runnerClass);
} catch (ClassNotFoundException e) {
String msg =
String.format(
"Unknown 'runner' specified '%s', supported pipeline runners %s",
runner,
Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
throw new IllegalArgumentException(msg, e);
}
}
} else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType())
|| returnType.getComponentType().isEnum()))
|| Collection.class.isAssignableFrom(returnType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties;
import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -824,6 +828,14 @@ public void testSettingRunner() {
assertEquals(BlockingDataflowPipelineRunner.class, options.getRunner());
}

@Test
public void testSettingRunnerFullName() {
String[] args =
new String[] {String.format("--runner=%s", DataflowPipelineRunner.class.getName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
assertEquals(opts.getRunner(), DataflowPipelineRunner.class);
}

@Test
public void testSettingUnknownRunner() {
String[] args = new String[] {"--runner=UnknownRunner"};
Expand All @@ -834,6 +846,30 @@ public void testSettingUnknownRunner() {
PipelineOptionsFactory.fromArgs(args).create();
}

private static class ExampleTestRunner extends PipelineRunner<PipelineResult> {
@Override
public PipelineResult run(Pipeline pipeline) {
return null;
}
}

@Test
public void testSettingRunnerCanonicalClassNameNotInSupportedExists() {
String[] args = new String[] {String.format("--runner=%s", ExampleTestRunner.class.getName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
assertEquals(opts.getRunner(), ExampleTestRunner.class);
}

@Test
public void testSettingRunnerCanonicalClassNameNotInSupportedNotPipelineRunner() {
String[] args = new String[] {"--runner=java.lang.String"};
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("does not implement PipelineRunner");
expectedException.expectMessage("java.lang.String");

PipelineOptionsFactory.fromArgs(args).create();
}

@Test
public void testUsingArgumentWithUnknownPropertyIsNotAllowed() {
String[] args = new String[] {"--unknownProperty=value"};
Expand Down