From 690b75a00ad62d9707ab75b84d7f4f10726f40c8 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Wed, 27 Jul 2022 17:00:33 +0200 Subject: [PATCH 1/4] Support VR test including TestStream for Spark runner in streaming mode (#22472). --- runners/spark/spark_runner.gradle | 66 ++++++-- .../beam/runners/spark/TestSparkRunner.java | 121 ++++++++++++++- .../StreamingTransformTranslator.java | 26 +++- .../translation/streaming/TestDStream.java | 143 ++++++++++++++++++ .../spark/SparkRunnerDebuggerTest.java | 2 +- .../metrics/sink/SparkMetricsSinkTest.java | 2 +- .../spark/metrics/SparkMetricsPusherTest.java | 2 +- .../streaming/CreateStreamTest.java | 2 +- .../ResumeFromCheckpointStreamingTest.java | 2 +- .../SparkCoGroupByKeyStreamingTest.java | 2 +- .../streaming/StreamingSourceMetricsTest.java | 2 +- .../beam/sdk/testing/TestStreamTest.java | 3 + 12 files changed, 353 insertions(+), 20 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 420c13d86146..5b83004cf844 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -51,6 +51,7 @@ configurations { def sparkTestProperties(overrides = [:]) { def defaults = ["--runner": "TestSparkRunner"] [ + "log4j.configuration" : "log4j-test.properties", "spark.sql.shuffle.partitions": "4", "spark.ui.enabled" : "false", "spark.ui.showConsoleProgress": "false", @@ -106,7 +107,6 @@ if (copySourceBase) { test { systemProperties sparkTestProperties() - systemProperty "log4j.configuration", "log4j-test.properties" // Change log level to debug: // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" // Change log level to debug only for the package and nested packages: @@ -207,6 +207,11 @@ configurations.all { exclude group: "com.codahale.metrics", module: "metrics-core" } +configurations.validatesRunner { + // Exclude to make sure log4j binding is used + exclude group: "org.slf4j", module: "slf4j-simple" +} + hadoopVersions.each { kv -> configurations."hadoopVersion$kv.key" { resolutionStrategy { @@ -260,21 +265,60 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) group = "Verification" // Disable gradle cache outputs.upToDateWhen { false } - def pipelineOptions = JsonOutput.toJson([ - "--runner=TestSparkRunner", - "--forceStreaming=true", - "--enableSparkMetricSinks=true", - ]) - systemProperty "beamTestPipelineOptions", pipelineOptions + systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true", "--forceStreaming": "true"]) classpath = configurations.validatesRunner - testClassesDirs += files(project.sourceSets.test.output.classesDirs) + testClassesDirs += files( + project(":sdks:java:core").sourceSets.test.output.classesDirs, + project(":runners:core-java").sourceSets.test.output.classesDirs, + ) - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 maxParallelForks 4 useJUnit { - includeCategories 'org.apache.beam.runners.spark.StreamingTest' + includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + + filter { + // translation to UNBOUNDED read doesn't work for empty input + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty' + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo' + } + + // TestStream using processing time is not supported in Spark + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' + // CreatePCollectionView not supported in streaming mode + excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs' + // BOUNDED, not applicable in streaming mode + excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' + + // Exceeds Java heap space + excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' + + // Should be run only in a properly configured SDK harness environment + excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' + excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' + + // State and Timers + excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' + excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer' + excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' + excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' + + // Metrics + excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' + // SDF + excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' + // Portability + excludeCategories 'org.apache.beam.sdk.testing.UsesJavaExpansionService' + excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' + excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' + // Ordering + excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' + excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 646781574c51..2648ae547c03 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.spark; +import static org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM; +import static org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -24,16 +26,33 @@ import java.io.File; import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.stateful.SparkTimerInternals; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.io.FileUtils; import org.joda.time.Duration; @@ -83,6 +102,8 @@ public SparkPipelineResult run(Pipeline pipeline) { PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); boolean isForceStreaming = testSparkOptions.isForceStreaming(); + boolean isStreaming = testSparkOptions.isStreaming(); + SparkPipelineResult result = null; // clear state of Aggregators, Metrics and Watermarks if exists. @@ -93,8 +114,26 @@ public SparkPipelineResult run(Pipeline pipeline) { LOG.info("About to run test pipeline {}", options.getJobName()); // if the pipeline was executed in streaming mode, validate aggregators. - if (isForceStreaming) { + if (isStreaming || isForceStreaming) { try { + testSparkOptions.setStreaming(true); + + if (isForceStreaming) { + FindBoundedReads boundedReadsVisitor = new FindBoundedReads(); + pipeline.traverseTopologically(boundedReadsVisitor); + + if (!boundedReadsVisitor.boundedReads.isEmpty()) { + // replace BOUNDED read with UNBOUNDED to force streaming + pipeline.replaceAll( + ImmutableList.of( + PTransformOverride.of( + app -> boundedReadsVisitor.contains(app.getTransform()), + new UnboundedReadFromBoundedSourceOverrideFactory<>()))); + // force UNBOUNDED PCollections + pipeline.traverseTopologically(new SetUnbounded()); + } + } + result = delegate.run(pipeline); awaitWatermarksOrTimeout(testSparkOptions, result); result.stop(); @@ -153,4 +192,84 @@ private static void awaitWatermarksOrTimeout( } while ((timeoutMillis -= batchDurationMillis) > 0 && globalWatermark.isBefore(stopPipelineWatermark)); } + + /** + * Override factory to replace {@link Read.Unbounded} with {@link UnboundedReadFromBoundedSource} + * to force streaming mode. + */ + private static class UnboundedReadFromBoundedSourceOverrideFactory + implements PTransformOverrideFactory, Read.Bounded> { + + @Override + public PTransformReplacement> getReplacementTransform( + AppliedPTransform, Read.Bounded> transform) { + return PTransformReplacement.of( + transform.getPipeline().begin(), + new UnboundedReadFromBoundedSource<>(transform.getTransform().getSource())); + } + + @Override + public Map, ReplacementOutput> mapOutputs( + Map, PCollection> outputs, PCollection newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } + + private static boolean isBoundedRead(PTransform transform) { + return transform != null && transform instanceof Read.Bounded; + } + + /** Gathers all bounded test inputs of {@link Read.Bounded} except for usages in PAssert. */ + private static class FindBoundedReads extends PipelineVisitor.Defaults { + private final Set> boundedReads = new HashSet<>(); + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + if (node.getFullName().contains("PAssert$")) { + return DO_NOT_ENTER_TRANSFORM; + } else if (isBoundedRead(node.getTransform())) { + boundedReads.add((Read.Bounded) node.getTransform()); + return DO_NOT_ENTER_TRANSFORM; + } + return ENTER_TRANSFORM; + } + + boolean contains(PTransform transform) { + return boundedReads.contains(transform); + } + } + + /** + * Sets all {@link PCollection}s to {@link IsBounded#UNBOUNDED} excepts for outputs of remaining + * {@link Read.Bounded} (used in PAssert) and descendants. + */ + private static class SetUnbounded extends PipelineVisitor.Defaults { + private final Set> excludes = new HashSet<>(); + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + if (isBoundedRead(node.getTransform())) { + // Output of Read.Bounded (used in PAssert) has to remain BOUNDED + setIsBounded(node.getOutputs(), IsBounded.BOUNDED); + excludes.addAll(node.getOutputs().values()); + return DO_NOT_ENTER_TRANSFORM; + } else { + visitPrimitiveTransform(node); + return ENTER_TRANSFORM; + } + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + // Force UNBOUNDED PCollections + setIsBounded(node.getInputs(), IsBounded.UNBOUNDED); + setIsBounded(node.getOutputs(), IsBounded.UNBOUNDED); + } + + private void setIsBounded(Map, PCollection> map, IsBounded isBounded) { + map.values().stream() + .filter(pc -> !excludes.contains(pc)) + .forEach(in -> in.setIsBoundedInternal(isBounded)); + } + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 077b646697f8..8ad6da2a7a75 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -59,6 +59,7 @@ import org.apache.beam.runners.spark.util.SparkCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.DoFn; @@ -170,6 +171,27 @@ public String toNativeString() { }; } + private static TransformEvaluator> createFromTestStream() { + return new TransformEvaluator>() { + + @Override + public void evaluate(TestStream transform, EvaluationContext context) { + TestDStream dStream = new TestDStream<>(transform, context.getStreamingContext().ssc()); + JavaInputDStream> javaDStream = + new JavaInputDStream<>(dStream, JavaSparkContext$.MODULE$.fakeClassTag()); + + UnboundedDataset dataset = + new UnboundedDataset<>(javaDStream, Collections.singletonList(dStream.id())); + context.putDataset(transform, dataset); + } + + @Override + public String toNativeString() { + return "streamingContext.testStream(...)"; + } + }; + } + private static TransformEvaluator> createFromQueue() { return new TransformEvaluator>() { @Override @@ -540,10 +562,12 @@ public String toNativeString() { EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped()); EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo()); EVALUATORS.put(ConsoleIO.Write.Unbound.TRANSFORM_URN, print()); - EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue()); EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window()); EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl()); EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle()); + // For testing only + EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue()); + EVALUATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, createFromTestStream()); } private static @Nullable TransformEvaluator getTranslator(PTransform transform) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java new file mode 100644 index 000000000000..04fb58aa9888 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.TestStream.ElementEvent; +import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent; +import org.apache.beam.sdk.testing.TestStream.WatermarkEvent; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaSparkContext$; +import org.apache.spark.rdd.RDD; +import org.apache.spark.streaming.StreamingContext; +import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.dstream.InputDStream; +import org.joda.time.Instant; +import scala.Option; +import scala.reflect.ClassTag; + +public class TestDStream extends InputDStream> { + private final Coder> coder; + + @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization + private final transient List> events; + + private int event = 0; + + private boolean insertEmptyBatch = false; + + private long lastMillis = 0; + + private Instant lastWatermark = Instant.EPOCH; + + public TestDStream(TestStream test, StreamingContext ssc) { + super(ssc, classTag()); + this.coder = WindowedValue.getFullCoder(test.getValueCoder(), GlobalWindow.Coder.INSTANCE); + this.events = test.getEvents(); + } + + @Override + public Option>> compute(Time validTime) { + Preconditions.checkStateNotNull(events); + + TestStream.Event event = insertEmptyBatch ? null : nextEvent(); + + if (event == null) { + insertEmptyBatch = false; + waitForLastBatch(validTime); + return Option.apply(emptyRDD()); + + } else if (event instanceof ElementEvent) { + waitForLastBatch(validTime); + return Option.apply(buildRdd((ElementEvent) event)); + + } else if (event instanceof WatermarkEvent) { + waitForLastBatch(validTime); + addWatermark(validTime, (WatermarkEvent) event); + // insert an additional empty batch so watermark can propagate + insertEmptyBatch = true; + return Option.apply(emptyRDD()); + + } else if (event instanceof ProcessingTimeEvent) { + throw new UnsupportedOperationException( + "Advancing Processing time is not supported by the Spark Runner."); + } else { + throw new IllegalStateException("Unknown event type " + event); + } + } + + private void waitForLastBatch(Time validTime) { + while (GlobalWatermarkHolder.getLastWatermarkedBatchTime() < lastMillis) { + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } + lastMillis = validTime.milliseconds(); + } + + private @Nullable TestStream.Event nextEvent() { + return events.size() > event ? events.get(event++) : null; + } + + private void addWatermark(Time time, WatermarkEvent event) { + SparkWatermarks watermarks = + new SparkWatermarks(lastWatermark, event.getWatermark(), new Instant(time.milliseconds())); + lastWatermark = event.getWatermark(); + GlobalWatermarkHolder.add(id(), watermarks); + } + + @Override + public void start() {} + + @Override + public void stop() {} + + private RDD> emptyRDD() { + return ssc().sparkContext().emptyRDD(classTag()); + } + + private RDD> buildRdd(ElementEvent event) { + List binaryData = new ArrayList<>(); + for (TimestampedValue elem : event.getElements()) { + WindowedValue wv = + WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()); + binaryData.add(CoderHelpers.toByteArray(wv, coder)); + } + + return new JavaSparkContext(ssc().sparkContext()) + .parallelize(binaryData) + .map(CoderHelpers.fromByteFunction(coder)) + .rdd(); + } + + private static ClassTag> classTag() { + return JavaSparkContext$.MODULE$.fakeClassTag(); + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 91ef5a426401..843d303bda87 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -109,7 +109,7 @@ public void debugBatchPipeline() { public void debugStreamingPipeline() { PipelineOptions options = contextRule.configure(PipelineOptionsFactory.create()); options.setRunner(SparkRunnerDebugger.class); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); Pipeline pipeline = Pipeline.create(options); KafkaIO.Read read = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index edf164b26b8d..c632a9800a8b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -86,7 +86,7 @@ public void testInBatchMode() throws Exception { @Category(StreamingTest.class) @Test public void testInStreamingMode() throws Exception { - pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true); + pipeline.getOptions().as(TestSparkPipelineOptions.class).setStreaming(true); assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); Instant instant = new Instant(0); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java index deb7e0cd5dcc..dbd569d89d15 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java @@ -69,7 +69,7 @@ public void init() { @Category(StreamingTest.class) @Test public void testInStreamingMode() throws Exception { - pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true); + pipeline.getOptions().as(TestSparkPipelineOptions.class).setStreaming(true); Instant instant = new Instant(0); CreateStream source = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index d4721cd3d823..a3d7724e4363 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -529,7 +529,7 @@ public void process(ProcessContext context) { private static PipelineOptions streamingOptions() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); return options; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index bf197b3eeb7a..a7618aad61b4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -279,7 +279,7 @@ private SparkPipelineResult run(Optional stopWatermarkOption, int expec options.setExpectedAssertions(expectedAssertions); options.setRunner(TestSparkRunner.class); options.setEnableSparkMetricSinks(false); - options.setForceStreaming(true); + options.setStreaming(true); options.setCheckpointDir(temporaryFolder.getRoot().getPath()); // timeout is per execution so it can be injected by the caller. if (stopWatermarkOption.isPresent()) { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java index 39203d602645..ac66be043969 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java @@ -170,7 +170,7 @@ public void testInStreamingMode() throws Exception { private static PipelineOptions streamingOptions() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); return options; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index ac377cf596b0..493ffd8acfcf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -92,7 +92,7 @@ public void testUnboundedSourceMetrics() { private static PipelineOptions streamingOptions() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); return options; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 6a785acce3c3..dc8686ac1727 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -105,6 +105,9 @@ public void testLateDataAccumulating() { .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6))) // These elements are late but within the allowed lateness .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant)) + .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10))) + // FIXME: Without advancing the watermark once more the (lower) input watermark remains + // at 6 mins, but data in [0,5 min) won't be considered late until it passes 10 mins. .advanceWatermarkTo(instant.plus(Duration.standardMinutes(20))) // These elements are droppably late .addElements( From 9eaf52e7b9bd2847edfe8a3a64e64c822938a18e Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Tue, 13 Sep 2022 14:42:47 +0200 Subject: [PATCH 2/4] Review feedback --- .../spark/translation/streaming/TestDStream.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java index 04fb58aa9888..ec4937d439c5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java @@ -49,13 +49,13 @@ public class TestDStream extends InputDStream> { private final Coder> coder; @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization - private final transient List> events; + private final transient @Nullable List> events; - private int event = 0; + private int currentEventIndex = 0; private boolean insertEmptyBatch = false; - private long lastMillis = 0; + private long lastValidTimeMs = 0; private Instant lastWatermark = Instant.EPOCH; @@ -67,8 +67,6 @@ public TestDStream(TestStream test, StreamingContext ssc) { @Override public Option>> compute(Time validTime) { - Preconditions.checkStateNotNull(events); - TestStream.Event event = insertEmptyBatch ? null : nextEvent(); if (event == null) { @@ -96,14 +94,15 @@ public Option>> compute(Time validTime) { } private void waitForLastBatch(Time validTime) { - while (GlobalWatermarkHolder.getLastWatermarkedBatchTime() < lastMillis) { + while (GlobalWatermarkHolder.getLastWatermarkedBatchTime() < lastValidTimeMs) { Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); } - lastMillis = validTime.milliseconds(); + lastValidTimeMs = validTime.milliseconds(); } private @Nullable TestStream.Event nextEvent() { - return events.size() > event ? events.get(event++) : null; + List> events = Preconditions.checkStateNotNull(this.events); + return events.size() > currentEventIndex ? events.get(currentEventIndex++) : null; } private void addWatermark(Time time, WatermarkEvent event) { From 480560c187603a1ebc148e02bebfcf7ba0c90959 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Wed, 14 Sep 2022 12:20:34 +0200 Subject: [PATCH 3/4] Detect batch/streaming mode based on Pipeline in TestSparkRunner --- .../java/org/apache/beam/runners/spark/SparkRunner.java | 6 +++--- .../java/org/apache/beam/runners/spark/TestSparkRunner.java | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 2fe92bb29f1b..f3558f633829 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -153,7 +153,7 @@ public SparkPipelineResult run(final Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); // visit the pipeline to determine the translation mode - detectTranslationMode(pipeline); + detectTranslationMode(pipeline, pipelineOptions); // Default to using the primitive versions of Read.Bounded and Read.Unbounded. // TODO(https://github.com/apache/beam/issues/20530): Use SDF read as default when we address @@ -273,12 +273,12 @@ public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext } /** Visit the pipeline to determine the translation mode (batch/streaming). */ - private void detectTranslationMode(Pipeline pipeline) { + static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions pipelineOptions) { TranslationModeDetector detector = new TranslationModeDetector(); pipeline.traverseTopologically(detector); if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { // set streaming mode if it's a streaming pipeline - this.pipelineOptions.setStreaming(true); + pipelineOptions.setStreaming(true); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 2648ae547c03..9b89c7c51d79 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -101,6 +101,8 @@ public SparkPipelineResult run(Pipeline pipeline) { TestSparkPipelineOptions testSparkOptions = PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); + SparkRunner.detectTranslationMode(pipeline, testSparkOptions); + boolean isForceStreaming = testSparkOptions.isForceStreaming(); boolean isStreaming = testSparkOptions.isStreaming(); From b35dbaa92d662a50e59ddebe4b77f1dbb4a60bb7 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Wed, 14 Sep 2022 13:51:15 +0200 Subject: [PATCH 4/4] Don't force streaming by replacing bounded sources with unbounded reads for Spark VR tests --- runners/spark/spark_runner.gradle | 14 +- .../beam/runners/spark/TestSparkRunner.java | 121 +----------------- 2 files changed, 5 insertions(+), 130 deletions(-) diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 5b83004cf844..14a433162fb6 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -265,7 +265,7 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) group = "Verification" // Disable gradle cache outputs.upToDateWhen { false } - systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true", "--forceStreaming": "true"]) + systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true"]) classpath = configurations.validatesRunner testClassesDirs += files( @@ -278,25 +278,19 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' filter { - // translation to UNBOUNDED read doesn't work for empty input - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty' - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo' + // UNBOUNDED View.CreatePCollectionView not supported + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent' } // TestStream using processing time is not supported in Spark excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' - // CreatePCollectionView not supported in streaming mode - excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs' - // BOUNDED, not applicable in streaming mode - excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' // Exceeds Java heap space excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' // Should be run only in a properly configured SDK harness environment excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' - excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' // State and Timers excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 9b89c7c51d79..bd9baeacfc49 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.spark; -import static org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM; -import static org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -26,33 +24,16 @@ import java.io.File; import java.io.IOException; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.stateful.SparkTimerInternals; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.io.FileUtils; import org.joda.time.Duration; @@ -102,8 +83,6 @@ public SparkPipelineResult run(Pipeline pipeline) { PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); SparkRunner.detectTranslationMode(pipeline, testSparkOptions); - - boolean isForceStreaming = testSparkOptions.isForceStreaming(); boolean isStreaming = testSparkOptions.isStreaming(); SparkPipelineResult result = null; @@ -116,26 +95,8 @@ public SparkPipelineResult run(Pipeline pipeline) { LOG.info("About to run test pipeline {}", options.getJobName()); // if the pipeline was executed in streaming mode, validate aggregators. - if (isStreaming || isForceStreaming) { + if (isStreaming) { try { - testSparkOptions.setStreaming(true); - - if (isForceStreaming) { - FindBoundedReads boundedReadsVisitor = new FindBoundedReads(); - pipeline.traverseTopologically(boundedReadsVisitor); - - if (!boundedReadsVisitor.boundedReads.isEmpty()) { - // replace BOUNDED read with UNBOUNDED to force streaming - pipeline.replaceAll( - ImmutableList.of( - PTransformOverride.of( - app -> boundedReadsVisitor.contains(app.getTransform()), - new UnboundedReadFromBoundedSourceOverrideFactory<>()))); - // force UNBOUNDED PCollections - pipeline.traverseTopologically(new SetUnbounded()); - } - } - result = delegate.run(pipeline); awaitWatermarksOrTimeout(testSparkOptions, result); result.stop(); @@ -194,84 +155,4 @@ private static void awaitWatermarksOrTimeout( } while ((timeoutMillis -= batchDurationMillis) > 0 && globalWatermark.isBefore(stopPipelineWatermark)); } - - /** - * Override factory to replace {@link Read.Unbounded} with {@link UnboundedReadFromBoundedSource} - * to force streaming mode. - */ - private static class UnboundedReadFromBoundedSourceOverrideFactory - implements PTransformOverrideFactory, Read.Bounded> { - - @Override - public PTransformReplacement> getReplacementTransform( - AppliedPTransform, Read.Bounded> transform) { - return PTransformReplacement.of( - transform.getPipeline().begin(), - new UnboundedReadFromBoundedSource<>(transform.getTransform().getSource())); - } - - @Override - public Map, ReplacementOutput> mapOutputs( - Map, PCollection> outputs, PCollection newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } - } - - private static boolean isBoundedRead(PTransform transform) { - return transform != null && transform instanceof Read.Bounded; - } - - /** Gathers all bounded test inputs of {@link Read.Bounded} except for usages in PAssert. */ - private static class FindBoundedReads extends PipelineVisitor.Defaults { - private final Set> boundedReads = new HashSet<>(); - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - if (node.getFullName().contains("PAssert$")) { - return DO_NOT_ENTER_TRANSFORM; - } else if (isBoundedRead(node.getTransform())) { - boundedReads.add((Read.Bounded) node.getTransform()); - return DO_NOT_ENTER_TRANSFORM; - } - return ENTER_TRANSFORM; - } - - boolean contains(PTransform transform) { - return boundedReads.contains(transform); - } - } - - /** - * Sets all {@link PCollection}s to {@link IsBounded#UNBOUNDED} excepts for outputs of remaining - * {@link Read.Bounded} (used in PAssert) and descendants. - */ - private static class SetUnbounded extends PipelineVisitor.Defaults { - private final Set> excludes = new HashSet<>(); - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - if (isBoundedRead(node.getTransform())) { - // Output of Read.Bounded (used in PAssert) has to remain BOUNDED - setIsBounded(node.getOutputs(), IsBounded.BOUNDED); - excludes.addAll(node.getOutputs().values()); - return DO_NOT_ENTER_TRANSFORM; - } else { - visitPrimitiveTransform(node); - return ENTER_TRANSFORM; - } - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - // Force UNBOUNDED PCollections - setIsBounded(node.getInputs(), IsBounded.UNBOUNDED); - setIsBounded(node.getOutputs(), IsBounded.UNBOUNDED); - } - - private void setIsBounded(Map, PCollection> map, IsBounded isBounded) { - map.values().stream() - .filter(pc -> !excludes.contains(pc)) - .forEach(in -> in.setIsBoundedInternal(isBounded)); - } - } }