diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 90890a7d5856..d8a818ff84c4 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -269,6 +269,7 @@ createCrossLanguageValidatesRunnerTask( "--environmentCacheMillis=10000", "--experiments=beam_fn_api", "--parallelism=2", + "--customBeamRequirement=${project.project(":sdks:python").projectDir}/build/apache-beam.tar.gz", ], goScriptOptions: [ "--runner flink", diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 85f8b583c347..8729bc2032ca 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -486,6 +486,7 @@ createCrossLanguageValidatesRunnerTask( "--tempRoot=${dataflowValidatesTempRoot}", "--sdkContainerImage=${dockerJavaImageContainer}:${dockerTag}", "--sdkHarnessContainerImageOverrides=.*python.*,${dockerPythonImageContainer}:${dockerTag}", + "--customBeamRequirement=${project.project(":sdks:python").projectDir}/build/apache-beam.tar.gz", ], pytestOptions: [ "--capture=no", diff --git a/runners/samza/job-server/build.gradle b/runners/samza/job-server/build.gradle index 05f6de392547..7ffb2becd6d0 100644 --- a/runners/samza/job-server/build.gradle +++ b/runners/samza/job-server/build.gradle @@ -243,6 +243,7 @@ createCrossLanguageValidatesRunnerTask( "--jobEndpoint=localhost:${jobPort}", "--environmentCacheMillis=10000", "--experiments=beam_fn_api", + "--customBeamRequirement=${project.project(":sdks:python").projectDir}/build/apache-beam.tar.gz", ], goScriptOptions: [ "--runner samza", diff --git a/runners/spark/job-server/spark_job_server.gradle b/runners/spark/job-server/spark_job_server.gradle index 90109598ed64..7e2deaf6e395 100644 --- a/runners/spark/job-server/spark_job_server.gradle +++ b/runners/spark/job-server/spark_job_server.gradle @@ -294,6 +294,7 @@ createCrossLanguageValidatesRunnerTask( "--jobEndpoint=localhost:${jobPort}", "--environmentCacheMillis=10000", "--experiments=beam_fn_api", + "--customBeamRequirement=${project.project(":sdks:python").projectDir}/build/apache-beam.tar.gz", ], goScriptOptions: [ "--runner spark", diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java index a1e1dade5136..f522a4c409f8 100644 --- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesPythonExpansionService; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; @@ -43,6 +44,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -50,28 +52,29 @@ @RunWith(JUnit4.class) public class PythonExternalTransformTest implements Serializable { + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); @Test @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) public void trivialPythonTransform() { - Pipeline p = Pipeline.create(); PCollection output = - p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z"))) + testPipeline + .apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z"))) .apply( PythonExternalTransform .>, PCollection>>> from("apache_beam.GroupByKey")) .apply(Keys.create()); PAssert.that(output).containsInAnyOrder("A", "B"); - // TODO: Run this on a multi-language supporting runner. + testPipeline.run(); } @Test @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) public void pythonTransformWithDependencies() { - Pipeline p = Pipeline.create(); PCollection output = - p.apply(Create.of("elephant", "mouse", "sheep")) + testPipeline + .apply(Create.of("elephant", "mouse", "sheep")) .apply( PythonExternalTransform., PCollection>from( "apache_beam.Map") @@ -79,7 +82,7 @@ public void pythonTransformWithDependencies() { .withExtraPackages(ImmutableList.of("inflection")) .withOutputCoder(StringUtf8Coder.of())); PAssert.that(output).containsInAnyOrder("elephants", "mice", "sheep"); - // TODO: Run this on a multi-language supporting runner. + testPipeline.run(); } @Test diff --git a/sdks/python/test-suites/direct/xlang/build.gradle b/sdks/python/test-suites/direct/xlang/build.gradle index 3003329aef59..602b633e350f 100644 --- a/sdks/python/test-suites/direct/xlang/build.gradle +++ b/sdks/python/test-suites/direct/xlang/build.gradle @@ -62,6 +62,7 @@ createCrossLanguageValidatesRunnerTask( "--jobEndpoint=localhost:${jobPort}", "--environmentCacheMillis=10000", "--experiments=beam_fn_api", + "--customBeamRequirement=${project.project(":sdks:python").projectDir}/build/apache-beam.tar.gz", ], goScriptOptions: [ "--runner portable",