From 18cb824ca3a4e2e537289f76fb52b6829d7d52f0 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 14 Apr 2016 16:12:23 -0700 Subject: [PATCH] Remove the DirectPipeline class Users who wish to use the DirectPipelineRunner should do so by creating a new Pipeline with the runner set in the PipelineOptions. --- .../BlockingDataflowPipelineRunnerTest.java | 20 +++---- .../translation/TransformTranslatorTest.java | 51 ++++++----------- .../sdk/options/DirectPipelineOptions.java | 9 ++- .../beam/sdk/runners/DirectPipeline.java | 56 ------------------- .../sdk/runners/DirectPipelineRunner.java | 15 ----- .../beam/sdk/io/AvroIOGeneratedClassTest.java | 13 ++--- .../org/apache/beam/sdk/io/AvroIOTest.java | 9 ++- .../sdk/runners/DirectPipelineRunnerTest.java | 29 ++++++---- .../beam/sdk/runners/DirectPipelineTest.java | 35 ------------ .../beam/sdk/runners/TransformTreeTest.java | 7 ++- .../transforms/ApproximateQuantilesTest.java | 5 +- 11 files changed, 66 insertions(+), 183 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java index 67ecdbedaeb4..ae504ed22d09 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestDataflowPipelineOptions; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.MonitoringUtil; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.TestCredential; @@ -209,7 +210,7 @@ private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job) @Test public void testJobDoneComplete() throws Exception { createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE)) - .run(DirectPipeline.createForTest()); + .run(TestPipeline.create()); expectedLogs.verifyInfo("Job finished with status DONE"); } @@ -223,7 +224,7 @@ public void testFailedJobThrowsException() throws Exception { expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( JobIdMatcher.expectJobId("testFailedJob-jobId"))); createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED)) - .run(DirectPipeline.createForTest()); + .run(TestPipeline.create()); } /** @@ -236,8 +237,8 @@ public void testCancelledJobThrowsException() throws Exception { expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( JobIdMatcher.expectJobId("testCancelledJob-jobId"))); createMockRunner( - createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) - .run(DirectPipeline.createForTest()); + createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) + .run(TestPipeline.create()); } /** @@ -256,7 +257,7 @@ public void testUpdatedJobThrowsException() throws Exception { DataflowPipelineJob replacedByJob = createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE); when(job.getReplacedByJob()).thenReturn(replacedByJob); - createMockRunner(job).run(DirectPipeline.createForTest()); + createMockRunner(job).run(TestPipeline.create()); } /** @@ -269,8 +270,8 @@ public void testUpdatedJobThrowsException() throws Exception { public void testUnknownJobThrowsException() throws Exception { expectedThrown.expect(IllegalStateException.class); createMockRunner( - createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) - .run(DirectPipeline.createForTest()); + createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) + .run(TestPipeline.create()); } /** @@ -283,9 +284,8 @@ public void testNullJobThrowsException() throws Exception { expectedThrown.expect(DataflowServiceException.class); expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( JobIdMatcher.expectJobId("testNullJob-jobId"))); - createMockRunner( - createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) - .run(DirectPipeline.createForTest()); + createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) + .run(TestPipeline.create()); } @Test diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index de4a5d23471e..4ef26d308715 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -18,29 +18,28 @@ package org.apache.beam.runners.spark.translation; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; -import com.google.api.client.repackaged.com.google.common.base.Joiner; import com.google.common.base.Charsets; -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestName; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Collections; import java.util.List; /** @@ -50,22 +49,7 @@ * executed in Spark. */ public class TransformTranslatorTest { - - @Rule - public TestName name = new TestName(); - - private DirectPipelineRunner directRunner; - private SparkPipelineRunner sparkRunner; - private String testDataDirName; - - @Before public void init() throws IOException { - sparkRunner = SparkPipelineRunner.create(); - directRunner = DirectPipelineRunner.createForTest(); - testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName()) - + File.separator; - FileUtils.deleteDirectory(new File(testDataDirName)); - new File(testDataDirName).mkdirs(); - } + @Rule public TemporaryFolder tmp = new TemporaryFolder(); /** * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline @@ -74,8 +58,8 @@ public class TransformTranslatorTest { */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline("direct", directRunner); - String sparkOut = runPipeline("spark", sparkRunner); + String directOut = runPipeline(DirectPipelineRunner.class); + String sparkOut = runPipeline(SparkPipelineRunner.class); List directOutput = Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8); @@ -84,18 +68,17 @@ public void testTextIOReadAndWriteTransforms() throws IOException { Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8); // sort output to get a stable result (PCollections are not ordered) - Collections.sort(directOutput); - Collections.sort(sparkOutput); - - Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray()); + assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray())); } - private String runPipeline(String name, PipelineRunner runner) { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name); + private String runPipeline(Class> runner) throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(runner); + Pipeline p = Pipeline.create(options); + File outFile = tmp.newFile(); PCollection lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt")); - lines.apply(TextIO.Write.to(outFile)); - runner.run(p); - return outFile; + lines.apply(TextIO.Write.to(outFile.getAbsolutePath())); + p.run(); + return outFile.getAbsolutePath(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java index 718948eac833..4cdc0cae2040 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java @@ -18,18 +18,17 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipeline; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.values.PCollection; import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Options that can be used to configure the {@link DirectPipeline}. + * Options that can be used to configure the {@link DirectPipelineRunner}. */ -public interface DirectPipelineOptions extends - ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, - PipelineOptions, StreamingOptions { +public interface DirectPipelineOptions + extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions, + StreamingOptions { /** * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java deleted file mode 100644 index 45f7647b2e3f..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.DirectPipelineOptions; - -/** - * A {@link DirectPipeline} is a {@link Pipeline} that returns - * {@link DirectPipelineRunner.EvaluationResults} when it is - * {@link org.apache.beam.sdk.Pipeline#run()}. - */ -public class DirectPipeline extends Pipeline { - - /** - * Creates and returns a new DirectPipeline instance for tests. - */ - public static DirectPipeline createForTest() { - DirectPipelineRunner runner = DirectPipelineRunner.createForTest(); - return new DirectPipeline(runner, runner.getPipelineOptions()); - } - - private DirectPipeline(DirectPipelineRunner runner, DirectPipelineOptions options) { - super(runner, options); - } - - @Override - public DirectPipelineRunner.EvaluationResults run() { - return (DirectPipelineRunner.EvaluationResults) super.run(); - } - - @Override - public DirectPipelineRunner getRunner() { - return (DirectPipelineRunner) super.getRunner(); - } - - @Override - public String toString() { - return "DirectPipeline#" + hashCode(); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java index 198d04e8a665..3cb970300c1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java @@ -34,8 +34,6 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.DirectPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -59,7 +57,6 @@ import org.apache.beam.sdk.util.PerKeyCombineFnRunner; import org.apache.beam.sdk.util.PerKeyCombineFnRunners; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.common.Counter; @@ -194,18 +191,6 @@ public static DirectPipelineRunner fromOptions(PipelineOptions options) { return new DirectPipelineRunner(directOptions); } - /** - * Constructs a runner with default properties for testing. - * - * @return The newly created runner. - */ - public static DirectPipelineRunner createForTest() { - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setStableUniqueNames(CheckEnabled.ERROR); - options.setGcpCredential(new TestCredential()); - return new DirectPipelineRunner(options); - } - /** * Enable runtime testing to verify that all functions and {@link Coder} * instances can be serialized. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java index f32a4203734c..f757b4e1736f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java @@ -23,8 +23,8 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipeline; -import org.apache.beam.sdk.runners.DirectPipelineRunner.EvaluationResults; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; @@ -137,12 +137,11 @@ void runTestRead(AvroIO.Read.Bound read, String expectedName, T[] expecte throws Exception { generateAvroFile(generateAvroObjects()); - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); PCollection output = p.apply(read); - EvaluationResults results = p.run(); + PAssert.that(output).containsInAnyOrder(expectedOutput); + p.run(); assertEquals(expectedName, output.getName()); - assertThat(results.getPCollection(output), - containsInAnyOrder(expectedOutput)); } @Test @@ -257,7 +256,7 @@ void runTestWrite(AvroIO.Write.Bound write, String expectedName) throws Exception { AvroGeneratedUser[] users = generateAvroObjects(); - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); @SuppressWarnings("unchecked") PCollection input = p.apply(Create.of(Arrays.asList((T[]) users)) .withCoder((Coder) AvroCoder.of(AvroGeneratedUser.class))); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 08f146f3d924..57312c0d9106 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; -import org.apache.beam.sdk.runners.DirectPipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -116,7 +115,7 @@ public boolean equals(Object other) { @Test public void testAvroIOWriteAndReadASingleFile() throws Throwable { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); List values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -127,7 +126,7 @@ public void testAvroIOWriteAndReadASingleFile() throws Throwable { .withSchema(GenericClass.class)); p.run(); - p = DirectPipeline.createForTest(); + p = TestPipeline.create(); PCollection input = p .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class)); @@ -179,7 +178,7 @@ public boolean equals(Object other) { */ @Test public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); List values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -192,7 +191,7 @@ public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { List expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); - p = DirectPipeline.createForTest(); + p = TestPipeline.create(); PCollection input = p .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java index 2f5272b5c828..ae3b4e037cf6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java @@ -98,16 +98,17 @@ public void processElement(DoFn.ProcessContext c) throws Except @Test public void testCoderException() { - DirectPipeline pipeline = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); - pipeline - .apply("CreateTestData", Create.of(42)) + p.apply("CreateTestData", Create.of(42)) .apply("CrashDuringCoding", ParDo.of(new HelloDoFn())) .setCoder(new CrashingCoder()); - expectedException.expect(RuntimeException.class); - expectedException.expectCause(isA(CoderException.class)); - pipeline.run(); + expectedException.expect(RuntimeException.class); + expectedException.expectCause(isA(CoderException.class)); + p.run(); } @Test @@ -119,7 +120,9 @@ public void testDirectPipelineOptions() { @Test public void testTextIOWriteWithDefaultShardingStrategy() throws Exception { String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(TextIO.Write.to(prefix).withSuffix("txt")); @@ -139,7 +142,9 @@ public void testTextIOWriteWithDefaultShardingStrategy() throws Exception { public void testTextIOWriteWithLimitedNumberOfShards() throws Exception { final int numShards = 3; String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt")); @@ -164,7 +169,9 @@ public void testTextIOWriteWithLimitedNumberOfShards() throws Exception { @Test public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception { String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro")); @@ -186,7 +193,9 @@ public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception { public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception { final int numShards = 3; String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(AvroIO.Write.withSchema(String.class).to(prefix) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java deleted file mode 100644 index 9829ebd64c59..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.junit.Assert.assertEquals; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link DirectPipeline}. */ -@RunWith(JUnit4.class) -public class DirectPipelineTest { - @Test - public void testToString() { - DirectPipeline pipeline = DirectPipeline.createForTest(); - assertEquals("DirectPipeline#" + pipeline.hashCode(), - pipeline.toString()); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index d926ac59fd22..7690d2ba88dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; @@ -116,7 +117,7 @@ public void testCompositeCapture() throws Exception { File inputFile = tmpFolder.newFile(); File outputFile = tmpFolder.newFile(); - Pipeline p = DirectPipeline.createForTest(); + Pipeline p = TestPipeline.create(); p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath())) .apply(Sample.any(10)) @@ -173,7 +174,7 @@ public void visitValue(PValue value, TransformTreeNode producer) { @Test(expected = IllegalStateException.class) public void testOutputChecking() throws Exception { - Pipeline p = DirectPipeline.createForTest(); + Pipeline p = TestPipeline.create(); p.apply(new InvalidCompositeTransform()); @@ -183,7 +184,7 @@ public void testOutputChecking() throws Exception { @Test public void testMultiGraphSetup() { - Pipeline p = DirectPipeline.createForTest(); + Pipeline p = TestPipeline.create(); PCollection input = p.begin() .apply(Create.of(1, 2, 3)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java index 6d62e08e6d5b..6bc5c1e92407 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.DirectPipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn; @@ -69,7 +68,7 @@ public PCollection> createInputTable(Pipeline p) { @Test public void testQuantilesGlobally() { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); PCollection input = intRangeCollection(p, 101); PCollection> quantiles = @@ -82,7 +81,7 @@ public void testQuantilesGlobally() { @Test public void testQuantilesGobally_comparable() { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); PCollection input = intRangeCollection(p, 101); PCollection> quantiles =