diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index 99da74ffdbfd..2b33e7acc965 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -21,7 +21,6 @@ import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.SparkProcessContext; import org.apache.beam.runners.spark.translation.TransformTranslator; @@ -30,6 +29,7 @@ import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; @@ -40,6 +40,7 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; + import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; @@ -85,7 +86,8 @@ public final class SparkPipelineRunner extends PipelineRunner * @return A pipeline runner with default options. */ public static SparkPipelineRunner create() { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); return new SparkPipelineRunner(options); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java deleted file mode 100644 index fe89ee399c08..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -public final class SparkPipelineOptionsFactory { - private SparkPipelineOptionsFactory() { - } - - public static SparkPipelineOptions create() { - return PipelineOptionsFactory.as(SparkPipelineOptions.class); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java deleted file mode 100644 index 4abf8160ee46..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -public final class SparkStreamingPipelineOptionsFactory { - - private SparkStreamingPipelineOptionsFactory() { - } - - public static SparkStreamingPipelineOptions create() { - return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); - } -} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index d13404cd9649..0b48bed095f2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -18,20 +18,22 @@ package org.apache.beam.runners.spark; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + /** * A test based on {@code DeDupExample} from the SDK. */ @@ -46,7 +48,7 @@ public class DeDupTest { @Test public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index eeca10f778b6..f9b00ccf2a6c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -20,24 +20,27 @@ import static org.junit.Assert.assertEquals; -import java.util.Collections; -import java.util.List; - -import com.google.common.collect.Iterables; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.Iterables; + import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class EmptyInputTest { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); List empty = Collections.emptyList(); PCollection inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index c413b3fe060f..faa4dbfd80bc 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -18,15 +18,9 @@ package org.apache.beam.runners.spark; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -import com.google.common.collect.ImmutableSet; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -37,8 +31,16 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + public class SimpleWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", @@ -49,7 +51,7 @@ public class SimpleWordCountTest { @Test public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index 4e80fe93d84e..00c4657f5f16 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.spark; -import java.net.URI; -import java.util.Arrays; - import org.apache.beam.examples.complete.TfIdf; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; @@ -31,8 +28,12 @@ import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + import org.junit.Test; +import java.net.URI; +import java.util.Arrays; + /** * A test based on {@code TfIdf} from the SDK. */ @@ -40,7 +41,9 @@ public class TfIdfTest { @Test public void testTfIdf() throws Exception { - Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create()); + SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class); + opts.setRunner(SparkPipelineRunner.class); + Pipeline pipeline = Pipeline.create(opts); pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java index 538fd97a8bb0..f2bd4d3e76fa 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.coders; import org.apache.beam.sdk.testing.CoderProperties; + import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.junit.Test; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index f8f96245dab3..693e2c65f5b0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -20,13 +20,16 @@ import static org.junit.Assert.assertEquals; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.Lists; import com.google.common.io.Resources; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; @@ -34,17 +37,16 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.PCollection; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + public class AvroPipelineTest { private File inputFile; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 37a7d54cc024..85eeabd822e3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -21,32 +21,34 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import com.google.common.base.Charsets; -import com.google.common.collect.Sets; -import com.google.common.io.Files; import org.apache.beam.examples.WordCount; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.base.Charsets; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + public class NumShardsTest { private static final String[] WORDS_ARRAY = { @@ -67,7 +69,7 @@ public void setUp() throws IOException { @Test public void testText() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index b7df7391f3ba..0c8c6fc89eb8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -20,9 +20,6 @@ import static org.junit.Assert.assertEquals; -import java.io.File; -import java.io.IOException; - import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.runners.spark.coders.WritableCoder; @@ -31,6 +28,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -46,6 +44,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; + public class HadoopFileFormatPipelineTest { private File inputFile; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java index eecfd581b13f..55991a420a66 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber; + import static org.junit.Assert.assertEquals; import org.junit.Test; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java index bd90aae28472..a6446738f74c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java @@ -20,20 +20,23 @@ import static org.junit.Assert.assertEquals; -import java.util.Arrays; -import java.util.List; - -import com.google.common.collect.Iterables; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.Iterables; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class CombineGloballyTest { private static final String[] WORDS_ARRAY = { @@ -43,7 +46,8 @@ public class CombineGloballyTest { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); PCollection output = inputWords.apply(Combine.globally(new WordMerger())); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 6cf93300c543..4e0bc5d3c7a0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -18,11 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.ImmutableList; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; @@ -37,9 +32,16 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableList; + import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class CombinePerKeyTest { private static final List WORDS = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index 80b5fc5903af..ca97a96590d7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -18,23 +18,25 @@ package org.apache.beam.runners.spark.translation; -import java.io.Serializable; - import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; + import org.junit.Test; +import java.io.Serializable; + public class DoFnOutputTest implements Serializable { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 1450a8febf6f..6a862c9350fe 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -18,10 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; @@ -47,9 +43,15 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + import org.junit.Assert; import org.junit.Test; +import java.util.Set; + public class MultiOutputWordCountTest { private static final TupleTag upper = new TupleTag<>(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 2237ea226416..75d3fb2d2e94 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -18,17 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -36,6 +25,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -46,8 +36,21 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + import org.junit.Test; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + public class SerializationTest { public static class StringHolder { // not serializable @@ -117,7 +120,7 @@ public StringHolder apply(String s) { @Test public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index b85d9350cbbe..14abbfc978ee 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -22,20 +22,22 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.Serializable; -import java.net.URI; - import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; + import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.Serializable; +import java.net.URI; + public class SideEffectsTest implements Serializable { static class UserException extends RuntimeException { @@ -43,7 +45,7 @@ static class UserException extends RuntimeException { @Test public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java similarity index 81% rename from runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java rename to runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java index 9cace83ed294..bf18486d599e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java @@ -19,19 +19,21 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + import org.junit.Assert; import org.junit.Test; -public class TestSparkPipelineOptionsFactory { +public class SparkPipelineOptionsTest { @Test public void testDefaultCreateMethod() { - SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class); Assert.assertEquals("local[1]", actualOptions.getSparkMaster()); } @Test public void testSettingCustomOptions() { - SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); + SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class); actualOptions.setSparkMaster("spark://207.184.161.138:7077"); Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster()); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index f07fa0b9c6fa..de4a5d23471e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -18,15 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; - -import com.google.api.client.repackaged.com.google.common.base.Joiner; -import com.google.common.base.Charsets; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -34,6 +25,10 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.repackaged.com.google.common.base.Joiner; +import com.google.common.base.Charsets; + import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; @@ -41,6 +36,13 @@ import org.junit.Test; import org.junit.rules.TestName; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + /** * A test for the transforms registered in TransformTranslator. * Builds a regular Dataflow pipeline with each of the mapped diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 5134b5c237ee..0db8913fb1d0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -18,10 +18,6 @@ package org.apache.beam.runners.spark.translation; -import java.util.Arrays; -import java.util.List; - -import com.google.common.collect.ImmutableList; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -34,9 +30,15 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableList; + import org.joda.time.Duration; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class WindowedWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 17f7fbd19afb..9152d72b9131 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -17,10 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; @@ -28,6 +24,7 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.View; @@ -35,9 +32,14 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; + import org.joda.time.Duration; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * Test Flatten (union) implementation for streaming. */ @@ -57,7 +59,8 @@ public class FlattenStreamingTest { @Test public void testRun() throws Exception { - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + SparkStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index a0c8a4e3e505..e1ff227d5d0e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -17,15 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import kafka.serializer.StringDecoder; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; @@ -35,6 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -43,6 +35,10 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; @@ -52,6 +48,13 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import kafka.serializer.StringDecoder; /** * Test Kafka as input. */ @@ -92,7 +95,8 @@ public static void init() throws IOException { @Test public void testRun() throws Exception { // test read from Kafka - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + SparkStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index de554e2dfee4..ef224daea1ba 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -17,12 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -31,14 +25,23 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; + import org.joda.time.Duration; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + public class SimpleStreamingWordCountTest { private static final String[] WORDS_ARRAY = { @@ -51,7 +54,8 @@ public class SimpleStreamingWordCountTest { @Test public void testRun() throws Exception { - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + SparkStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); options.setRunner(SparkPipelineRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval @@ -72,4 +76,4 @@ public void testRun() throws Exception { PAssertStreaming.assertNoFailures(res); } -} \ No newline at end of file +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java index 21c8115bebe2..2ade467e10a2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.spark.translation.streaming.utils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -31,11 +37,6 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.Time; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * https://gist.github.com/fjavieralba/7930018 diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 3f507980bc29..041cc5061a39 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming.utils; import org.apache.beam.runners.spark.EvaluationResult; + import org.junit.Assert; /**