From 2ade2f5bb7ab42eb4f559612115e11d541d6fb31 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 21 Apr 2025 15:50:56 -0400 Subject: [PATCH 1/5] Initial. --- sdks/python/apache_beam/internal/pickler.py | 4 ++-- sdks/python/apache_beam/pipeline_test.py | 16 ++++++++++--- .../runners/direct/direct_runner_test.py | 24 ++++++++++++------- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index c577bd3d4a25..256f88c5453f 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -33,9 +33,9 @@ USE_CLOUDPICKLE = 'cloudpickle' USE_DILL = 'dill' -DEFAULT_PICKLE_LIB = USE_DILL -desired_pickle_lib = dill_pickler +DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE +desired_pickle_lib = cloudpickle_pickler def dumps( diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 6480b2db3c86..9a44363a28e8 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -287,9 +287,19 @@ def test_no_wait_until_finish(self, mock_info): def test_auto_unique_labels(self): opts = PipelineOptions(["--auto_unique_labels"]) - with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen: - mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')] - mock_uuid_gen.side_effect = mock_uuids + + mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')] + mock_uuid_gen = mock.Mock(side_effect=mock_uuids) + + original_generate_unique_label = Pipeline._generate_unique_label + + def patched_generate_unique_label(self, transform): + with mock.patch.object(uuid, 'uuid4', return_value=mock_uuid_gen()): + return original_generate_unique_label(self, transform) + + with mock.patch.object(Pipeline, + '_generate_unique_label', + patched_generate_unique_label): with TestPipeline(options=opts) as pipeline: pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py index 1af5f1bc7bea..92116c665b64 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py @@ -41,6 +41,15 @@ from apache_beam.testing import test_pipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils import shared + + +class DictWrapper(dict): + pass + + +def acquire_dict(): + return DictWrapper() class DirectPipelineResultTest(unittest.TestCase): @@ -159,18 +168,17 @@ def test_retry_fork_graph(self): # currently does not currently support retries. p = beam.Pipeline(runner='BundleBasedDirectRunner') - # TODO(mariagh): Remove the use of globals from the test. - global count_b, count_c # pylint: disable=global-variable-undefined - count_b, count_c = 0, 0 + shared_handler = shared.Shared() + counts = shared_handler.acquire(acquire_dict) + counts['count_b'] = 0 + counts['count_c'] = 0 def f_b(x): - global count_b # pylint: disable=global-variable-undefined - count_b += 1 + shared_handler.acquire(acquire_dict)['count_b'] += 1 raise Exception('exception in f_b') def f_c(x): - global count_c # pylint: disable=global-variable-undefined - count_c += 1 + shared_handler.acquire(acquire_dict)['count_c'] += 1 raise Exception('exception in f_c') names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe']) @@ -180,7 +188,7 @@ def f_c(x): with self.assertRaises(Exception): p.run().wait_until_finish() - assert count_b == count_c == 4 + assert counts['count_b'] == counts['count_c'] == 4 def test_no_partial_writeouts(self): class TestTransformEvaluator(_TransformEvaluator): From 192001721039ae3842b666eb466842f52f99f099 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 21 Apr 2025 15:55:09 -0400 Subject: [PATCH 2/5] Run test suites. --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- .../beam_PostCommit_Python_ValidatesContainer_Dataflow.json | 2 +- .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index ed7f7f1e3863..0b759d797ee8 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,6 +1,6 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "https://github.com/apache/beam/pull/32440": "test new datastream runner for batch" - "modification": 9 + "modification": 10 } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json index 1012782ae381..12b91fa89ae4 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index bb11e2f79c20..afdc7f7012a8 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 10 + "modification": 11 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e0266d62f2e0..f1ba03a243ee 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 5 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } From d9cca49e1650971263a263b53e1d0016bf523088 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 24 Apr 2025 03:08:05 +0000 Subject: [PATCH 3/5] Allow runner to override default pickler. --- sdks/python/apache_beam/pipeline.py | 13 ++++++++++--- sdks/python/apache_beam/pipeline_test.py | 17 ++++++++++++++++- .../apache_beam/runners/direct/direct_runner.py | 4 ++++ sdks/python/apache_beam/runners/runner.py | 4 ++++ 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index b91d27926c54..eddc868f5450 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -191,9 +191,6 @@ def __init__( FileSystems.set_options(self._options) - pickle_library = self._options.view_as(SetupOptions).pickle_library - pickler.set_library(pickle_library) - if runner is None: runner = self._options.view_as(StandardOptions).runner if runner is None: @@ -228,6 +225,16 @@ def __init__( # Default runner to be used. self.runner = runner + + if (self._options.view_as(SetupOptions).pickle_library == 'default' and + self.runner.default_pickle_library_override() is not None): + logging.info( + "Default pickling library set to : %s.", + runner.default_pickle_library_override()) + self._options.view_as( + SetupOptions).pickle_library = runner.default_pickle_library_override( + ) + pickler.set_library(self._options.view_as(SetupOptions).pickle_library) # Stack of transforms generated by nested apply() calls. The stack will # contain a root node as an enclosing (parent) node for top transforms. self.transforms_stack = [ diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 9a44363a28e8..ef128471e9ff 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -62,6 +62,7 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MIN_TIMESTAMP +from apache_beam.runners.direct import direct_runner class FakeUnboundedSource(SourceBase): @@ -156,6 +157,20 @@ def test_create(self): pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10]) assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') + @mock.patch('logging.info') + def test_runner_overrides_default_pickler(self, mock_info): + with mock.patch.object(direct_runner.SwitchingDirectRunner, + 'default_pickle_library_override') as mock_fn: + mock_fn.return_value = 'dill' + with TestPipeline() as pipeline: + pcoll = pipeline | 'label1' >> Create([1, 2, 3]) + assert_that(pcoll, equal_to([1, 2, 3])) + + from apache_beam.internal import pickler + from apache_beam.internal import dill_pickler + self.assertIs(pickler.desired_pickle_lib, dill_pickler) + mock_info.assert_any_call('Default pickling library set to : %s.', 'dill') + def test_flatmap_builtin(self): with TestPipeline() as pipeline: pcoll = pipeline | 'label1' >> Create([1, 2, 3]) @@ -279,7 +294,7 @@ def test_no_wait_until_finish(self, mock_info): with Pipeline(runner='DirectRunner', options=PipelineOptions(["--no_wait_until_finish"])) as p: _ = p | beam.Create(['test']) - mock_info.assert_called_once_with( + mock_info.assert_any_call( 'Job execution continues without waiting for completion. ' 'Use "wait_until_finish" in PipelineResult to block until finished.') p.result.wait_until_finish() diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8b8937653688..8f2d31ed6754 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -66,6 +66,10 @@ class SwitchingDirectRunner(PipelineRunner): which supports streaming execution and certain primitives not yet implemented in the FnApiRunner. """ + def default_pickle_library_override(self): + """Default pickle library, can be overridden by runner implementation.""" + return 'cloudpickle' + def is_fnapi_compatible(self): return BundleBasedDirectRunner.is_fnapi_compatible() diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 1030e9fbd8b5..e3b7a9de9483 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -225,6 +225,10 @@ def check_requirements( beam_runner_api_pb2.TimeDomain.PROCESSING_TIME): raise NotImplementedError(timer.time_domain) + def default_pickle_library_override(self): + """Default pickle library, can be overridden by runner implementation.""" + return None + # FIXME: replace with PipelineState(str, enum.Enum) class PipelineState(object): From 1d4ab9a67dff4597fe63a33c86c32c1357f6d196 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 25 Apr 2025 01:57:52 +0000 Subject: [PATCH 4/5] Move options override to before validation. --- sdks/python/apache_beam/pipeline.py | 20 ++++++++++--------- sdks/python/apache_beam/pipeline_test.py | 7 ++++--- .../runners/dataflow/dataflow_runner.py | 3 +++ .../runners/direct/direct_runner.py | 4 ---- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index eddc868f5450..207e5be7f6dd 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -207,6 +207,17 @@ def __init__( 'Runner %s is not a PipelineRunner object or the ' 'name of a registered runner.' % runner) + # Runner can oerride the default runner to be used. + if (self._options.view_as(SetupOptions).pickle_library == 'default' and + runner.default_pickle_library_override() is not None): + logging.info( + "Runner defaulting to pickling library: %s.", + runner.default_pickle_library_override()) + self._options.view_as( + SetupOptions).pickle_library = runner.default_pickle_library_override( + ) + pickler.set_library(self._options.view_as(SetupOptions).pickle_library) + # Validate pipeline options errors = PipelineOptionsValidator(self._options, runner).validate() if errors: @@ -226,15 +237,6 @@ def __init__( # Default runner to be used. self.runner = runner - if (self._options.view_as(SetupOptions).pickle_library == 'default' and - self.runner.default_pickle_library_override() is not None): - logging.info( - "Default pickling library set to : %s.", - runner.default_pickle_library_override()) - self._options.view_as( - SetupOptions).pickle_library = runner.default_pickle_library_override( - ) - pickler.set_library(self._options.view_as(SetupOptions).pickle_library) # Stack of transforms generated by nested apply() calls. The stack will # contain a root node as an enclosing (parent) node for top transforms. self.transforms_stack = [ diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index ef128471e9ff..52ecb1a1d575 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -41,6 +41,7 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import TaggedOutput +from apache_beam.runners.runner import PipelineRunner from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -62,7 +63,6 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MIN_TIMESTAMP -from apache_beam.runners.direct import direct_runner class FakeUnboundedSource(SourceBase): @@ -159,7 +159,7 @@ def test_create(self): @mock.patch('logging.info') def test_runner_overrides_default_pickler(self, mock_info): - with mock.patch.object(direct_runner.SwitchingDirectRunner, + with mock.patch.object(PipelineRunner, 'default_pickle_library_override') as mock_fn: mock_fn.return_value = 'dill' with TestPipeline() as pipeline: @@ -169,7 +169,8 @@ def test_runner_overrides_default_pickler(self, mock_info): from apache_beam.internal import pickler from apache_beam.internal import dill_pickler self.assertIs(pickler.desired_pickle_lib, dill_pickler) - mock_info.assert_any_call('Default pickling library set to : %s.', 'dill') + mock_info.assert_any_call( + 'Runner defaulting to pickling library: %s.', 'dill') def test_flatmap_builtin(self): with TestPipeline() as pipeline: diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 860326d02d30..349fee3eff26 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -98,6 +98,9 @@ class DataflowRunner(PipelineRunner): def __init__(self, cache=None): self._default_environment = None + def default_pickle_library_override(self): + return 'cloudpickle' + def is_fnapi_compatible(self): return False diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8f2d31ed6754..8b8937653688 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -66,10 +66,6 @@ class SwitchingDirectRunner(PipelineRunner): which supports streaming execution and certain primitives not yet implemented in the FnApiRunner. """ - def default_pickle_library_override(self): - """Default pickle library, can be overridden by runner implementation.""" - return 'cloudpickle' - def is_fnapi_compatible(self): return BundleBasedDirectRunner.is_fnapi_compatible() From 4739edfc5328ab85bf017b208beeadbefbe5c78d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 25 Apr 2025 21:47:19 +0000 Subject: [PATCH 5/5] Fix tests and comments. --- .../apache_beam/internal/pickler_test.py | 104 +++++++++++++++--- sdks/python/apache_beam/pipeline.py | 4 +- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 60fa1e075522..c8c1dd5e5f7a 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -25,7 +25,11 @@ import types import unittest +from parameterized import param +from parameterized import parameterized + from apache_beam.internal import module_test +from apache_beam.internal import pickler from apache_beam.internal.pickler import dumps from apache_beam.internal.pickler import loads @@ -34,13 +38,24 @@ class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") - def test_basics(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_basics(self, pickle_lib): + pickler.set_library(pickle_lib) + self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )]))) fun = lambda x: 'xyz-%s' % x self.assertEqual('xyz-abc', loads(dumps(fun))('abc')) - def test_lambda_with_globals(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_lambda_with_globals(self, pickle_lib): """Tests that the globals of a function are preserved.""" + pickler.set_library(pickle_lib) # The point of the test is that the lambda being called after unpickling # relies on having the re module being loaded. @@ -48,60 +63,115 @@ def test_lambda_with_globals(self): loads(dumps( module_test.get_lambda_with_globals()))('abc def')) - def test_lambda_with_main_globals(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_lambda_with_main_globals(self, pickle_lib): + pickler.set_library(pickle_lib) self.assertEqual(unittest, loads(dumps(lambda: unittest))()) - def test_lambda_with_closure(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_lambda_with_closure(self, pickle_lib): """Tests that the closure of a function is preserved.""" + pickler.set_library(pickle_lib) self.assertEqual( 'closure: abc', loads(dumps(module_test.get_lambda_with_closure('abc')))()) - def test_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_class(self, pickle_lib): """Tests that a class object is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.Xyz))().foo('abc def')) - def test_object(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_object(self, pickle_lib): """Tests that a class instance is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) - def test_nested_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_nested_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual( 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) self.assertEqual( 'Y:abc', loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum) - def test_dynamic_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_dynamic_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual( 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) - def test_generators(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_generators(self, pickle_lib): + pickler.set_library(pickle_lib) with self.assertRaises(TypeError): dumps((_ for _ in range(10))) - def test_recursive_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_recursive_class(self, pickle_lib): + pickler.set_library(pickle_lib) self.assertEqual( 'RecursiveClass:abc', loads(dumps(module_test.RecursiveClass('abc').datum))) - def test_pickle_rlock(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_pickle_rlock(self, pickle_lib): + pickler.set_library(pickle_lib) rlock_instance = threading.RLock() rlock_type = type(rlock_instance) self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type) - def test_save_paths(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_save_paths(self, pickle_lib): + pickler.set_library(pickle_lib) f = loads(dumps(lambda x: x)) co_filename = f.__code__.co_filename self.assertTrue(co_filename.endswith('pickler_test.py')) @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') - def test_dump_and_load_mapping_proxy(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_dump_and_load_mapping_proxy(self, pickle_lib): + pickler.set_library(pickle_lib) self.assertEqual( 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) self.assertEqual( @@ -109,7 +179,11 @@ def test_dump_and_load_mapping_proxy(self): # pylint: disable=exec-used @unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only') - def test_dataclass(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_dataclass(self, pickle_lib): exec( ''' from apache_beam.internal.module_test import DataClass @@ -141,6 +215,7 @@ def maybe_get_sets_with_different_iteration_orders(self): return set1, set2 def test_best_effort_determinism(self): + pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() self.assertEqual( dumps(set1, enable_best_effort_determinism=True), @@ -152,6 +227,7 @@ def test_best_effort_determinism(self): self.skipTest('Set iteration orders matched. Test results inconclusive.') def test_disable_best_effort_determinism(self): + pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() # The test relies on the sets having different iteration orders for the # elements. Iteration order is implementation dependent and undefined, diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 207e5be7f6dd..db03968a237d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -207,9 +207,9 @@ def __init__( 'Runner %s is not a PipelineRunner object or the ' 'name of a registered runner.' % runner) - # Runner can oerride the default runner to be used. + # Runner can override the default pickler to be used. if (self._options.view_as(SetupOptions).pickle_library == 'default' and - runner.default_pickle_library_override() is not None): + runner.default_pickle_library_override()): logging.info( "Runner defaulting to pickling library: %s.", runner.default_pickle_library_override())