From bd24635205ae7491e30593fe814b58a791693dcd Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Wed, 20 Apr 2022 11:22:53 +0200 Subject: [PATCH 1/5] [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1. --- runners/spark/spark_runner.gradle | 9 +- .../runners/spark/SparkRunnerDebugger.java | 6 +- .../translation/SparkContextFactory.java | 4 + .../apache/beam/runners/spark/CacheTest.java | 25 +-- .../spark/GlobalWatermarkHolderTest.java | 19 +-- .../spark/ProvidedSparkContextTest.java | 94 +++++----- ...Rule.java => SparkContextOptionsRule.java} | 35 ++-- .../beam/runners/spark/SparkContextRule.java | 82 +++++++++ .../runners/spark/SparkPipelineStateTest.java | 161 ++++++------------ .../spark/SparkRunnerDebuggerTest.java | 15 +- .../metrics/sink/SparkMetricsSinkTest.java | 2 - .../SparkRunnerKryoRegistratorTest.java | 128 +++++++------- .../spark/metrics/SparkMetricsPusherTest.java | 3 - .../structuredstreaming/SparkSessionRule.java | 64 +++++++ .../helpers/EncoderHelpersTest.java | 14 +- .../streaming/CreateStreamTest.java | 2 - .../ResumeFromCheckpointStreamingTest.java | 4 - .../SparkCoGroupByKeyStreamingTest.java | 3 - .../streaming/TrackStreamingSourcesTest.java | 23 +-- 19 files changed, 372 insertions(+), 321 deletions(-) rename runners/spark/src/test/java/org/apache/beam/runners/spark/{ReuseSparkContextRule.java => SparkContextOptionsRule.java} (56%) create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 3344c42fe3f6..e07819b954dd 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -113,17 +113,14 @@ test { jvmArgs System.getProperty("beamSurefireArgline") } - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 maxParallelForks 4 useJUnit { excludeCategories "org.apache.beam.runners.spark.StreamingTest" excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" } - filter { - // BEAM-11653 MetricsSinkTest is failing with Spark 3 - excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest' - } + + // easily re-run all tests (to deal with flaky tests / SparkContext leaks) + if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} } } dependencies { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java index 65f83f5e8195..0474ca580a44 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeoutException; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; @@ -86,7 +87,8 @@ public SparkPipelineResult run(Pipeline pipeline) { SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } - JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline"); + JavaSparkContext jsc = + SparkContextFactory.getSparkContext(pipeline.getOptions().as(SparkPipelineOptions.class)); JavaStreamingContext jssc = new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000)); @@ -107,7 +109,7 @@ public SparkPipelineResult run(Pipeline pipeline) { pipeline.traverseTopologically(visitor); - jsc.stop(); + SparkContextFactory.stopSparkContext(jsc); String debugString = visitor.getDebugString(); LOG.info("Translated Native Spark pipeline:\n" + debugString); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 61cf3afed9ca..173772d3f618 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -35,7 +35,11 @@ public final class SparkContextFactory { /** * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark * context will be reused for beam pipelines. This property should only be enabled for tests. + * + * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle + * control to not leak your SparkContext. */ + @Deprecated public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; // Spark allows only one context for JVM so this can be static. diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java index 8209d4302717..288c49652598 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java @@ -25,11 +25,9 @@ import java.util.List; import org.apache.beam.runners.spark.translation.Dataset; import org.apache.beam.runners.spark.translation.EvaluationContext; -import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Create.Values; @@ -39,7 +37,7 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.spark.api.java.JavaSparkContext; +import org.junit.ClassRule; import org.junit.Test; /** Tests of {@link Dataset#cache(String, Coder)}} scenarios. */ @@ -48,13 +46,15 @@ }) public class CacheTest { + @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); + /** * Test checks how the cache candidates map is populated by the runner when evaluating the * pipeline. */ @Test public void cacheCandidatesUpdaterTest() { - SparkPipelineOptions options = createOptions(); + SparkPipelineOptions options = contextRule.createPipelineOptions(TestSparkRunner.class); Pipeline pipeline = Pipeline.create(options); PCollection pCollection = pipeline.apply(Create.of("foo", "bar")); @@ -80,8 +80,8 @@ public void processElement(ProcessContext processContext) { }) .withSideInputs(view)); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options); + EvaluationContext ctxt = + new EvaluationContext(contextRule.getSparkContext(), pipeline, options); SparkRunner.CacheVisitor cacheVisitor = new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt); pipeline.traverseTopologically(cacheVisitor); @@ -91,15 +91,15 @@ public void processElement(ProcessContext processContext) { @Test public void shouldCacheTest() { - SparkPipelineOptions options = createOptions(); + SparkPipelineOptions options = contextRule.createPipelineOptions(TestSparkRunner.class); options.setCacheDisabled(true); Pipeline pipeline = Pipeline.create(options); Values valuesTransform = Create.of("foo", "bar"); PCollection pCollection = mock(PCollection.class); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options); + EvaluationContext ctxt = + new EvaluationContext(contextRule.getSparkContext(), pipeline, options); ctxt.getCacheCandidates().put(pCollection, 2L); assertFalse(ctxt.shouldCache(valuesTransform, pCollection)); @@ -110,11 +110,4 @@ public void shouldCacheTest() { GroupByKey gbkTransform = GroupByKey.create(); assertFalse(ctxt.shouldCache(gbkTransform, pCollection)); } - - private SparkPipelineOptions createOptions() { - SparkPipelineOptions options = - PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); - options.setRunner(TestSparkRunner.class); - return options; - } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java index 7bcff9875db6..a4dc6afd9c45 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java @@ -20,13 +20,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; -import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.RegexMatcher; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -34,27 +33,21 @@ /** A test suite for the propagation of watermarks in the Spark runner. */ public class GlobalWatermarkHolderTest { + // Watermark holder requires valid SparkEnv + @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); + @Rule public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule(); @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule public ReuseSparkContextRule reuseContext = ReuseSparkContextRule.yes(); - - // only needed in-order to get context from the SparkContextFactory. - private static final SparkPipelineOptions options = - PipelineOptionsFactory.create().as(SparkPipelineOptions.class); - private static final String INSTANT_PATTERN = "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z"; @Test public void testLowHighWatermarksAdvance() { - Instant instant = new Instant(0); // low == high. - SparkContextFactory.getSparkContext(options); - GlobalWatermarkHolder.add( 1, new SparkWatermarks( @@ -98,7 +91,7 @@ public void testLowHighWatermarksAdvance() { @Test public void testSynchronizedTimeMonotonic() { Instant instant = new Instant(0); - SparkContextFactory.getSparkContext(options); + GlobalWatermarkHolder.add( 1, new SparkWatermarks( @@ -119,7 +112,7 @@ public void testSynchronizedTimeMonotonic() { @Test public void testMultiSource() { Instant instant = new Instant(0); - SparkContextFactory.getSparkContext(options); + GlobalWatermarkHolder.add( 1, new SparkWatermarks( diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index 4a57ade09cb5..e81a48a0b058 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -18,13 +18,12 @@ package org.apache.beam.runners.spark; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -32,10 +31,18 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; -import org.apache.spark.api.java.JavaSparkContext; +import org.junit.ClassRule; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; -/** Provided Spark Context tests. */ +/** + * Provided Spark Context tests. + * + *

Note: These tests are run sequentially ordered by their name to reuse the Spark context and + * speed up testing. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ProvidedSparkContextTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", @@ -47,72 +54,51 @@ public class ProvidedSparkContextTest { private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped"; + @ClassRule + public static SparkContextOptionsRule contextRule = + new SparkContextOptionsRule(TestSparkRunner.class); + /** Provide a context and call pipeline run. */ @Test - public void testWithProvidedContext() throws Exception { - JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - testWithValidProvidedContext(jsc); + public void testAWithProvidedContext() throws Exception { + Pipeline p = createPipeline(); + PipelineResult result = p.run(); // Run test from pipeline + result.waitUntilFinish(); + TestPipeline.verifyPAssertsSucceeded(p, result); // A provided context must not be stopped after execution - assertFalse(jsc.sc().isStopped()); - jsc.stop(); + assertFalse(contextRule.getSparkContext().sc().isStopped()); } - /** Provide a context and call pipeline run. */ + /** A SparkRunner with a stopped provided Spark context cannot run pipelines. */ @Test - public void testWithNullContext() throws Exception { - testWithInvalidContext(null); + public void testBWithStoppedProvidedContext() { + // Stop the provided Spark context + contextRule.getSparkContext().sc().stop(); + assertThrows( + PROVIDED_CONTEXT_EXCEPTION, + RuntimeException.class, + () -> createPipeline().run().waitUntilFinish()); } - /** A SparkRunner with a stopped provided Spark context cannot run pipelines. */ + /** Provide a context and call pipeline run. */ @Test - public void testWithStoppedProvidedContext() throws Exception { - JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - // Stop the provided Spark context directly - jsc.stop(); - testWithInvalidContext(jsc); + public void testCWithNullContext() { + contextRule.getOptions().setProvidedSparkContext(null); + assertThrows( + PROVIDED_CONTEXT_EXCEPTION, + RuntimeException.class, + () -> createPipeline().run().waitUntilFinish()); } - private void testWithValidProvidedContext(JavaSparkContext jsc) throws Exception { - SparkContextOptions options = getSparkContextOptions(jsc); - - Pipeline p = Pipeline.create(options); + private Pipeline createPipeline() { + Pipeline p = Pipeline.create(contextRule.getOptions()); PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection output = inputWords .apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); - - PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - // Run test from pipeline - PipelineResult result = p.run(); - - TestPipeline.verifyPAssertsSucceeded(p, result); - } - - private void testWithInvalidContext(JavaSparkContext jsc) { - SparkContextOptions options = getSparkContextOptions(jsc); - - Pipeline p = Pipeline.create(options); - PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); - inputWords - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); - - try { - p.run().waitUntilFinish(); - fail("Should throw an exception when The provided Spark context is null or stopped"); - } catch (RuntimeException e) { - assert e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION); - } - } - - private static SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { - final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(TestSparkRunner.class); - options.setUsesProvidedSparkContext(true); - options.setProvidedSparkContext(jsc); - options.setEnableSparkMetricSinks(false); - return options; + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + return p; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java similarity index 56% rename from runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java rename to runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java index 54b77448f78a..515abd37a32e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java @@ -17,28 +17,31 @@ */ package org.apache.beam.runners.spark; -import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.junit.rules.ExternalResource; +import javax.annotation.Nullable; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.values.KV; -/** Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) in tests. */ -public class ReuseSparkContextRule extends ExternalResource { +public class SparkContextOptionsRule extends SparkContextRule { - private final boolean reuse; + private Class> runner; + private @Nullable SparkContextOptions contextOptions = null; - private ReuseSparkContextRule(boolean reuse) { - this.reuse = reuse; - } - - public static ReuseSparkContextRule no() { - return new ReuseSparkContextRule(false); - } - - public static ReuseSparkContextRule yes() { - return new ReuseSparkContextRule(true); + public SparkContextOptionsRule( + Class> runner, KV... sparkConfig) { + super(sparkConfig); + this.runner = runner; } @Override protected void before() throws Throwable { - System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, Boolean.toString(reuse)); + super.before(); + contextOptions = createPipelineOptions(runner); + } + + public SparkContextOptions getOptions() { + if (contextOptions == null) { + throw new IllegalStateException("SparkContextOptions not available"); + } + return contextOptions; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java new file mode 100644 index 000000000000..f93a7d56cbc9 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +import static java.util.stream.Collectors.toMap; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.rules.ExternalResource; + +public class SparkContextRule extends ExternalResource implements Serializable { + private transient SparkConf sparkConf; + private transient @Nullable JavaSparkContext sparkContext = null; + + public SparkContextRule(String sparkMaster, Map sparkConfig) { + sparkConf = new SparkConf(); + sparkConfig.forEach(sparkConf::set); + sparkConf.setMaster(sparkMaster).setAppName(sparkMaster); + } + + public SparkContextRule(KV... sparkConfig) { + this("local", sparkConfig); + } + + public SparkContextRule(String sparkMaster, KV... sparkConfig) { + this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, KV::getValue))); + } + + public JavaSparkContext getSparkContext() { + if (sparkContext == null) { + throw new IllegalStateException("SparkContext not available"); + } + return sparkContext; + } + + public SparkContextOptions createPipelineOptions( + Class> runner, String... options) { + PipelineOptions opts = PipelineOptionsFactory.fromArgs(options).create(); + opts.as(SparkPipelineOptions.class).setRunner(runner); + return configurePipelineOptions(opts); + } + + public SparkContextOptions configurePipelineOptions(PipelineOptions opts) { + SparkContextOptions ctxOpts = opts.as(SparkContextOptions.class); + ctxOpts.setUsesProvidedSparkContext(true); + ctxOpts.setProvidedSparkContext(sparkContext); + return ctxOpts; + } + + @Override + protected void before() throws Throwable { + sparkContext = new JavaSparkContext(sparkConf); + } + + @Override + protected void after() { + getSparkContext().stop(); + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index b48f553d8fc5..28223ca9b644 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -20,190 +20,135 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.fail; +import static org.joda.time.Duration.millis; +import static org.junit.Assert.assertThrows; import java.io.Serializable; +import javax.annotation.Nullable; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; -import org.junit.rules.TestName; /** This suite tests that various scenarios result in proper states of the pipeline. */ public class SparkPipelineStateTest implements Serializable { - private static class MyCustomException extends RuntimeException { + @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); + private static class MyCustomException extends RuntimeException { MyCustomException(final String message) { super(message); } } - private final transient SparkPipelineOptions options = - PipelineOptionsFactory.as(SparkPipelineOptions.class); - - @Rule public transient TestName testName = new TestName(); - - private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally"; - - private ParDo.SingleOutput printParDo(final String prefix) { - return ParDo.of( - new DoFn() { - - @ProcessElement - public void processElement(final ProcessContext c) { - System.out.println(prefix + " " + c.element()); - } - }); - } - - private PTransform> getValues(final SparkPipelineOptions options) { - final boolean doNotSyncWithWatermark = false; - return options.isStreaming() - ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark) - .nextBatch("one", "two") - : Create.of("one", "two"); - } - - private SparkPipelineOptions getStreamingOptions() { - options.setRunner(SparkRunner.class); - options.setStreaming(true); - return options; - } - - private SparkPipelineOptions getBatchOptions() { - options.setRunner(SparkRunner.class); - options.setStreaming(false); // explicit because options is reused throughout the test. - return options; + private static class FailAlways extends SimpleFunction { + @Override + public String apply(final String input) { + throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY); + } } - private Pipeline getPipeline(final SparkPipelineOptions options) { + private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally"; - final Pipeline pipeline = Pipeline.create(options); - final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")"; + private Pipeline createPipeline( + boolean isStreaming, @Nullable SimpleFunction mapFun) { + SparkContextOptions options = contextRule.createPipelineOptions(SparkRunner.class); + options.setStreaming(isStreaming); - pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name)); + Pipeline pipeline = Pipeline.create(options); + PTransform> values = + isStreaming + ? CreateStream.of(StringUtf8Coder.of(), millis(1), false).nextBatch("one", "two") + : Create.of("one", "two"); + PCollection collection = pipeline.apply(values).setCoder(StringUtf8Coder.of()); + if (mapFun != null) { + collection.apply(MapElements.via(mapFun)); + } return pipeline; } - private void testFailedPipeline(final SparkPipelineOptions options) throws Exception { - - SparkPipelineResult result = null; - - try { - final Pipeline pipeline = Pipeline.create(options); - pipeline - .apply(getValues(options)) - .setCoder(StringUtf8Coder.of()) - .apply( - MapElements.via( - new SimpleFunction() { - - @Override - public String apply(final String input) { - throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY); - } - })); - - result = (SparkPipelineResult) pipeline.run(); - result.waitUntilFinish(); - } catch (final Exception e) { - assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class)); - assertThat(e.getCause(), instanceOf(MyCustomException.class)); - assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY)); - assertThat(result.getState(), is(PipelineResult.State.FAILED)); - result.cancel(); - return; - } + private void testFailedPipeline(boolean isStreaming) throws Exception { + Pipeline pipeline = createPipeline(isStreaming, new FailAlways()); + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); - fail("An injected failure did not affect the pipeline as expected."); + PipelineExecutionException e = + assertThrows(PipelineExecutionException.class, () -> result.waitUntilFinish()); + assertThat(e.getCause(), instanceOf(MyCustomException.class)); + assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY)); + assertThat(result.getState(), is(PipelineResult.State.FAILED)); + result.cancel(); } - private void testTimeoutPipeline(final SparkPipelineOptions options) throws Exception { - - final Pipeline pipeline = getPipeline(options); - - final SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); - - result.waitUntilFinish(Duration.millis(1)); + private void testTimeoutPipeline(boolean isStreaming) throws Exception { + Pipeline pipeline = createPipeline(isStreaming, null); + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); + result.waitUntilFinish(millis(1)); assertThat(result.getState(), is(PipelineResult.State.RUNNING)); - result.cancel(); } - private void testCanceledPipeline(final SparkPipelineOptions options) throws Exception { - - final Pipeline pipeline = getPipeline(options); - - final SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); - + private void testCanceledPipeline(boolean isStreaming) throws Exception { + Pipeline pipeline = createPipeline(isStreaming, null); + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); result.cancel(); - assertThat(result.getState(), is(PipelineResult.State.CANCELLED)); } - private void testRunningPipeline(final SparkPipelineOptions options) throws Exception { - - final Pipeline pipeline = getPipeline(options); - - final SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); - + private void testRunningPipeline(boolean isStreaming) throws Exception { + Pipeline pipeline = createPipeline(isStreaming, null); + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); assertThat(result.getState(), is(PipelineResult.State.RUNNING)); - result.cancel(); } @Test public void testStreamingPipelineRunningState() throws Exception { - testRunningPipeline(getStreamingOptions()); + testRunningPipeline(true); } @Test public void testBatchPipelineRunningState() throws Exception { - testRunningPipeline(getBatchOptions()); + testRunningPipeline(false); } @Test public void testStreamingPipelineCanceledState() throws Exception { - testCanceledPipeline(getStreamingOptions()); + testCanceledPipeline(true); } @Test public void testBatchPipelineCanceledState() throws Exception { - testCanceledPipeline(getBatchOptions()); + testCanceledPipeline(false); } @Test public void testStreamingPipelineFailedState() throws Exception { - testFailedPipeline(getStreamingOptions()); + testFailedPipeline(true); } @Test public void testBatchPipelineFailedState() throws Exception { - testFailedPipeline(getBatchOptions()); + testFailedPipeline(false); } @Test public void testStreamingPipelineTimeoutState() throws Exception { - testTimeoutPipeline(getStreamingOptions()); + testTimeoutPipeline(true); } @Test public void testBatchPipelineTimeoutState() throws Exception { - testTimeoutPipeline(getBatchOptions()); + testTimeoutPipeline(false); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index c9bb83dd0c34..4908b89216eb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; @@ -49,6 +48,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.hamcrest.Matchers; import org.joda.time.Duration; +import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -57,11 +57,11 @@ @RunWith(JUnit4.class) public class SparkRunnerDebuggerTest { + @ClassRule public static SparkContextRule contextRule = new SparkContextRule("local[1]"); + @Test public void debugBatchPipeline() { - PipelineOptions options = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); - options.setRunner(SparkRunnerDebugger.class); - + PipelineOptions options = contextRule.createPipelineOptions(SparkRunnerDebugger.class); Pipeline pipeline = Pipeline.create(options); PCollection lines = @@ -105,11 +105,8 @@ public void debugBatchPipeline() { @Test public void debugStreamingPipeline() { - TestSparkPipelineOptions options = - PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); - options.setForceStreaming(true); - options.setRunner(SparkRunnerDebugger.class); - + PipelineOptions options = contextRule.createPipelineOptions(SparkRunnerDebugger.class); + options.as(TestSparkPipelineOptions.class).setForceStreaming(true); Pipeline pipeline = Pipeline.create(options); KafkaIO.Read read = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index 7439ebfeb726..b1a5e7c0f5c9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.runners.spark.examples.WordCount; @@ -51,7 +50,6 @@ public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create(); - @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); private static final ImmutableList WORDS = ImmutableList.of("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java index 390b127871a4..6ab47de16a4b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java @@ -22,91 +22,95 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Registration; -import org.apache.beam.runners.spark.SparkContextOptions; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.TestSparkPipelineOptions; +import org.apache.beam.runners.spark.SparkContextRule; import org.apache.beam.runners.spark.TestSparkRunner; import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.beam.sdk.values.KV; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; -/** Testing of beam registrar. */ +/** + * Testing of beam registrar. Note: There can only be one Spark context at a time. For that reason + * tests requiring a different context have to be forked using separate test classes. + */ @SuppressWarnings({ "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556) }) +@RunWith(Enclosed.class) public class SparkRunnerKryoRegistratorTest { - @Test - public void testKryoRegistration() { - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()); - runSimplePipelineWithSparkContext(conf); - assertTrue( - "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set", - WrapperKryoRegistrator.wasInitiated); - } + public static class WithKryoSerializer { - @Test - public void testDefaultSerializerNotCallingKryo() { - SparkConf conf = new SparkConf(); - conf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()); - runSimplePipelineWithSparkContext(conf); - } + @ClassRule + public static SparkContextRule contextRule = + new SparkContextRule( + KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), + KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName())); - private void runSimplePipelineWithSparkContext(SparkConf conf) { - SparkPipelineOptions options = - PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); - options.setRunner(TestSparkRunner.class); + @Test + public void testKryoRegistration() { + runSimplePipelineWithSparkContextOptions(contextRule); + assertTrue( + "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set", + WrapperKryoRegistrator.wasInitiated); + } - conf.set("spark.master", "local"); - conf.setAppName("test"); + /** + * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate + * KryoSerialization resolution. Use only for test purposes. Needs to be public for + * serialization. + */ + public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator { - JavaSparkContext javaSparkContext = new JavaSparkContext(conf); - options.setUsesProvidedSparkContext(true); - options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext); - Pipeline p = Pipeline.create(options); - p.apply(Create.of("a")); // some operation to trigger pipeline construction - p.run().waitUntilFinish(); - javaSparkContext.stop(); - } + static boolean wasInitiated = false; - /** - * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs to - * be public for serialization. - */ - public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator { + public WrapperKryoRegistrator() { + wasInitiated = true; + } - @Override - public void registerClasses(Kryo kryo) { - fail( - "Default spark.serializer is JavaSerializer" - + " so spark.kryo.registrator shouldn't be called"); + @Override + public void registerClasses(Kryo kryo) { + super.registerClasses(kryo); + Registration registration = kryo.getRegistration(MicrobatchSource.class); + com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer(); + assertTrue(kryoSerializer instanceof StatelessJavaSerializer); + } } } - /** - * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate - * KryoSerialization resolution. Use only for test purposes. Needs to be public for serialization. - */ - public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator { + public static class WithoutKryoSerializer { + @ClassRule + public static SparkContextRule contextRule = + new SparkContextRule( + KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName())); - static boolean wasInitiated = false; - - public WrapperKryoRegistrator() { - wasInitiated = true; + @Test + public void testDefaultSerializerNotCallingKryo() { + runSimplePipelineWithSparkContextOptions(contextRule); } - @Override - public void registerClasses(Kryo kryo) { - super.registerClasses(kryo); - Registration registration = kryo.getRegistration(MicrobatchSource.class); - com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer(); - assertTrue(kryoSerializer instanceof StatelessJavaSerializer); + /** + * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs + * to be public for serialization. + */ + public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { + fail( + "Default spark.serializer is JavaSerializer" + + " so spark.kryo.registrator shouldn't be called"); + } } } + + private static void runSimplePipelineWithSparkContextOptions(SparkContextRule context) { + Pipeline p = Pipeline.create(context.createPipelineOptions(TestSparkRunner.class)); + p.apply(Create.of("a")); // some operation to trigger pipeline construction + p.run().waitUntilFinish(); + } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java index bc4f7507ca05..aa7ab616ecd8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.is; import org.apache.beam.runners.core.metrics.TestMetricsSink; -import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.runners.spark.io.CreateStream; @@ -52,8 +51,6 @@ public class SparkMetricsPusherTest { private static final Logger LOG = LoggerFactory.getLogger(SparkMetricsPusherTest.class); private static final String COUNTER_NAME = "counter"; - @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); - @Rule public final TestPipeline pipeline = TestPipeline.create(); private Duration batchDuration() { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java new file mode 100644 index 000000000000..29cbb8941e74 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming; + +import static java.util.stream.Collectors.toMap; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.values.KV; +import org.apache.spark.sql.SparkSession; +import org.junit.rules.ExternalResource; + +public class SparkSessionRule extends ExternalResource implements Serializable { + private transient SparkSession.Builder builder; + private transient @Nullable SparkSession session = null; + + public SparkSessionRule(String sparkMaster, Map sparkConfig) { + builder = SparkSession.builder(); + sparkConfig.forEach(builder::config); + builder.master(sparkMaster).appName("test"); + } + + public SparkSessionRule(KV... sparkConfig) { + this("local", sparkConfig); + } + + public SparkSessionRule(String sparkMaster, KV... sparkConfig) { + this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, KV::getValue))); + } + + public SparkSession getSession() { + if (session == null) { + throw new IllegalStateException("SparkSession not available"); + } + return session; + } + + @Override + protected void before() throws Throwable { + session = builder.getOrCreate(); + } + + @Override + protected void after() { + getSession().stop(); + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java index 54db4fae1c24..3151a5fe956f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java @@ -21,9 +21,10 @@ import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SparkSession; +import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -32,16 +33,15 @@ @RunWith(JUnit4.class) public class EncoderHelpersTest { + @ClassRule public static SparkSessionRule sessionRule = new SparkSessionRule(); + @Test public void beamCoderToSparkEncoderTest() { - SparkSession sparkSession = - SparkSession.builder() - .appName("beamCoderToSparkEncoderTest") - .master("local[4]") - .getOrCreate(); List data = Arrays.asList(1, 2, 3); Dataset dataset = - sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); + sessionRule + .getSession() + .createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); assertEquals(data, dataset.collectAsList()); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index 2a40b45136a9..8fde97456227 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.runners.spark.io.CreateStream; @@ -86,7 +85,6 @@ public class CreateStreamTest implements Serializable { @Rule public final transient TestPipeline p = TestPipeline.create(); - @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); @Rule public final transient ExpectedException thrown = ExpectedException.none(); @Test diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 6c107c474b66..e7f45d99e513 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.TestSparkRunner; @@ -84,7 +83,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; @@ -112,8 +110,6 @@ public class ResumeFromCheckpointStreamingTest implements Serializable { private transient TemporaryFolder temporaryFolder; - @Rule public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no(); - @BeforeClass public static void setup() throws IOException { EMBEDDED_ZOOKEEPER.startup(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java index 407b07ac0d6d..fc4e427e2f30 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.runners.spark.io.CreateStream; @@ -53,8 +52,6 @@ public class SparkCoGroupByKeyStreamingTest { private static final TupleTag INPUT1_TAG = new TupleTag<>("input1"); private static final TupleTag INPUT2_TAG = new TupleTag<>("input2"); - @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); - @Rule public final TestPipeline pipeline = TestPipeline.create(); private Duration batchDuration() { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index 5ede41aaedaf..1160bfdb8d11 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -22,17 +22,15 @@ import static org.hamcrest.core.IsEqual.equalTo; import java.util.List; -import org.apache.beam.runners.spark.ReuseSparkContextRule; -import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkContextOptions; +import org.apache.beam.runners.spark.SparkContextRule; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.Dataset; import org.apache.beam.runners.spark.translation.EvaluationContext; -import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.DoFn; @@ -46,7 +44,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; import org.junit.Before; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; /** @@ -58,10 +56,7 @@ }) public class TrackStreamingSourcesTest { - @Rule public ReuseSparkContextRule reuseContext = ReuseSparkContextRule.yes(); - - private static final transient SparkPipelineOptions options = - PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + @ClassRule public static SparkContextRule sparkContext = new SparkContextRule(); @Before public void before() { @@ -70,8 +65,8 @@ public void before() { @Test public void testTrackSingle() { - options.setRunner(SparkRunner.class); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + SparkContextOptions options = sparkContext.createPipelineOptions(SparkRunner.class); + JavaSparkContext jsc = sparkContext.getSparkContext(); JavaStreamingContext jssc = new JavaStreamingContext( jsc, new org.apache.spark.streaming.Duration(options.getBatchIntervalMillis())); @@ -90,8 +85,8 @@ public void testTrackSingle() { @Test public void testTrackFlattened() { - options.setRunner(SparkRunner.class); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + SparkContextOptions options = sparkContext.createPipelineOptions(SparkRunner.class); + JavaSparkContext jsc = sparkContext.getSparkContext(); JavaStreamingContext jssc = new JavaStreamingContext( jsc, new org.apache.spark.streaming.Duration(options.getBatchIntervalMillis())); @@ -135,7 +130,7 @@ private StreamingSourceTracker( Pipeline pipeline, Class transformClassToAssert, Integer... expected) { - this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, options, jssc); + this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, pipeline.getOptions(), jssc); this.evaluator = new SparkRunner.Evaluator( new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()), From e2a309555d6090711ecd36974f1ae6c4995cc1d2 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 21 Apr 2022 10:13:47 +0200 Subject: [PATCH 2/5] Set provided SparkContext via SparkContextFactory to avoid losing it during a serde roundtrip in TestPipenline --- runners/spark/spark_runner.gradle | 5 - .../translation/SparkContextFactory.java | 114 +++++++++++------- .../apache/beam/runners/spark/CacheTest.java | 4 +- .../spark/ProvidedSparkContextTest.java | 4 +- .../spark/SparkContextOptionsRule.java | 8 +- .../beam/runners/spark/SparkContextRule.java | 17 ++- .../runners/spark/SparkPipelineStateTest.java | 3 +- .../spark/SparkRunnerDebuggerTest.java | 7 +- .../metrics/sink/SparkMetricsSinkTest.java | 11 +- .../SparkRunnerKryoRegistratorTest.java | 3 +- .../streaming/TrackStreamingSourcesTest.java | 8 +- 11 files changed, 107 insertions(+), 77 deletions(-) diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index e07819b954dd..f5555854c24f 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -94,7 +94,6 @@ if (copySourceBase) { } test { - systemProperty "beam.spark.test.reuseSparkContext", "true" systemProperty "spark.sql.shuffle.partitions", "4" systemProperty "spark.ui.enabled", "false" systemProperty "spark.ui.showConsoleProgress", "false" @@ -286,10 +285,6 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) useJUnit { includeCategories 'org.apache.beam.runners.spark.StreamingTest' } - filter { - // BEAM-11653 MetricsSinkTest is failing with Spark 3 - excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest' - } } tasks.register("validatesStructuredStreamingRunnerBatch", Test) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 173772d3f618..2c9eb6d59dff 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.spark.translation; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.SparkRunnerKryoRegistrator; @@ -25,10 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** The Spark context factory. */ -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) public final class SparkContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class); @@ -43,66 +42,97 @@ public final class SparkContextFactory { public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; // Spark allows only one context for JVM so this can be static. - private static JavaSparkContext sparkContext; - private static String sparkMaster; - private static boolean usesProvidedSparkContext; + private static @Nullable JavaSparkContext sparkContext; + private static @Nullable String sparkMaster; + + private static boolean hasProvidedSparkContext; private SparkContextFactory() {} + /** + * Set an externally managed {@link JavaSparkContext} that will be used if {@link + * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}. + * + *

A Spark context can also be provided using {@link + * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped + * during serialization potentially leading to confusing behavior. This is particularly the case + * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}. + */ + public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) { + sparkContext = checkNotNull(providedSparkContext); + hasProvidedSparkContext = true; + sparkMaster = null; + } + + public static synchronized void clearProvidedSparkContext() { + hasProvidedSparkContext = false; + sparkContext = null; + } + public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) { SparkContextOptions contextOptions = options.as(SparkContextOptions.class); - usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext(); - // reuse should be ignored if the context is provided. - if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) { - - // if the context is null or stopped for some reason, re-create it. - if (sparkContext == null || sparkContext.sc().isStopped()) { - sparkContext = createSparkContext(contextOptions); + if (contextOptions.getUsesProvidedSparkContext()) { + JavaSparkContext jsc = contextOptions.getProvidedSparkContext(); + if (jsc != null) { + setProvidedSparkContext(jsc); + } else if (hasProvidedSparkContext) { + jsc = sparkContext; + } + if (jsc == null) { + throw new IllegalStateException( + "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so."); + } else if (jsc.sc().isStopped()) { + LOG.error("The provided Spark context " + jsc + " was already stopped."); + throw new IllegalStateException("The provided Spark context was already stopped"); + } + LOG.info("Using a provided Spark Context"); + return jsc; + } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { + // This is highly discouraged as it leaks SparkContexts without any control. + // If the context is null or stopped for some reason, re-create it. + @Nullable JavaSparkContext jsc = sparkContext; + if (jsc == null || jsc.sc().isStopped()) { + sparkContext = jsc = createSparkContext(contextOptions); sparkMaster = options.getSparkMaster(); + hasProvidedSparkContext = false; + } else if (hasProvidedSparkContext) { + throw new IllegalStateException("Usage of provided Spark context is disabled."); } else if (!options.getSparkMaster().equals(sparkMaster)) { - throw new IllegalArgumentException( + throw new IllegalStateException( String.format( "Cannot reuse spark context " + "with different spark master URL. Existing: %s, requested: %s.", sparkMaster, options.getSparkMaster())); } - return sparkContext; + return jsc; } else { - return createSparkContext(contextOptions); + JavaSparkContext jsc = createSparkContext(contextOptions); + clearProvidedSparkContext(); // any provided context can't be valid anymore + return jsc; } } public static synchronized void stopSparkContext(JavaSparkContext context) { - if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) { + if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !hasProvidedSparkContext) { context.stop(); } } - private static JavaSparkContext createSparkContext(SparkContextOptions contextOptions) { - if (usesProvidedSparkContext) { - LOG.info("Using a provided Spark Context"); - JavaSparkContext jsc = contextOptions.getProvidedSparkContext(); - if (jsc == null || jsc.sc().isStopped()) { - LOG.error("The provided Spark context " + jsc + " was not created or was stopped"); - throw new RuntimeException("The provided Spark context was not created or was stopped"); - } - return jsc; - } else { - LOG.info("Creating a brand new Spark Context."); - SparkConf conf = new SparkConf(); - if (!conf.contains("spark.master")) { - // set master if not set. - conf.setMaster(contextOptions.getSparkMaster()); - } - - if (contextOptions.getFilesToStage() != null && !contextOptions.getFilesToStage().isEmpty()) { - conf.setJars(contextOptions.getFilesToStage().toArray(new String[0])); - } + private static JavaSparkContext createSparkContext(SparkPipelineOptions options) { + LOG.info("Creating a brand new Spark Context."); + SparkConf conf = new SparkConf(); + if (!conf.contains("spark.master")) { + // set master if not set. + conf.setMaster(options.getSparkMaster()); + } - conf.setAppName(contextOptions.getAppName()); - // register immutable collections serializers because the SDK uses them. - conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName()); - return new JavaSparkContext(conf); + if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) { + conf.setJars(options.getFilesToStage().toArray(new String[0])); } + + conf.setAppName(options.getAppName()); + // register immutable collections serializers because the SDK uses them. + conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName()); + return new JavaSparkContext(conf); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java index 288c49652598..861e13a4208f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java @@ -54,7 +54,7 @@ public class CacheTest { */ @Test public void cacheCandidatesUpdaterTest() { - SparkPipelineOptions options = contextRule.createPipelineOptions(TestSparkRunner.class); + SparkPipelineOptions options = contextRule.createPipelineOptions(); Pipeline pipeline = Pipeline.create(options); PCollection pCollection = pipeline.apply(Create.of("foo", "bar")); @@ -91,7 +91,7 @@ public void processElement(ProcessContext processContext) { @Test public void shouldCacheTest() { - SparkPipelineOptions options = contextRule.createPipelineOptions(TestSparkRunner.class); + SparkPipelineOptions options = contextRule.createPipelineOptions(); options.setCacheDisabled(true); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index e81a48a0b058..262eaeceaa58 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -54,9 +54,7 @@ public class ProvidedSparkContextTest { private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped"; - @ClassRule - public static SparkContextOptionsRule contextRule = - new SparkContextOptionsRule(TestSparkRunner.class); + @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule(); /** Provide a context and call pipeline run. */ @Test diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java index 515abd37a32e..2f424cd7ca40 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextOptionsRule.java @@ -18,24 +18,20 @@ package org.apache.beam.runners.spark; import javax.annotation.Nullable; -import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.values.KV; public class SparkContextOptionsRule extends SparkContextRule { - private Class> runner; private @Nullable SparkContextOptions contextOptions = null; - public SparkContextOptionsRule( - Class> runner, KV... sparkConfig) { + public SparkContextOptionsRule(KV... sparkConfig) { super(sparkConfig); - this.runner = runner; } @Override protected void before() throws Throwable { super.before(); - contextOptions = createPipelineOptions(runner); + contextOptions = createPipelineOptions(); } public SparkContextOptions getOptions() { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java index f93a7d56cbc9..d3501dc30b24 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java @@ -23,9 +23,9 @@ import java.util.Arrays; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -56,27 +56,26 @@ public JavaSparkContext getSparkContext() { return sparkContext; } - public SparkContextOptions createPipelineOptions( - Class> runner, String... options) { - PipelineOptions opts = PipelineOptionsFactory.fromArgs(options).create(); - opts.as(SparkPipelineOptions.class).setRunner(runner); - return configurePipelineOptions(opts); + public SparkContextOptions createPipelineOptions() { + return configure(TestPipeline.testingPipelineOptions()); } - public SparkContextOptions configurePipelineOptions(PipelineOptions opts) { + public SparkContextOptions configure(PipelineOptions opts) { SparkContextOptions ctxOpts = opts.as(SparkContextOptions.class); ctxOpts.setUsesProvidedSparkContext(true); - ctxOpts.setProvidedSparkContext(sparkContext); + ctxOpts.setProvidedSparkContext(getSparkContext()); return ctxOpts; } @Override protected void before() throws Throwable { sparkContext = new JavaSparkContext(sparkConf); + SparkContextFactory.setProvidedSparkContext(sparkContext); } @Override protected void after() { + SparkContextFactory.clearProvidedSparkContext(); getSparkContext().stop(); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index 28223ca9b644..47f7dd2c41ae 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -61,7 +61,8 @@ public String apply(final String input) { private Pipeline createPipeline( boolean isStreaming, @Nullable SimpleFunction mapFun) { - SparkContextOptions options = contextRule.createPipelineOptions(SparkRunner.class); + SparkContextOptions options = contextRule.createPipelineOptions(); + options.setRunner(SparkRunner.class); options.setStreaming(isStreaming); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 4908b89216eb..91ef5a426401 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; @@ -61,7 +62,8 @@ public class SparkRunnerDebuggerTest { @Test public void debugBatchPipeline() { - PipelineOptions options = contextRule.createPipelineOptions(SparkRunnerDebugger.class); + PipelineOptions options = contextRule.configure(PipelineOptionsFactory.create()); + options.setRunner(SparkRunnerDebugger.class); Pipeline pipeline = Pipeline.create(options); PCollection lines = @@ -105,7 +107,8 @@ public void debugBatchPipeline() { @Test public void debugStreamingPipeline() { - PipelineOptions options = contextRule.createPipelineOptions(SparkRunnerDebugger.class); + PipelineOptions options = contextRule.configure(PipelineOptionsFactory.create()); + options.setRunner(SparkRunnerDebugger.class); options.as(TestSparkPipelineOptions.class).setForceStreaming(true); Pipeline pipeline = Pipeline.create(options); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index b1a5e7c0f5c9..f21168336d02 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import org.apache.beam.runners.spark.SparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.runners.spark.examples.WordCount; @@ -38,6 +39,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -48,8 +50,13 @@ * streaming modes. */ public class SparkMetricsSinkTest { + @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); + @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); - @Rule public final TestPipeline pipeline = TestPipeline.create(); + + @Rule + public final TestPipeline pipeline = + TestPipeline.fromOptions(contextRule.createPipelineOptions()); private static final ImmutableList WORDS = ImmutableList.of("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"); @@ -66,7 +73,7 @@ public void testInBatchMode() throws Exception { .apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); PAssert.that(output).containsInAnyOrder(EXPECTED_COUNTS); - pipeline.run(); + pipeline.run().waitUntilFinish(); assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d)); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java index 6ab47de16a4b..841862caf840 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java @@ -23,7 +23,6 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Registration; import org.apache.beam.runners.spark.SparkContextRule; -import org.apache.beam.runners.spark.TestSparkRunner; import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; @@ -109,7 +108,7 @@ public void registerClasses(Kryo kryo) { } private static void runSimplePipelineWithSparkContextOptions(SparkContextRule context) { - Pipeline p = Pipeline.create(context.createPipelineOptions(TestSparkRunner.class)); + Pipeline p = Pipeline.create(context.createPipelineOptions()); p.apply(Create.of("a")); // some operation to trigger pipeline construction p.run().waitUntilFinish(); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index 1160bfdb8d11..79bc8a0a71a2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -22,8 +22,8 @@ import static org.hamcrest.core.IsEqual.equalTo; import java.util.List; -import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkContextRule; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.Dataset; @@ -65,7 +65,8 @@ public void before() { @Test public void testTrackSingle() { - SparkContextOptions options = sparkContext.createPipelineOptions(SparkRunner.class); + SparkPipelineOptions options = sparkContext.createPipelineOptions(); + options.setRunner(SparkRunner.class); JavaSparkContext jsc = sparkContext.getSparkContext(); JavaStreamingContext jssc = new JavaStreamingContext( @@ -85,7 +86,8 @@ public void testTrackSingle() { @Test public void testTrackFlattened() { - SparkContextOptions options = sparkContext.createPipelineOptions(SparkRunner.class); + SparkPipelineOptions options = sparkContext.createPipelineOptions(); + options.setRunner(SparkRunner.class); JavaSparkContext jsc = sparkContext.getSparkContext(); JavaStreamingContext jssc = new JavaStreamingContext( From ae0a52429d29d8b7be4bc826e3c48fd661fc3a51 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Wed, 11 May 2022 11:05:18 +0200 Subject: [PATCH 3/5] PR feedback --- .../runners/spark/SparkContextOptions.java | 6 ++++++ .../translation/SparkContextFactory.java | 15 +++++++++------ .../spark/ProvidedSparkContextTest.java | 11 ++++++----- .../beam/runners/spark/SparkContextRule.java | 11 ++++++++++- .../runners/spark/SparkPipelineStateTest.java | 19 ++++++++++--------- .../structuredstreaming/SparkSessionRule.java | 11 ++++++++++- 6 files changed, 51 insertions(+), 22 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java index 13ae67878eb2..39caee7e6ba7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java @@ -37,6 +37,12 @@ * which link to Spark dependencies, won't be scanned by {@link PipelineOptions} reflective * instantiation. Note that {@link SparkContextOptions} is not registered with {@link * SparkRunnerRegistrar}. + * + *

Note: It's recommended to use {@link + * org.apache.beam.runners.spark.translation.SparkContextFactory#setProvidedSparkContext(JavaSparkContext)} + * instead of {@link SparkContextOptions#setProvidedSparkContext(JavaSparkContext)} for testing. + * When using @{@link org.apache.beam.sdk.testing.TestPipeline} any provided {@link + * JavaSparkContext} via {@link SparkContextOptions} is dropped. */ public interface SparkContextOptions extends SparkPipelineOptions { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 2c9eb6d59dff..116e82c8724a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -43,7 +43,9 @@ public final class SparkContextFactory { // Spark allows only one context for JVM so this can be static. private static @Nullable JavaSparkContext sparkContext; - private static @Nullable String sparkMaster; + + // Remember spark master if TEST_REUSE_SPARK_CONTEXT is enabled. + private static @Nullable String reusableSparkMaster; private static boolean hasProvidedSparkContext; @@ -61,7 +63,7 @@ private SparkContextFactory() {} public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) { sparkContext = checkNotNull(providedSparkContext); hasProvidedSparkContext = true; - sparkMaster = null; + reusableSparkMaster = null; } public static synchronized void clearProvidedSparkContext() { @@ -93,16 +95,17 @@ public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions @Nullable JavaSparkContext jsc = sparkContext; if (jsc == null || jsc.sc().isStopped()) { sparkContext = jsc = createSparkContext(contextOptions); - sparkMaster = options.getSparkMaster(); + reusableSparkMaster = options.getSparkMaster(); hasProvidedSparkContext = false; } else if (hasProvidedSparkContext) { - throw new IllegalStateException("Usage of provided Spark context is disabled."); - } else if (!options.getSparkMaster().equals(sparkMaster)) { + throw new IllegalStateException( + "Usage of provided Spark context is disabled in SparkPipelineOptions."); + } else if (!options.getSparkMaster().equals(reusableSparkMaster)) { throw new IllegalStateException( String.format( "Cannot reuse spark context " + "with different spark master URL. Existing: %s, requested: %s.", - sparkMaster, options.getSparkMaster())); + reusableSparkMaster, options.getSparkMaster())); } return jsc; } else { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index 262eaeceaa58..0ef6bb0e078c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -54,7 +54,8 @@ public class ProvidedSparkContextTest { private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped"; - @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule(); + @ClassRule + public static SparkContextOptionsRule contextOptionsRule = new SparkContextOptionsRule(); /** Provide a context and call pipeline run. */ @Test @@ -64,14 +65,14 @@ public void testAWithProvidedContext() throws Exception { result.waitUntilFinish(); TestPipeline.verifyPAssertsSucceeded(p, result); // A provided context must not be stopped after execution - assertFalse(contextRule.getSparkContext().sc().isStopped()); + assertFalse(contextOptionsRule.getSparkContext().sc().isStopped()); } /** A SparkRunner with a stopped provided Spark context cannot run pipelines. */ @Test public void testBWithStoppedProvidedContext() { // Stop the provided Spark context - contextRule.getSparkContext().sc().stop(); + contextOptionsRule.getSparkContext().sc().stop(); assertThrows( PROVIDED_CONTEXT_EXCEPTION, RuntimeException.class, @@ -81,7 +82,7 @@ public void testBWithStoppedProvidedContext() { /** Provide a context and call pipeline run. */ @Test public void testCWithNullContext() { - contextRule.getOptions().setProvidedSparkContext(null); + contextOptionsRule.getOptions().setProvidedSparkContext(null); assertThrows( PROVIDED_CONTEXT_EXCEPTION, RuntimeException.class, @@ -89,7 +90,7 @@ public void testCWithNullContext() { } private Pipeline createPipeline() { - Pipeline p = Pipeline.create(contextRule.getOptions()); + Pipeline p = Pipeline.create(contextOptionsRule.getOptions()); PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection output = inputWords diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java index d3501dc30b24..caa7d8f6814b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java @@ -30,6 +30,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; public class SparkContextRule extends ExternalResource implements Serializable { private transient SparkConf sparkConf; @@ -38,7 +40,7 @@ public class SparkContextRule extends ExternalResource implements Serializable { public SparkContextRule(String sparkMaster, Map sparkConfig) { sparkConf = new SparkConf(); sparkConfig.forEach(sparkConf::set); - sparkConf.setMaster(sparkMaster).setAppName(sparkMaster); + sparkConf.setMaster(sparkMaster); } public SparkContextRule(KV... sparkConfig) { @@ -67,6 +69,12 @@ public SparkContextOptions configure(PipelineOptions opts) { return ctxOpts; } + @Override + public Statement apply(Statement base, Description description) { + sparkConf.setAppName(description.getDisplayName()); + return super.apply(base, description); + } + @Override protected void before() throws Throwable { sparkContext = new JavaSparkContext(sparkConf); @@ -77,5 +85,6 @@ protected void before() throws Throwable { protected void after() { SparkContextFactory.clearProvidedSparkContext(); getSparkContext().stop(); + sparkContext = null; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index 47f7dd2c41ae..61e111234331 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -44,8 +44,8 @@ public class SparkPipelineStateTest implements Serializable { @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); - private static class MyCustomException extends RuntimeException { - MyCustomException(final String message) { + private static class CustomException extends RuntimeException { + CustomException(final String message) { super(message); } } @@ -53,7 +53,7 @@ private static class MyCustomException extends RuntimeException { private static class FailAlways extends SimpleFunction { @Override public String apply(final String input) { - throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY); + throw new CustomException(FAILED_THE_BATCH_INTENTIONALLY); } } @@ -84,17 +84,18 @@ private void testFailedPipeline(boolean isStreaming) throws Exception { PipelineExecutionException e = assertThrows(PipelineExecutionException.class, () -> result.waitUntilFinish()); - assertThat(e.getCause(), instanceOf(MyCustomException.class)); + assertThat(e.getCause(), instanceOf(CustomException.class)); assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY)); assertThat(result.getState(), is(PipelineResult.State.FAILED)); result.cancel(); } - private void testTimeoutPipeline(boolean isStreaming) throws Exception { + private void testWaitUntilFinishedTimeout(boolean isStreaming) throws Exception { Pipeline pipeline = createPipeline(isStreaming, null); SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); result.waitUntilFinish(millis(1)); + // Wait timed out, pipeline is still running assertThat(result.getState(), is(PipelineResult.State.RUNNING)); result.cancel(); } @@ -144,12 +145,12 @@ public void testBatchPipelineFailedState() throws Exception { } @Test - public void testStreamingPipelineTimeoutState() throws Exception { - testTimeoutPipeline(true); + public void testStreamingPipelineWaitTimeout() throws Exception { + testWaitUntilFinishedTimeout(true); } @Test - public void testBatchPipelineTimeoutState() throws Exception { - testTimeoutPipeline(false); + public void testBatchPipelineWaitTimeout() throws Exception { + testWaitUntilFinishedTimeout(false); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java index 29cbb8941e74..f68df83ac07d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.SparkSession; import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; public class SparkSessionRule extends ExternalResource implements Serializable { private transient SparkSession.Builder builder; @@ -34,7 +36,7 @@ public class SparkSessionRule extends ExternalResource implements Serializable { public SparkSessionRule(String sparkMaster, Map sparkConfig) { builder = SparkSession.builder(); sparkConfig.forEach(builder::config); - builder.master(sparkMaster).appName("test"); + builder.master(sparkMaster); } public SparkSessionRule(KV... sparkConfig) { @@ -52,6 +54,12 @@ public SparkSession getSession() { return session; } + @Override + public Statement apply(Statement base, Description description) { + builder.appName(description.getDisplayName()); + return super.apply(base, description); + } + @Override protected void before() throws Throwable { session = builder.getOrCreate(); @@ -60,5 +68,6 @@ protected void before() throws Throwable { @Override protected void after() { getSession().stop(); + session = null; } } From b042e166028181ba71d23570641d50b135b1b67a Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Wed, 11 May 2022 11:50:49 +0200 Subject: [PATCH 4/5] Add more javadocs --- .../runners/spark/translation/SparkContextFactory.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 116e82c8724a..932ede5f9550 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -36,7 +36,8 @@ public final class SparkContextFactory { * context will be reused for beam pipelines. This property should only be enabled for tests. * * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle - * control to not leak your SparkContext. + * control to not leak your SparkContext without any way to close it. Attempting to create any + * new SparkContext later will fail. */ @Deprecated public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; @@ -47,6 +48,7 @@ public final class SparkContextFactory { // Remember spark master if TEST_REUSE_SPARK_CONTEXT is enabled. private static @Nullable String reusableSparkMaster; + // SparkContext is provided by the user instead of simply reused using TEST_REUSE_SPARK_CONTEXT private static boolean hasProvidedSparkContext; private SparkContextFactory() {} @@ -90,7 +92,8 @@ public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions LOG.info("Using a provided Spark Context"); return jsc; } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { - // This is highly discouraged as it leaks SparkContexts without any control. + // This is highly discouraged as it leaks the SparkContext without any way to close it. + // Attempting to create any new SparkContext later will fail. // If the context is null or stopped for some reason, re-create it. @Nullable JavaSparkContext jsc = sparkContext; if (jsc == null || jsc.sc().isStopped()) { From ca71ea790237a89b9e34bb0964fe45391ccb826a Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 12 May 2022 12:53:36 +0200 Subject: [PATCH 5/5] Addressing final review comments --- .../translation/SparkContextFactory.java | 10 +++- .../SparkRunnerKryoRegistratorTest.java | 59 +++++++------------ 2 files changed, 29 insertions(+), 40 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 932ede5f9550..9f9465ccde8f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -35,9 +35,13 @@ public final class SparkContextFactory { * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark * context will be reused for beam pipelines. This property should only be enabled for tests. * - * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle - * control to not leak your SparkContext without any way to close it. Attempting to create any - * new SparkContext later will fail. + * @deprecated This will leak your SparkContext, any attempt to create a new SparkContext later + * will fail. Please use {@link #setProvidedSparkContext(JavaSparkContext)} / {@link + * #clearProvidedSparkContext()} instead to properly control the lifecycle of your context. + * Alternatively you may also provide a SparkContext using {@link + * SparkContextOptions#setUsesProvidedSparkContext(boolean)} together with {@link + * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)} and close that one + * appropriately. Tests of this module should use {@code SparkContextRule}. */ @Deprecated public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java index 841862caf840..fcc7fee27063 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java @@ -17,12 +17,13 @@ */ package org.apache.beam.runners.spark.coders; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Registration; import org.apache.beam.runners.spark.SparkContextRule; +import org.apache.beam.runners.spark.coders.SparkRunnerKryoRegistratorTest.Others.TestKryoRegistrator; import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; @@ -48,61 +49,45 @@ public static class WithKryoSerializer { public static SparkContextRule contextRule = new SparkContextRule( KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), - KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName())); + KV.of("spark.kryo.registrator", TestKryoRegistrator.class.getName())); @Test public void testKryoRegistration() { + TestKryoRegistrator.wasInitiated = false; runSimplePipelineWithSparkContextOptions(contextRule); - assertTrue( - "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set", - WrapperKryoRegistrator.wasInitiated); - } - - /** - * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate - * KryoSerialization resolution. Use only for test purposes. Needs to be public for - * serialization. - */ - public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator { - - static boolean wasInitiated = false; - - public WrapperKryoRegistrator() { - wasInitiated = true; - } - - @Override - public void registerClasses(Kryo kryo) { - super.registerClasses(kryo); - Registration registration = kryo.getRegistration(MicrobatchSource.class); - com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer(); - assertTrue(kryoSerializer instanceof StatelessJavaSerializer); - } + assertTrue(TestKryoRegistrator.wasInitiated); } } public static class WithoutKryoSerializer { @ClassRule public static SparkContextRule contextRule = - new SparkContextRule( - KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName())); + new SparkContextRule(KV.of("spark.kryo.registrator", TestKryoRegistrator.class.getName())); @Test public void testDefaultSerializerNotCallingKryo() { + TestKryoRegistrator.wasInitiated = false; runSimplePipelineWithSparkContextOptions(contextRule); + assertFalse(TestKryoRegistrator.wasInitiated); } + } + + // Hide TestKryoRegistrator from the Enclosed JUnit runner + interface Others { + class TestKryoRegistrator extends SparkRunnerKryoRegistrator { - /** - * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs - * to be public for serialization. - */ - public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator { + static boolean wasInitiated = false; + + public TestKryoRegistrator() { + wasInitiated = true; + } @Override public void registerClasses(Kryo kryo) { - fail( - "Default spark.serializer is JavaSerializer" - + " so spark.kryo.registrator shouldn't be called"); + super.registerClasses(kryo); + // verify serializer for MicrobatchSource + Registration registration = kryo.getRegistration(MicrobatchSource.class); + assertTrue(registration.getSerializer() instanceof StatelessJavaSerializer); } } }