diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 420c13d86146..14a433162fb6 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -51,6 +51,7 @@ configurations { def sparkTestProperties(overrides = [:]) { def defaults = ["--runner": "TestSparkRunner"] [ + "log4j.configuration" : "log4j-test.properties", "spark.sql.shuffle.partitions": "4", "spark.ui.enabled" : "false", "spark.ui.showConsoleProgress": "false", @@ -106,7 +107,6 @@ if (copySourceBase) { test { systemProperties sparkTestProperties() - systemProperty "log4j.configuration", "log4j-test.properties" // Change log level to debug: // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" // Change log level to debug only for the package and nested packages: @@ -207,6 +207,11 @@ configurations.all { exclude group: "com.codahale.metrics", module: "metrics-core" } +configurations.validatesRunner { + // Exclude to make sure log4j binding is used + exclude group: "org.slf4j", module: "slf4j-simple" +} + hadoopVersions.each { kv -> configurations."hadoopVersion$kv.key" { resolutionStrategy { @@ -260,21 +265,54 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) group = "Verification" // Disable gradle cache outputs.upToDateWhen { false } - def pipelineOptions = JsonOutput.toJson([ - "--runner=TestSparkRunner", - "--forceStreaming=true", - "--enableSparkMetricSinks=true", - ]) - systemProperty "beamTestPipelineOptions", pipelineOptions + systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true"]) classpath = configurations.validatesRunner - testClassesDirs += files(project.sourceSets.test.output.classesDirs) + testClassesDirs += files( + project(":sdks:java:core").sourceSets.test.output.classesDirs, + project(":runners:core-java").sourceSets.test.output.classesDirs, + ) - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 maxParallelForks 4 useJUnit { - includeCategories 'org.apache.beam.runners.spark.StreamingTest' + includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + + filter { + // UNBOUNDED View.CreatePCollectionView not supported + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent' + } + + // TestStream using processing time is not supported in Spark + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' + + // Exceeds Java heap space + excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' + + // Should be run only in a properly configured SDK harness environment + excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' + + // State and Timers + excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' + excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer' + excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' + excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' + + // Metrics + excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics' + // SDF + excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' + // Portability + excludeCategories 'org.apache.beam.sdk.testing.UsesJavaExpansionService' + excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' + excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' + // Ordering + excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' + excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 2fe92bb29f1b..f3558f633829 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -153,7 +153,7 @@ public SparkPipelineResult run(final Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); // visit the pipeline to determine the translation mode - detectTranslationMode(pipeline); + detectTranslationMode(pipeline, pipelineOptions); // Default to using the primitive versions of Read.Bounded and Read.Unbounded. // TODO(https://github.com/apache/beam/issues/20530): Use SDF read as default when we address @@ -273,12 +273,12 @@ public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext } /** Visit the pipeline to determine the translation mode (batch/streaming). */ - private void detectTranslationMode(Pipeline pipeline) { + static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions pipelineOptions) { TranslationModeDetector detector = new TranslationModeDetector(); pipeline.traverseTopologically(detector); if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { // set streaming mode if it's a streaming pipeline - this.pipelineOptions.setStreaming(true); + pipelineOptions.setStreaming(true); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 646781574c51..bd9baeacfc49 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -82,7 +82,9 @@ public SparkPipelineResult run(Pipeline pipeline) { TestSparkPipelineOptions testSparkOptions = PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); - boolean isForceStreaming = testSparkOptions.isForceStreaming(); + SparkRunner.detectTranslationMode(pipeline, testSparkOptions); + boolean isStreaming = testSparkOptions.isStreaming(); + SparkPipelineResult result = null; // clear state of Aggregators, Metrics and Watermarks if exists. @@ -93,7 +95,7 @@ public SparkPipelineResult run(Pipeline pipeline) { LOG.info("About to run test pipeline {}", options.getJobName()); // if the pipeline was executed in streaming mode, validate aggregators. - if (isForceStreaming) { + if (isStreaming) { try { result = delegate.run(pipeline); awaitWatermarksOrTimeout(testSparkOptions, result); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 077b646697f8..8ad6da2a7a75 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -59,6 +59,7 @@ import org.apache.beam.runners.spark.util.SparkCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.DoFn; @@ -170,6 +171,27 @@ public String toNativeString() { }; } + private static TransformEvaluator> createFromTestStream() { + return new TransformEvaluator>() { + + @Override + public void evaluate(TestStream transform, EvaluationContext context) { + TestDStream dStream = new TestDStream<>(transform, context.getStreamingContext().ssc()); + JavaInputDStream> javaDStream = + new JavaInputDStream<>(dStream, JavaSparkContext$.MODULE$.fakeClassTag()); + + UnboundedDataset dataset = + new UnboundedDataset<>(javaDStream, Collections.singletonList(dStream.id())); + context.putDataset(transform, dataset); + } + + @Override + public String toNativeString() { + return "streamingContext.testStream(...)"; + } + }; + } + private static TransformEvaluator> createFromQueue() { return new TransformEvaluator>() { @Override @@ -540,10 +562,12 @@ public String toNativeString() { EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped()); EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo()); EVALUATORS.put(ConsoleIO.Write.Unbound.TRANSFORM_URN, print()); - EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue()); EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window()); EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl()); EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle()); + // For testing only + EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue()); + EVALUATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, createFromTestStream()); } private static @Nullable TransformEvaluator getTranslator(PTransform transform) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java new file mode 100644 index 000000000000..ec4937d439c5 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.TestStream.ElementEvent; +import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent; +import org.apache.beam.sdk.testing.TestStream.WatermarkEvent; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaSparkContext$; +import org.apache.spark.rdd.RDD; +import org.apache.spark.streaming.StreamingContext; +import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.dstream.InputDStream; +import org.joda.time.Instant; +import scala.Option; +import scala.reflect.ClassTag; + +public class TestDStream extends InputDStream> { + private final Coder> coder; + + @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization + private final transient @Nullable List> events; + + private int currentEventIndex = 0; + + private boolean insertEmptyBatch = false; + + private long lastValidTimeMs = 0; + + private Instant lastWatermark = Instant.EPOCH; + + public TestDStream(TestStream test, StreamingContext ssc) { + super(ssc, classTag()); + this.coder = WindowedValue.getFullCoder(test.getValueCoder(), GlobalWindow.Coder.INSTANCE); + this.events = test.getEvents(); + } + + @Override + public Option>> compute(Time validTime) { + TestStream.Event event = insertEmptyBatch ? null : nextEvent(); + + if (event == null) { + insertEmptyBatch = false; + waitForLastBatch(validTime); + return Option.apply(emptyRDD()); + + } else if (event instanceof ElementEvent) { + waitForLastBatch(validTime); + return Option.apply(buildRdd((ElementEvent) event)); + + } else if (event instanceof WatermarkEvent) { + waitForLastBatch(validTime); + addWatermark(validTime, (WatermarkEvent) event); + // insert an additional empty batch so watermark can propagate + insertEmptyBatch = true; + return Option.apply(emptyRDD()); + + } else if (event instanceof ProcessingTimeEvent) { + throw new UnsupportedOperationException( + "Advancing Processing time is not supported by the Spark Runner."); + } else { + throw new IllegalStateException("Unknown event type " + event); + } + } + + private void waitForLastBatch(Time validTime) { + while (GlobalWatermarkHolder.getLastWatermarkedBatchTime() < lastValidTimeMs) { + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } + lastValidTimeMs = validTime.milliseconds(); + } + + private @Nullable TestStream.Event nextEvent() { + List> events = Preconditions.checkStateNotNull(this.events); + return events.size() > currentEventIndex ? events.get(currentEventIndex++) : null; + } + + private void addWatermark(Time time, WatermarkEvent event) { + SparkWatermarks watermarks = + new SparkWatermarks(lastWatermark, event.getWatermark(), new Instant(time.milliseconds())); + lastWatermark = event.getWatermark(); + GlobalWatermarkHolder.add(id(), watermarks); + } + + @Override + public void start() {} + + @Override + public void stop() {} + + private RDD> emptyRDD() { + return ssc().sparkContext().emptyRDD(classTag()); + } + + private RDD> buildRdd(ElementEvent event) { + List binaryData = new ArrayList<>(); + for (TimestampedValue elem : event.getElements()) { + WindowedValue wv = + WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()); + binaryData.add(CoderHelpers.toByteArray(wv, coder)); + } + + return new JavaSparkContext(ssc().sparkContext()) + .parallelize(binaryData) + .map(CoderHelpers.fromByteFunction(coder)) + .rdd(); + } + + private static ClassTag> classTag() { + return JavaSparkContext$.MODULE$.fakeClassTag(); + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 91ef5a426401..843d303bda87 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -109,7 +109,7 @@ public void debugBatchPipeline() { public void debugStreamingPipeline() { PipelineOptions options = contextRule.configure(PipelineOptionsFactory.create()); options.setRunner(SparkRunnerDebugger.class); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); Pipeline pipeline = Pipeline.create(options); KafkaIO.Read read = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index edf164b26b8d..c632a9800a8b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -86,7 +86,7 @@ public void testInBatchMode() throws Exception { @Category(StreamingTest.class) @Test public void testInStreamingMode() throws Exception { - pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true); + pipeline.getOptions().as(TestSparkPipelineOptions.class).setStreaming(true); assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); Instant instant = new Instant(0); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java index deb7e0cd5dcc..dbd569d89d15 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java @@ -69,7 +69,7 @@ public void init() { @Category(StreamingTest.class) @Test public void testInStreamingMode() throws Exception { - pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true); + pipeline.getOptions().as(TestSparkPipelineOptions.class).setStreaming(true); Instant instant = new Instant(0); CreateStream source = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index d4721cd3d823..a3d7724e4363 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -529,7 +529,7 @@ public void process(ProcessContext context) { private static PipelineOptions streamingOptions() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); return options; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index bf197b3eeb7a..a7618aad61b4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -279,7 +279,7 @@ private SparkPipelineResult run(Optional stopWatermarkOption, int expec options.setExpectedAssertions(expectedAssertions); options.setRunner(TestSparkRunner.class); options.setEnableSparkMetricSinks(false); - options.setForceStreaming(true); + options.setStreaming(true); options.setCheckpointDir(temporaryFolder.getRoot().getPath()); // timeout is per execution so it can be injected by the caller. if (stopWatermarkOption.isPresent()) { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java index 39203d602645..ac66be043969 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java @@ -170,7 +170,7 @@ public void testInStreamingMode() throws Exception { private static PipelineOptions streamingOptions() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); return options; } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index ac377cf596b0..493ffd8acfcf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -92,7 +92,7 @@ public void testUnboundedSourceMetrics() { private static PipelineOptions streamingOptions() { PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(TestSparkPipelineOptions.class).setForceStreaming(true); + options.as(TestSparkPipelineOptions.class).setStreaming(true); return options; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 6a785acce3c3..dc8686ac1727 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -105,6 +105,9 @@ public void testLateDataAccumulating() { .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6))) // These elements are late but within the allowed lateness .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant)) + .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10))) + // FIXME: Without advancing the watermark once more the (lower) input watermark remains + // at 6 mins, but data in [0,5 min) won't be considered late until it passes 10 mins. .advanceWatermarkTo(instant.plus(Duration.standardMinutes(20))) // These elements are droppably late .addElements(