diff --git a/runners/spark/README.md b/runners/spark/README.md index 1d75b3519fc0..5b2e73232ec3 100644 --- a/runners/spark/README.md +++ b/runners/spark/README.md @@ -93,7 +93,7 @@ Switch to the Spark runner directory: Then run the [word count example][wc] from the SDK using a single threaded Spark instance in local mode: - mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount \ + mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \ -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \ -DsparkMaster=local @@ -104,7 +104,7 @@ Check the output by running: __Note: running examples using `mvn exec:exec` only works for Spark local mode at the moment. See the next section for how to run on a cluster.__ -[wc]: https://github.com/apache/incubator-beam/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java +[wc]: https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ## Running on a Cluster Spark Beam pipelines can be run on a cluster using the `spark-submit` command. @@ -117,7 +117,7 @@ Then run the word count example using Spark submit with the `yarn-client` master (`yarn-cluster` works just as well): spark-submit \ - --class com.google.cloud.dataflow.examples.WordCount \ + --class org.apache.beam.examples.WordCount \ --master yarn-client \ target/spark-runner-*-spark-app.jar \ --inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index f12d8a6a3c17..5ccaec5b6bc1 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -70,6 +70,12 @@ guava ${guava.version} + + com.google.auto.service + auto-service + 1.0-rc2 + true + org.apache.beam java-sdk-all diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java new file mode 100644 index 000000000000..30142f9966fb --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the + * {@link SparkPipelineRunner}. + * + * {@link AutoService} will register Spark's implementations of the {@link PipelineRunner} + * and {@link PipelineOptions} as available pipeline runner services. + */ +public final class SparkRunnerRegistrar { + private SparkRunnerRegistrar() {} + + /** + * Registers the {@link SparkPipelineRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of(SparkPipelineRunner.class); + } + } + + /** + * Registers the {@link SparkPipelineOptions} and {@link SparkStreamingPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of( + SparkPipelineOptions.class, + SparkStreamingPipelineOptions.class); + } + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java deleted file mode 100644 index c882d7b84820..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsRegistrar.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -public class SparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(SparkPipelineOptions.class); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java deleted file mode 100644 index 38993fb8543e..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineRunnerRegistrar.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -public class SparkPipelineRunnerRegistrar implements PipelineRunnerRegistrar { - @Override - public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(SparkPipelineRunner.class); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java deleted file mode 100644 index 2e3509837e9d..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(SparkStreamingPipelineOptions - .class); - } -} diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar deleted file mode 100644 index e4a3a737425b..000000000000 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar +++ /dev/null @@ -1,17 +0,0 @@ -# -# Copyright 2014 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.beam.runners.spark.translation.SparkPipelineOptionsRegistrar -org.apache.beam.runners.spark.translation.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar deleted file mode 100644 index 7949db444cad..000000000000 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar +++ /dev/null @@ -1,16 +0,0 @@ -# -# Copyright 2014 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.beam.runners.spark.translation.SparkPipelineRunnerRegistrar \ No newline at end of file diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java new file mode 100644 index 000000000000..3643bacee230 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ServiceLoader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Test {@link SparkRunnerRegistrar}. + */ +@RunWith(JUnit4.class) +public class SparkRunnerRegistrarTest { + @Test + public void testOptions() { + assertEquals( + ImmutableList.of(SparkPipelineOptions.class, SparkStreamingPipelineOptions.class), + new SparkRunnerRegistrar.Options().getPipelineOptions()); + } + + @Test + public void testRunners() { + assertEquals(ImmutableList.of(SparkPipelineRunner.class), + new SparkRunnerRegistrar.Runner().getPipelineRunners()); + } + + @Test + public void testServiceLoaderForOptions() { + for (PipelineOptionsRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof SparkRunnerRegistrar.Options) { + return; + } + } + fail("Expected to find " + SparkRunnerRegistrar.Options.class); + } + + @Test + public void testServiceLoaderForRunner() { + for (PipelineRunnerRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { + if (registrar instanceof SparkRunnerRegistrar.Runner) { + return; + } + } + fail("Expected to find " + SparkRunnerRegistrar.Runner.class); + } +}