From ae38a0353de3473017c6736f283cf059881c0b52 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 10 Jun 2025 17:34:47 -0400 Subject: [PATCH 1/5] Install JDK 21 in release build Support Beam components requiring Java17+ for release workflows They will be compiled with JDK21 with byte code compatibility configured by applyJavaNature(requireJavaVersion) --- .github/workflows/beam_Release_NightlySnapshot.yml | 5 ++++- .github/workflows/build_release_candidate.yml | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/beam_Release_NightlySnapshot.yml b/.github/workflows/beam_Release_NightlySnapshot.yml index ac473472f970..6126cf199b6b 100644 --- a/.github/workflows/beam_Release_NightlySnapshot.yml +++ b/.github/workflows/beam_Release_NightlySnapshot.yml @@ -61,7 +61,9 @@ jobs: - name: Setup environment uses: ./.github/actions/setup-environment-action with: - java-version: default + java-version: | + 21 + 11 disable-cache: true - name: Auth on snapshot repository run: | @@ -78,6 +80,7 @@ jobs: - name: run Publish script run: | ./gradlew publish --max-workers=8 -Ppublishing -PskipCheckerFramework \ + -Pjava21Home=$JAVA_HOME_21_X64 \ --continue -Dorg.gradle.jvmargs=-Xms2g -Dorg.gradle.jvmargs=-Xmx6g \ -Dorg.gradle.vfs.watch=false -Pdocker-pull-licenses \ -Dorg.gradle.internal.http.connectionTimeout=60000 \ diff --git a/.github/workflows/build_release_candidate.yml b/.github/workflows/build_release_candidate.yml index 329e543a9e7b..08fa59d8ceb4 100644 --- a/.github/workflows/build_release_candidate.yml +++ b/.github/workflows/build_release_candidate.yml @@ -66,7 +66,9 @@ jobs: uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: '11' + java-version: | + 21 + 11 - name: Import GPG key id: import_gpg uses: crazy-max/ghaction-import-gpg@111c56156bcc6918c056dbef52164cfa583dc549 @@ -92,7 +94,7 @@ jobs: - name: Configure git run: git config credential.helper store - name: Stage Java Artifacts into Maven - run: ./gradlew publish -Psigning.gnupg.keyName=${{steps.import_gpg.outputs.fingerprint}} -PisRelease --no-daemon --no-parallel + run: ./gradlew publish -Psigning.gnupg.keyName=${{steps.import_gpg.outputs.fingerprint}} -PisRelease -Pjava21Home=$JAVA_HOME_21_X64 --no-daemon --no-parallel stage_java_source: From a15bedc3d319692c26665b2e2f092d81597a0bb3 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 11 Jun 2025 11:25:56 -0400 Subject: [PATCH 2/5] Use Java 21 for Python PostCommit --- .github/workflows/beam_PostCommit_Python.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/beam_PostCommit_Python.yml b/.github/workflows/beam_PostCommit_Python.yml index 03c76f7253ed..af34428aea0f 100644 --- a/.github/workflows/beam_PostCommit_Python.yml +++ b/.github/workflows/beam_PostCommit_Python.yml @@ -78,6 +78,7 @@ jobs: - name: Setup environment uses: ./.github/actions/setup-environment-action with: + java-version: 21 python-version: ${{ matrix.python_version }} - name: Install docker compose run: | From 85dacdbc41579a8f207f166dcba17ff9fa0e3b4f Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 11 Jun 2025 17:06:22 -0400 Subject: [PATCH 3/5] Honor JAVA_HOME in JavaJarServer checks and tests --- .github/workflows/beam_PostCommit_Python.yml | 5 ++++- runners/flink/job-server/flink_job_server.gradle | 6 ++++++ .../io/external/xlang_kafkaio_it_test.py | 10 ++++++++-- sdks/python/apache_beam/io/kafka.py | 3 ++- sdks/python/apache_beam/io/kinesis.py | 3 ++- sdks/python/apache_beam/io/snowflake.py | 3 ++- .../runners/portability/samza_runner_test.py | 3 ++- .../runners/portability/spark_runner_test.py | 3 ++- sdks/python/apache_beam/transforms/external.py | 3 ++- .../apache_beam/transforms/external_java.py | 4 +++- .../apache_beam/utils/subprocess_server.py | 16 +++++++++++----- sdks/python/apache_beam/yaml/yaml_provider.py | 3 ++- sdks/python/test-suites/portable/common.gradle | 13 +++++++++++++ 13 files changed, 59 insertions(+), 16 deletions(-) diff --git a/.github/workflows/beam_PostCommit_Python.yml b/.github/workflows/beam_PostCommit_Python.yml index af34428aea0f..2a98ccb0efb0 100644 --- a/.github/workflows/beam_PostCommit_Python.yml +++ b/.github/workflows/beam_PostCommit_Python.yml @@ -78,7 +78,9 @@ jobs: - name: Setup environment uses: ./.github/actions/setup-environment-action with: - java-version: 21 + java-version: | + 21 + 11 python-version: ${{ matrix.python_version }} - name: Install docker compose run: | @@ -95,6 +97,7 @@ jobs: with: gradle-command: :python${{steps.set_py_ver_clean.outputs.py_ver_clean}}PostCommit arguments: | + -Pjava21Home=$JAVA_HOME_21_X64 \ -PuseWheelDistribution \ -PpythonVersion=${{ matrix.python_version }} \ env: diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 1c610477a444..90890a7d5856 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -277,6 +277,12 @@ createCrossLanguageValidatesRunnerTask( ], ) +shadowJar { + manifest { + attributes(["Multi-Release": true]) + } +} + // miniCluster jar starts an embedded Flink cluster intended for use in testing. tasks.register("miniCluster", Jar) { dependsOn shadowJar diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py index a7bf686d0642..23178b0ee363 100644 --- a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py @@ -40,6 +40,7 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.userstate import BagStateSpec from apache_beam.transforms.userstate import CombiningValueStateSpec +from apache_beam.utils import subprocess_server NUM_RECORDS = 1000 @@ -220,8 +221,13 @@ def local_kafka_service(self, local_kafka_jar_file): zookeeper_port = str(self.get_open_port()) kafka_server = None try: - kafka_server = subprocess.Popen( - ['java', '-jar', local_kafka_jar_file, kafka_port, zookeeper_port]) + kafka_server = subprocess.Popen([ + subprocess_server.JavaHelper.get_java(), + '-jar', + local_kafka_jar_file, + kafka_port, + zookeeper_port + ]) time.sleep(3) yield kafka_port finally: diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index 3ed4737cd766..d6be00f0502e 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -45,7 +45,8 @@ pipeline. * Install Java runtime in the computer from where the pipeline is constructed - and make sure that 'java' command is available. + and make sure that 'java' command is available or set JAVA_HOME environment + variable. In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) an expansion service jar and use diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index a066ec5cb2f0..b4c939703f94 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -44,7 +44,8 @@ pipeline. * Install Java runtime in the computer from where the pipeline is constructed - and make sure that 'java' command is available. + and make sure that 'java' command is available or set JAVA_HOME environment + variable. In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use diff --git a/sdks/python/apache_beam/io/snowflake.py b/sdks/python/apache_beam/io/snowflake.py index d789a72001df..7f85af5ac691 100644 --- a/sdks/python/apache_beam/io/snowflake.py +++ b/sdks/python/apache_beam/io/snowflake.py @@ -39,7 +39,8 @@ before running the Beam pipeline. * Install Java runtime in the computer from where the pipeline is constructed - and make sure that 'java' command is available. + and make sure that 'java' command is available or set JAVA_HOME environment + variable. In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py index e46885c7c96b..6112a50e8511 100644 --- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py @@ -33,6 +33,7 @@ from apache_beam.runners.portability import job_server from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability import portable_runner_test +from apache_beam.utils import subprocess_server _LOGGER = logging.getLogger(__name__) @@ -97,7 +98,7 @@ def _subprocess_command(cls, job_port, expansion_port): try: return [ - 'java', + subprocess_server.JavaHelper.get_java(), '-jar', cls.samza_job_server_jar, '--artifacts-dir', diff --git a/sdks/python/apache_beam/runners/portability/spark_runner_test.py b/sdks/python/apache_beam/runners/portability/spark_runner_test.py index 9fc7da2449fc..cf6536be4c2d 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner_test.py @@ -29,6 +29,7 @@ from apache_beam.runners.portability import job_server from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability import portable_runner_test +from apache_beam.utils import subprocess_server # Run as # @@ -98,7 +99,7 @@ def _subprocess_command(cls, job_port, expansion_port): try: return [ - 'java', + subprocess_server.JavaHelper.get_java(), '-Dbeam.spark.test.reuseSparkContext=true', '-jar', cls.spark_job_server_jar, diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index f6d361a9ecac..f0b69a047b7c 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -1154,7 +1154,8 @@ def _maybe_use_transform_service(provided_service=None, options=None): return provided_service def is_java_available(): - cmd = ['java', '--version'] + java = subprocess_server.JavaHelper.get_java() + cmd = [java, '--version'] try: subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) diff --git a/sdks/python/apache_beam/transforms/external_java.py b/sdks/python/apache_beam/transforms/external_java.py index e3984fa8ef20..ebd760f70f7e 100644 --- a/sdks/python/apache_beam/transforms/external_java.py +++ b/sdks/python/apache_beam/transforms/external_java.py @@ -32,6 +32,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder +from apache_beam.utils import subprocess_server # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -67,7 +68,8 @@ def __enter__(self): # Start the java server and wait for it to be ready. if jar: - self._server = subprocess.Popen(['java', '-jar', jar, str(port)]) + java = subprocess_server.JavaHelper.get_java() + self._server = subprocess.Popen([java, '-jar', jar, str(port)]) address = 'localhost:%s' % str(port) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 85d9286bddd0..babe81d6bde9 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -113,6 +113,16 @@ def get(self, *key): return self._cache[key].obj +class JavaHelper: + @classmethod + def get_java(cls): + java_path = 'java' + java_home = os.environ.get('JAVA_HOME') + if java_home: + java_path = os.path.join(java_home, 'bin', 'java') + return java_path + + class SubprocessServer(object): """An abstract base class for running GRPC Servers as an external process. @@ -274,11 +284,7 @@ def __init__( java_arguments, classpath=None, cache_dir=None): - java_path = 'java' - java_home = os.environ.get('JAVA_HOME') - if java_home: - java_path = os.path.join(java_home, 'bin', 'java') - self._java_path = java_path + self._java_path = JavaHelper.get_java() if classpath: # java -jar ignores the classpath, so we make a new jar that embeds # the requested classpath. diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index d7c427228a31..7c8114b57706 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -385,7 +385,8 @@ def __init__( def available(self): # pylint: disable=subprocess-run-check - trial = subprocess.run(['which', 'java'], capture_output=True) + trial = subprocess.run(['which', subprocess_server.JavaHelper.get_java()], + capture_output=True) if trial.returncode == 0: return True else: diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 4211cd009471..a6bff7f4fbd5 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -412,7 +412,20 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { "pipeline_opts": pipelineOpts.join(" "), ]) def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath + + String fork_java_home = null + if (JavaVersion.current() < JavaVersion.VERSION_17) { + if (project.hasProperty("java17Home")) { + fork_java_home = project.getProperty("java17Home") + } else if (project.hasProperty("java21Home")) { + fork_java_home = project.getProperty("java21Home") + } + } + exec { + if (fork_java_home != null) { + environment "JAVA_HOME", fork_java_home + } environment "LOCAL_KAFKA_JAR", kafkaJar executable 'sh' args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" From 1e1e72e141070c885ca962bf7b6f85b9e9aaadec Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 12 Jun 2025 11:19:39 -0400 Subject: [PATCH 4/5] Disable Debezium test on Java17- --- .../io/external/xlang_debeziumio_it_test.py | 22 +++++++++++++++++ .../python/test-suites/portable/common.gradle | 24 +++++++++++-------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py b/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py index f343f88ec802..4c883447a0a8 100644 --- a/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py @@ -16,6 +16,7 @@ # import logging +import subprocess import unittest from apache_beam.io.debezium import DriverClassName @@ -24,6 +25,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils import subprocess_server # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -34,12 +36,32 @@ NUM_RECORDS = 1 +def _disable_debezium_test(): + # disable if run on Date: Thu, 12 Jun 2025 15:54:06 -0400 Subject: [PATCH 5/5] add example line --- .../apache_beam/io/external/xlang_debeziumio_it_test.py | 2 +- sdks/python/apache_beam/transforms/sql_test.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py b/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py index 4c883447a0a8..197cc97e840e 100644 --- a/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py @@ -45,7 +45,7 @@ def _disable_debezium_test(): capture_output=True, text=True) version_line = result.stderr.splitlines()[0] - + # Example output: openjdk version "21.0.6" 2025-01-21 version = version_line.split()[2].strip('\"') if int(version.split(".")[0]) < 17: return True diff --git a/sdks/python/apache_beam/transforms/sql_test.py b/sdks/python/apache_beam/transforms/sql_test.py index e020d7ec5998..cf4136436027 100644 --- a/sdks/python/apache_beam/transforms/sql_test.py +++ b/sdks/python/apache_beam/transforms/sql_test.py @@ -33,6 +33,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.sql import SqlTransform +from apache_beam.utils import subprocess_server SimpleRow = typing.NamedTuple( "SimpleRow", [("id", int), ("str", str), ("flt", float)]) @@ -74,7 +75,8 @@ class SqlTransformTest(unittest.TestCase): def _disable_zetasql_test(): # disable if run on Java8 which is no longer supported by ZetaSQL try: - result = subprocess.run(['java', '-version'], + java = subprocess_server.JavaHelper.get_java() + result = subprocess.run([java, '-version'], check=True, capture_output=True, text=True)