Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/beam_PostCommit_Python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ jobs:
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: |
21
11
python-version: ${{ matrix.python_version }}
- name: Install docker compose
run: |
Expand All @@ -94,6 +97,7 @@ jobs:
with:
gradle-command: :python${{steps.set_py_ver_clean.outputs.py_ver_clean}}PostCommit
arguments: |
-Pjava21Home=$JAVA_HOME_21_X64 \
-PuseWheelDistribution \
-PpythonVersion=${{ matrix.python_version }} \
env:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/beam_Release_NightlySnapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ jobs:
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: default
java-version: |
21
11
disable-cache: true
- name: Auth on snapshot repository
run: |
Expand All @@ -78,6 +80,7 @@ jobs:
- name: run Publish script
run: |
./gradlew publish --max-workers=8 -Ppublishing -PskipCheckerFramework \
-Pjava21Home=$JAVA_HOME_21_X64 \
--continue -Dorg.gradle.jvmargs=-Xms2g -Dorg.gradle.jvmargs=-Xmx6g \
-Dorg.gradle.vfs.watch=false -Pdocker-pull-licenses \
-Dorg.gradle.internal.http.connectionTimeout=60000 \
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/build_release_candidate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ jobs:
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '11'
java-version: |
21
11
- name: Import GPG key
id: import_gpg
uses: crazy-max/ghaction-import-gpg@111c56156bcc6918c056dbef52164cfa583dc549
Expand All @@ -92,7 +94,7 @@ jobs:
- name: Configure git
run: git config credential.helper store
- name: Stage Java Artifacts into Maven
run: ./gradlew publish -Psigning.gnupg.keyName=${{steps.import_gpg.outputs.fingerprint}} -PisRelease --no-daemon --no-parallel
run: ./gradlew publish -Psigning.gnupg.keyName=${{steps.import_gpg.outputs.fingerprint}} -PisRelease -Pjava21Home=$JAVA_HOME_21_X64 --no-daemon --no-parallel


stage_java_source:
Expand Down
6 changes: 6 additions & 0 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ createCrossLanguageValidatesRunnerTask(
],
)

shadowJar {
manifest {
attributes(["Multi-Release": true])
}
}

