Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:managed:build")
dependsOn(":sdks:java:testing:expansion-service:build")
dependsOn(":sdks:java:testing:jpms-tests:build")
dependsOn(":sdks:java:testing:junit:build")
dependsOn(":sdks:java:testing:load-tests:build")
dependsOn(":sdks:java:testing:nexmark:build")
dependsOn(":sdks:java:testing:test-utils:build")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -131,7 +133,7 @@ public class TestPipeline extends Pipeline implements TestRule {

private final PipelineOptions options;

static class PipelineRunEnforcement {
private static class PipelineRunEnforcement {

@SuppressWarnings("WeakerAccess")
protected boolean enableAutoRunIfMissing;
Expand All @@ -140,7 +142,7 @@ static class PipelineRunEnforcement {

protected boolean runAttempted;

PipelineRunEnforcement(final Pipeline pipeline) {
private PipelineRunEnforcement(final Pipeline pipeline) {
this.pipeline = pipeline;
}

Expand All @@ -161,7 +163,7 @@ protected void afterUserCodeFinished() {
}
}

static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {
private static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {

// Null until the pipeline has been run
private @MonotonicNonNull List<TransformHierarchy.Node> runVisitedNodes;
Expand All @@ -187,7 +189,7 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node node) {
}
}

PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
private PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
super(pipeline);
runVisitedNodes = null;
}
Expand Down Expand Up @@ -296,6 +298,13 @@ public static TestPipeline create() {
return fromOptions(testingPipelineOptions());
}

/** */
static TestPipeline createWithEnforcement() {
TestPipeline p = create();

return p;
}

public static TestPipeline fromOptions(PipelineOptions options) {
return new TestPipeline(options);
}
Expand All @@ -310,49 +319,55 @@ public PipelineOptions getOptions() {
return this.options;
}

@Override
public Statement apply(final Statement statement, final Description description) {
return new Statement() {
// package private for JUnit5 TestPipelineExtension
void setDeducedEnforcementLevel(Collection<Annotation> annotations) {
// if the enforcement level has not been set by the user do auto-inference
if (!enforcement.isPresent()) {

private void setDeducedEnforcementLevel() {
// if the enforcement level has not been set by the user do auto-inference
if (!enforcement.isPresent()) {
final boolean annotatedWithNeedsRunner =
FluentIterable.from(annotations)
.filter(Annotations.Predicates.isAnnotationOfType(Category.class))
.anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));

final boolean annotatedWithNeedsRunner =
FluentIterable.from(description.getAnnotations())
.filter(Annotations.Predicates.isAnnotationOfType(Category.class))
.anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));
final boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner());

final boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner());
checkState(
!(annotatedWithNeedsRunner && crashingRunner),
"The test was annotated with a [@%s] / [@%s] while the runner "
+ "was set to [%s]. Please re-check your configuration.",
NeedsRunner.class.getSimpleName(),
ValidatesRunner.class.getSimpleName(),
CrashingRunner.class.getSimpleName());

checkState(
!(annotatedWithNeedsRunner && crashingRunner),
"The test was annotated with a [@%s] / [@%s] while the runner "
+ "was set to [%s]. Please re-check your configuration.",
NeedsRunner.class.getSimpleName(),
ValidatesRunner.class.getSimpleName(),
CrashingRunner.class.getSimpleName());
enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || !crashingRunner);
}
}

enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || !crashingRunner);
}
}
// package private for JUnit5 TestPipelineExtension
void afterUserCodeFinished() {
enforcement.get().afterUserCodeFinished();
}

@Override
public Statement apply(final Statement statement, final Description description) {
return new Statement() {

@Override
public void evaluate() throws Throwable {
options.as(ApplicationNameOptions.class).setAppName(getAppName(description));

setDeducedEnforcementLevel();
setDeducedEnforcementLevel(description.getAnnotations());

// statement.evaluate() essentially runs the user code contained in the unit test at hand.
// Exceptions thrown during the execution of the user's test code will propagate here,
// unless the user explicitly handles them with a "catch" clause in his code. If the
// exception is handled by a user's "catch" clause, is does not interrupt the flow and
// exception is handled by a user's "catch" clause, it does not interrupt the flow, and
// we move on to invoking the configured enforcements.
// If the user does not handle a thrown exception, it will propagate here and interrupt
// the flow, preventing the enforcement(s) from being activated.
// The motivation for this is avoiding enforcements over faulty pipelines.
statement.evaluate();
enforcement.get().afterUserCodeFinished();
afterUserCodeFinished();
}
};
}
Expand Down Expand Up @@ -597,7 +612,7 @@ public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pip
}
}

