-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-14334] Remove remaining forkEvery 1 from all Spark tests and stop mixing unit tests with runner validations. #17662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,17 @@ configurations { | |
| examplesJavaIntegrationTest | ||
| } | ||
|
|
||
| def sparkTestProperties(overrides = [:]) { | ||
| def defaults = ["--runner": "TestSparkRunner"] | ||
| [ | ||
| "spark.sql.shuffle.partitions": "4", | ||
| "spark.ui.enabled" : "false", | ||
| "spark.ui.showConsoleProgress": "false", | ||
| "beamTestPipelineOptions" : | ||
| JsonOutput.toJson((defaults + overrides).collect { k, v -> "$k=$v" }) | ||
| ] | ||
| } | ||
|
|
||
| def hadoopVersions = [ | ||
| "285" : "2.8.5", | ||
| "292" : "2.9.2", | ||
|
|
@@ -94,14 +105,7 @@ if (copySourceBase) { | |
| } | ||
|
|
||
| test { | ||
| systemProperty "spark.sql.shuffle.partitions", "4" | ||
| systemProperty "spark.ui.enabled", "false" | ||
| systemProperty "spark.ui.showConsoleProgress", "false" | ||
| systemProperty "beamTestPipelineOptions", """[ | ||
| "--runner=TestSparkRunner", | ||
| "--streaming=false", | ||
| "--enableSparkMetricSinks=true" | ||
| ]""" | ||
| systemProperties sparkTestProperties() | ||
| systemProperty "log4j.configuration", "log4j-test.properties" | ||
| // Change log level to debug: | ||
| // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug" | ||
|
|
@@ -113,10 +117,6 @@ test { | |
| } | ||
|
|
||
| maxParallelForks 4 | ||
| useJUnit { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it was removed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. previously these unit tests were excluded during |
||
| excludeCategories "org.apache.beam.runners.spark.StreamingTest" | ||
| excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" | ||
| } | ||
|
|
||
| // easily re-run all tests (to deal with flaky tests / SparkContext leaks) | ||
| if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} } | ||
|
|
@@ -218,29 +218,17 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) { | |
| group = "Verification" | ||
| // Disable gradle cache | ||
| outputs.upToDateWhen { false } | ||
| def pipelineOptions = JsonOutput.toJson([ | ||
| "--runner=TestSparkRunner", | ||
| "--streaming=false", | ||
| "--enableSparkMetricSinks=false", | ||
| ]) | ||
| systemProperty "beamTestPipelineOptions", pipelineOptions | ||
| systemProperty "beam.spark.test.reuseSparkContext", "true" | ||
| systemProperty "spark.ui.enabled", "false" | ||
| systemProperty "spark.ui.showConsoleProgress", "false" | ||
| systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"]) | ||
|
|
||
| classpath = configurations.validatesRunner | ||
| testClassesDirs = files( | ||
| project(":sdks:java:core").sourceSets.test.output.classesDirs, | ||
| project(":runners:core-java").sourceSets.test.output.classesDirs, | ||
| ) | ||
| testClassesDirs += files(project.sourceSets.test.output.classesDirs) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All unit tests should be run as part of the |
||
|
|
||
| // Only one SparkContext may be running in a JVM (SPARK-2243) | ||
| forkEvery 1 | ||
| maxParallelForks 4 | ||
| useJUnit { | ||
| includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' | ||
| includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery' | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unit test!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as said above, tests of this custom category are normal unit tests and are already run during |
||
| // Should be run only in a properly configured SDK harness environment | ||
| excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' | ||
| excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' | ||
|
|
@@ -293,15 +281,7 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { | |
| group = "Verification" | ||
| // Disable gradle cache | ||
| outputs.upToDateWhen { false } | ||
| def pipelineOptions = JsonOutput.toJson([ | ||
| "--runner=SparkStructuredStreamingRunner", | ||
| "--testMode=true", | ||
| "--streaming=false", | ||
| ]) | ||
| systemProperty "beamTestPipelineOptions", pipelineOptions | ||
| systemProperty "spark.sql.shuffle.partitions", "4" | ||
| systemProperty "spark.ui.enabled", "false" | ||
| systemProperty "spark.ui.showConsoleProgress", "false" | ||
| systemProperties sparkTestProperties(["--runner":"SparkStructuredStreamingRunner", "--testMode":"true"]) | ||
|
|
||
| classpath = configurations.validatesRunner | ||
| testClassesDirs = files( | ||
|
|
@@ -310,8 +290,6 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { | |
| ) | ||
| testClassesDirs += files(project.sourceSets.test.output.classesDirs) | ||
|
|
||
| // Only one SparkContext may be running in a JVM (SPARK-2243) | ||
| forkEvery 1 | ||
| maxParallelForks 4 | ||
| // Increase memory heap in order to avoid OOM errors | ||
| jvmArgs '-Xmx7g' | ||
|
|
@@ -373,27 +351,18 @@ createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark') | |
|
|
||
| tasks.register("hadoopVersionsTest") { | ||
| group = "Verification" | ||
| def taskNames = hadoopVersions.keySet().stream() | ||
| .map { num -> "hadoopVersion${num}Test" } | ||
| .collect(Collectors.toList()) | ||
| dependsOn taskNames | ||
| dependsOn hadoopVersions.collect{k,v -> "hadoopVersion${k}Test"} | ||
| } | ||
|
|
||
| tasks.register("examplesIntegrationTest", Test) { | ||
| group = "Verification" | ||
| // Disable gradle cache | ||
| outputs.upToDateWhen { false } | ||
| def pipelineOptions = JsonOutput.toJson([ | ||
| "--runner=TestSparkRunner", | ||
| "--enableSparkMetricSinks=true", | ||
| "--tempLocation=${tempLocation}", | ||
| "--tempRoot=${tempLocation}", | ||
| "--project=${gcpProject}", | ||
| systemProperties sparkTestProperties([ | ||
| "--tempLocation": "${tempLocation}", | ||
| "--tempRoot" : "${tempLocation}", | ||
| "--project" : "${gcpProject}" | ||
| ]) | ||
| systemProperty "beamTestPipelineOptions", pipelineOptions | ||
| systemProperty "beam.spark.test.reuseSparkContext", "true" | ||
| systemProperty "spark.ui.enabled", "false" | ||
| systemProperty "spark.ui.showConsoleProgress", "false" | ||
|
|
||
| include '**/*IT.class' | ||
| maxParallelForks 4 | ||
|
|
@@ -414,19 +383,10 @@ hadoopVersions.each { kv -> | |
| group = "Verification" | ||
| description = "Runs Spark tests with Hadoop version $kv.value" | ||
| classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath | ||
| systemProperty "beam.spark.test.reuseSparkContext", "true" | ||
| systemProperty "spark.sql.shuffle.partitions", "4" | ||
| systemProperty "spark.ui.enabled", "false" | ||
| systemProperty "spark.ui.showConsoleProgress", "false" | ||
| systemProperty "beamTestPipelineOptions", """[ | ||
| "--runner=TestSparkRunner", | ||
| "--streaming=false", | ||
| "--enableSparkMetricSinks=true" | ||
| ]""" | ||
| // Only one SparkContext may be running in a JVM (SPARK-2243) | ||
| forkEvery 1 | ||
| maxParallelForks 4 | ||
| systemProperties sparkTestProperties() | ||
|
|
||
| include "**/*Test.class" | ||
| maxParallelForks 4 | ||
| useJUnit { | ||
| excludeCategories "org.apache.beam.runners.spark.StreamingTest" | ||
| excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just refactoring (extract method)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes 👍