Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.options;

import org.apache.beam.sdk.runners.DataflowPipeline;
import org.apache.beam.sdk.runners.DataflowPipelineRunner;

import com.google.common.base.MoreObjects;

Expand All @@ -27,14 +27,14 @@
import org.joda.time.format.DateTimeFormatter;

/**
* Options that can be used to configure the {@link DataflowPipeline}.
* Options that can be used to configure the {@link DataflowPipelineRunner}.
*/
@Description("Options that configure the Dataflow pipeline.")
public interface DataflowPipelineOptions extends
PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
DataflowPipelineWorkerPoolOptions, BigQueryOptions,
GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
DataflowProfilingOptions, PubsubOptions {
public interface DataflowPipelineOptions
extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions,
CloudDebuggerOptions, DataflowWorkerLoggingOptions, DataflowProfilingOptions,
PubsubOptions {

@Description("Project id. Required when running a Dataflow in the cloud. "
+ "See https://cloud.google.com/storage/docs/projects for further details.")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.google.common.collect.ImmutableList;

/**
* Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
* the {@link DataflowPipeline}.
* Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the
* {@link DataflowPipelineRunner}.
*/
public class DataflowPipelineRegistrar {
private DataflowPipelineRegistrar() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
import java.util.Map;

/**
* Tests for DataflowPipelineRunner.
* Tests for the {@link DataflowPipelineRunner}.
*/
@RunWith(JUnit4.class)
public class DataflowPipelineRunnerTest {
Expand All @@ -143,9 +143,10 @@ private static void assertValidJob(Job job) {
assertNull(job.getCurrentState());
}

private DataflowPipeline buildDataflowPipeline(DataflowPipelineOptions options) {
private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
DataflowPipeline p = DataflowPipeline.create(options);
options.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(options);

p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
.apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
Expand Down Expand Up @@ -212,6 +213,7 @@ private DataflowPipelineOptions buildPipelineOptions() throws IOException {
private DataflowPipelineOptions buildPipelineOptions(
ArgumentCaptor<Job> jobCaptor) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(PROJECT_ID);
options.setTempLocation("gs://somebucket/some/path");
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
Expand All @@ -227,8 +229,8 @@ public void testRun() throws IOException {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);

DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
DataflowPipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = p.run();
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
assertValidJob(jobCaptor.getValue());
}
Expand All @@ -246,7 +248,7 @@ public void testRunReturnDifferentRequestId() throws IOException {
resultJob.setClientRequestId("different_request_id");
when(mockRequest.execute()).thenReturn(resultJob);

DataflowPipeline p = buildDataflowPipeline(options);
Pipeline p = buildDataflowPipeline(options);
try {
p.run();
fail("Expected DataflowJobAlreadyExistsException");
Expand All @@ -265,8 +267,8 @@ public void testUpdate() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
options.setUpdate(true);
options.setJobName("oldJobName");
DataflowPipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = p.run();
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
assertValidJob(jobCaptor.getValue());
}
Expand All @@ -279,7 +281,7 @@ public void testUpdateNonExistentPipeline() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setUpdate(true);
options.setJobName("badJobName");
DataflowPipeline p = buildDataflowPipeline(options);
Pipeline p = buildDataflowPipeline(options);
p.run();
}

Expand All @@ -298,7 +300,7 @@ public void testUpdateAlreadyUpdatedPipeline() throws IOException {
resultJob.setClientRequestId("different_request_id");
when(mockRequest.execute()).thenReturn(resultJob);

DataflowPipeline p = buildDataflowPipeline(options);
Pipeline p = buildDataflowPipeline(options);

thrown.expect(DataflowJobAlreadyUpdatedException.class);
thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() {
Expand Down Expand Up @@ -348,9 +350,9 @@ public void testRunWithFiles() throws IOException {
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());

DataflowPipeline p = buildDataflowPipeline(options);
Pipeline p = buildDataflowPipeline(options);

DataflowPipelineJob job = p.run();
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());

Job workflowJob = jobCaptor.getValue();
Expand Down Expand Up @@ -750,23 +752,24 @@ public void testTransformTranslatorMissing() throws IOException {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);

DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
DataflowPipeline p = DataflowPipeline.create(options);
Pipeline p = Pipeline.create(options);

p.apply(Create.of(Arrays.asList(1, 2, 3)))
.apply(new TestTransform());

thrown.expect(IllegalStateException.class);
thrown.expectMessage(Matchers.containsString("no translator registered"));
DataflowPipelineTranslator.fromOptions(options)
.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
.translate(
p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
assertValidJob(jobCaptor.getValue());
}

@Test
public void testTransformTranslator() throws IOException {
// Test that we can provide a custom translation
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipeline p = DataflowPipeline.create(options);
Pipeline p = Pipeline.create(options);
TestTransform transform = new TestTransform();

p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of()))
Expand All @@ -793,7 +796,7 @@ public void translate(
});

translator.translate(
p, p.getRunner(), Collections.<DataflowPackage>emptyList());
p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
assertTrue(transform.translated);
}

Expand Down Expand Up @@ -828,7 +831,7 @@ public void visitValue(PValue value, TransformTreeNode producer) {
@Test
public void testApplyIsScopedToExactClass() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipeline p = DataflowPipeline.create(options);
Pipeline p = Pipeline.create(options);

Create.TimestampedValues<String> transform =
Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now())));
Expand Down

This file was deleted.

Loading