diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index aafa088ceb10..626d672e7e46 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -827,13 +827,15 @@ def timer_callback(self, t=beam.DoFn.TimestampParam): assert_that(actual, equal_to(expected)) def test_pardo_et_timer_with_no_firing(self): - if type(self) in [FnApiRunnerTest, - FnApiRunnerTestWithGrpc, - FnApiRunnerTestWithGrpcAndMultiWorkers, - FnApiRunnerTestWithDisabledCaching, - FnApiRunnerTestWithMultiWorkers, - FnApiRunnerTestWithBundleRepeat, - FnApiRunnerTestWithBundleRepeatAndMultiWorkers]: + if type(self).__name__ in {'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest'}: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will not fire. It is initially set to T + 10, but then it is @@ -842,13 +844,15 @@ def test_pardo_et_timer_with_no_firing(self): self._run_pardo_et_timer_test(5, 10, True, True, []) def test_pardo_et_timer_with_no_reset(self): - if type(self) in [FnApiRunnerTest, - FnApiRunnerTestWithGrpc, - FnApiRunnerTestWithGrpcAndMultiWorkers, - FnApiRunnerTestWithDisabledCaching, - FnApiRunnerTestWithMultiWorkers, - FnApiRunnerTestWithBundleRepeat, - FnApiRunnerTestWithBundleRepeatAndMultiWorkers]: + if type(self).__name__ in {'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest'}: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will not fire. It is initially set to T + 10, and then it is @@ -856,6 +860,16 @@ def test_pardo_et_timer_with_no_reset(self): self._run_pardo_et_timer_test(5, 10, False, True, []) def test_pardo_et_timer_with_no_reset_and_no_clear(self): + if type(self).__name__ in {'FnApiRunnerTest', + 'FnApiRunnerTestWithGrpc', + 'FnApiRunnerTestWithGrpcAndMultiWorkers', + 'FnApiRunnerTestWithDisabledCaching', + 'FnApiRunnerTestWithMultiWorkers', + 'FnApiRunnerTestWithBundleRepeat', + 'FnApiRunnerTestWithBundleRepeatAndMultiWorkers', + 'SamzaRunnerTest', + 'SparkRunnerTest'}: + raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") # The timer will fire at T + 10. After the timer is set, it is never # cleared or set again. self._run_pardo_et_timer_test(5, 10, False, False, ["fired"]) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index e128b6a73e4c..31293a4d43ec 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -223,22 +223,30 @@ def test_draining_sdf_with_sdf_initiated_checkpointing(self): raise unittest.SkipTest("Portable runners don't support drain yet.") def test_pardo_et_timer_with_no_firing(self): - if type(self) in [PortableRunnerTest, - PortableRunnerTestWithSubprocesses, - PortableRunnerTestWithSubprocessesAndMultiWorkers, - PortableRunnerTestWithExternalEnv, - PortableRunnerTestWithLocalDocker, - PortableRunnerOptimizedWithoutFusion]: + if type(self).__name__ in { + 'PortableRunnerTest', + 'PortableRunnerTestWithSubprocesses', + 'PortableRunnerTestWithSubprocessesAndMultiWorkers', + 'PortableRunnerTestWithExternalEnv', + 'PortableRunnerTestWithLocalDocker', + 'PortableRunnerOptimizedWithoutFusion', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") super().test_pardo_et_timer_with_no_firing() def test_pardo_et_timer_with_no_reset(self): - if type(self) in [PortableRunnerTest, - PortableRunnerTestWithSubprocesses, - PortableRunnerTestWithSubprocessesAndMultiWorkers, - PortableRunnerTestWithExternalEnv, - PortableRunnerTestWithLocalDocker, - PortableRunnerOptimizedWithoutFusion]: + if type(self).__name__ in { + 'PortableRunnerTest', + 'PortableRunnerTestWithSubprocesses', + 'PortableRunnerTestWithSubprocessesAndMultiWorkers', + 'PortableRunnerTestWithExternalEnv', + 'PortableRunnerTestWithLocalDocker', + 'PortableRunnerOptimizedWithoutFusion', + 'SamzaRunnerTest', + 'SparkRunnerTest' + }: raise unittest.SkipTest("https://github.com/apache/beam/issues/35168") super().test_pardo_et_timer_with_no_reset()