From 4254749bf103c4bb6f68e316768c0aa46d9f7df0 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 5 May 2016 15:11:07 -0700 Subject: [PATCH] Configure RunnableOnService tests for Spark runner, batch mode --- runners/spark/pom.xml | 112 ++++++++++++------ .../runners/spark/SparkRunnerRegistrar.java | 3 +- .../spark/TestSparkPipelineRunner.java | 77 ++++++++++++ .../spark/SparkRunnerRegistrarTest.java | 2 +- 4 files changed, 155 insertions(+), 39 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e7d08342f1dc..747464e9cf09 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -37,6 +37,62 @@ 1.6.1 + + + jacoco + + + + org.jacoco + jacoco-maven-plugin + + + + + + + + runnable-on-service-tests + false + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + runnable-on-service-tests + + org.apache.beam.sdk.testing.RunnableOnService + none + true + + org.apache.beam:java-sdk-all + + + org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest + + + + [ + "--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner", + "--streaming=false" + ] + + true + + + + + + + + + + + org.apache.spark @@ -122,6 +178,25 @@ org.apache.beam beam-runners-direct-java 0.2.0-incubating-SNAPSHOT + + + + + org.apache.beam + beam-sdks-java-core + tests + test + + + org.slf4j + slf4j-jdk14 + + + + + + org.mockito + mockito-all test @@ -237,41 +312,4 @@ - - - jacoco - - - - org.jacoco - jacoco-maven-plugin - - - - - - - disable-runnable-on-service-tests - - true - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - runnable-on-service-tests - - true - - - - - - - - - diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index 9537ec6efc9e..baa2241a5515 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -43,7 +43,8 @@ private SparkRunnerRegistrar() {} public static class Runner implements PipelineRunnerRegistrar { @Override public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(SparkPipelineRunner.class); + return ImmutableList.>>of( + SparkPipelineRunner.class, TestSparkPipelineRunner.class); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java new file mode 100644 index 000000000000..d11d1c15cad0 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * The SparkPipelineRunner translate operations defined on a pipeline to a representation executable + * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow + * pipeline with the default options of a single threaded spark instance in local mode, we would do + * the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * EvaluationResult result = SparkPipelineRunner.create().run(p); + * } + * + * To create a pipeline runner to run against a different spark cluster, with a custom master url we + * would do the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + * options.setSparkMaster("spark://host:port"); + * EvaluationResult result = SparkPipelineRunner.create(options).run(p); + * } + * + * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} + */ +public final class TestSparkPipelineRunner extends PipelineRunner { + + private SparkPipelineRunner delegate; + + private TestSparkPipelineRunner(SparkPipelineOptions options) { + this.delegate = SparkPipelineRunner.fromOptions(options); + } + + public static TestSparkPipelineRunner fromOptions(PipelineOptions options) { + // Default options suffice to set it up as a test runner + SparkPipelineOptions sparkOptions = + PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); + return new TestSparkPipelineRunner(sparkOptions); + } + + @Override + public + OutputT apply(PTransform transform, InputT input) { + return delegate.apply(transform, input); + }; + + @Override + public EvaluationResult run(Pipeline pipeline) { + return delegate.run(pipeline); + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 88f4a06d50cb..d2e57aaeac78 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -47,7 +47,7 @@ public void testOptions() { @Test public void testRunners() { - assertEquals(ImmutableList.of(SparkPipelineRunner.class), + assertEquals(ImmutableList.of(SparkPipelineRunner.class, TestSparkPipelineRunner.class), new SparkRunnerRegistrar.Runner().getPipelineRunners()); }