From fd4f9bbb515c694e77186c5df9b2260388b08a03 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Mon, 9 Jun 2025 12:44:10 +0400 Subject: [PATCH 1/2] Skip Samza and Spark runner tests --- .../fn_api_runner/fn_runner_test.py | 48 +++++++++++++------ .../portability/portable_runner_test.py | 32 ++++++++----- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index aafa088ceb10..250970437a10 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -827,13 +827,17 @@ def timer_callback(self, t=beam.DoFn.TimestampParam): assert_that(actual, equal_to(expected)) def test_pardo_et_timer_with_no_firing(self): - if type(self) in [FnApiRunnerTest, - FnApiRunnerTestWithGrpc, - FnApiRunnerTestWithGrpcAndMultiWorkers, - FnApiRunnerTestWithDisabledCaching, - FnApiRunnerTestWithMultiWorkers, - FnApiRunnerTestWithBundleRepeat, - FnApiRunnerTestWithBundleRepeatAndMultiWorkers]: + if type(self).__name__ in { + 'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will not fire. It is initially set to T + 10, but then it is @@ -842,13 +846,17 @@ def test_pardo_et_timer_with_no_firing(self): self._run_pardo_et_timer_test(5, 10, True, True, []) def test_pardo_et_timer_with_no_reset(self): - if type(self) in [FnApiRunnerTest, - FnApiRunnerTestWithGrpc, - FnApiRunnerTestWithGrpcAndMultiWorkers, - FnApiRunnerTestWithDisabledCaching, - FnApiRunnerTestWithMultiWorkers, - FnApiRunnerTestWithBundleRepeat, - FnApiRunnerTestWithBundleRepeatAndMultiWorkers]: + if type(self).__name__ in { + 'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will not fire. It is initially set to T + 10, and then it is @@ -856,6 +864,18 @@ def test_pardo_et_timer_with_no_reset(self): self._run_pardo_et_timer_test(5, 10, False, True, []) def test_pardo_et_timer_with_no_reset_and_no_clear(self): + if type(self).__name__ in { + 'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: + raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will fire at T + 10. After the timer is set, it is never # cleared or set again. self._run_pardo_et_timer_test(5, 10, False, False, ["fired"]) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index e128b6a73e4c..31293a4d43ec 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -223,22 +223,30 @@ def test_draining_sdf_with_sdf_initiated_checkpointing(self): raise unittest.SkipTest("Portable runners don't support drain yet.") def test_pardo_et_timer_with_no_firing(self): - if type(self) in [PortableRunnerTest, - PortableRunnerTestWithSubprocesses, - PortableRunnerTestWithSubprocessesAndMultiWorkers, - PortableRunnerTestWithExternalEnv, - PortableRunnerTestWithLocalDocker, - PortableRunnerOptimizedWithoutFusion]: + if type(self).__name__ in { + 'PortableRunnerTest', + 'PortableRunnerTestWithSubprocesses', + 'PortableRunnerTestWithSubprocessesAndMultiWorkers', + 'PortableRunnerTestWithExternalEnv', + 'PortableRunnerTestWithLocalDocker', + 'PortableRunnerOptimizedWithoutFusion', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") super().test_pardo_et_timer_with_no_firing() def test_pardo_et_timer_with_no_reset(self): - if type(self) in [PortableRunnerTest, - PortableRunnerTestWithSubprocesses, - PortableRunnerTestWithSubprocessesAndMultiWorkers, - PortableRunnerTestWithExternalEnv, - PortableRunnerTestWithLocalDocker, - PortableRunnerOptimizedWithoutFusion]: + if type(self).__name__ in { + 'PortableRunnerTest', + 'PortableRunnerTestWithSubprocesses', + 'PortableRunnerTestWithSubprocessesAndMultiWorkers', + 'PortableRunnerTestWithExternalEnv', + 'PortableRunnerTestWithLocalDocker', + 'PortableRunnerOptimizedWithoutFusion', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") super().test_pardo_et_timer_with_no_reset() From 3a7a8c6037f8814c1248628130608983a123f07b Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Mon, 9 Jun 2025 17:04:23 +0400 Subject: [PATCH 2/2] Fix formatting --- .../fn_api_runner/fn_runner_test.py | 60 +++++++++---------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 250970437a10..626d672e7e46 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -827,17 +827,15 @@ def timer_callback(self, t=beam.DoFn.TimestampParam): assert_that(actual, equal_to(expected)) def test_pardo_et_timer_with_no_firing(self): - if type(self).__name__ in { - 'FnApiRunnerTest', - 'FnApiRunnerTestWithGrpc', - 'FnApiRunnerTestWithGrpcAndMultiWorkers', - 'FnApiRunnerTestWithDisabledCaching', - 'FnApiRunnerTestWithMultiWorkers', - 'FnApiRunnerTestWithBundleRepeat', - 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', - 'SamzaRunnerTest', - 'SparkRunnerTest' - }: + if type(self).__name__ in {'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest'}: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will not fire. It is initially set to T + 10, but then it is @@ -846,17 +844,15 @@ def test_pardo_et_timer_with_no_firing(self): self._run_pardo_et_timer_test(5, 10, True, True, []) def test_pardo_et_timer_with_no_reset(self): - if type(self).__name__ in { - 'FnApiRunnerTest', - 'FnApiRunnerTestWithGrpc', - 'FnApiRunnerTestWithGrpcAndMultiWorkers', - 'FnApiRunnerTestWithDisabledCaching', - 'FnApiRunnerTestWithMultiWorkers', - 'FnApiRunnerTestWithBundleRepeat', - 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', - 'SamzaRunnerTest', - 'SparkRunnerTest' - }: + if type(self).__name__ in {'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest'}: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will not fire. It is initially set to T + 10, and then it is @@ -864,17 +860,15 @@ def test_pardo_et_timer_with_no_reset(self): self._run_pardo_et_timer_test(5, 10, False, True, []) def test_pardo_et_timer_with_no_reset_and_no_clear(self): - if type(self).__name__ in { - 'FnApiRunnerTest', - 'FnApiRunnerTestWithGrpc', - 'FnApiRunnerTestWithGrpcAndMultiWorkers', - 'FnApiRunnerTestWithDisabledCaching', - 'FnApiRunnerTestWithMultiWorkers', - 'FnApiRunnerTestWithBundleRepeat', - 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', - 'SamzaRunnerTest', - 'SparkRunnerTest' - }: + if type(self).__name__ in {'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest'}: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will fire at T + 10. After the timer is set, it is never # cleared or set again.