Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ configurations {
def sparkTestProperties(overrides = [:]) {
def defaults = ["--runner": "TestSparkRunner"]
[
"log4j.configuration" : "log4j-test.properties",
"spark.sql.shuffle.partitions": "4",
"spark.ui.enabled" : "false",
"spark.ui.showConsoleProgress": "false",
Expand Down Expand Up @@ -106,7 +107,6 @@ if (copySourceBase) {

test {
systemProperties sparkTestProperties()
systemProperty "log4j.configuration", "log4j-test.properties"
// Change log level to debug:
// systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
// Change log level to debug only for the package and nested packages:
Expand Down Expand Up @@ -207,6 +207,11 @@ configurations.all {
exclude group: "com.codahale.metrics", module: "metrics-core"
}

configurations.validatesRunner {
// Exclude to make sure log4j binding is used
exclude group: "org.slf4j", module: "slf4j-simple"
}

hadoopVersions.each { kv ->
configurations."hadoopVersion$kv.key" {
resolutionStrategy {
Expand Down Expand Up @@ -260,21 +265,54 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--forceStreaming=true",
"--enableSparkMetricSinks=true",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true"])

classpath = configurations.validatesRunner
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
testClassesDirs += files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense. I don't understand how it worked before. Was it not actually running the VR tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in fact VR test were never run/supported for Spark in streaming mode. I just stumbled on this accidentally when I started looking into bugs related to onWindowExpiration :(
Instead there was some custom tests in the module that try to mimic the VR test, but they only cover a very small part (and also run as unit tests).

project(":runners:core-java").sourceSets.test.output.classesDirs,
)

// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.runners.spark.StreamingTest'
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'

filter {
// UNBOUNDED View.CreatePCollectionView not supported
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent'
}

// TestStream using processing time is not supported in Spark
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'

// Exceeds Java heap space
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'

// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// State and Timers
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Metrics
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
// SDF
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesJavaExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
// Ordering
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public SparkPipelineResult run(final Pipeline pipeline) {
MetricsEnvironment.setMetricsSupported(true);

// visit the pipeline to determine the translation mode
detectTranslationMode(pipeline);
detectTranslationMode(pipeline, pipelineOptions);

// Default to using the primitive versions of Read.Bounded and Read.Unbounded.
// TODO(https://github.com/apache/beam/issues/20530): Use SDF read as default when we address
Expand Down Expand Up @@ -273,12 +273,12 @@ public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext
}

/** Visit the pipeline to determine the translation mode (batch/streaming). */
private void detectTranslationMode(Pipeline pipeline) {
static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions pipelineOptions) {
TranslationModeDetector detector = new TranslationModeDetector();
pipeline.traverseTopologically(detector);
if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
// set streaming mode if it's a streaming pipeline
this.pipelineOptions.setStreaming(true);
pipelineOptions.setStreaming(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public SparkPipelineResult run(Pipeline pipeline) {
TestSparkPipelineOptions testSparkOptions =
PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);

boolean isForceStreaming = testSparkOptions.isForceStreaming();
SparkRunner.detectTranslationMode(pipeline, testSparkOptions);
boolean isStreaming = testSparkOptions.isStreaming();

SparkPipelineResult result = null;

// clear state of Aggregators, Metrics and Watermarks if exists.
Expand All @@ -93,7 +95,7 @@ public SparkPipelineResult run(Pipeline pipeline) {
LOG.info("About to run test pipeline {}", options.getJobName());

// if the pipeline was executed in streaming mode, validate aggregators.
if (isForceStreaming) {
if (isStreaming) {
try {
result = delegate.run(pipeline);
awaitWatermarksOrTimeout(testSparkOptions, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -170,6 +171,27 @@ public String toNativeString() {
};
}

private static <T> TransformEvaluator<TestStream<T>> createFromTestStream() {
return new TransformEvaluator<TestStream<T>>() {

@Override
public void evaluate(TestStream<T> transform, EvaluationContext context) {
TestDStream<T> dStream = new TestDStream<>(transform, context.getStreamingContext().ssc());
JavaInputDStream<WindowedValue<T>> javaDStream =
new JavaInputDStream<>(dStream, JavaSparkContext$.MODULE$.fakeClassTag());

UnboundedDataset<T> dataset =
new UnboundedDataset<>(javaDStream, Collections.singletonList(dStream.id()));
context.putDataset(transform, dataset);
}

@Override
public String toNativeString() {
return "streamingContext.testStream(...)";
}
};
}

private static <T> TransformEvaluator<CreateStream<T>> createFromQueue() {
return new TransformEvaluator<CreateStream<T>>() {
@Override
Expand Down Expand Up @@ -540,10 +562,12 @@ public String toNativeString() {
EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped());
EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo());
EVALUATORS.put(ConsoleIO.Write.Unbound.TRANSFORM_URN, print());
EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue());
EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl());
EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
// For testing only
EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue());
EVALUATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, createFromTestStream());
}

private static @Nullable TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.translation.streaming;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.TestStream.ElementEvent;
import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.joda.time.Instant;
import scala.Option;
import scala.reflect.ClassTag;

public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
private final Coder<WindowedValue<T>> coder;

@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization
private final transient @Nullable List<TestStream.Event<T>> events;

private int currentEventIndex = 0;

private boolean insertEmptyBatch = false;

private long lastValidTimeMs = 0;

private Instant lastWatermark = Instant.EPOCH;

public TestDStream(TestStream<T> test, StreamingContext ssc) {
super(ssc, classTag());
this.coder = WindowedValue.getFullCoder(test.getValueCoder(), GlobalWindow.Coder.INSTANCE);
this.events = test.getEvents();
}

@Override
public Option<RDD<WindowedValue<T>>> compute(Time validTime) {
TestStream.Event<T> event = insertEmptyBatch ? null : nextEvent();

if (event == null) {
insertEmptyBatch = false;
waitForLastBatch(validTime);
return Option.apply(emptyRDD());

} else if (event instanceof ElementEvent) {
waitForLastBatch(validTime);
return Option.apply(buildRdd((ElementEvent<T>) event));

} else if (event instanceof WatermarkEvent) {
waitForLastBatch(validTime);
addWatermark(validTime, (WatermarkEvent<T>) event);
// insert an additional empty batch so watermark can propagate
insertEmptyBatch = true;
return Option.apply(emptyRDD());

} else if (event instanceof ProcessingTimeEvent) {
throw new UnsupportedOperationException(
"Advancing Processing time is not supported by the Spark Runner.");
} else {
throw new IllegalStateException("Unknown event type " + event);
}
}

private void waitForLastBatch(Time validTime) {
while (GlobalWatermarkHolder.getLastWatermarkedBatchTime() < lastValidTimeMs) {
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
}
lastValidTimeMs = validTime.milliseconds();
}

private @Nullable TestStream.Event<T> nextEvent() {
List<TestStream.Event<T>> events = Preconditions.checkStateNotNull(this.events);
return events.size() > currentEventIndex ? events.get(currentEventIndex++) : null;
}

private void addWatermark(Time time, WatermarkEvent<T> event) {
SparkWatermarks watermarks =
new SparkWatermarks(lastWatermark, event.getWatermark(), new Instant(time.milliseconds()));
lastWatermark = event.getWatermark();
GlobalWatermarkHolder.add(id(), watermarks);
}

@Override
public void start() {}

@Override
public void stop() {}

private RDD<WindowedValue<T>> emptyRDD() {
return ssc().sparkContext().emptyRDD(classTag());
}

private RDD<WindowedValue<T>> buildRdd(ElementEvent<T> event) {
List<byte[]> binaryData = new ArrayList<>();
for (TimestampedValue<T> elem : event.getElements()) {
WindowedValue<T> wv =
WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp());
binaryData.add(CoderHelpers.toByteArray(wv, coder));
}

return new JavaSparkContext(ssc().sparkContext())
.parallelize(binaryData)
.map(CoderHelpers.fromByteFunction(coder))
.rdd();
}

private static <T> ClassTag<WindowedValue<T>> classTag() {
return JavaSparkContext$.MODULE$.fakeClassTag();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void debugBatchPipeline() {
public void debugStreamingPipeline() {
PipelineOptions options = contextRule.configure(PipelineOptionsFactory.create());
options.setRunner(SparkRunnerDebugger.class);
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
options.as(TestSparkPipelineOptions.class).setStreaming(true);
Pipeline pipeline = Pipeline.create(options);

KafkaIO.Read<String, String> read =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testInBatchMode() throws Exception {
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);
pipeline.getOptions().as(TestSparkPipelineOptions.class).setStreaming(true);
assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));

Instant instant = new Instant(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void init() {
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);
pipeline.getOptions().as(TestSparkPipelineOptions.class).setStreaming(true);

Instant instant = new Instant(0);
CreateStream<Integer> source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void process(ProcessContext context) {

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
options.as(TestSparkPipelineOptions.class).setStreaming(true);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private SparkPipelineResult run(Optional<Instant> stopWatermarkOption, int expec
options.setExpectedAssertions(expectedAssertions);
options.setRunner(TestSparkRunner.class);
options.setEnableSparkMetricSinks(false);
options.setForceStreaming(true);
options.setStreaming(true);
options.setCheckpointDir(temporaryFolder.getRoot().getPath());
// timeout is per execution so it can be injected by the caller.
if (stopWatermarkOption.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testInStreamingMode() throws Exception {

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
options.as(TestSparkPipelineOptions.class).setStreaming(true);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testUnboundedSourceMetrics() {

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
options.as(TestSparkPipelineOptions.class).setStreaming(true);
return options;
}
}
Loading