From 11ba2b9c865e4bb78790056e0cd9fa94abe000f5 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 11 Apr 2016 13:58:59 -0700 Subject: [PATCH] Remove Spark(Streaming)PipelineOptionsFactory Pipeline authors should generally not use any runner-specific classes, but instead should select the runner and appropriate configurations through the PipelineOptionsFactory.fromArgs() method. The runner can then obtain the appropriately typed PipelineOptions class as required and do any neccessary validation. Failing this, they should use the provided PipelineOptions#as() method to acquire the appropriately typed options. If required, users should construct SparkPipelineOptions via PipelineOptionsFactory.as(SparkPipelineOptions.class). --- .../runners/spark/SparkPipelineRunner.java | 6 ++-- .../SparkPipelineOptionsFactory.java | 31 ------------------- .../SparkStreamingPipelineOptionsFactory.java | 31 ------------------- .../apache/beam/runners/spark/DeDupTest.java | 16 +++++----- .../beam/runners/spark/EmptyInputTest.java | 15 +++++---- .../runners/spark/SimpleWordCountTest.java | 18 ++++++----- .../apache/beam/runners/spark/TfIdfTest.java | 11 ++++--- .../spark/coders/WritableCoderTest.java | 1 + .../runners/spark/io/AvroPipelineTest.java | 22 +++++++------ .../beam/runners/spark/io/NumShardsTest.java | 26 +++++++++------- .../hadoop/HadoopFileFormatPipelineTest.java | 7 +++-- .../spark/io/hadoop/ShardNameBuilderTest.java | 1 + .../translation/CombineGloballyTest.java | 14 ++++++--- .../spark/translation/CombinePerKeyTest.java | 12 ++++--- .../spark/translation/DoFnOutputTest.java | 8 +++-- .../translation/MultiOutputWordCountTest.java | 10 +++--- .../spark/translation/SerializationTest.java | 27 +++++++++------- .../spark/translation/SideEffectsTest.java | 10 +++--- ...ory.java => SparkPipelineOptionsTest.java} | 8 +++-- .../translation/TransformTranslatorTest.java | 20 ++++++------ .../translation/WindowedWordCountTest.java | 10 +++--- .../streaming/FlattenStreamingTest.java | 13 +++++--- .../streaming/KafkaStreamingTest.java | 24 ++++++++------ .../SimpleStreamingWordCountTest.java | 20 +++++++----- .../streaming/utils/EmbeddedKafkaCluster.java | 11 ++++--- .../streaming/utils/PAssertStreaming.java | 1 + 26 files changed, 182 insertions(+), 191 deletions(-) delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java rename runners/spark/src/test/java/org/apache/beam/runners/spark/translation/{TestSparkPipelineOptionsFactory.java => SparkPipelineOptionsTest.java} (81%) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index 99da74ffdbfd..2b33e7acc965 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -21,7 +21,6 @@ import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.SparkProcessContext; import org.apache.beam.runners.spark.translation.TransformTranslator; @@ -30,6 +29,7 @@ import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; @@ -40,6 +40,7 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; + import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; @@ -85,7 +86,8 @@ public final class SparkPipelineRunner extends PipelineRunner * @return A pipeline runner with default options. */ public static SparkPipelineRunner create() { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); return new SparkPipelineRunner(options); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java deleted file mode 100644 index fe89ee399c08..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -public final class SparkPipelineOptionsFactory { - private SparkPipelineOptionsFactory() { - } - - public static SparkPipelineOptions create() { - return PipelineOptionsFactory.as(SparkPipelineOptions.class); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java deleted file mode 100644 index 4abf8160ee46..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -public final class SparkStreamingPipelineOptionsFactory { - - private SparkStreamingPipelineOptionsFactory() { - } - - public static SparkStreamingPipelineOptions create() { - return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); - } -} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index d13404cd9649..0b48bed095f2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -18,20 +18,22 @@ package org.apache.beam.runners.spark; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + /** * A test based on {@code DeDupExample} from the SDK. */ @@ -46,7 +48,7 @@ public class DeDupTest { @Test public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index eeca10f778b6..f9b00ccf2a6c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -20,24 +20,27 @@ import static org.junit.Assert.assertEquals; -import java.util.Collections; -import java.util.List; - -import com.google.common.collect.Iterables; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.Iterables; + import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class EmptyInputTest { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); List empty = Collections.emptyList(); PCollection inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index c413b3fe060f..faa4dbfd80bc 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -18,15 +18,9 @@ package org.apache.beam.runners.spark; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -import com.google.common.collect.ImmutableSet; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -37,8 +31,16 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + public class SimpleWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", @@ -49,7 +51,7 @@ public class SimpleWordCountTest { @Test public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index 4e80fe93d84e..00c4657f5f16 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.spark; -import java.net.URI; -import java.util.Arrays; - import org.apache.beam.examples.complete.TfIdf; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; @@ -31,8 +28,12 @@ import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + import org.junit.Test; +import java.net.URI; +import java.util.Arrays; + /** * A test based on {@code TfIdf} from the SDK. */ @@ -40,7 +41,9 @@ public class TfIdfTest { @Test public void testTfIdf() throws Exception { - Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create()); + SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class); + opts.setRunner(SparkPipelineRunner.class); + Pipeline pipeline = Pipeline.create(opts); pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java index 538fd97a8bb0..f2bd4d3e76fa 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.coders; import org.apache.beam.sdk.testing.CoderProperties; + import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.junit.Test; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index f8f96245dab3..693e2c65f5b0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -20,13 +20,16 @@ import static org.junit.Assert.assertEquals; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.Lists; import com.google.common.io.Resources; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; @@ -34,17 +37,16 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.PCollection; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + public class AvroPipelineTest { private File inputFile; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 37a7d54cc024..85eeabd822e3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -21,32 +21,34 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import com.google.common.base.Charsets; -import com.google.common.collect.Sets; -import com.google.common.io.Files; import org.apache.beam.examples.WordCount; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.base.Charsets; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + public class NumShardsTest { private static final String[] WORDS_ARRAY = { @@ -67,7 +69,7 @@ public void setUp() throws IOException { @Test public void testText() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index b7df7391f3ba..0c8c6fc89eb8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -20,9 +20,6 @@ import static org.junit.Assert.assertEquals; -import java.io.File; -import java.io.IOException; - import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.runners.spark.coders.WritableCoder; @@ -31,6 +28,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -46,6 +44,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; + public class HadoopFileFormatPipelineTest { private File inputFile; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java index eecfd581b13f..55991a420a66 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber; + import static org.junit.Assert.assertEquals; import org.junit.Test; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java index bd90aae28472..a6446738f74c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java @@ -20,20 +20,23 @@ import static org.junit.Assert.assertEquals; -import java.util.Arrays; -import java.util.List; - -import com.google.common.collect.Iterables; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.Iterables; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class CombineGloballyTest { private static final String[] WORDS_ARRAY = { @@ -43,7 +46,8 @@ public class CombineGloballyTest { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); PCollection output = inputWords.apply(Combine.globally(new WordMerger())); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 6cf93300c543..4e0bc5d3c7a0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -18,11 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.ImmutableList; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; @@ -37,9 +32,16 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableList; + import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class CombinePerKeyTest { private static final List WORDS = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index 80b5fc5903af..ca97a96590d7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -18,23 +18,25 @@ package org.apache.beam.runners.spark.translation; -import java.io.Serializable; - import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; + import org.junit.Test; +import java.io.Serializable; + public class DoFnOutputTest implements Serializable { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 1450a8febf6f..6a862c9350fe 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -18,10 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; @@ -47,9 +43,15 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + import org.junit.Assert; import org.junit.Test; +import java.util.Set; + public class MultiOutputWordCountTest { private static final TupleTag upper = new TupleTag<>(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 2237ea226416..75d3fb2d2e94 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -18,17 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -36,6 +25,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -46,8 +36,21 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + import org.junit.Test; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + public class SerializationTest { public static class StringHolder { // not serializable @@ -117,7 +120,7 @@ public StringHolder apply(String s) { @Test public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index b85d9350cbbe..14abbfc978ee 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -22,20 +22,22 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.Serializable; -import java.net.URI; - import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; + import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.Serializable; +import java.net.URI; + public class SideEffectsTest implements Serializable { static class UserException extends RuntimeException { @@ -43,7 +45,7 @@ static class UserException extends RuntimeException { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java similarity index 81% rename from runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java rename to runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java index 9cace83ed294..bf18486d599e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java @@ -19,19 +19,21 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + import org.junit.Assert; import org.junit.Test; -public class TestSparkPipelineOptionsFactory { +public class SparkPipelineOptionsTest { @Test public void testDefaultCreateMethod() { - SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class); Assert.assertEquals("local[1]", actualOptions.getSparkMaster()); } @Test public void testSettingCustomOptions() { - SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class); actualOptions.setSparkMaster("spark://207.184.161.138:7077"); Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster()); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index f07fa0b9c6fa..de4a5d23471e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -18,15 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; - -import com.google.api.client.repackaged.com.google.common.base.Joiner; -import com.google.common.base.Charsets; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -34,6 +25,10 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.repackaged.com.google.common.base.Joiner; +import com.google.common.base.Charsets; + import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; @@ -41,6 +36,13 @@ import org.junit.Test; import org.junit.rules.TestName; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + /** * A test for the transforms registered in TransformTranslator. * Builds a regular Dataflow pipeline with each of the mapped diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 5134b5c237ee..0db8913fb1d0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -18,10 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.util.Arrays; -import java.util.List; - -import com.google.common.collect.ImmutableList; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -34,9 +30,15 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableList; + import org.joda.time.Duration; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class WindowedWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 17f7fbd19afb..9152d72b9131 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -17,10 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; @@ -28,6 +24,7 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.View; @@ -35,9 +32,14 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; + import org.joda.time.Duration; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * Test Flatten (union) implementation for streaming. */ @@ -57,7 +59,8 @@ public class FlattenStreamingTest { @Test public void testRun() throws Exception { - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + SparkStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index a0c8a4e3e505..e1ff227d5d0e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -17,15 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import kafka.serializer.StringDecoder; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; @@ -35,6 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -43,6 +35,10 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; @@ -52,6 +48,13 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import kafka.serializer.StringDecoder; /** * Test Kafka as input. */ @@ -92,7 +95,8 @@ public static void init() throws IOException { @Test public void testRun() throws Exception { // test read from Kafka - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + SparkStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index de554e2dfee4..ef224daea1ba 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -17,12 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -31,14 +25,23 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + import org.joda.time.Duration; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + public class SimpleStreamingWordCountTest { private static final String[] WORDS_ARRAY = { @@ -51,7 +54,8 @@ public class SimpleStreamingWordCountTest { @Test public void testRun() throws Exception { - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + SparkStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval @@ -72,4 +76,4 @@ public void testRun() throws Exception { PAssertStreaming.assertNoFailures(res); } -} \ No newline at end of file +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java index 21c8115bebe2..2ade467e10a2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.spark.translation.streaming.utils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -31,11 +37,6 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.Time; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * https://gist.github.com/fjavieralba/7930018 diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 3f507980bc29..041cc5061a39 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming.utils; import org.apache.beam.runners.spark.EvaluationResult; + import org.junit.Assert; /**