// miniCluster jar starts an embedded Flink cluster intended for use in testing.
tasks.register("miniCluster", Jar) {
dependsOn shadowJar
Expand Down
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_debeziumio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import subprocess
import unittest

from apache_beam.io.debezium import DriverClassName
Expand All @@ -24,6 +25,7 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.utils import subprocess_server

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
Expand All @@ -34,12 +36,32 @@
NUM_RECORDS = 1


def _disable_debezium_test():
# disable if run on <Java17
try:
java = subprocess_server.JavaHelper.get_java()
result = subprocess.run([java, '-version'],
check=True,
capture_output=True,
text=True)
version_line = result.stderr.splitlines()[0]
# Example output: openjdk version "21.0.6" 2025-01-21
version = version_line.split()[2].strip('\"')
if int(version.split(".")[0]) < 17:
return True
except: # pylint: disable=bare-except
return False


@unittest.skipIf(
PostgresContainer is None, 'testcontainers package is not installed')
@unittest.skipIf(
TestPipeline().get_pipeline_options().view_as(StandardOptions).runner
is None,
'Do not run this test on precommit suites.')
@unittest.skipIf(
_disable_debezium_test(),
'Debezium test requires Java17+ in PATH or JAVA_HOME')
class CrossLanguageDebeziumIOTest(unittest.TestCase):
def setUp(self):
self.username = 'debezium'
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from apache_beam.testing.util import equal_to
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import CombiningValueStateSpec
from apache_beam.utils import subprocess_server

NUM_RECORDS = 1000

Expand Down Expand Up @@ -220,8 +221,13 @@ def local_kafka_service(self, local_kafka_jar_file):
zookeeper_port = str(self.get_open_port())
kafka_server = None
try:
kafka_server = subprocess.Popen(
['java', '-jar', local_kafka_jar_file, kafka_port, zookeeper_port])
kafka_server = subprocess.Popen([
subprocess_server.JavaHelper.get_java(),
'-jar',
local_kafka_jar_file,
kafka_port,
zookeeper_port
])
time.sleep(3)
yield kafka_port
finally:
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
pipeline.

* Install Java runtime in the computer from where the pipeline is constructed
and make sure that 'java' command is available.
and make sure that 'java' command is available or set JAVA_HOME environment
variable.

In this option, Python SDK will either download (for released Beam version) or
build (when running from a Beam Git clone) an expansion service jar and use
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
pipeline.

* Install Java runtime in the computer from where the pipeline is constructed
and make sure that 'java' command is available.
and make sure that 'java' command is available or set JAVA_HOME environment
variable.

In this option, Python SDK will either download (for released Beam version) or
build (when running from a Beam Git clone) a expansion service jar and use
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
before running the Beam pipeline.

* Install Java runtime in the computer from where the pipeline is constructed
and make sure that 'java' command is available.
and make sure that 'java' command is available or set JAVA_HOME environment
variable.

In this option, Python SDK will either download (for released Beam version) or
build (when running from a Beam Git clone) a expansion service jar and use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from apache_beam.runners.portability import job_server
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability import portable_runner_test
from apache_beam.utils import subprocess_server

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,7 +98,7 @@ def _subprocess_command(cls, job_port, expansion_port):

try:
return [
'java',
subprocess_server.JavaHelper.get_java(),
'-jar',
cls.samza_job_server_jar,
'--artifacts-dir',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from apache_beam.runners.portability import job_server
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability import portable_runner_test
from apache_beam.utils import subprocess_server

# Run as
#
Expand Down Expand Up @@ -98,7 +99,7 @@ def _subprocess_command(cls, job_port, expansion_port):

try:
return [
'java',
subprocess_server.JavaHelper.get_java(),
'-Dbeam.spark.test.reuseSparkContext=true',
'-jar',
cls.spark_job_server_jar,
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,8 @@ def _maybe_use_transform_service(provided_service=None, options=None):
return provided_service

def is_java_available():
cmd = ['java', '--version']
java = subprocess_server.JavaHelper.get_java()
cmd = [java, '--version']

try:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/transforms/external_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
from apache_beam.utils import subprocess_server

# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down Expand Up @@ -67,7 +68,8 @@ def __enter__(self):

# Start the java server and wait for it to be ready.
if jar:
self._server = subprocess.Popen(['java', '-jar', jar, str(port)])
java = subprocess_server.JavaHelper.get_java()
self._server = subprocess.Popen([java, '-jar', jar, str(port)])

address = 'localhost:%s' % str(port)

Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/transforms/sql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.sql import SqlTransform
from apache_beam.utils import subprocess_server

SimpleRow = typing.NamedTuple(
"SimpleRow", [("id", int), ("str", str), ("flt", float)])
Expand Down Expand Up @@ -74,7 +75,8 @@ class SqlTransformTest(unittest.TestCase):
def _disable_zetasql_test():
# disable if run on Java8 which is no longer supported by ZetaSQL
try:
result = subprocess.run(['java', '-version'],
java = subprocess_server.JavaHelper.get_java()
result = subprocess.run([java, '-version'],
check=True,
capture_output=True,
text=True)
Expand Down
16 changes: 11 additions & 5 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ def get(self, *key):
return self._cache[key].obj


class JavaHelper:
@classmethod
def get_java(cls):
java_path = 'java'
java_home = os.environ.get('JAVA_HOME')
if java_home:
java_path = os.path.join(java_home, 'bin', 'java')
return java_path


class SubprocessServer(object):
"""An abstract base class for running GRPC Servers as an external process.

Expand Down Expand Up @@ -274,11 +284,7 @@ def __init__(
java_arguments,
classpath=None,
cache_dir=None):
java_path = 'java'
java_home = os.environ.get('JAVA_HOME')
if java_home:
java_path = os.path.join(java_home, 'bin', 'java')
self._java_path = java_path
self._java_path = JavaHelper.get_java()
if classpath:
# java -jar ignores the classpath, so we make a new jar that embeds
# the requested classpath.
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ def __init__(

def available(self):
# pylint: disable=subprocess-run-check
trial = subprocess.run(['which', 'java'], capture_output=True)
trial = subprocess.run(['which', subprocess_server.JavaHelper.get_java()],
Copy link
Contributor Author

@Abacn Abacn Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was an edge case here: if only JAVA_HOME is provided, the external transform expansion should work, but here it assumes java not available

capture_output=True)
if trial.returncode == 0:
return True
else:
Expand Down
19 changes: 18 additions & 1 deletion sdks/python/test-suites/portable/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,24 @@ project.tasks.register("prismExamples") {
}

project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
String fork_java_home = null
String fork_java_version = currentJavaVersion
// DebeziumIO needs Java17+
if (JavaVersion.current() < JavaVersion.VERSION_17) {
if (project.hasProperty("java17Home")) {
fork_java_version = 'java17'
fork_java_home = project.getProperty("java17Home")
} else if (project.hasProperty("java21Home")) {
fork_java_version = 'java21'
fork_java_home = project.getProperty("java21Home")
}
}

dependsOn = [
'setupVirtualenv',
'installGcpTest',
":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
":sdks:java:container:${currentJavaVersion}:docker",
":sdks:java:container:${fork_java_version}:docker",
':sdks:java:testing:kafka-service:buildTestKafkaServiceJar',
':sdks:java:io:expansion-service:shadowJar',
':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
Expand Down Expand Up @@ -412,7 +425,11 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
"pipeline_opts": pipelineOpts.join(" "),
])
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath

exec {
if (fork_java_home != null) {
environment "JAVA_HOME", fork_java_home
}
environment "LOCAL_KAFKA_JAR", kafkaJar
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
Expand Down
Loading