From a1b77445b5c7d0c60415a9db8d943d6ede9da6b1 Mon Sep 17 00:00:00 2001 From: Sela Date: Fri, 22 Apr 2016 14:33:46 +0300 Subject: [PATCH 1/6] Remove specific registrar classes and service files --- .../SparkPipelineOptionsRegistrar.java | 31 ------------------ .../SparkPipelineRunnerRegistrar.java | 31 ------------------ ...parkStreamingPipelineOptionsRegistrar.java | 32 ------------------- ...aflow.sdk.options.PipelineOptionsRegistrar | 17 ---------- ...taflow.sdk.runners.PipelineRunnerRegistrar | 16 ---------- 5 files changed, 127 deletions(-) delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java delete mode 100644 runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar delete mode 100644 runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java deleted file mode 100644 index c882d7b84820..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -public class SparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(SparkPipelineOptions.class); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java deleted file mode 100644 index 38993fb8543e..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -public class SparkPipelineRunnerRegistrar implements PipelineRunnerRegistrar { - @Override - public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(SparkPipelineRunner.class); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java deleted file mode 100644 index 2e3509837e9d..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(SparkStreamingPipelineOptions - .class); - } -} diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar deleted file mode 100644 index e4a3a737425b..000000000000 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar +++ /dev/null @@ -1,17 +0,0 @@ -# -# Copyright 2014 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.beam.runners.spark.translation.SparkPipelineOptionsRegistrar -org.apache.beam.runners.spark.translation.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar deleted file mode 100644 index 7949db444cad..000000000000 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar +++ /dev/null @@ -1,16 +0,0 @@ -# -# Copyright 2014 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.beam.runners.spark.translation.SparkPipelineRunnerRegistrar \ No newline at end of file From 3f662069448bc38f03b735a8152e78181c469c54 Mon Sep 17 00:00:00 2001 From: Sela Date: Fri, 22 Apr 2016 14:34:53 +0300 Subject: [PATCH 2/6] Add dependency on AutoService --- runners/spark/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index f12d8a6a3c17..5ccaec5b6bc1 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -70,6 +70,12 @@ guava ${guava.version} + + com.google.auto.service + auto-service + 1.0-rc2 + true + org.apache.beam java-sdk-all From 5860a3c0762fe4f7a04eac4f7344fcd4b561c2da Mon Sep 17 00:00:00 2001 From: Sela Date: Fri, 22 Apr 2016 14:35:19 +0300 Subject: [PATCH 3/6] Add SparkRunnerRegistrar as a runner specific registrar that uses AutoService --- .../runners/spark/SparkRunnerRegistrar.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java new file mode 100644 index 000000000000..30142f9966fb --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the + * {@link SparkPipelineRunner}. + * + * {@link AutoService} will register Spark's implementations of the {@link PipelineRunner} + * and {@link PipelineOptions} as available pipeline runner services. + */ +public final class SparkRunnerRegistrar { + private SparkRunnerRegistrar() {} + + /** + * Registers the {@link SparkPipelineRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of(SparkPipelineRunner.class); + } + } + + /** + * Registers the {@link SparkPipelineOptions} and {@link SparkStreamingPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of( + SparkPipelineOptions.class, + SparkStreamingPipelineOptions.class); + } + } +} From bd65f2687b601794ea29efff330f9bdec1e62bce Mon Sep 17 00:00:00 2001 From: Sela Date: Fri, 22 Apr 2016 14:36:33 +0300 Subject: [PATCH 4/6] Add a unit test for the SparkRunnerRegistrar --- .../spark/SparkRunnerRegistrarTest.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java new file mode 100644 index 000000000000..d51403ffbf7b --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; +import org.junit.Test; + +import java.util.ServiceLoader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Test {@link SparkRunnerRegistrar}. + */ +public class SparkRunnerRegistrarTest { + @Test + public void testOptions() { + assertEquals( + ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class), + new SparkRunnerRegistrar.Options().getPipelineOptions()); + } + + @Test + public void testRunners() { + assertEquals(ImmutableList.of(SparkPipelineRunner.class), + new SparkRunnerRegistrar.Runner().getPipelineRunners()); + } + + @Test + public void testServiceLoaderForOptions() { + for (PipelineOptionsRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof SparkRunnerRegistrar.Options) { + return; + } + } + fail("Expected to find " + SparkRunnerRegistrar.Options.class); + } + + @Test + public void testServiceLoaderForRunner() { + for (PipelineRunnerRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { + if (registrar instanceof SparkRunnerRegistrar.Runner) { + return; + } + } + fail("Expected to find " + SparkRunnerRegistrar.Runner.class); + } +} From eb65c1565867ffae38033628cdd4b3b029535908 Mon Sep 17 00:00:00 2001 From: Sela Date: Thu, 21 Apr 2016 22:48:14 +0300 Subject: [PATCH 5/6] Update README according to dataflow->beam package rename --- runners/spark/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/spark/README.md b/runners/spark/README.md index 1d75b3519fc0..5b2e73232ec3 100644 --- a/runners/spark/README.md +++ b/runners/spark/README.md @@ -93,7 +93,7 @@ Switch to the Spark runner directory: Then run the [word count example][wc] from the SDK using a single threaded Spark instance in local mode: - mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount \ + mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \ -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \ -DsparkMaster=local @@ -104,7 +104,7 @@ Check the output by running: __Note: running examples using `mvn exec:exec` only works for Spark local mode at the moment. See the next section for how to run on a cluster.__ -[wc]: https://github.com/apache/incubator-beam/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java +[wc]: https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ## Running on a Cluster Spark Beam pipelines can be run on a cluster using the `spark-submit` command. @@ -117,7 +117,7 @@ Then run the word count example using Spark submit with the `yarn-client` master (`yarn-cluster` works just as well): spark-submit \ - --class com.google.cloud.dataflow.examples.WordCount \ + --class org.apache.beam.examples.WordCount \ --master yarn-client \ target/spark-runner-*-spark-app.jar \ --inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client From c92a522870b39d0cf96ff6551d0d9cf0135f1595 Mon Sep 17 00:00:00 2001 From: Sela Date: Sat, 23 Apr 2016 00:32:25 +0300 Subject: [PATCH 6/6] Annotate class with RunWith and some whitespace fix ups --- .../beam/runners/spark/SparkRunnerRegistrarTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index d51403ffbf7b..3643bacee230 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import java.util.ServiceLoader; @@ -32,12 +34,13 @@ /** * Test {@link SparkRunnerRegistrar}. */ +@RunWith(JUnit4.class) public class SparkRunnerRegistrarTest { @Test public void testOptions() { assertEquals( - ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class), - new SparkRunnerRegistrar.Options().getPipelineOptions()); + ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class), + new SparkRunnerRegistrar.Options().getPipelineOptions()); } @Test @@ -51,7 +54,7 @@ public void testServiceLoaderForOptions() { for (PipelineOptionsRegistrar registrar : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { if (registrar instanceof SparkRunnerRegistrar.Options) { - return; + return; } } fail("Expected to find " + SparkRunnerRegistrar.Options.class); @@ -62,7 +65,7 @@ public void testServiceLoaderForRunner() { for (PipelineRunnerRegistrar registrar : Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { if (registrar instanceof SparkRunnerRegistrar.Runner) { - return; + return; } } fail("Expected to find " + SparkRunnerRegistrar.Runner.class);