static class IsEmptyVisitor extends PipelineVisitor.Defaults {
private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
private boolean empty = true;

public boolean isEmpty() {
Expand Down
3 changes: 1 addition & 2 deletions sdks/java/testing/junit/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
plugins { id 'org.apache.beam.module' }

applyJavaNature(
exportJavadoc: false,
automaticModuleName: 'org.apache.beam.sdk.testing.junit',
archivesBaseName: 'beam-sdks-java-testing-junit'
)
Expand All @@ -33,11 +32,11 @@ dependencies {
// Needed to resolve TestPipeline's JUnit 4 TestRule type and @Category at compile time,
// but should not leak to consumers at runtime.
provided library.java.junit
permitUnusedDeclared(library.java.junit)

// JUnit 5 API needed to compile the extension; not packaged for consumers of core.
provided library.java.jupiter_api

testImplementation project(path: ":sdks:java:core", configuration: "shadow")
testImplementation library.java.jupiter_api
testImplementation library.java.junit
testRuntimeOnly library.java.jupiter_engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@
*/
package org.apache.beam.sdk.testing;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Optional;
import java.util.Collection;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline.PipelineAbandonedNodeEnforcement;
import org.apache.beam.sdk.testing.TestPipeline.PipelineRunEnforcement;
import org.junit.experimental.categories.Category;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand Down Expand Up @@ -86,16 +84,16 @@ public static TestPipelineExtension fromOptions(PipelineOptions options) {
return new TestPipelineExtension(options);
}

private TestPipeline testPipeline;
private @Nullable PipelineOptions options;

/** Creates a TestPipelineExtension with default options. */
public TestPipelineExtension() {
this.testPipeline = TestPipeline.create();
this.options = null;
}

/** Creates a TestPipelineExtension with custom options. */
public TestPipelineExtension(PipelineOptions options) {
this.testPipeline = TestPipeline.fromOptions(options);
this.options = options;
}

@Override
Expand All @@ -107,52 +105,38 @@ public boolean supportsParameter(
@Override
public Object resolveParameter(
ParameterContext parameterContext, ExtensionContext extensionContext) {
if (this.testPipeline == null) {
return getOrCreateTestPipeline(extensionContext);
} else {
return this.testPipeline;
}
return getOrCreateTestPipeline(extensionContext);
}

@Override
public void beforeEach(ExtensionContext context) throws Exception {
TestPipeline pipeline;

if (this.testPipeline != null) {
pipeline = this.testPipeline;
} else {
pipeline = getOrCreateTestPipeline(context);
}
public void beforeEach(ExtensionContext context) {
TestPipeline pipeline = getOrCreateTestPipeline(context);

// Set application name based on test method
String appName = getAppName(context);
pipeline.getOptions().as(ApplicationNameOptions.class).setAppName(appName);

// Set up enforcement based on annotations
setDeducedEnforcementLevel(context, pipeline);
pipeline.setDeducedEnforcementLevel(getAnnotations(context));
}

@Override
public void afterEach(ExtensionContext context) throws Exception {
Optional<PipelineRunEnforcement> enforcement = getEnforcement(context);
if (enforcement.isPresent()) {
enforcement.get().afterUserCodeFinished();
}
public void afterEach(ExtensionContext context) {
TestPipeline pipeline = getRequiredTestPipeline(context);
pipeline.afterUserCodeFinished();
}

private TestPipeline getOrCreateTestPipeline(ExtensionContext context) {
return context
.getStore(NAMESPACE)
.getOrComputeIfAbsent(PIPELINE_KEY, key -> TestPipeline.create(), TestPipeline.class);
}

private Optional<PipelineRunEnforcement> getEnforcement(ExtensionContext context) {
return Optional.ofNullable(
context.getStore(NAMESPACE).get(ENFORCEMENT_KEY, PipelineRunEnforcement.class));
.getOrComputeIfAbsent(
PIPELINE_KEY,
key -> options == null ? TestPipeline.create() : TestPipeline.fromOptions(options),
TestPipeline.class);
}

private void setEnforcement(ExtensionContext context, PipelineRunEnforcement enforcement) {
context.getStore(NAMESPACE).put(ENFORCEMENT_KEY, enforcement);
private TestPipeline getRequiredTestPipeline(ExtensionContext context) {
return checkNotNull(context.getStore(NAMESPACE).get(PIPELINE_KEY, TestPipeline.class));
}

private String getAppName(ExtensionContext context) {
Expand All @@ -161,53 +145,10 @@ private String getAppName(ExtensionContext context) {
return className + "-" + methodName;
}

private void setDeducedEnforcementLevel(ExtensionContext context, TestPipeline pipeline) {
// If enforcement level has not been set, do auto-inference
if (!getEnforcement(context).isPresent()) {
boolean annotatedWithNeedsRunner = hasNeedsRunnerAnnotation(context);

PipelineOptions options = pipeline.getOptions();
boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner());

checkState(
!(annotatedWithNeedsRunner && crashingRunner),
"The test was annotated with a [@%s] / [@%s] while the runner "
+ "was set to [%s]. Please re-check your configuration.",
NeedsRunner.class.getSimpleName(),
ValidatesRunner.class.getSimpleName(),
CrashingRunner.class.getSimpleName());

if (annotatedWithNeedsRunner || !crashingRunner) {
setEnforcement(context, new PipelineAbandonedNodeEnforcement(pipeline));
}
}
}

private boolean hasNeedsRunnerAnnotation(ExtensionContext context) {
// Check method annotations
Method testMethod = context.getTestMethod().orElse(null);
if (testMethod != null) {
if (hasNeedsRunnerCategory(testMethod.getAnnotations())) {
return true;
}
}

// Check class annotations
Class<?> testClass = context.getTestClass().orElse(null);
if (testClass != null) {
if (hasNeedsRunnerCategory(testClass.getAnnotations())) {
return true;
}
}

return false;
}

private boolean hasNeedsRunnerCategory(Annotation[] annotations) {
return Arrays.stream(annotations)
.filter(annotation -> annotation instanceof Category)
.map(annotation -> (Category) annotation)
.flatMap(category -> Arrays.stream(category.value()))
.anyMatch(categoryClass -> NeedsRunner.class.isAssignableFrom(categoryClass));
private static Collection<Annotation> getAnnotations(ExtensionContext context) {
ImmutableList.Builder<Annotation> builder = ImmutableList.builder();
context.getTestMethod().ifPresent(testMethod -> builder.add(testMethod.getAnnotations()));
context.getTestClass().ifPresent(testClass -> builder.add(testClass.getAnnotations()));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.Serializable;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -31,7 +32,7 @@

/** Advanced tests for {@link TestPipelineExtension} demonstrating comprehensive functionality. */
@ExtendWith(TestPipelineExtension.class)
public class TestPipelineExtensionAdvancedTest {
public class TestPipelineExtensionAdvancedTest implements Serializable {

@Test
public void testApplicationNameIsSet(TestPipeline pipeline) {
Expand Down Expand Up @@ -72,7 +73,7 @@ public void testWithValidatesRunnerCategory(TestPipeline pipeline) {
@Test
public void testPipelineInstancesAreIsolated(TestPipeline pipeline1) {
// Each test method gets its own pipeline instance
assertNotNull(pipeline1);
pipeline1.enableAutoRunIfMissing(true);
pipeline1.apply("Create", Create.of("test"));
// Don't run the pipeline - test should still pass due to auto-run functionality
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.testing;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.junit.jupiter.api.Test;
Expand All @@ -33,6 +35,9 @@ public void testPipelineInjection(TestPipeline pipeline) {
// Verify that the pipeline is injected and not null
assertNotNull(pipeline);
assertNotNull(pipeline.getOptions());
assertEquals(
"TestPipelineExtensionTest-testPipelineInjection",
pipeline.getOptions().as(ApplicationNameOptions.class).getAppName());
}

@Test
Expand Down
Loading