From 017da7bac3e844ef7391aabbcbaf86c9c99af968 Mon Sep 17 00:00:00 2001 From: Abbass MAROUNI Date: Mon, 29 Aug 2016 13:28:46 +0200 Subject: [PATCH] [BEAM-313] Provide a context for SparkRunner --- .../runners/spark/SparkPipelineOptions.java | 13 ++ .../beam/runners/spark/SparkRunner.java | 16 +- .../spark/ProvidedSparkContextTest.java | 138 ++++++++++++++++++ 3 files changed, 164 insertions(+), 3 deletions(-) create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index be4f7f06a9bc..db6b75cb4a03 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -18,11 +18,14 @@ package org.apache.beam.runners.spark; +import com.fasterxml.jackson.annotation.JsonIgnore; + import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.spark.api.java.JavaSparkContext; /** * Spark runner pipeline options. @@ -49,4 +52,14 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, @Default.Boolean(true) Boolean getEnableSparkSinks(); void setEnableSparkSinks(Boolean enableSparkSinks); + + @Description("If the spark runner will be initialized with a provided Spark Context") + @Default.Boolean(false) + boolean getUsesProvidedSparkContext(); + void setUsesProvidedSparkContext(boolean value); + + @Description("Provided Java Spark Context") + @JsonIgnore + JavaSparkContext getProvidedSparkContext(); + void setProvidedSparkContext(JavaSparkContext jsc); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index fa85a2e25e26..9f1a83996fb7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -143,9 +143,19 @@ private SparkRunner(SparkPipelineOptions options) { public EvaluationResult run(Pipeline pipeline) { try { LOG.info("Executing pipeline using the SparkRunner."); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), - mOptions.getAppName()); - + JavaSparkContext jsc; + if (mOptions.getUsesProvidedSparkContext()) { + LOG.info("Using a provided Spark Context"); + jsc = mOptions.getProvidedSparkContext(); + if (jsc == null || jsc.sc().isStopped()){ + LOG.error("The provided Spark context " + + jsc + " was not created or was stopped"); + throw new RuntimeException("The provided Spark context was not created or was stopped"); + } + } else { + LOG.info("Creating a new Spark Context"); + jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), mOptions.getAppName()); + } if (mOptions.isStreaming()) { SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java new file mode 100644 index 000000000000..cbc5976b9542 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.Test; + +/** + * Provided Spark Context tests. + */ +public class ProvidedSparkContextTest { + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + private static final List WORDS = Arrays.asList(WORDS_ARRAY); + private static final Set EXPECTED_COUNT_SET = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + private static final String PROVIDED_CONTEXT_EXCEPTION = + "The provided Spark context was not created or was stopped"; + + /** + * Provide a context and call pipeline run. + * @throws Exception + */ + @Test + public void testWithProvidedContext() throws Exception { + JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); + + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); + PCollection output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + // Run test from pipeline + p.run(); + + jsc.stop(); + } + + /** + * Provide a context and call pipeline run. + * @throws Exception + */ + @Test + public void testWithNullContext() throws Exception { + JavaSparkContext jsc = null; + + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); + PCollection output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + try { + p.run(); + fail("Should throw an exception when The provided Spark context is null"); + } catch (RuntimeException e){ + assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); + } + } + + /** + * A SparkRunner with a stopped provided Spark context cannot run pipelines. + * @throws Exception + */ + @Test + public void testWithStoppedProvidedContext() throws Exception { + JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); + // Stop the provided Spark context directly + jsc.stop(); + + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); + PCollection output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + try { + p.run(); + fail("Should throw an exception when The provided Spark context is stopped"); + } catch (RuntimeException e){ + assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); + } + } + +}