diff --git a/CHANGES.md b/CHANGES.md
index 8a20bdd21b7b..c754a4fe6581 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -74,6 +74,8 @@
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* BigtableRead Connector for BeamYaml added with new Config Param ([#35696](https://github.com/apache/beam/pull/35696))
+* Introduced a dedicated module for JUnit-based testing support: `sdks/java/testing/junit`, which provides `TestPipelineExtension` for JUnit 5 while maintaining backward compatibility with existing JUnit 4 `TestRule`-based tests (Java) ([#18733](https://github.com/apache/beam/issues/18733), [#35688](https://github.com/apache/beam/pull/35688)).
+ - To use JUnit 5 with Beam tests, add a test-scoped dependency on `org.apache.beam:beam-sdks-java-testing-junit`.
## Breaking Changes
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index a1f2916f1958..e849ae597791 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -130,3 +130,11 @@ project.tasks.compileTestJava {
// TODO: fix other places with warnings in tests and delete this option
options.compilerArgs += ['-Xlint:-rawtypes']
}
+
+// Configure test task to use JUnit 4. JUnit 5 support is provided in module
+// sdks/java/testing/junit, which configures useJUnitPlatform(). Submodules that
+// need to run both JUnit 4 and 5 via the JUnit Platform must also add the
+// Vintage engine explicitly.
+test {
+ useJUnit()
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 328bf19c466c..782471407a2a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -82,7 +82,11 @@
*
*
*
Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
- * remote execution modes. For example:
+ * remote execution modes.
+ *
+ *
JUnit 4 Usage
+ *
+ * For JUnit 4 tests, use this class as a TestRule:
*
*
* {@literal @Rule}
@@ -97,6 +101,25 @@
* }
*
*
+ * JUnit5 Usage
+ *
+ * For JUnit5 tests, use {@link TestPipelineExtension} from the module
+ * sdks/java/testing/junit (artifact org.apache.beam:beam-sdks-java-testing-junit
+ * ):
+ *
+ *
+ * {@literal @ExtendWith}(TestPipelineExtension.class)
+ * class MyPipelineTest {
+ * {@literal @Test}
+ * {@literal @Category}(NeedsRunner.class)
+ * void myPipelineTest(TestPipeline pipeline) {
+ * final PCollection<String> pCollection = pipeline.apply(...)
+ * PAssert.that(pCollection).containsInAnyOrder(...);
+ * pipeline.run();
+ * }
+ * }
+ *
+ *
* For pipeline runners, it is required that they must throw an {@link AssertionError} containing
* the message from the {@link PAssert} that failed.
*
@@ -108,7 +131,7 @@ public class TestPipeline extends Pipeline implements TestRule {
private final PipelineOptions options;
- private static class PipelineRunEnforcement {
+ static class PipelineRunEnforcement {
@SuppressWarnings("WeakerAccess")
protected boolean enableAutoRunIfMissing;
@@ -117,7 +140,7 @@ private static class PipelineRunEnforcement {
protected boolean runAttempted;
- private PipelineRunEnforcement(final Pipeline pipeline) {
+ PipelineRunEnforcement(final Pipeline pipeline) {
this.pipeline = pipeline;
}
@@ -138,7 +161,7 @@ protected void afterUserCodeFinished() {
}
}
- private static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {
+ static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {
// Null until the pipeline has been run
private @MonotonicNonNull List runVisitedNodes;
@@ -164,7 +187,7 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node node) {
}
}
- private PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
+ PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
super(pipeline);
runVisitedNodes = null;
}
@@ -574,7 +597,7 @@ public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pip
}
}
- private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
+ static class IsEmptyVisitor extends PipelineVisitor.Defaults {
private boolean empty = true;
public boolean isEmpty() {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index eba0f793265d..bb60c7aef1d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -731,11 +731,20 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
input.apply(write);
p.run();
+ // On some environments/runners, the exact shard filenames may not be materialized
+ // deterministically by the time we assert. Verify shard count via a glob, then
+ // validate contents using pattern matching.
+ String pattern = baseFilename.toString() + "*";
+ List matches = FileSystems.match(Collections.singletonList(pattern));
+ List found = new ArrayList<>(Iterables.getOnlyElement(matches).metadata());
+ assertEquals(3, found.size());
+
+ // Now assert file contents irrespective of exact shard indices.
assertOutputFiles(
LINES2_ARRAY,
null,
null,
- 3,
+ 0, // match all files by prefix
baseFilename,
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
false);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java
index 0753f7a00fc7..8b9678733f85 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java
@@ -809,8 +809,12 @@ public Long answer(InvocationOnMock invocation) throws Throwable {
// and unblock the state transition once a certain number of samples
// have been taken.
waitTillActive.await();
- waitForSamples.countDown();
- currentTime += Duration.standardMinutes(1).getMillis();
+ // Freeze time after the desired number of samples to avoid races where
+ // the sampling loop spins and exceeds the timeout before we deactivate.
+ if (waitForSamples.getCount() > 0) {
+ waitForSamples.countDown();
+ currentTime += Duration.standardMinutes(1).getMillis();
+ }
return currentTime;
}
}
diff --git a/sdks/java/testing/junit/build.gradle b/sdks/java/testing/junit/build.gradle
new file mode 100644
index 000000000000..977dbd2cd344
--- /dev/null
+++ b/sdks/java/testing/junit/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(
+ exportJavadoc: false,
+ automaticModuleName: 'org.apache.beam.sdk.testing.junit',
+ archivesBaseName: 'beam-sdks-java-testing-junit'
+)
+
+description = "Apache Beam :: SDKs :: Java :: Testing :: JUnit"
+
+dependencies {
+ implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.vendored_guava_32_1_2_jre
+ // Needed to resolve TestPipeline's JUnit 4 TestRule type and @Category at compile time,
+ // but should not leak to consumers at runtime.
+ provided library.java.junit
+
+ // JUnit 5 API needed to compile the extension; not packaged for consumers of core.
+ provided library.java.jupiter_api
+
+ testImplementation project(path: ":sdks:java:core", configuration: "shadow")
+ testImplementation library.java.jupiter_api
+ testImplementation library.java.junit
+ testRuntimeOnly library.java.jupiter_engine
+ testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
+
+// This module runs JUnit 5 tests using the JUnit Platform.
+test {
+ useJUnitPlatform()
+}
diff --git a/sdks/java/testing/junit/src/main/java/org/apache/beam/sdk/testing/TestPipelineExtension.java b/sdks/java/testing/junit/src/main/java/org/apache/beam/sdk/testing/TestPipelineExtension.java
new file mode 100644
index 000000000000..ea0e1f3eac9b
--- /dev/null
+++ b/sdks/java/testing/junit/src/main/java/org/apache/beam/sdk/testing/TestPipelineExtension.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Optional;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline.PipelineAbandonedNodeEnforcement;
+import org.apache.beam.sdk.testing.TestPipeline.PipelineRunEnforcement;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+/**
+ * JUnit 5 extension for {@link TestPipeline} that provides the same functionality as the JUnit 4
+ * {@link org.junit.rules.TestRule} implementation.
+ *
+ * Use this extension to test pipelines in JUnit 5:
+ *
+ *
+ * {@literal @}ExtendWith(TestPipelineExtension.class)
+ * class MyPipelineTest {
+ * {@literal @}Test
+ * {@literal @}Category(NeedsRunner.class)
+ * void myPipelineTest(TestPipeline pipeline) {
+ * final PCollection<String> pCollection = pipeline.apply(...)
+ * PAssert.that(pCollection).containsInAnyOrder(...);
+ * pipeline.run();
+ * }
+ * }
+ *
+ *
+ * You can also create the extension yourself for more control:
+ *
+ *
+ * class MyPipelineTest {
+ * {@literal @}RegisterExtension
+ * final TestPipelineExtension pipeline = TestPipelineExtension.create();
+ *
+ * {@literal @}Test
+ * void testUsingPipeline() {
+ * pipeline.apply(...);
+ * pipeline.run();
+ * }
+ * }
+ *
+ */
+public class TestPipelineExtension
+ implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+ private static final ExtensionContext.Namespace NAMESPACE =
+ ExtensionContext.Namespace.create(TestPipelineExtension.class);
+ private static final String PIPELINE_KEY = "testPipeline";
+ private static final String ENFORCEMENT_KEY = "enforcement";
+
+ /** Creates a new TestPipelineExtension with default options. */
+ public static TestPipelineExtension create() {
+ return new TestPipelineExtension();
+ }
+
+ /** Creates a new TestPipelineExtension with custom options. */
+ public static TestPipelineExtension fromOptions(PipelineOptions options) {
+ return new TestPipelineExtension(options);
+ }
+
+ private TestPipeline testPipeline;
+
+ /** Creates a TestPipelineExtension with default options. */
+ public TestPipelineExtension() {
+ this.testPipeline = TestPipeline.create();
+ }
+
+ /** Creates a TestPipelineExtension with custom options. */
+ public TestPipelineExtension(PipelineOptions options) {
+ this.testPipeline = TestPipeline.fromOptions(options);
+ }
+
+ @Override
+ public boolean supportsParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext) {
+ return parameterContext.getParameter().getType() == TestPipeline.class;
+ }
+
+ @Override
+ public Object resolveParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext) {
+ if (this.testPipeline == null) {
+ return getOrCreateTestPipeline(extensionContext);
+ } else {
+ return this.testPipeline;
+ }
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ TestPipeline pipeline;
+
+ if (this.testPipeline != null) {
+ pipeline = this.testPipeline;
+ } else {
+ pipeline = getOrCreateTestPipeline(context);
+ }
+
+ // Set application name based on test method
+ String appName = getAppName(context);
+ pipeline.getOptions().as(ApplicationNameOptions.class).setAppName(appName);
+
+ // Set up enforcement based on annotations
+ setDeducedEnforcementLevel(context, pipeline);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ Optional enforcement = getEnforcement(context);
+ if (enforcement.isPresent()) {
+ enforcement.get().afterUserCodeFinished();
+ }
+ }
+
+ private TestPipeline getOrCreateTestPipeline(ExtensionContext context) {
+ return context
+ .getStore(NAMESPACE)
+ .getOrComputeIfAbsent(PIPELINE_KEY, key -> TestPipeline.create(), TestPipeline.class);
+ }
+
+ private Optional getEnforcement(ExtensionContext context) {
+ return Optional.ofNullable(
+ context.getStore(NAMESPACE).get(ENFORCEMENT_KEY, PipelineRunEnforcement.class));
+ }
+
+ private void setEnforcement(ExtensionContext context, PipelineRunEnforcement enforcement) {
+ context.getStore(NAMESPACE).put(ENFORCEMENT_KEY, enforcement);
+ }
+
+ private String getAppName(ExtensionContext context) {
+ String className = context.getTestClass().map(Class::getSimpleName).orElse("UnknownClass");
+ String methodName = context.getTestMethod().map(Method::getName).orElse("unknownMethod");
+ return className + "-" + methodName;
+ }
+
+ private void setDeducedEnforcementLevel(ExtensionContext context, TestPipeline pipeline) {
+ // If enforcement level has not been set, do auto-inference
+ if (!getEnforcement(context).isPresent()) {
+ boolean annotatedWithNeedsRunner = hasNeedsRunnerAnnotation(context);
+
+ PipelineOptions options = pipeline.getOptions();
+ boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner());
+
+ checkState(
+ !(annotatedWithNeedsRunner && crashingRunner),
+ "The test was annotated with a [@%s] / [@%s] while the runner "
+ + "was set to [%s]. Please re-check your configuration.",
+ NeedsRunner.class.getSimpleName(),
+ ValidatesRunner.class.getSimpleName(),
+ CrashingRunner.class.getSimpleName());
+
+ if (annotatedWithNeedsRunner || !crashingRunner) {
+ setEnforcement(context, new PipelineAbandonedNodeEnforcement(pipeline));
+ }
+ }
+ }
+
+ private boolean hasNeedsRunnerAnnotation(ExtensionContext context) {
+ // Check method annotations
+ Method testMethod = context.getTestMethod().orElse(null);
+ if (testMethod != null) {
+ if (hasNeedsRunnerCategory(testMethod.getAnnotations())) {
+ return true;
+ }
+ }
+
+ // Check class annotations
+ Class> testClass = context.getTestClass().orElse(null);
+ if (testClass != null) {
+ if (hasNeedsRunnerCategory(testClass.getAnnotations())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean hasNeedsRunnerCategory(Annotation[] annotations) {
+ return Arrays.stream(annotations)
+ .filter(annotation -> annotation instanceof Category)
+ .map(annotation -> (Category) annotation)
+ .flatMap(category -> Arrays.stream(category.value()))
+ .anyMatch(categoryClass -> NeedsRunner.class.isAssignableFrom(categoryClass));
+ }
+}
diff --git a/sdks/java/testing/junit/src/main/java/org/apache/beam/sdk/testing/package-info.java b/sdks/java/testing/junit/src/main/java/org/apache/beam/sdk/testing/package-info.java
new file mode 100644
index 000000000000..2909111bfec8
--- /dev/null
+++ b/sdks/java/testing/junit/src/main/java/org/apache/beam/sdk/testing/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** JUnit 5 testing support for Apache Beam Java SDK. */
+package org.apache.beam.sdk.testing;
diff --git a/sdks/java/testing/junit/src/test/java/org/apache/beam/sdk/testing/TestPipelineExtensionAdvancedTest.java b/sdks/java/testing/junit/src/test/java/org/apache/beam/sdk/testing/TestPipelineExtensionAdvancedTest.java
new file mode 100644
index 000000000000..b792204a945e
--- /dev/null
+++ b/sdks/java/testing/junit/src/test/java/org/apache/beam/sdk/testing/TestPipelineExtensionAdvancedTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Advanced tests for {@link TestPipelineExtension} demonstrating comprehensive functionality. */
+@ExtendWith(TestPipelineExtension.class)
+public class TestPipelineExtensionAdvancedTest {
+
+ @Test
+ public void testApplicationNameIsSet(TestPipeline pipeline) {
+ String appName = pipeline.getOptions().as(ApplicationNameOptions.class).getAppName();
+ assertNotNull(appName);
+ assertTrue(appName.contains("TestPipelineExtensionAdvancedTest"));
+ assertTrue(appName.contains("testApplicationNameIsSet"));
+ }
+
+ @Test
+ public void testMultipleTransforms(TestPipeline pipeline) {
+ PCollection input = pipeline.apply("Create", Create.of("a", "b", "c"));
+
+ PCollection output =
+ input.apply(
+ "Transform",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().toUpperCase());
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("A", "B", "C");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testWithValidatesRunnerCategory(TestPipeline pipeline) {
+ // This test demonstrates that @Category annotations work with JUnit 5
+ PCollection numbers = pipeline.apply("Create", Create.of(1, 2, 3, 4, 5));
+ PAssert.that(numbers).containsInAnyOrder(1, 2, 3, 4, 5);
+ pipeline.run();
+ }
+
+ @Test
+ public void testPipelineInstancesAreIsolated(TestPipeline pipeline1) {
+ // Each test method gets its own pipeline instance
+ assertNotNull(pipeline1);
+ pipeline1.apply("Create", Create.of("test"));
+ // Don't run the pipeline - test should still pass due to auto-run functionality
+ }
+
+ @Test
+ public void testAnotherPipelineInstance(TestPipeline pipeline2) {
+ // This should be a different instance from the previous test
+ assertNotNull(pipeline2);
+ PCollection data = pipeline2.apply("Create", Create.of("different", "data"));
+ PAssert.that(data).containsInAnyOrder("different", "data");
+ pipeline2.run();
+ }
+}
diff --git a/sdks/java/testing/junit/src/test/java/org/apache/beam/sdk/testing/TestPipelineExtensionTest.java b/sdks/java/testing/junit/src/test/java/org/apache/beam/sdk/testing/TestPipelineExtensionTest.java
new file mode 100644
index 000000000000..bc6d5741bac0
--- /dev/null
+++ b/sdks/java/testing/junit/src/test/java/org/apache/beam/sdk/testing/TestPipelineExtensionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Tests for {@link TestPipelineExtension} to demonstrate JUnit 5 integration. */
+@ExtendWith(TestPipelineExtension.class)
+public class TestPipelineExtensionTest {
+
+ @Test
+ public void testPipelineInjection(TestPipeline pipeline) {
+ // Verify that the pipeline is injected and not null
+ assertNotNull(pipeline);
+ assertNotNull(pipeline.getOptions());
+ }
+
+ @Test
+ public void testBasicPipelineExecution(TestPipeline pipeline) {
+ // Create a simple pipeline
+ PCollection input = pipeline.apply("Create", Create.of("hello", "world"));
+
+ // Use PAssert to verify the output
+ PAssert.that(input).containsInAnyOrder("hello", "world");
+
+ // Run the pipeline
+ pipeline.run();
+ }
+
+ @Test
+ public void testEmptyPipeline(TestPipeline pipeline) {
+ // Test that an empty pipeline doesn't cause issues
+ assertNotNull(pipeline);
+ // The extension should handle empty pipelines gracefully
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 1879e0b151eb..135d9da42b05 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -276,6 +276,7 @@ include(":sdks:java:testing:load-tests")
include(":sdks:java:testing:test-utils")
include(":sdks:java:testing:tpcds")
include(":sdks:java:testing:watermarks")
+include(":sdks:java:testing:junit")
include(":sdks:java:transform-service")
include(":sdks:java:transform-service:app")
include(":sdks:java:transform-service:launcher")