From 951657c1239eae8e9fbc499e2a68174d15085f55 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 13 Apr 2016 09:43:49 -0700 Subject: [PATCH] Update Spark Test to use more standard Pipeline Running --- .../translation/TransformTranslatorTest.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index f07fa0b9c6fa..216504873473 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -18,22 +18,18 @@ package org.apache.beam.runners.spark.translation; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; - -import com.google.api.client.repackaged.com.google.common.base.Joiner; -import com.google.common.base.Charsets; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.repackaged.com.google.common.base.Joiner; +import com.google.common.base.Charsets; + import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; @@ -41,6 +37,13 @@ import org.junit.Test; import org.junit.rules.TestName; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + /** * A test for the transforms registered in TransformTranslator. * Builds a regular Dataflow pipeline with each of the mapped @@ -89,11 +92,13 @@ public void testTextIOReadAndWriteTransforms() throws IOException { } private String runPipeline(String name, PipelineRunner runner) { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner((Class>) runner.getClass()); + Pipeline p = Pipeline.create(options); String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name); PCollection lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt")); lines.apply(TextIO.Write.to(outFile)); - runner.run(p); + p.run(); return outFile; } }