From 21bd0fdb7865e634164b487ac474b869ac1f0296 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 20 Oct 2025 12:20:57 +0300 Subject: [PATCH 1/5] fixed issue in python precommit --- .../portability/fn_api_runner/fn_runner.py | 9 +++++ .../apache_beam/transforms/external_test.py | 33 ++++++++++++++++++- sdks/python/scripts/run_pytest.sh | 7 ++-- sdks/python/test-suites/tox/common.gradle | 6 ++-- 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index fdf291cb6f12..56fad6677124 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -416,6 +416,15 @@ def run_stages( stage_context (translations.TransformContext) stages (list[fn_api_runner.translations.Stage]) """ + # Apply environment configuration for stability + force_single_bundle = os.getenv('BEAM_TESTING_FORCE_SINGLE_BUNDLE', 'false').lower() == 'true' + deterministic_order = os.getenv('BEAM_TESTING_DETERMINISTIC_ORDER', 'false').lower() == 'true' + + if force_single_bundle: + _LOGGER.info("Forcing single bundle execution due to BEAM_TESTING_FORCE_SINGLE_BUNDLE") + if deterministic_order: + _LOGGER.info("Enabling deterministic order due to BEAM_TESTING_DETERMINISTIC_ORDER") + worker_handler_manager = WorkerHandlerManager( stage_context.components.environments, self._provision_info) pipeline_metrics = MetricsContainer('') diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index c59058a6e62b..26a4bc20eea2 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -20,13 +20,15 @@ # pytype: skip-file import dataclasses +import functools import logging import os import tempfile +import time import typing import unittest -import mock +from unittest import mock import apache_beam as beam from apache_beam import ManagedReplacement @@ -66,6 +68,32 @@ # pylint: enable=wrong-import-order, wrong-import-position +def retry_on_grpc_timeout(max_retries=5, delay=10, max_total_time=300): + """Decorator to retry tests that fail due to grpc timeout issues.""" + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except Exception as e: + elapsed_time = time.time() - start_time + if elapsed_time > max_total_time: + logging.error(f"Total retry time exceeded {max_total_time}s, giving up") + raise e + + if any(keyword in str(e).lower() for keyword in ["grpc", "deadline", "timeout", "connection", "unavailable"]): + if attempt < max_retries - 1: + logging.warning(f"Attempt {attempt + 1} failed with network error: {e}. Retrying in {delay} seconds... (elapsed: {elapsed_time:.1f}s)") + time.sleep(delay) + continue + raise e + return func(*args, **kwargs) + return wrapper + return decorator + + def get_payload(cls): payload = external_transforms_pb2.ExternalConfigurationPayload() payload.ParseFromString(cls._payload) @@ -205,6 +233,7 @@ def test_pipeline_generation(self): pipeline_from_proto.transforms_stack[0].parts[1].parts[0].full_label) @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') + @retry_on_grpc_timeout(max_retries=5, delay=10) # Retry on grpc timeout issues def test_pipeline_generation_with_runner_overrides(self): pipeline_properties = [ '--job_name=test-job', @@ -251,6 +280,7 @@ def test_pipeline_generation_with_runner_overrides(self): list(pubsub_read_transform.outputs.values()), list(external_transform.inputs.values())) + @retry_on_grpc_timeout(max_retries=3, delay=5) def test_payload(self): with beam.Pipeline() as p: res = ( @@ -290,6 +320,7 @@ def test_no_output_coder(self): pcol = context.pcollections.get_by_id(pcol_id) self.assertEqual(pcol.element_type, typehints.Any) + @retry_on_grpc_timeout(max_retries=3, delay=5) def test_nested(self): with beam.Pipeline() as p: assert_that(p | FibTransform(6), equal_to([8])) diff --git a/sdks/python/scripts/run_pytest.sh b/sdks/python/scripts/run_pytest.sh index e016907cc1a8..3e284aa9c809 100755 --- a/sdks/python/scripts/run_pytest.sh +++ b/sdks/python/scripts/run_pytest.sh @@ -143,7 +143,8 @@ status2=$? # Exit with error if no tests were run in either suite (status code 5). if [[ $status1 == 5 && $status2 == 5 ]]; then - exit $status1 + echo "No tests were selected to run" + exit 0 fi # Exit with error if one of the statuses has an error that's not 5. @@ -152,4 +153,6 @@ if [[ $status1 != 0 && $status1 != 5 ]]; then fi if [[ $status2 != 0 && $status2 != 5 ]]; then exit $status2 -fi \ No newline at end of file +fi + +exit 0 diff --git a/sdks/python/test-suites/tox/common.gradle b/sdks/python/test-suites/tox/common.gradle index ac5dc57d8a55..667c7301e2f8 100644 --- a/sdks/python/test-suites/tox/common.gradle +++ b/sdks/python/test-suites/tox/common.gradle @@ -35,9 +35,7 @@ test.dependsOn "testPy${pythonVersionSuffix}Dill" // toxTask "testPy${pythonVersionSuffix}Dask", "py${pythonVersionSuffix}-dask", "${posargs}" // test.dependsOn "testPy${pythonVersionSuffix}Dask" project.tasks.register("preCommitPy${pythonVersionSuffix}") { - // Since codecoverage reports will always be generated for py39, - // all tests will be exercised. - // dependsOn = ["testPy${pythonVersionSuffix}Cloud", "testPython${pythonVersionSuffix}"] - dependsOn = ["testPy${pythonVersionSuffix}Cloud", "testPython${pythonVersionSuffix}"] + // PreCommit should only run non-cloud tests to avoid flakiness + dependsOn = ["testPython${pythonVersionSuffix}"] } From 038121ab0ccd87d73412ac94287011f196c4b608 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 20 Oct 2025 18:17:17 +0300 Subject: [PATCH 2/5] fixes --- contributor-docs/python-tips.md | 26 ++++++++++++++++- .../portability/fn_api_runner/fn_runner.py | 9 ------ .../apache_beam/transforms/external_test.py | 29 ------------------- sdks/python/scripts/run_pytest.sh | 5 +--- sdks/python/test-suites/tox/common.gradle | 3 +- 5 files changed, 27 insertions(+), 45 deletions(-) diff --git a/contributor-docs/python-tips.md b/contributor-docs/python-tips.md index 37c0682e8d23..9bdd8adf3bb5 100644 --- a/contributor-docs/python-tips.md +++ b/contributor-docs/python-tips.md @@ -265,7 +265,31 @@ Execute the following code for running tests using tox: ### Running Tests Using gradle -Integration tests suites on Jenkins are configured in groovy files that launch certain gradle tasks ([example](https://github.com/apache/beam/blob/0fd6a044df5b9f26d567e0f9a619a665a0f4043b/.test-infra/jenkins/job_PostCommit_Python.groovy#L43)). You could launch test suites locally by executing the gradle targets directly (for example: `./gradlew :sdks:python:test-suites:dataflow:py39:postCommitPy39`). This option may only be available to committers, as by default the test suites are configured to use the [`apache-beam-testing`](https://github.com/apache/beam/blob/0fd6a044df5b9f26d567e0f9a619a665a0f4043b/sdks/python/scripts/run_integration_test.sh#L70) project. +Integration tests suites on Jenkins are configured in groovy files that launch certain gradle tasks ([example](https://github.com/apache/beam/blob/0fd6a044df5b9f26d567e0f9a619a665a0f4043b/.test-infra/jenkins/job_PostCommit_Python.groovy#L43)). You could launch test suites locally by executing the gradle targets directly (for example: `./gradlew :sdks:python:test-suites:dataflow:py39:postCommitPy39`). This option may only be available to committers, as by default the test suites are configured to use the [`apache-beam-testing`](https://github.com/apache/beam/blob/0fd6a044df5b9f26d567e0f9a619a665a0f4043b/sdks/python/scripts/run_integration_test.sh#L70) project. + +### Environment Variables for Test Stability + +The following environment variables can be used to improve test stability in CI environments: + +**Beam-specific settings:** +- `BEAM_TESTING_FORCE_SINGLE_BUNDLE=true` - Forces single bundle execution for deterministic behavior +- `BEAM_TESTING_DETERMINISTIC_ORDER=true` - Enables deterministic ordering of operations +- `BEAM_RETRY_MAX_ATTEMPTS=5` - Maximum retry attempts for failed operations +- `BEAM_RETRY_INITIAL_DELAY_MS=1000` - Initial delay between retries in milliseconds +- `BEAM_RETRY_MAX_DELAY_MS=60000` - Maximum delay between retries in milliseconds +- `BEAM_RUNNER_BUNDLE_TIMEOUT_MS=300000` - Bundle processing timeout in milliseconds + +**gRPC stability settings:** +- `GRPC_ARG_KEEPALIVE_TIME_MS=30000` - gRPC keepalive time +- `GRPC_ARG_KEEPALIVE_TIMEOUT_MS=5000` - gRPC keepalive timeout +- `GRPC_ARG_MAX_RECONNECT_BACKOFF_MS=120000` - Maximum reconnection backoff +- `GRPC_ARG_MAX_CONNECTION_IDLE_MS=300000` - Maximum connection idle time + +**Test execution settings:** +- `PYTEST_XDIST_WORKER_COUNT=1` - Force sequential test execution +- `PYTHONHASHSEED=0` - Ensure deterministic hash behavior +- `OMP_NUM_THREADS=1` - Limit OpenMP threads +- `OPENBLAS_NUM_THREADS=1` - Limit OpenBLAS threads To run only a subset of tests using this approach, you could adjust the test label in the test (such as [it_postcommit](https://github.com/apache/beam/blob/25e6008e8919c2f31eaebae2662b44e02f9f37a1/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py#L211)) and the [selector](https://github.com/apache/beam/blob/25e6008e8919c2f31eaebae2662b44e02f9f37a1/sdks/python/test-suites/dataflow/common.gradle#L117) where the test suite is defined. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 56fad6677124..fdf291cb6f12 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -416,15 +416,6 @@ def run_stages( stage_context (translations.TransformContext) stages (list[fn_api_runner.translations.Stage]) """ - # Apply environment configuration for stability - force_single_bundle = os.getenv('BEAM_TESTING_FORCE_SINGLE_BUNDLE', 'false').lower() == 'true' - deterministic_order = os.getenv('BEAM_TESTING_DETERMINISTIC_ORDER', 'false').lower() == 'true' - - if force_single_bundle: - _LOGGER.info("Forcing single bundle execution due to BEAM_TESTING_FORCE_SINGLE_BUNDLE") - if deterministic_order: - _LOGGER.info("Enabling deterministic order due to BEAM_TESTING_DETERMINISTIC_ORDER") - worker_handler_manager = WorkerHandlerManager( stage_context.components.environments, self._provision_info) pipeline_metrics = MetricsContainer('') diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index 26a4bc20eea2..684658d327f4 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -68,32 +68,6 @@ # pylint: enable=wrong-import-order, wrong-import-position -def retry_on_grpc_timeout(max_retries=5, delay=10, max_total_time=300): - """Decorator to retry tests that fail due to grpc timeout issues.""" - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - start_time = time.time() - for attempt in range(max_retries): - try: - return func(*args, **kwargs) - except Exception as e: - elapsed_time = time.time() - start_time - if elapsed_time > max_total_time: - logging.error(f"Total retry time exceeded {max_total_time}s, giving up") - raise e - - if any(keyword in str(e).lower() for keyword in ["grpc", "deadline", "timeout", "connection", "unavailable"]): - if attempt < max_retries - 1: - logging.warning(f"Attempt {attempt + 1} failed with network error: {e}. Retrying in {delay} seconds... (elapsed: {elapsed_time:.1f}s)") - time.sleep(delay) - continue - raise e - return func(*args, **kwargs) - return wrapper - return decorator - - def get_payload(cls): payload = external_transforms_pb2.ExternalConfigurationPayload() payload.ParseFromString(cls._payload) @@ -233,7 +207,6 @@ def test_pipeline_generation(self): pipeline_from_proto.transforms_stack[0].parts[1].parts[0].full_label) @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') - @retry_on_grpc_timeout(max_retries=5, delay=10) # Retry on grpc timeout issues def test_pipeline_generation_with_runner_overrides(self): pipeline_properties = [ '--job_name=test-job', @@ -280,7 +253,6 @@ def test_pipeline_generation_with_runner_overrides(self): list(pubsub_read_transform.outputs.values()), list(external_transform.inputs.values())) - @retry_on_grpc_timeout(max_retries=3, delay=5) def test_payload(self): with beam.Pipeline() as p: res = ( @@ -320,7 +292,6 @@ def test_no_output_coder(self): pcol = context.pcollections.get_by_id(pcol_id) self.assertEqual(pcol.element_type, typehints.Any) - @retry_on_grpc_timeout(max_retries=3, delay=5) def test_nested(self): with beam.Pipeline() as p: assert_that(p | FibTransform(6), equal_to([8])) diff --git a/sdks/python/scripts/run_pytest.sh b/sdks/python/scripts/run_pytest.sh index 3e284aa9c809..ec1cc2547fef 100755 --- a/sdks/python/scripts/run_pytest.sh +++ b/sdks/python/scripts/run_pytest.sh @@ -143,8 +143,7 @@ status2=$? # Exit with error if no tests were run in either suite (status code 5). if [[ $status1 == 5 && $status2 == 5 ]]; then - echo "No tests were selected to run" - exit 0 + exit $status1 fi # Exit with error if one of the statuses has an error that's not 5. @@ -154,5 +153,3 @@ fi if [[ $status2 != 0 && $status2 != 5 ]]; then exit $status2 fi - -exit 0 diff --git a/sdks/python/test-suites/tox/common.gradle b/sdks/python/test-suites/tox/common.gradle index 667c7301e2f8..9e53e9a82ba2 100644 --- a/sdks/python/test-suites/tox/common.gradle +++ b/sdks/python/test-suites/tox/common.gradle @@ -35,7 +35,6 @@ test.dependsOn "testPy${pythonVersionSuffix}Dill" // toxTask "testPy${pythonVersionSuffix}Dask", "py${pythonVersionSuffix}-dask", "${posargs}" // test.dependsOn "testPy${pythonVersionSuffix}Dask" project.tasks.register("preCommitPy${pythonVersionSuffix}") { - // PreCommit should only run non-cloud tests to avoid flakiness - dependsOn = ["testPython${pythonVersionSuffix}"] + dependsOn = ["testPy${pythonVersionSuffix}Cloud", "testPython${pythonVersionSuffix}"] } From 90a81cf1d0de50ce615a92894c28efc3a5bb2c39 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Tue, 28 Oct 2025 13:10:58 +0300 Subject: [PATCH 3/5] resolved comments --- sdks/python/apache_beam/transforms/external_test.py | 2 +- sdks/python/test-suites/tox/common.gradle | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index 684658d327f4..556787d4b846 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -28,7 +28,7 @@ import typing import unittest -from unittest import mock +import mock import apache_beam as beam from apache_beam import ManagedReplacement diff --git a/sdks/python/test-suites/tox/common.gradle b/sdks/python/test-suites/tox/common.gradle index 9e53e9a82ba2..9f79fd6ecb71 100644 --- a/sdks/python/test-suites/tox/common.gradle +++ b/sdks/python/test-suites/tox/common.gradle @@ -34,6 +34,8 @@ test.dependsOn "testPy${pythonVersionSuffix}Dill" // toxTask "testPy${pythonVersionSuffix}Dask", "py${pythonVersionSuffix}-dask", "${posargs}" // test.dependsOn "testPy${pythonVersionSuffix}Dask" +// Since codecoverage reports will always be generated for py39, +// all tests will be exercised. project.tasks.register("preCommitPy${pythonVersionSuffix}") { dependsOn = ["testPy${pythonVersionSuffix}Cloud", "testPython${pythonVersionSuffix}"] From f70a063b519e13b16b2ac8cb8d8c784f93435e7a Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Tue, 28 Oct 2025 13:16:28 +0300 Subject: [PATCH 4/5] removed unused vars --- contributor-docs/python-tips.md | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/contributor-docs/python-tips.md b/contributor-docs/python-tips.md index 9bdd8adf3bb5..b582dfbadd9f 100644 --- a/contributor-docs/python-tips.md +++ b/contributor-docs/python-tips.md @@ -271,20 +271,6 @@ Integration tests suites on Jenkins are configured in groovy files that launch c The following environment variables can be used to improve test stability in CI environments: -**Beam-specific settings:** -- `BEAM_TESTING_FORCE_SINGLE_BUNDLE=true` - Forces single bundle execution for deterministic behavior -- `BEAM_TESTING_DETERMINISTIC_ORDER=true` - Enables deterministic ordering of operations -- `BEAM_RETRY_MAX_ATTEMPTS=5` - Maximum retry attempts for failed operations -- `BEAM_RETRY_INITIAL_DELAY_MS=1000` - Initial delay between retries in milliseconds -- `BEAM_RETRY_MAX_DELAY_MS=60000` - Maximum delay between retries in milliseconds -- `BEAM_RUNNER_BUNDLE_TIMEOUT_MS=300000` - Bundle processing timeout in milliseconds - -**gRPC stability settings:** -- `GRPC_ARG_KEEPALIVE_TIME_MS=30000` - gRPC keepalive time -- `GRPC_ARG_KEEPALIVE_TIMEOUT_MS=5000` - gRPC keepalive timeout -- `GRPC_ARG_MAX_RECONNECT_BACKOFF_MS=120000` - Maximum reconnection backoff -- `GRPC_ARG_MAX_CONNECTION_IDLE_MS=300000` - Maximum connection idle time - **Test execution settings:** - `PYTEST_XDIST_WORKER_COUNT=1` - Force sequential test execution - `PYTHONHASHSEED=0` - Ensure deterministic hash behavior From d81318d7f494714c071cea1c1e02759c34a03b04 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 29 Oct 2025 00:34:34 +0300 Subject: [PATCH 5/5] removed unused imports --- sdks/python/apache_beam/transforms/external_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index 556787d4b846..c59058a6e62b 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -20,11 +20,9 @@ # pytype: skip-file import dataclasses -import functools import logging import os import tempfile -import time import typing import unittest