Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.MonitoringUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.TestCredential;
Expand Down Expand Up @@ -209,7 +210,7 @@ private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
@Test
public void testJobDoneComplete() throws Exception {
createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
.run(DirectPipeline.createForTest());
.run(TestPipeline.create());
expectedLogs.verifyInfo("Job finished with status DONE");
}

Expand All @@ -223,7 +224,7 @@ public void testFailedJobThrowsException() throws Exception {
expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
JobIdMatcher.expectJobId("testFailedJob-jobId")));
createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
.run(DirectPipeline.createForTest());
.run(TestPipeline.create());
}

/**
Expand All @@ -236,8 +237,8 @@ public void testCancelledJobThrowsException() throws Exception {
expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
JobIdMatcher.expectJobId("testCancelledJob-jobId")));
createMockRunner(
createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
.run(DirectPipeline.createForTest());
createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
.run(TestPipeline.create());
}

/**
Expand All @@ -256,7 +257,7 @@ public void testUpdatedJobThrowsException() throws Exception {
DataflowPipelineJob replacedByJob =
createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
when(job.getReplacedByJob()).thenReturn(replacedByJob);
createMockRunner(job).run(DirectPipeline.createForTest());
createMockRunner(job).run(TestPipeline.create());
}

/**
Expand All @@ -269,8 +270,8 @@ public void testUpdatedJobThrowsException() throws Exception {
public void testUnknownJobThrowsException() throws Exception {
expectedThrown.expect(IllegalStateException.class);
createMockRunner(
createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
.run(DirectPipeline.createForTest());
createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
.run(TestPipeline.create());
}

/**
Expand All @@ -283,9 +284,8 @@ public void testNullJobThrowsException() throws Exception {
expectedThrown.expect(DataflowServiceException.class);
expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
JobIdMatcher.expectJobId("testNullJob-jobId")));
createMockRunner(
createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
.run(DirectPipeline.createForTest());
createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
.run(TestPipeline.create());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,28 @@

package org.apache.beam.runners.spark.translation;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;

import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.values.PCollection;

import com.google.api.client.repackaged.com.google.common.base.Joiner;
import com.google.common.base.Charsets;

import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -50,22 +49,7 @@
* executed in Spark.
*/
public class TransformTranslatorTest {

@Rule
public TestName name = new TestName();

private DirectPipelineRunner directRunner;
private SparkPipelineRunner sparkRunner;
private String testDataDirName;

@Before public void init() throws IOException {
sparkRunner = SparkPipelineRunner.create();
directRunner = DirectPipelineRunner.createForTest();
testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName())
+ File.separator;
FileUtils.deleteDirectory(new File(testDataDirName));
new File(testDataDirName).mkdirs();
}
@Rule public TemporaryFolder tmp = new TemporaryFolder();

/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
Expand All @@ -74,8 +58,8 @@ public class TransformTranslatorTest {
*/
@Test
public void testTextIOReadAndWriteTransforms() throws IOException {
String directOut = runPipeline("direct", directRunner);
String sparkOut = runPipeline("spark", sparkRunner);
String directOut = runPipeline(DirectPipelineRunner.class);
String sparkOut = runPipeline(SparkPipelineRunner.class);

List<String> directOutput =
Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8);
Expand All @@ -84,18 +68,17 @@ public void testTextIOReadAndWriteTransforms() throws IOException {
Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8);

// sort output to get a stable result (PCollections are not ordered)
Collections.sort(directOutput);
Collections.sort(sparkOutput);

Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray()));
}

private String runPipeline(String name, PipelineRunner<?> runner) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name);
private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(runner);
Pipeline p = Pipeline.create(options);
File outFile = tmp.newFile();
PCollection<String> lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
lines.apply(TextIO.Write.to(outFile));
runner.run(p);
return outFile;
lines.apply(TextIO.Write.to(outFile.getAbsolutePath()));
p.run();
return outFile.getAbsolutePath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
package org.apache.beam.sdk.options;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.DirectPipeline;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.values.PCollection;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
* Options that can be used to configure the {@link DirectPipeline}.
* Options that can be used to configure the {@link DirectPipelineRunner}.
*/
public interface DirectPipelineOptions extends
ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions,
PipelineOptions, StreamingOptions {
public interface DirectPipelineOptions
extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions,
StreamingOptions {

/**
* The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.DirectPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand All @@ -59,7 +57,6 @@
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.common.Counter;
Expand Down Expand Up @@ -194,18 +191,6 @@ public static DirectPipelineRunner fromOptions(PipelineOptions options) {
return new DirectPipelineRunner(directOptions);
}

/**
* Constructs a runner with default properties for testing.
*
* @return The newly created runner.
*/
public static DirectPipelineRunner createForTest() {
DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setGcpCredential(new TestCredential());
return new DirectPipelineRunner(options);
}

/**
* Enable runtime testing to verify that all functions and {@link Coder}
* instances can be serialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.DirectPipeline;
import org.apache.beam.sdk.runners.DirectPipelineRunner.EvaluationResults;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -137,12 +137,11 @@ <T> void runTestRead(AvroIO.Read.Bound<T> read, String expectedName, T[] expecte
throws Exception {
generateAvroFile(generateAvroObjects());

DirectPipeline p = DirectPipeline.createForTest();
TestPipeline p = TestPipeline.create();
PCollection<T> output = p.apply(read);
EvaluationResults results = p.run();
PAssert.that(output).containsInAnyOrder(expectedOutput);
p.run();
assertEquals(expectedName, output.getName());
assertThat(results.getPCollection(output),
containsInAnyOrder(expectedOutput));
}

@Test
Expand Down Expand Up @@ -257,7 +256,7 @@ <T> void runTestWrite(AvroIO.Write.Bound<T> write, String expectedName)
throws Exception {
AvroGeneratedUser[] users = generateAvroObjects();

DirectPipeline p = DirectPipeline.createForTest();
TestPipeline p = TestPipeline.create();
@SuppressWarnings("unchecked")
PCollection<T> input = p.apply(Create.of(Arrays.asList((T[]) users))
.withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroIO.Write.Bound;
import org.apache.beam.sdk.runners.DirectPipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -116,7 +115,7 @@ public boolean equals(Object other) {

@Test
public void testAvroIOWriteAndReadASingleFile() throws Throwable {
DirectPipeline p = DirectPipeline.createForTest();
TestPipeline p = TestPipeline.create();
List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
Expand All @@ -127,7 +126,7 @@ public void testAvroIOWriteAndReadASingleFile() throws Throwable {
.withSchema(GenericClass.class));
p.run();

p = DirectPipeline.createForTest();
p = TestPipeline.create();
PCollection<GenericClass> input = p
.apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));

Expand Down Expand Up @@ -179,7 +178,7 @@ public boolean equals(Object other) {
*/
@Test
public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
DirectPipeline p = DirectPipeline.createForTest();
TestPipeline p = TestPipeline.create();
List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
Expand All @@ -192,7 +191,7 @@ public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {

List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
new GenericClassV2(5, "bar", null));
p = DirectPipeline.createForTest();
p = TestPipeline.create();
PCollection<GenericClassV2> input = p
.apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));

Expand Down
Loading